1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2025-03-30 00:00:14 +01:00
restic/internal/archiver/blob_saver.go
Winfried Plappert 767d66474b changed logic for processing failing MaxCapacityExceeded data blobs
intercept MaxCapacityExceeded errors for all blobs and convert them to "no error" for
all blobs but DataBlobs.

For DataBlobs insert an extra data blob once, with the signature "MaxCapacityExceeded\n"

For all data blob instances with the above error, return a `saveBlobResponse` with an ID of
restic.Hash("MaxCapacityExceeded\n")

This done to maintain the integritry of all newly saved blobs.
2025-03-02 13:01:14 +00:00

128 lines
3.1 KiB
Go

package archiver
import (
"context"
"fmt"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)
// saver allows saving a blob.
type saver interface {
SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error)
}
// blobSaver concurrently saves incoming blobs to the repo.
type blobSaver struct {
repo saver
ch chan<- saveBlobJob
}
// newBlobSaver returns a new blob. A worker pool is started, it is stopped
// when ctx is cancelled.
func newBlobSaver(ctx context.Context, wg *errgroup.Group, repo saver, workers uint) *blobSaver {
ch := make(chan saveBlobJob)
s := &blobSaver{
repo: repo,
ch: ch,
}
for i := uint(0); i < workers; i++ {
wg.Go(func() error {
return s.worker(ctx, ch)
})
}
return s
}
func (s *blobSaver) TriggerShutdown() {
close(s.ch)
}
// Save stores a blob in the repo. It checks the index and the known blobs
// before saving anything. It takes ownership of the buffer passed in.
func (s *blobSaver) Save(ctx context.Context, t restic.BlobType, buf *buffer, filename string, cb func(res saveBlobResponse)) {
select {
case s.ch <- saveBlobJob{BlobType: t, buf: buf, fn: filename, cb: cb}:
case <-ctx.Done():
debug.Log("not sending job, context is cancelled")
}
}
type saveBlobJob struct {
restic.BlobType
buf *buffer
fn string
cb func(res saveBlobResponse)
}
type saveBlobResponse struct {
id restic.ID
length int
sizeInRepo int
known bool
}
func (s *blobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) {
id, known, sizeInRepo, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false)
if err != nil && err.Error() == "MaxCapacityExceeded" {
if t != restic.DataBlob {
err = nil
} else {
err = nil
var once sync.Once
// need to modify data blob for repository monitoring being triggered
buf = []byte("MaxCapacityExceeded\n")
id = restic.Hash(buf)
once.Do(func() {
_, _, _, err = s.repo.SaveBlob(ctx, restic.DataBlob, buf, id, false)
})
if err != nil && err.Error() != "MaxCapacityExceeded" {
debug.Log("failing at saving extra data blob: %v", err)
return saveBlobResponse{}, err
}
return saveBlobResponse{id: id}, err
}
} else if err != nil {
return saveBlobResponse{}, err
}
return saveBlobResponse{
id: id,
length: len(buf),
sizeInRepo: sizeInRepo,
known: known,
}, nil
}
func (s *blobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
for {
var job saveBlobJob
var ok bool
select {
case <-ctx.Done():
return nil
case job, ok = <-jobs:
if !ok {
return nil
}
}
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
// pass through unharmed for repository monitoring
if err != nil && err.Error() == "MaxCapacityExceeded" {
err = nil
}
if err != nil {
debug.Log("saveBlob returned error, exiting: %v", err)
return fmt.Errorf("failed to save blob from file %q: %w", job.fn, err)
}
job.cb(res)
job.buf.Release()
}
}