1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2025-03-09 00:00:02 +01:00
This commit is contained in:
Michael Eischer 2025-02-23 22:00:05 +01:00 committed by GitHub
commit 74c8c7828e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 138 additions and 105 deletions

View file

@ -0,0 +1,9 @@
Enhancement: Improve handling of flaky rclone backends
Since restic 0.17.0, the backend retry mechanisms relies on backends correctly
reporting when a file does not exist. This is not always the case for some rclone
backends, causing restic to stop retrying after the first failure.
For rclone, failed requests are now retried up to 5 times before giving up.
https://github.com/restic/restic/pull/5251

View file

@ -217,8 +217,11 @@ func (be *Backend) IsPermanentError(err error) bool {
return false
}
func (be *Backend) Connections() uint {
return be.connections
func (be *Backend) Properties() backend.Properties {
return backend.Properties{
Connections: be.connections,
HasAtomicReplace: true,
}
}
// Hasher may return a hash function for calculating a content hash for the backend
@ -226,11 +229,6 @@ func (be *Backend) Hasher() hash.Hash {
return md5.New()
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *Backend) HasAtomicReplace() bool {
return true
}
// Path returns the path in the bucket that is used for this backend.
func (be *Backend) Path() string {
return be.prefix

View file

@ -154,8 +154,11 @@ func (be *b2Backend) SetListMaxItems(i int) {
be.listMaxItems = i
}
func (be *b2Backend) Connections() uint {
return be.cfg.Connections
func (be *b2Backend) Properties() backend.Properties {
return backend.Properties{
Connections: be.cfg.Connections,
HasAtomicReplace: true,
}
}
// Hasher may return a hash function for calculating a content hash for the backend
@ -163,11 +166,6 @@ func (be *b2Backend) Hasher() hash.Hash {
return nil
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *b2Backend) HasAtomicReplace() bool {
return true
}
// IsNotExist returns true if the error is caused by a non-existing file.
func (be *b2Backend) IsNotExist(err error) bool {
// blazer/b2 does not export its error types and values,

View file

@ -17,15 +17,12 @@ var ErrNoRepository = fmt.Errorf("repository does not exist")
// the context package need not be wrapped, as context cancellation is checked
// separately by the retrying logic.
type Backend interface {
// Connections returns the maximum number of concurrent backend operations.
Connections() uint
// Properties returns information about the backend
Properties() Properties
// Hasher may return a hash function for calculating a content hash for the backend
Hasher() hash.Hash
// HasAtomicReplace returns whether Save() can atomically replace files
HasAtomicReplace() bool
// Remove removes a File described by h.
Remove(ctx context.Context, h Handle) error
@ -92,6 +89,18 @@ type Backend interface {
WarmupWait(ctx context.Context, h []Handle) error
}
type Properties struct {
// Connections states the maximum number of concurrent backend operations.
Connections uint
// HasAtomicReplace states whether Save() can atomically replace files
HasAtomicReplace bool
// HasFlakyErrors states whether the backend may temporarily return errors
// that are considered as permanent for existing files.
HasFlakyErrors bool
}
type Unwrapper interface {
// Unwrap returns the underlying backend or nil if there is none.
Unwrap() Backend

View file

@ -42,8 +42,8 @@ func (be *Backend) Remove(_ context.Context, _ backend.Handle) error {
return nil
}
func (be *Backend) Connections() uint {
return be.b.Connections()
func (be *Backend) Properties() backend.Properties {
return be.b.Properties()
}
// Delete removes all data in the backend.
@ -59,10 +59,6 @@ func (be *Backend) Hasher() hash.Hash {
return be.b.Hasher()
}
func (be *Backend) HasAtomicReplace() bool {
return be.b.HasAtomicReplace()
}
func (be *Backend) IsNotExist(err error) bool {
return be.b.IsNotExist(err)
}

View file

@ -186,8 +186,11 @@ func (be *Backend) IsPermanentError(err error) bool {
return false
}
func (be *Backend) Connections() uint {
return be.connections
func (be *Backend) Properties() backend.Properties {
return backend.Properties{
Connections: be.connections,
HasAtomicReplace: true,
}
}
// Hasher may return a hash function for calculating a content hash for the backend
@ -195,11 +198,6 @@ func (be *Backend) Hasher() hash.Hash {
return md5.New()
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *Backend) HasAtomicReplace() bool {
return true
}
// Path returns the path in the bucket that is used for this backend.
func (be *Backend) Path() string {
return be.prefix

View file

@ -84,8 +84,11 @@ func Create(_ context.Context, cfg Config) (*Local, error) {
return be, nil
}
func (b *Local) Connections() uint {
return b.Config.Connections
func (b *Local) Properties() backend.Properties {
return backend.Properties{
Connections: b.Config.Connections,
HasAtomicReplace: true,
}
}
// Hasher may return a hash function for calculating a content hash for the backend
@ -93,11 +96,6 @@ func (b *Local) Hasher() hash.Hash {
return nil
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (b *Local) HasAtomicReplace() bool {
return true
}
// IsNotExist returns true if the error is caused by a non existing file.
func (b *Local) IsNotExist(err error) bool {
return errors.Is(err, os.ErrNotExist)

View file

@ -218,8 +218,11 @@ func (be *MemoryBackend) List(ctx context.Context, t backend.FileType, fn func(b
return ctx.Err()
}
func (be *MemoryBackend) Connections() uint {
return connectionCount
func (be *MemoryBackend) Properties() backend.Properties {
return backend.Properties{
Connections: connectionCount,
HasAtomicReplace: false,
}
}
// Hasher may return a hash function for calculating a content hash for the backend
@ -227,11 +230,6 @@ func (be *MemoryBackend) Hasher() hash.Hash {
return xxhash.New()
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *MemoryBackend) HasAtomicReplace() bool {
return false
}
// Delete removes all data in the backend.
func (be *MemoryBackend) Delete(ctx context.Context) error {
be.m.Lock()

View file

@ -22,9 +22,8 @@ type Backend struct {
DeleteFn func(ctx context.Context) error
WarmupFn func(ctx context.Context, h []backend.Handle) ([]backend.Handle, error)
WarmupWaitFn func(ctx context.Context, h []backend.Handle) error
ConnectionsFn func() uint
PropertiesFn func() backend.Properties
HasherFn func() hash.Hash
HasAtomicReplaceFn func() bool
}
// NewBackend returns new mock Backend instance
@ -42,12 +41,15 @@ func (m *Backend) Close() error {
return m.CloseFn()
}
func (m *Backend) Connections() uint {
if m.ConnectionsFn == nil {
return 2
func (m *Backend) Properties() backend.Properties {
if m.PropertiesFn == nil {
return backend.Properties{
Connections: 2,
HasAtomicReplace: false,
}
}
return m.ConnectionsFn()
return m.PropertiesFn()
}
// Hasher may return a hash function for calculating a content hash for the backend
@ -59,14 +61,6 @@ func (m *Backend) Hasher() hash.Hash {
return m.HasherFn()
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (m *Backend) HasAtomicReplace() bool {
if m.HasAtomicReplaceFn == nil {
return false
}
return m.HasAtomicReplaceFn()
}
// IsNotExist returns true if the error is caused by a missing file.
func (m *Backend) IsNotExist(err error) bool {
if m.IsNotExistFn == nil {

View file

@ -341,8 +341,8 @@ func (be *Backend) Close() error {
return be.waitResult
}
// Warmup not implemented
func (be *Backend) Warmup(_ context.Context, _ []backend.Handle) ([]backend.Handle, error) {
return []backend.Handle{}, nil
func (be *Backend) Properties() backend.Properties {
properties := be.Backend.Properties()
properties.HasFlakyErrors = true
return properties
}
func (be *Backend) WarmupWait(_ context.Context, _ []backend.Handle) error { return nil }

View file

@ -116,8 +116,12 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, er
return be, nil
}
func (b *Backend) Connections() uint {
return b.connections
func (b *Backend) Properties() backend.Properties {
return backend.Properties{
Connections: b.connections,
// rest-server prevents overwriting
HasAtomicReplace: false,
}
}
// Hasher may return a hash function for calculating a content hash for the backend
@ -125,12 +129,6 @@ func (b *Backend) Hasher() hash.Hash {
return nil
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (b *Backend) HasAtomicReplace() bool {
// rest-server prevents overwriting
return false
}
// Save stores data in the backend at the handle.
func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
ctx, cancel := context.WithCancel(ctx)

View file

@ -127,12 +127,20 @@ func (be *Backend) retry(ctx context.Context, msg string, f func() error) error
b = backoff.WithMaxRetries(b, 10)
}
permanentErrorAttempts := 1
if be.Backend.Properties().HasFlakyErrors {
permanentErrorAttempts = 5
}
err := retryNotifyErrorWithSuccess(
func() error {
err := f()
// don't retry permanent errors as those very likely cannot be fixed by retrying
// TODO remove IsNotExist(err) special cases when removing the feature flag
if feature.Flag.Enabled(feature.BackendErrorRedesign) && !errors.Is(err, &backoff.PermanentError{}) && be.Backend.IsPermanentError(err) {
permanentErrorAttempts--
}
if permanentErrorAttempts <= 0 {
return backoff.Permanent(err)
}
return err
@ -166,7 +174,7 @@ func (be *Backend) Save(ctx context.Context, h backend.Handle, rd backend.Rewind
return nil
}
if be.Backend.HasAtomicReplace() {
if be.Backend.Properties().HasAtomicReplace {
debug.Log("Save(%v) failed with error: %v", h, err)
// there is no need to remove files from backends which can atomically replace files
// in fact if something goes wrong at the backend side the delete operation might delete the wrong instance of the file

View file

@ -69,7 +69,12 @@ func TestBackendSaveRetryAtomic(t *testing.T) {
calledRemove = true
return nil
},
HasAtomicReplaceFn: func() bool { return true },
PropertiesFn: func() backend.Properties {
return backend.Properties{
Connections: 2,
HasAtomicReplace: true,
}
},
}
TestFastRetries(t)
@ -278,32 +283,52 @@ func TestBackendLoadRetry(t *testing.T) {
test.Equals(t, 2, attempt)
}
func TestBackendLoadNotExists(t *testing.T) {
func testBackendLoadNotExists(t *testing.T, hasFlakyErrors bool) {
// load should not retry if the error matches IsNotExist
notFound := errors.New("not found")
attempt := 0
expectedAttempts := 1
if hasFlakyErrors {
expectedAttempts = 5
}
be := mock.NewBackend()
be.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
attempt++
if attempt > 1 {
if attempt > expectedAttempts {
t.Fail()
return nil, errors.New("must not retry")
}
return nil, notFound
}
be.PropertiesFn = func() backend.Properties {
return backend.Properties{
Connections: 2,
HasFlakyErrors: hasFlakyErrors,
}
}
be.IsPermanentErrorFn = func(err error) bool {
return errors.Is(err, notFound)
}
TestFastRetries(t)
retryBackend := New(be, 10, nil, nil)
retryBackend := New(be, time.Second, nil, nil)
err := retryBackend.Load(context.TODO(), backend.Handle{}, 0, 0, func(rd io.Reader) (err error) {
return nil
})
test.Assert(t, be.IsPermanentErrorFn(err), "unexpected error %v", err)
test.Equals(t, 1, attempt)
test.Equals(t, expectedAttempts, attempt)
}
func TestBackendLoadNotExists(t *testing.T) {
// Without HasFlakyErrors, should fail after 1 attempt
testBackendLoadNotExists(t, false)
}
func TestBackendLoadNotExistsFlakyErrors(t *testing.T) {
// With HasFlakyErrors, should fail after attempt number 5
testBackendLoadNotExists(t, true)
}
func TestBackendLoadCircuitBreaker(t *testing.T) {

View file

@ -261,8 +261,11 @@ func (be *Backend) IsPermanentError(err error) bool {
return false
}
func (be *Backend) Connections() uint {
return be.cfg.Connections
func (be *Backend) Properties() backend.Properties {
return backend.Properties{
Connections: be.cfg.Connections,
HasAtomicReplace: true,
}
}
// Hasher may return a hash function for calculating a content hash for the backend
@ -270,11 +273,6 @@ func (be *Backend) Hasher() hash.Hash {
return nil
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *Backend) HasAtomicReplace() bool {
return true
}
// Path returns the path in the bucket that is used for this backend.
func (be *Backend) Path() string {
return be.cfg.Prefix

View file

@ -22,7 +22,7 @@ type connectionLimitedBackend struct {
// NewBackend creates a backend that limits the concurrent operations on the underlying backend
func NewBackend(be backend.Backend) backend.Backend {
sem, err := newSemaphore(be.Connections())
sem, err := newSemaphore(be.Properties().Connections)
if err != nil {
panic(err)
}

View file

@ -106,7 +106,12 @@ func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(b
m := mock.NewBackend()
setup(m)
m.ConnectionsFn = func() uint { return uint(expectBlocked) }
m.PropertiesFn = func() backend.Properties {
return backend.Properties{
Connections: uint(expectBlocked),
HasAtomicReplace: false,
}
}
be := sema.NewBackend(m)
var wg errgroup.Group
@ -206,7 +211,12 @@ func TestFreeze(t *testing.T) {
atomic.AddInt64(&counter, 1)
return nil
}
m.ConnectionsFn = func() uint { return 2 }
m.PropertiesFn = func() backend.Properties {
return backend.Properties{
Connections: 2,
HasAtomicReplace: false,
}
}
be := sema.NewBackend(m)
fb := be.(backend.FreezeBackend)

View file

@ -264,8 +264,11 @@ func Create(ctx context.Context, cfg Config) (*SFTP, error) {
return open(sftp, cfg)
}
func (r *SFTP) Connections() uint {
return r.Config.Connections
func (r *SFTP) Properties() backend.Properties {
return backend.Properties{
Connections: r.Config.Connections,
HasAtomicReplace: r.posixRename,
}
}
// Hasher may return a hash function for calculating a content hash for the backend
@ -273,11 +276,6 @@ func (r *SFTP) Hasher() hash.Hash {
return nil
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (r *SFTP) HasAtomicReplace() bool {
return r.posixRename
}
// tempSuffix generates a random string suffix that should be sufficiently long
// to avoid accidental conflicts
func tempSuffix() string {

View file

@ -111,8 +111,11 @@ func (be *beSwift) createContainer(ctx context.Context, policy string) error {
return be.conn.ContainerCreate(ctx, be.container, h)
}
func (be *beSwift) Connections() uint {
return be.connections
func (be *beSwift) Properties() backend.Properties {
return backend.Properties{
Connections: be.connections,
HasAtomicReplace: true,
}
}
// Hasher may return a hash function for calculating a content hash for the backend
@ -120,11 +123,6 @@ func (be *beSwift) Hasher() hash.Hash {
return md5.New()
}
// HasAtomicReplace returns whether Save() can atomically replace files
func (be *beSwift) HasAtomicReplace() bool {
return true
}
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *beSwift) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {

View file

@ -552,7 +552,7 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group)
innerWg, ctx := errgroup.WithContext(ctx)
r.packerWg = innerWg
r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections())
r.uploader = newPackerUploader(ctx, innerWg, r, r.Connections())
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker)
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker)
@ -587,7 +587,7 @@ func (r *Repository) flushPacks(ctx context.Context) error {
}
func (r *Repository) Connections() uint {
return r.be.Connections()
return r.be.Properties().Connections
}
func (r *Repository) LookupBlob(tpe restic.BlobType, id restic.ID) []restic.PackedBlob {

View file

@ -33,7 +33,7 @@ func (err *upgradeRepoV2Error) Unwrap() error {
func upgradeRepository(ctx context.Context, repo *Repository) error {
h := backend.Handle{Type: backend.ConfigFile}
if !repo.be.HasAtomicReplace() {
if !repo.be.Properties().HasAtomicReplace {
// remove the original file for backends which do not support atomic overwriting
err := repo.be.Remove(ctx, h)
if err != nil {