From c970e58739ff4c63d891117701be357d3062e5b5 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 16 Feb 2025 22:27:58 +0100 Subject: [PATCH] backend: refactor backend Connections and HasAtomicReplace into Properties --- internal/backend/azure/azure.go | 12 +++++------ internal/backend/b2/b2.go | 12 +++++------ internal/backend/backend.go | 15 ++++++++----- internal/backend/dryrun/dry_backend.go | 8 ++----- internal/backend/gs/gs.go | 12 +++++------ internal/backend/local/local.go | 12 +++++------ internal/backend/mem/mem_backend.go | 12 +++++------ internal/backend/mock/backend.go | 22 +++++++------------- internal/backend/rest/rest.go | 14 ++++++------- internal/backend/retry/backend_retry.go | 2 +- internal/backend/retry/backend_retry_test.go | 7 ++++++- internal/backend/s3/s3.go | 12 +++++------ internal/backend/sema/backend.go | 2 +- internal/backend/sema/backend_test.go | 14 +++++++++++-- internal/backend/sftp/sftp.go | 12 +++++------ internal/backend/swift/swift.go | 12 +++++------ internal/repository/repository.go | 4 ++-- internal/repository/upgrade_repo.go | 2 +- 18 files changed, 89 insertions(+), 97 deletions(-) diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index b2ef7ec30..54aeadc4e 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -217,8 +217,11 @@ func (be *Backend) IsPermanentError(err error) bool { return false } -func (be *Backend) Connections() uint { - return be.connections +func (be *Backend) Properties() backend.Properties { + return backend.Properties{ + Connections: be.connections, + HasAtomicReplace: true, + } } // Hasher may return a hash function for calculating a content hash for the backend @@ -226,11 +229,6 @@ func (be *Backend) Hasher() hash.Hash { return md5.New() } -// HasAtomicReplace returns whether Save() can atomically replace files -func (be *Backend) HasAtomicReplace() bool { - return true -} - // Path returns the path in the bucket that is used for this backend. func (be *Backend) Path() string { return be.prefix diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 6f66b3673..6ebba570e 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -154,8 +154,11 @@ func (be *b2Backend) SetListMaxItems(i int) { be.listMaxItems = i } -func (be *b2Backend) Connections() uint { - return be.cfg.Connections +func (be *b2Backend) Properties() backend.Properties { + return backend.Properties{ + Connections: be.cfg.Connections, + HasAtomicReplace: true, + } } // Hasher may return a hash function for calculating a content hash for the backend @@ -163,11 +166,6 @@ func (be *b2Backend) Hasher() hash.Hash { return nil } -// HasAtomicReplace returns whether Save() can atomically replace files -func (be *b2Backend) HasAtomicReplace() bool { - return true -} - // IsNotExist returns true if the error is caused by a non-existing file. func (be *b2Backend) IsNotExist(err error) bool { // blazer/b2 does not export its error types and values, diff --git a/internal/backend/backend.go b/internal/backend/backend.go index 2529dfab5..d3e47d0b2 100644 --- a/internal/backend/backend.go +++ b/internal/backend/backend.go @@ -17,15 +17,12 @@ var ErrNoRepository = fmt.Errorf("repository does not exist") // the context package need not be wrapped, as context cancellation is checked // separately by the retrying logic. type Backend interface { - // Connections returns the maximum number of concurrent backend operations. - Connections() uint + // Properties returns information about the backend + Properties() Properties // Hasher may return a hash function for calculating a content hash for the backend Hasher() hash.Hash - // HasAtomicReplace returns whether Save() can atomically replace files - HasAtomicReplace() bool - // Remove removes a File described by h. Remove(ctx context.Context, h Handle) error @@ -92,6 +89,14 @@ type Backend interface { WarmupWait(ctx context.Context, h []Handle) error } +type Properties struct { + // Connections states the maximum number of concurrent backend operations. + Connections uint + + // HasAtomicReplace states whether Save() can atomically replace files + HasAtomicReplace bool +} + type Unwrapper interface { // Unwrap returns the underlying backend or nil if there is none. Unwrap() Backend diff --git a/internal/backend/dryrun/dry_backend.go b/internal/backend/dryrun/dry_backend.go index fbce41916..383dd9f38 100644 --- a/internal/backend/dryrun/dry_backend.go +++ b/internal/backend/dryrun/dry_backend.go @@ -42,8 +42,8 @@ func (be *Backend) Remove(_ context.Context, _ backend.Handle) error { return nil } -func (be *Backend) Connections() uint { - return be.b.Connections() +func (be *Backend) Properties() backend.Properties { + return be.b.Properties() } // Delete removes all data in the backend. @@ -59,10 +59,6 @@ func (be *Backend) Hasher() hash.Hash { return be.b.Hasher() } -func (be *Backend) HasAtomicReplace() bool { - return be.b.HasAtomicReplace() -} - func (be *Backend) IsNotExist(err error) bool { return be.b.IsNotExist(err) } diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index ab20ca103..bd8f0c22b 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -186,8 +186,11 @@ func (be *Backend) IsPermanentError(err error) bool { return false } -func (be *Backend) Connections() uint { - return be.connections +func (be *Backend) Properties() backend.Properties { + return backend.Properties{ + Connections: be.connections, + HasAtomicReplace: true, + } } // Hasher may return a hash function for calculating a content hash for the backend @@ -195,11 +198,6 @@ func (be *Backend) Hasher() hash.Hash { return md5.New() } -// HasAtomicReplace returns whether Save() can atomically replace files -func (be *Backend) HasAtomicReplace() bool { - return true -} - // Path returns the path in the bucket that is used for this backend. func (be *Backend) Path() string { return be.prefix diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index e2065742f..861371b3e 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -84,8 +84,11 @@ func Create(_ context.Context, cfg Config) (*Local, error) { return be, nil } -func (b *Local) Connections() uint { - return b.Config.Connections +func (b *Local) Properties() backend.Properties { + return backend.Properties{ + Connections: b.Config.Connections, + HasAtomicReplace: true, + } } // Hasher may return a hash function for calculating a content hash for the backend @@ -93,11 +96,6 @@ func (b *Local) Hasher() hash.Hash { return nil } -// HasAtomicReplace returns whether Save() can atomically replace files -func (b *Local) HasAtomicReplace() bool { - return true -} - // IsNotExist returns true if the error is caused by a non existing file. func (b *Local) IsNotExist(err error) bool { return errors.Is(err, os.ErrNotExist) diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index e5ee297a1..3064a3b88 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -218,8 +218,11 @@ func (be *MemoryBackend) List(ctx context.Context, t backend.FileType, fn func(b return ctx.Err() } -func (be *MemoryBackend) Connections() uint { - return connectionCount +func (be *MemoryBackend) Properties() backend.Properties { + return backend.Properties{ + Connections: connectionCount, + HasAtomicReplace: false, + } } // Hasher may return a hash function for calculating a content hash for the backend @@ -227,11 +230,6 @@ func (be *MemoryBackend) Hasher() hash.Hash { return xxhash.New() } -// HasAtomicReplace returns whether Save() can atomically replace files -func (be *MemoryBackend) HasAtomicReplace() bool { - return false -} - // Delete removes all data in the backend. func (be *MemoryBackend) Delete(ctx context.Context) error { be.m.Lock() diff --git a/internal/backend/mock/backend.go b/internal/backend/mock/backend.go index 2083f7e88..7b304d231 100644 --- a/internal/backend/mock/backend.go +++ b/internal/backend/mock/backend.go @@ -22,9 +22,8 @@ type Backend struct { DeleteFn func(ctx context.Context) error WarmupFn func(ctx context.Context, h []backend.Handle) ([]backend.Handle, error) WarmupWaitFn func(ctx context.Context, h []backend.Handle) error - ConnectionsFn func() uint + PropertiesFn func() backend.Properties HasherFn func() hash.Hash - HasAtomicReplaceFn func() bool } // NewBackend returns new mock Backend instance @@ -42,12 +41,15 @@ func (m *Backend) Close() error { return m.CloseFn() } -func (m *Backend) Connections() uint { - if m.ConnectionsFn == nil { - return 2 +func (m *Backend) Properties() backend.Properties { + if m.PropertiesFn == nil { + return backend.Properties{ + Connections: 2, + HasAtomicReplace: false, + } } - return m.ConnectionsFn() + return m.PropertiesFn() } // Hasher may return a hash function for calculating a content hash for the backend @@ -59,14 +61,6 @@ func (m *Backend) Hasher() hash.Hash { return m.HasherFn() } -// HasAtomicReplace returns whether Save() can atomically replace files -func (m *Backend) HasAtomicReplace() bool { - if m.HasAtomicReplaceFn == nil { - return false - } - return m.HasAtomicReplaceFn() -} - // IsNotExist returns true if the error is caused by a missing file. func (m *Backend) IsNotExist(err error) bool { if m.IsNotExistFn == nil { diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index 2c5f59b4e..8df91beb3 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -116,8 +116,12 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, er return be, nil } -func (b *Backend) Connections() uint { - return b.connections +func (b *Backend) Properties() backend.Properties { + return backend.Properties{ + Connections: b.connections, + // rest-server prevents overwriting + HasAtomicReplace: false, + } } // Hasher may return a hash function for calculating a content hash for the backend @@ -125,12 +129,6 @@ func (b *Backend) Hasher() hash.Hash { return nil } -// HasAtomicReplace returns whether Save() can atomically replace files -func (b *Backend) HasAtomicReplace() bool { - // rest-server prevents overwriting - return false -} - // Save stores data in the backend at the handle. func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error { ctx, cancel := context.WithCancel(ctx) diff --git a/internal/backend/retry/backend_retry.go b/internal/backend/retry/backend_retry.go index aa48bde77..869380d8b 100644 --- a/internal/backend/retry/backend_retry.go +++ b/internal/backend/retry/backend_retry.go @@ -166,7 +166,7 @@ func (be *Backend) Save(ctx context.Context, h backend.Handle, rd backend.Rewind return nil } - if be.Backend.HasAtomicReplace() { + if be.Backend.Properties().HasAtomicReplace { debug.Log("Save(%v) failed with error: %v", h, err) // there is no need to remove files from backends which can atomically replace files // in fact if something goes wrong at the backend side the delete operation might delete the wrong instance of the file diff --git a/internal/backend/retry/backend_retry_test.go b/internal/backend/retry/backend_retry_test.go index 9259144d4..c305a50f7 100644 --- a/internal/backend/retry/backend_retry_test.go +++ b/internal/backend/retry/backend_retry_test.go @@ -69,7 +69,12 @@ func TestBackendSaveRetryAtomic(t *testing.T) { calledRemove = true return nil }, - HasAtomicReplaceFn: func() bool { return true }, + PropertiesFn: func() backend.Properties { + return backend.Properties{ + Connections: 2, + HasAtomicReplace: true, + } + }, } TestFastRetries(t) diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index e0d8ea623..8170e0738 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -261,8 +261,11 @@ func (be *Backend) IsPermanentError(err error) bool { return false } -func (be *Backend) Connections() uint { - return be.cfg.Connections +func (be *Backend) Properties() backend.Properties { + return backend.Properties{ + Connections: be.cfg.Connections, + HasAtomicReplace: true, + } } // Hasher may return a hash function for calculating a content hash for the backend @@ -270,11 +273,6 @@ func (be *Backend) Hasher() hash.Hash { return nil } -// HasAtomicReplace returns whether Save() can atomically replace files -func (be *Backend) HasAtomicReplace() bool { - return true -} - // Path returns the path in the bucket that is used for this backend. func (be *Backend) Path() string { return be.cfg.Prefix diff --git a/internal/backend/sema/backend.go b/internal/backend/sema/backend.go index 1d69c52ac..c545a7302 100644 --- a/internal/backend/sema/backend.go +++ b/internal/backend/sema/backend.go @@ -22,7 +22,7 @@ type connectionLimitedBackend struct { // NewBackend creates a backend that limits the concurrent operations on the underlying backend func NewBackend(be backend.Backend) backend.Backend { - sem, err := newSemaphore(be.Connections()) + sem, err := newSemaphore(be.Properties().Connections) if err != nil { panic(err) } diff --git a/internal/backend/sema/backend_test.go b/internal/backend/sema/backend_test.go index d220f48a3..0c73c5cb0 100644 --- a/internal/backend/sema/backend_test.go +++ b/internal/backend/sema/backend_test.go @@ -106,7 +106,12 @@ func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(b m := mock.NewBackend() setup(m) - m.ConnectionsFn = func() uint { return uint(expectBlocked) } + m.PropertiesFn = func() backend.Properties { + return backend.Properties{ + Connections: uint(expectBlocked), + HasAtomicReplace: false, + } + } be := sema.NewBackend(m) var wg errgroup.Group @@ -206,7 +211,12 @@ func TestFreeze(t *testing.T) { atomic.AddInt64(&counter, 1) return nil } - m.ConnectionsFn = func() uint { return 2 } + m.PropertiesFn = func() backend.Properties { + return backend.Properties{ + Connections: 2, + HasAtomicReplace: false, + } + } be := sema.NewBackend(m) fb := be.(backend.FreezeBackend) diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index df7c3b14a..a65db464c 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -264,8 +264,11 @@ func Create(ctx context.Context, cfg Config) (*SFTP, error) { return open(sftp, cfg) } -func (r *SFTP) Connections() uint { - return r.Config.Connections +func (r *SFTP) Properties() backend.Properties { + return backend.Properties{ + Connections: r.Config.Connections, + HasAtomicReplace: r.posixRename, + } } // Hasher may return a hash function for calculating a content hash for the backend @@ -273,11 +276,6 @@ func (r *SFTP) Hasher() hash.Hash { return nil } -// HasAtomicReplace returns whether Save() can atomically replace files -func (r *SFTP) HasAtomicReplace() bool { - return r.posixRename -} - // tempSuffix generates a random string suffix that should be sufficiently long // to avoid accidental conflicts func tempSuffix() string { diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index 090d00512..ae6a13462 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -111,8 +111,11 @@ func (be *beSwift) createContainer(ctx context.Context, policy string) error { return be.conn.ContainerCreate(ctx, be.container, h) } -func (be *beSwift) Connections() uint { - return be.connections +func (be *beSwift) Properties() backend.Properties { + return backend.Properties{ + Connections: be.connections, + HasAtomicReplace: true, + } } // Hasher may return a hash function for calculating a content hash for the backend @@ -120,11 +123,6 @@ func (be *beSwift) Hasher() hash.Hash { return md5.New() } -// HasAtomicReplace returns whether Save() can atomically replace files -func (be *beSwift) HasAtomicReplace() bool { - return true -} - // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *beSwift) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { diff --git a/internal/repository/repository.go b/internal/repository/repository.go index aee0db103..d7249d7ee 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -552,7 +552,7 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) innerWg, ctx := errgroup.WithContext(ctx) r.packerWg = innerWg - r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) + r.uploader = newPackerUploader(ctx, innerWg, r, r.Connections()) r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker) r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker) @@ -587,7 +587,7 @@ func (r *Repository) flushPacks(ctx context.Context) error { } func (r *Repository) Connections() uint { - return r.be.Connections() + return r.be.Properties().Connections } func (r *Repository) LookupBlob(tpe restic.BlobType, id restic.ID) []restic.PackedBlob { diff --git a/internal/repository/upgrade_repo.go b/internal/repository/upgrade_repo.go index 0a91b1093..8f6bc0320 100644 --- a/internal/repository/upgrade_repo.go +++ b/internal/repository/upgrade_repo.go @@ -33,7 +33,7 @@ func (err *upgradeRepoV2Error) Unwrap() error { func upgradeRepository(ctx context.Context, repo *Repository) error { h := backend.Handle{Type: backend.ConfigFile} - if !repo.be.HasAtomicReplace() { + if !repo.be.Properties().HasAtomicReplace { // remove the original file for backends which do not support atomic overwriting err := repo.be.Remove(ctx, h) if err != nil {