mirror of
https://github.com/restic/restic.git
synced 2025-03-30 00:00:14 +01:00
Merge pull request #5251 from MichaelEischer/rclone-retries
Retry temporary rclone backend errors
This commit is contained in:
commit
445477312c
20 changed files with 138 additions and 105 deletions
9
changelog/unreleased/pull-5251
Normal file
9
changelog/unreleased/pull-5251
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
Enhancement: Improve handling of flaky rclone backends
|
||||||
|
|
||||||
|
Since restic 0.17.0, the backend retry mechanisms relies on backends correctly
|
||||||
|
reporting when a file does not exist. This is not always the case for some rclone
|
||||||
|
backends, causing restic to stop retrying after the first failure.
|
||||||
|
|
||||||
|
For rclone, failed requests are now retried up to 5 times before giving up.
|
||||||
|
|
||||||
|
https://github.com/restic/restic/pull/5251
|
|
@ -217,8 +217,11 @@ func (be *Backend) IsPermanentError(err error) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (be *Backend) Connections() uint {
|
func (be *Backend) Properties() backend.Properties {
|
||||||
return be.connections
|
return backend.Properties{
|
||||||
|
Connections: be.connections,
|
||||||
|
HasAtomicReplace: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
|
@ -226,11 +229,6 @@ func (be *Backend) Hasher() hash.Hash {
|
||||||
return md5.New()
|
return md5.New()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
func (be *Backend) HasAtomicReplace() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Path returns the path in the bucket that is used for this backend.
|
// Path returns the path in the bucket that is used for this backend.
|
||||||
func (be *Backend) Path() string {
|
func (be *Backend) Path() string {
|
||||||
return be.prefix
|
return be.prefix
|
||||||
|
|
|
@ -154,8 +154,11 @@ func (be *b2Backend) SetListMaxItems(i int) {
|
||||||
be.listMaxItems = i
|
be.listMaxItems = i
|
||||||
}
|
}
|
||||||
|
|
||||||
func (be *b2Backend) Connections() uint {
|
func (be *b2Backend) Properties() backend.Properties {
|
||||||
return be.cfg.Connections
|
return backend.Properties{
|
||||||
|
Connections: be.cfg.Connections,
|
||||||
|
HasAtomicReplace: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
|
@ -163,11 +166,6 @@ func (be *b2Backend) Hasher() hash.Hash {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
func (be *b2Backend) HasAtomicReplace() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsNotExist returns true if the error is caused by a non-existing file.
|
// IsNotExist returns true if the error is caused by a non-existing file.
|
||||||
func (be *b2Backend) IsNotExist(err error) bool {
|
func (be *b2Backend) IsNotExist(err error) bool {
|
||||||
// blazer/b2 does not export its error types and values,
|
// blazer/b2 does not export its error types and values,
|
||||||
|
|
|
@ -17,15 +17,12 @@ var ErrNoRepository = fmt.Errorf("repository does not exist")
|
||||||
// the context package need not be wrapped, as context cancellation is checked
|
// the context package need not be wrapped, as context cancellation is checked
|
||||||
// separately by the retrying logic.
|
// separately by the retrying logic.
|
||||||
type Backend interface {
|
type Backend interface {
|
||||||
// Connections returns the maximum number of concurrent backend operations.
|
// Properties returns information about the backend
|
||||||
Connections() uint
|
Properties() Properties
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
Hasher() hash.Hash
|
Hasher() hash.Hash
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
HasAtomicReplace() bool
|
|
||||||
|
|
||||||
// Remove removes a File described by h.
|
// Remove removes a File described by h.
|
||||||
Remove(ctx context.Context, h Handle) error
|
Remove(ctx context.Context, h Handle) error
|
||||||
|
|
||||||
|
@ -92,6 +89,18 @@ type Backend interface {
|
||||||
WarmupWait(ctx context.Context, h []Handle) error
|
WarmupWait(ctx context.Context, h []Handle) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Properties struct {
|
||||||
|
// Connections states the maximum number of concurrent backend operations.
|
||||||
|
Connections uint
|
||||||
|
|
||||||
|
// HasAtomicReplace states whether Save() can atomically replace files
|
||||||
|
HasAtomicReplace bool
|
||||||
|
|
||||||
|
// HasFlakyErrors states whether the backend may temporarily return errors
|
||||||
|
// that are considered as permanent for existing files.
|
||||||
|
HasFlakyErrors bool
|
||||||
|
}
|
||||||
|
|
||||||
type Unwrapper interface {
|
type Unwrapper interface {
|
||||||
// Unwrap returns the underlying backend or nil if there is none.
|
// Unwrap returns the underlying backend or nil if there is none.
|
||||||
Unwrap() Backend
|
Unwrap() Backend
|
||||||
|
|
|
@ -42,8 +42,8 @@ func (be *Backend) Remove(_ context.Context, _ backend.Handle) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (be *Backend) Connections() uint {
|
func (be *Backend) Properties() backend.Properties {
|
||||||
return be.b.Connections()
|
return be.b.Properties()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete removes all data in the backend.
|
// Delete removes all data in the backend.
|
||||||
|
@ -59,10 +59,6 @@ func (be *Backend) Hasher() hash.Hash {
|
||||||
return be.b.Hasher()
|
return be.b.Hasher()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (be *Backend) HasAtomicReplace() bool {
|
|
||||||
return be.b.HasAtomicReplace()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (be *Backend) IsNotExist(err error) bool {
|
func (be *Backend) IsNotExist(err error) bool {
|
||||||
return be.b.IsNotExist(err)
|
return be.b.IsNotExist(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,8 +186,11 @@ func (be *Backend) IsPermanentError(err error) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (be *Backend) Connections() uint {
|
func (be *Backend) Properties() backend.Properties {
|
||||||
return be.connections
|
return backend.Properties{
|
||||||
|
Connections: be.connections,
|
||||||
|
HasAtomicReplace: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
|
@ -195,11 +198,6 @@ func (be *Backend) Hasher() hash.Hash {
|
||||||
return md5.New()
|
return md5.New()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
func (be *Backend) HasAtomicReplace() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Path returns the path in the bucket that is used for this backend.
|
// Path returns the path in the bucket that is used for this backend.
|
||||||
func (be *Backend) Path() string {
|
func (be *Backend) Path() string {
|
||||||
return be.prefix
|
return be.prefix
|
||||||
|
|
|
@ -84,8 +84,11 @@ func Create(_ context.Context, cfg Config) (*Local, error) {
|
||||||
return be, nil
|
return be, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Local) Connections() uint {
|
func (b *Local) Properties() backend.Properties {
|
||||||
return b.Config.Connections
|
return backend.Properties{
|
||||||
|
Connections: b.Config.Connections,
|
||||||
|
HasAtomicReplace: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
|
@ -93,11 +96,6 @@ func (b *Local) Hasher() hash.Hash {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
func (b *Local) HasAtomicReplace() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsNotExist returns true if the error is caused by a non existing file.
|
// IsNotExist returns true if the error is caused by a non existing file.
|
||||||
func (b *Local) IsNotExist(err error) bool {
|
func (b *Local) IsNotExist(err error) bool {
|
||||||
return errors.Is(err, os.ErrNotExist)
|
return errors.Is(err, os.ErrNotExist)
|
||||||
|
|
|
@ -218,8 +218,11 @@ func (be *MemoryBackend) List(ctx context.Context, t backend.FileType, fn func(b
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (be *MemoryBackend) Connections() uint {
|
func (be *MemoryBackend) Properties() backend.Properties {
|
||||||
return connectionCount
|
return backend.Properties{
|
||||||
|
Connections: connectionCount,
|
||||||
|
HasAtomicReplace: false,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
|
@ -227,11 +230,6 @@ func (be *MemoryBackend) Hasher() hash.Hash {
|
||||||
return xxhash.New()
|
return xxhash.New()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
func (be *MemoryBackend) HasAtomicReplace() bool {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete removes all data in the backend.
|
// Delete removes all data in the backend.
|
||||||
func (be *MemoryBackend) Delete(ctx context.Context) error {
|
func (be *MemoryBackend) Delete(ctx context.Context) error {
|
||||||
be.m.Lock()
|
be.m.Lock()
|
||||||
|
|
|
@ -22,9 +22,8 @@ type Backend struct {
|
||||||
DeleteFn func(ctx context.Context) error
|
DeleteFn func(ctx context.Context) error
|
||||||
WarmupFn func(ctx context.Context, h []backend.Handle) ([]backend.Handle, error)
|
WarmupFn func(ctx context.Context, h []backend.Handle) ([]backend.Handle, error)
|
||||||
WarmupWaitFn func(ctx context.Context, h []backend.Handle) error
|
WarmupWaitFn func(ctx context.Context, h []backend.Handle) error
|
||||||
ConnectionsFn func() uint
|
PropertiesFn func() backend.Properties
|
||||||
HasherFn func() hash.Hash
|
HasherFn func() hash.Hash
|
||||||
HasAtomicReplaceFn func() bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBackend returns new mock Backend instance
|
// NewBackend returns new mock Backend instance
|
||||||
|
@ -42,12 +41,15 @@ func (m *Backend) Close() error {
|
||||||
return m.CloseFn()
|
return m.CloseFn()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Backend) Connections() uint {
|
func (m *Backend) Properties() backend.Properties {
|
||||||
if m.ConnectionsFn == nil {
|
if m.PropertiesFn == nil {
|
||||||
return 2
|
return backend.Properties{
|
||||||
|
Connections: 2,
|
||||||
|
HasAtomicReplace: false,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return m.ConnectionsFn()
|
return m.PropertiesFn()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
|
@ -59,14 +61,6 @@ func (m *Backend) Hasher() hash.Hash {
|
||||||
return m.HasherFn()
|
return m.HasherFn()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
func (m *Backend) HasAtomicReplace() bool {
|
|
||||||
if m.HasAtomicReplaceFn == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return m.HasAtomicReplaceFn()
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsNotExist returns true if the error is caused by a missing file.
|
// IsNotExist returns true if the error is caused by a missing file.
|
||||||
func (m *Backend) IsNotExist(err error) bool {
|
func (m *Backend) IsNotExist(err error) bool {
|
||||||
if m.IsNotExistFn == nil {
|
if m.IsNotExistFn == nil {
|
||||||
|
|
|
@ -341,8 +341,8 @@ func (be *Backend) Close() error {
|
||||||
return be.waitResult
|
return be.waitResult
|
||||||
}
|
}
|
||||||
|
|
||||||
// Warmup not implemented
|
func (be *Backend) Properties() backend.Properties {
|
||||||
func (be *Backend) Warmup(_ context.Context, _ []backend.Handle) ([]backend.Handle, error) {
|
properties := be.Backend.Properties()
|
||||||
return []backend.Handle{}, nil
|
properties.HasFlakyErrors = true
|
||||||
|
return properties
|
||||||
}
|
}
|
||||||
func (be *Backend) WarmupWait(_ context.Context, _ []backend.Handle) error { return nil }
|
|
||||||
|
|
|
@ -116,8 +116,12 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, er
|
||||||
return be, nil
|
return be, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Backend) Connections() uint {
|
func (b *Backend) Properties() backend.Properties {
|
||||||
return b.connections
|
return backend.Properties{
|
||||||
|
Connections: b.connections,
|
||||||
|
// rest-server prevents overwriting
|
||||||
|
HasAtomicReplace: false,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
|
@ -125,12 +129,6 @@ func (b *Backend) Hasher() hash.Hash {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
func (b *Backend) HasAtomicReplace() bool {
|
|
||||||
// rest-server prevents overwriting
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save stores data in the backend at the handle.
|
// Save stores data in the backend at the handle.
|
||||||
func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
|
func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
|
@ -127,12 +127,20 @@ func (be *Backend) retry(ctx context.Context, msg string, f func() error) error
|
||||||
b = backoff.WithMaxRetries(b, 10)
|
b = backoff.WithMaxRetries(b, 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
permanentErrorAttempts := 1
|
||||||
|
if be.Backend.Properties().HasFlakyErrors {
|
||||||
|
permanentErrorAttempts = 5
|
||||||
|
}
|
||||||
|
|
||||||
err := retryNotifyErrorWithSuccess(
|
err := retryNotifyErrorWithSuccess(
|
||||||
func() error {
|
func() error {
|
||||||
err := f()
|
err := f()
|
||||||
// don't retry permanent errors as those very likely cannot be fixed by retrying
|
// don't retry permanent errors as those very likely cannot be fixed by retrying
|
||||||
// TODO remove IsNotExist(err) special cases when removing the feature flag
|
// TODO remove IsNotExist(err) special cases when removing the feature flag
|
||||||
if feature.Flag.Enabled(feature.BackendErrorRedesign) && !errors.Is(err, &backoff.PermanentError{}) && be.Backend.IsPermanentError(err) {
|
if feature.Flag.Enabled(feature.BackendErrorRedesign) && !errors.Is(err, &backoff.PermanentError{}) && be.Backend.IsPermanentError(err) {
|
||||||
|
permanentErrorAttempts--
|
||||||
|
}
|
||||||
|
if permanentErrorAttempts <= 0 {
|
||||||
return backoff.Permanent(err)
|
return backoff.Permanent(err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -166,7 +174,7 @@ func (be *Backend) Save(ctx context.Context, h backend.Handle, rd backend.Rewind
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if be.Backend.HasAtomicReplace() {
|
if be.Backend.Properties().HasAtomicReplace {
|
||||||
debug.Log("Save(%v) failed with error: %v", h, err)
|
debug.Log("Save(%v) failed with error: %v", h, err)
|
||||||
// there is no need to remove files from backends which can atomically replace files
|
// there is no need to remove files from backends which can atomically replace files
|
||||||
// in fact if something goes wrong at the backend side the delete operation might delete the wrong instance of the file
|
// in fact if something goes wrong at the backend side the delete operation might delete the wrong instance of the file
|
||||||
|
|
|
@ -69,7 +69,12 @@ func TestBackendSaveRetryAtomic(t *testing.T) {
|
||||||
calledRemove = true
|
calledRemove = true
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
HasAtomicReplaceFn: func() bool { return true },
|
PropertiesFn: func() backend.Properties {
|
||||||
|
return backend.Properties{
|
||||||
|
Connections: 2,
|
||||||
|
HasAtomicReplace: true,
|
||||||
|
}
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
TestFastRetries(t)
|
TestFastRetries(t)
|
||||||
|
@ -278,32 +283,52 @@ func TestBackendLoadRetry(t *testing.T) {
|
||||||
test.Equals(t, 2, attempt)
|
test.Equals(t, 2, attempt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBackendLoadNotExists(t *testing.T) {
|
func testBackendLoadNotExists(t *testing.T, hasFlakyErrors bool) {
|
||||||
// load should not retry if the error matches IsNotExist
|
// load should not retry if the error matches IsNotExist
|
||||||
notFound := errors.New("not found")
|
notFound := errors.New("not found")
|
||||||
attempt := 0
|
attempt := 0
|
||||||
|
expectedAttempts := 1
|
||||||
|
if hasFlakyErrors {
|
||||||
|
expectedAttempts = 5
|
||||||
|
}
|
||||||
|
|
||||||
be := mock.NewBackend()
|
be := mock.NewBackend()
|
||||||
be.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
be.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||||
attempt++
|
attempt++
|
||||||
if attempt > 1 {
|
if attempt > expectedAttempts {
|
||||||
t.Fail()
|
t.Fail()
|
||||||
return nil, errors.New("must not retry")
|
return nil, errors.New("must not retry")
|
||||||
}
|
}
|
||||||
return nil, notFound
|
return nil, notFound
|
||||||
}
|
}
|
||||||
|
be.PropertiesFn = func() backend.Properties {
|
||||||
|
return backend.Properties{
|
||||||
|
Connections: 2,
|
||||||
|
HasFlakyErrors: hasFlakyErrors,
|
||||||
|
}
|
||||||
|
}
|
||||||
be.IsPermanentErrorFn = func(err error) bool {
|
be.IsPermanentErrorFn = func(err error) bool {
|
||||||
return errors.Is(err, notFound)
|
return errors.Is(err, notFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
TestFastRetries(t)
|
TestFastRetries(t)
|
||||||
retryBackend := New(be, 10, nil, nil)
|
retryBackend := New(be, time.Second, nil, nil)
|
||||||
|
|
||||||
err := retryBackend.Load(context.TODO(), backend.Handle{}, 0, 0, func(rd io.Reader) (err error) {
|
err := retryBackend.Load(context.TODO(), backend.Handle{}, 0, 0, func(rd io.Reader) (err error) {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
test.Assert(t, be.IsPermanentErrorFn(err), "unexpected error %v", err)
|
test.Assert(t, be.IsPermanentErrorFn(err), "unexpected error %v", err)
|
||||||
test.Equals(t, 1, attempt)
|
test.Equals(t, expectedAttempts, attempt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBackendLoadNotExists(t *testing.T) {
|
||||||
|
// Without HasFlakyErrors, should fail after 1 attempt
|
||||||
|
testBackendLoadNotExists(t, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBackendLoadNotExistsFlakyErrors(t *testing.T) {
|
||||||
|
// With HasFlakyErrors, should fail after attempt number 5
|
||||||
|
testBackendLoadNotExists(t, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBackendLoadCircuitBreaker(t *testing.T) {
|
func TestBackendLoadCircuitBreaker(t *testing.T) {
|
||||||
|
|
|
@ -261,8 +261,11 @@ func (be *Backend) IsPermanentError(err error) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (be *Backend) Connections() uint {
|
func (be *Backend) Properties() backend.Properties {
|
||||||
return be.cfg.Connections
|
return backend.Properties{
|
||||||
|
Connections: be.cfg.Connections,
|
||||||
|
HasAtomicReplace: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
|
@ -270,11 +273,6 @@ func (be *Backend) Hasher() hash.Hash {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
func (be *Backend) HasAtomicReplace() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Path returns the path in the bucket that is used for this backend.
|
// Path returns the path in the bucket that is used for this backend.
|
||||||
func (be *Backend) Path() string {
|
func (be *Backend) Path() string {
|
||||||
return be.cfg.Prefix
|
return be.cfg.Prefix
|
||||||
|
|
|
@ -22,7 +22,7 @@ type connectionLimitedBackend struct {
|
||||||
|
|
||||||
// NewBackend creates a backend that limits the concurrent operations on the underlying backend
|
// NewBackend creates a backend that limits the concurrent operations on the underlying backend
|
||||||
func NewBackend(be backend.Backend) backend.Backend {
|
func NewBackend(be backend.Backend) backend.Backend {
|
||||||
sem, err := newSemaphore(be.Connections())
|
sem, err := newSemaphore(be.Properties().Connections)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,12 @@ func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(b
|
||||||
|
|
||||||
m := mock.NewBackend()
|
m := mock.NewBackend()
|
||||||
setup(m)
|
setup(m)
|
||||||
m.ConnectionsFn = func() uint { return uint(expectBlocked) }
|
m.PropertiesFn = func() backend.Properties {
|
||||||
|
return backend.Properties{
|
||||||
|
Connections: uint(expectBlocked),
|
||||||
|
HasAtomicReplace: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
be := sema.NewBackend(m)
|
be := sema.NewBackend(m)
|
||||||
|
|
||||||
var wg errgroup.Group
|
var wg errgroup.Group
|
||||||
|
@ -206,7 +211,12 @@ func TestFreeze(t *testing.T) {
|
||||||
atomic.AddInt64(&counter, 1)
|
atomic.AddInt64(&counter, 1)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
m.ConnectionsFn = func() uint { return 2 }
|
m.PropertiesFn = func() backend.Properties {
|
||||||
|
return backend.Properties{
|
||||||
|
Connections: 2,
|
||||||
|
HasAtomicReplace: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
be := sema.NewBackend(m)
|
be := sema.NewBackend(m)
|
||||||
fb := be.(backend.FreezeBackend)
|
fb := be.(backend.FreezeBackend)
|
||||||
|
|
||||||
|
|
|
@ -264,8 +264,11 @@ func Create(ctx context.Context, cfg Config) (*SFTP, error) {
|
||||||
return open(sftp, cfg)
|
return open(sftp, cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *SFTP) Connections() uint {
|
func (r *SFTP) Properties() backend.Properties {
|
||||||
return r.Config.Connections
|
return backend.Properties{
|
||||||
|
Connections: r.Config.Connections,
|
||||||
|
HasAtomicReplace: r.posixRename,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
|
@ -273,11 +276,6 @@ func (r *SFTP) Hasher() hash.Hash {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
func (r *SFTP) HasAtomicReplace() bool {
|
|
||||||
return r.posixRename
|
|
||||||
}
|
|
||||||
|
|
||||||
// tempSuffix generates a random string suffix that should be sufficiently long
|
// tempSuffix generates a random string suffix that should be sufficiently long
|
||||||
// to avoid accidental conflicts
|
// to avoid accidental conflicts
|
||||||
func tempSuffix() string {
|
func tempSuffix() string {
|
||||||
|
|
|
@ -111,8 +111,11 @@ func (be *beSwift) createContainer(ctx context.Context, policy string) error {
|
||||||
return be.conn.ContainerCreate(ctx, be.container, h)
|
return be.conn.ContainerCreate(ctx, be.container, h)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (be *beSwift) Connections() uint {
|
func (be *beSwift) Properties() backend.Properties {
|
||||||
return be.connections
|
return backend.Properties{
|
||||||
|
Connections: be.connections,
|
||||||
|
HasAtomicReplace: true,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hasher may return a hash function for calculating a content hash for the backend
|
// Hasher may return a hash function for calculating a content hash for the backend
|
||||||
|
@ -120,11 +123,6 @@ func (be *beSwift) Hasher() hash.Hash {
|
||||||
return md5.New()
|
return md5.New()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAtomicReplace returns whether Save() can atomically replace files
|
|
||||||
func (be *beSwift) HasAtomicReplace() bool {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Load runs fn with a reader that yields the contents of the file at h at the
|
// Load runs fn with a reader that yields the contents of the file at h at the
|
||||||
// given offset.
|
// given offset.
|
||||||
func (be *beSwift) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
func (be *beSwift) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||||
|
|
|
@ -552,7 +552,7 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group)
|
||||||
|
|
||||||
innerWg, ctx := errgroup.WithContext(ctx)
|
innerWg, ctx := errgroup.WithContext(ctx)
|
||||||
r.packerWg = innerWg
|
r.packerWg = innerWg
|
||||||
r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections())
|
r.uploader = newPackerUploader(ctx, innerWg, r, r.Connections())
|
||||||
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker)
|
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker)
|
||||||
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker)
|
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker)
|
||||||
|
|
||||||
|
@ -587,7 +587,7 @@ func (r *Repository) flushPacks(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Repository) Connections() uint {
|
func (r *Repository) Connections() uint {
|
||||||
return r.be.Connections()
|
return r.be.Properties().Connections
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Repository) LookupBlob(tpe restic.BlobType, id restic.ID) []restic.PackedBlob {
|
func (r *Repository) LookupBlob(tpe restic.BlobType, id restic.ID) []restic.PackedBlob {
|
||||||
|
|
|
@ -33,7 +33,7 @@ func (err *upgradeRepoV2Error) Unwrap() error {
|
||||||
func upgradeRepository(ctx context.Context, repo *Repository) error {
|
func upgradeRepository(ctx context.Context, repo *Repository) error {
|
||||||
h := backend.Handle{Type: backend.ConfigFile}
|
h := backend.Handle{Type: backend.ConfigFile}
|
||||||
|
|
||||||
if !repo.be.HasAtomicReplace() {
|
if !repo.be.Properties().HasAtomicReplace {
|
||||||
// remove the original file for backends which do not support atomic overwriting
|
// remove the original file for backends which do not support atomic overwriting
|
||||||
err := repo.be.Remove(ctx, h)
|
err := repo.be.Remove(ctx, h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Add table
Reference in a new issue