mirror of
https://github.com/restic/restic.git
synced 2025-03-30 00:00:14 +01:00

* feat(backends/s3): add warmup support before repacks and restores This commit introduces basic support for transitioning pack files stored in cold storage to hot storage on S3 and S3-compatible providers. To prevent unexpected behavior for existing users, the feature is gated behind new flags: - `s3.enable-restore`: opt-in flag (defaults to false) - `s3.restore-days`: number of days for the restored objects to remain in hot storage (defaults to `7`) - `s3.restore-timeout`: maximum time to wait for a single restoration (default to `1 day`) - `s3.restore-tier`: retrieval tier at which the restore will be processed. (default to `Standard`) As restoration times can be lengthy, this implementation preemptively restores selected packs to prevent incessant restore-delays during downloads. This is slightly sub-optimal as we could process packs out-of-order (as soon as they're transitioned), but this would really add too much complexity for a marginal gain in speed. To maintain simplicity and prevent resources exhautions with lots of packs, no new concurrency mechanisms or goroutines were added. This just hooks gracefully into the existing routines. **Limitations:** - Tests against the backend were not written due to the lack of cold storage class support in MinIO. Testing was done manually on Scaleway's S3-compatible object storage. If necessary, we could explore testing with LocalStack or mocks, though this requires further discussion. - Currently, this feature only warms up before restores and repacks (prune/copy), as those are the two main use-cases I came across. Support for other commands may be added in future iterations, as long as affected packs can be calculated in advance. - The feature is gated behind a new alpha `s3-restore` feature flag to make it explicit that the feature is still wet behind the ears. - There is no explicit user notification for ongoing pack restorations. While I think it is not necessary because of the opt-in flag, showing some notice may improve usability (but would probably require major refactoring in the progress bar which I didn't want to start). Another possibility would be to add a flag to send restores requests and fail early. See https://github.com/restic/restic/issues/3202 * ui: warn user when files are warming up from cold storage * refactor: remove the PacksWarmer struct It's easier to handle multiple handles in the backend directly, and it may open the door to reducing the number of requests made to the backend in the future.
403 lines
11 KiB
Go
403 lines
11 KiB
Go
package restorer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/restic/restic/internal/debug"
|
|
"github.com/restic/restic/internal/errors"
|
|
"github.com/restic/restic/internal/feature"
|
|
"github.com/restic/restic/internal/repository"
|
|
"github.com/restic/restic/internal/restic"
|
|
"github.com/restic/restic/internal/ui/restore"
|
|
)
|
|
|
|
const (
|
|
largeFileBlobCount = 25
|
|
)
|
|
|
|
// information about regular file being restored
|
|
type fileInfo struct {
|
|
lock sync.Mutex
|
|
inProgress bool
|
|
sparse bool
|
|
size int64
|
|
location string // file on local filesystem relative to restorer basedir
|
|
blobs interface{} // blobs of the file
|
|
state *fileState
|
|
}
|
|
|
|
type fileBlobInfo struct {
|
|
id restic.ID // the blob id
|
|
offset int64 // blob offset in the file
|
|
}
|
|
|
|
// information about a data pack required to restore one or more files
|
|
type packInfo struct {
|
|
id restic.ID // the pack id
|
|
files map[*fileInfo]struct{} // set of files that use blobs from this pack
|
|
}
|
|
|
|
type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error
|
|
type startWarmupFn func(context.Context, restic.IDSet) (restic.WarmupJob, error)
|
|
|
|
// fileRestorer restores set of files
|
|
type fileRestorer struct {
|
|
idx func(restic.BlobType, restic.ID) []restic.PackedBlob
|
|
blobsLoader blobsLoaderFn
|
|
|
|
startWarmup startWarmupFn
|
|
|
|
workerCount int
|
|
filesWriter *filesWriter
|
|
zeroChunk restic.ID
|
|
sparse bool
|
|
progress *restore.Progress
|
|
|
|
allowRecursiveDelete bool
|
|
|
|
dst string
|
|
files []*fileInfo
|
|
Error func(string, error) error
|
|
Info func(string)
|
|
}
|
|
|
|
func newFileRestorer(dst string,
|
|
blobsLoader blobsLoaderFn,
|
|
idx func(restic.BlobType, restic.ID) []restic.PackedBlob,
|
|
connections uint,
|
|
sparse bool,
|
|
allowRecursiveDelete bool,
|
|
startWarmup startWarmupFn,
|
|
progress *restore.Progress) *fileRestorer {
|
|
|
|
// as packs are streamed the concurrency is limited by IO
|
|
workerCount := int(connections)
|
|
|
|
return &fileRestorer{
|
|
idx: idx,
|
|
blobsLoader: blobsLoader,
|
|
startWarmup: startWarmup,
|
|
filesWriter: newFilesWriter(workerCount, allowRecursiveDelete),
|
|
zeroChunk: repository.ZeroChunk(),
|
|
sparse: sparse,
|
|
progress: progress,
|
|
allowRecursiveDelete: allowRecursiveDelete,
|
|
workerCount: workerCount,
|
|
dst: dst,
|
|
Error: restorerAbortOnAllErrors,
|
|
Info: func(_ string) {},
|
|
}
|
|
}
|
|
|
|
func (r *fileRestorer) addFile(location string, content restic.IDs, size int64, state *fileState) {
|
|
r.files = append(r.files, &fileInfo{location: location, blobs: content, size: size, state: state})
|
|
}
|
|
|
|
func (r *fileRestorer) targetPath(location string) string {
|
|
return filepath.Join(r.dst, location)
|
|
}
|
|
|
|
func (r *fileRestorer) forEachBlob(blobIDs []restic.ID, fn func(packID restic.ID, packBlob restic.Blob, idx int, fileOffset int64)) error {
|
|
if len(blobIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
fileOffset := int64(0)
|
|
for i, blobID := range blobIDs {
|
|
packs := r.idx(restic.DataBlob, blobID)
|
|
if len(packs) == 0 {
|
|
return errors.Errorf("Unknown blob %s", blobID.String())
|
|
}
|
|
pb := packs[0]
|
|
fn(pb.PackID, pb.Blob, i, fileOffset)
|
|
fileOffset += int64(pb.DataLength())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *fileRestorer) restoreFiles(ctx context.Context) error {
|
|
|
|
packs := make(map[restic.ID]*packInfo) // all packs
|
|
// Process packs in order of first access. While this cannot guarantee
|
|
// that file chunks are restored sequentially, it offers a good enough
|
|
// approximation to shorten restore times by up to 19% in some test.
|
|
var packOrder restic.IDs
|
|
|
|
// create packInfo from fileInfo
|
|
for _, file := range r.files {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
fileBlobs := file.blobs.(restic.IDs)
|
|
largeFile := len(fileBlobs) > largeFileBlobCount
|
|
var packsMap map[restic.ID][]fileBlobInfo
|
|
if largeFile {
|
|
packsMap = make(map[restic.ID][]fileBlobInfo)
|
|
file.blobs = packsMap
|
|
}
|
|
restoredBlobs := false
|
|
err := r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob, idx int, fileOffset int64) {
|
|
if !file.state.HasMatchingBlob(idx) {
|
|
if largeFile {
|
|
packsMap[packID] = append(packsMap[packID], fileBlobInfo{id: blob.ID, offset: fileOffset})
|
|
}
|
|
restoredBlobs = true
|
|
} else {
|
|
r.reportBlobProgress(file, uint64(blob.DataLength()))
|
|
// completely ignore blob
|
|
return
|
|
}
|
|
pack, ok := packs[packID]
|
|
if !ok {
|
|
pack = &packInfo{
|
|
id: packID,
|
|
files: make(map[*fileInfo]struct{}),
|
|
}
|
|
packs[packID] = pack
|
|
packOrder = append(packOrder, packID)
|
|
}
|
|
pack.files[file] = struct{}{}
|
|
if blob.ID.Equal(r.zeroChunk) {
|
|
file.sparse = r.sparse
|
|
}
|
|
})
|
|
if err != nil {
|
|
// repository index is messed up, can't do anything
|
|
return err
|
|
}
|
|
|
|
if len(fileBlobs) == 1 {
|
|
// no need to preallocate files with a single block, thus we can always consider them to be sparse
|
|
// in addition, a short chunk will never match r.zeroChunk which would prevent sparseness for short files
|
|
file.sparse = r.sparse
|
|
}
|
|
if file.state != nil {
|
|
// The restorer currently cannot punch new holes into an existing files.
|
|
// Thus sections that contained data but should be sparse after restoring
|
|
// the snapshot would still contain the old data resulting in a corrupt restore.
|
|
file.sparse = false
|
|
}
|
|
|
|
// empty file or one with already uptodate content. Make sure that the file size is correct
|
|
if !restoredBlobs {
|
|
err := r.truncateFileToSize(file.location, file.size)
|
|
if errFile := r.sanitizeError(file, err); errFile != nil {
|
|
return errFile
|
|
}
|
|
|
|
// the progress events were already sent for non-zero size files
|
|
if file.size == 0 {
|
|
r.reportBlobProgress(file, 0)
|
|
}
|
|
}
|
|
}
|
|
// drop no longer necessary file list
|
|
r.files = nil
|
|
|
|
if feature.Flag.Enabled(feature.S3Restore) {
|
|
warmupJob, err := r.startWarmup(ctx, restic.NewIDSet(packOrder...))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if warmupJob.HandleCount() != 0 {
|
|
r.Info(fmt.Sprintf("warming up %d packs from cold storage, this may take a while...", warmupJob.HandleCount()))
|
|
if err := warmupJob.Wait(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
wg, ctx := errgroup.WithContext(ctx)
|
|
downloadCh := make(chan *packInfo)
|
|
|
|
worker := func() error {
|
|
for pack := range downloadCh {
|
|
if err := r.downloadPack(ctx, pack); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
for i := 0; i < r.workerCount; i++ {
|
|
wg.Go(worker)
|
|
}
|
|
|
|
// the main restore loop
|
|
wg.Go(func() error {
|
|
defer close(downloadCh)
|
|
for _, id := range packOrder {
|
|
pack := packs[id]
|
|
// allow garbage collection of packInfo
|
|
delete(packs, id)
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case downloadCh <- pack:
|
|
debug.Log("Scheduled download pack %s", pack.id.Str())
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return wg.Wait()
|
|
}
|
|
|
|
func (r *fileRestorer) truncateFileToSize(location string, size int64) error {
|
|
f, err := createFile(r.targetPath(location), size, false, r.allowRecursiveDelete)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return f.Close()
|
|
}
|
|
|
|
type blobToFileOffsetsMapping map[restic.ID]struct {
|
|
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
|
|
blob restic.Blob
|
|
}
|
|
|
|
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
|
// calculate blob->[]files->[]offsets mappings
|
|
blobs := make(blobToFileOffsetsMapping)
|
|
for file := range pack.files {
|
|
addBlob := func(blob restic.Blob, fileOffset int64) {
|
|
blobInfo, ok := blobs[blob.ID]
|
|
if !ok {
|
|
blobInfo.files = make(map[*fileInfo][]int64)
|
|
blobInfo.blob = blob
|
|
blobs[blob.ID] = blobInfo
|
|
}
|
|
blobInfo.files[file] = append(blobInfo.files[file], fileOffset)
|
|
}
|
|
if fileBlobs, ok := file.blobs.(restic.IDs); ok {
|
|
err := r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob, idx int, fileOffset int64) {
|
|
if packID.Equal(pack.id) && !file.state.HasMatchingBlob(idx) {
|
|
addBlob(blob, fileOffset)
|
|
}
|
|
})
|
|
if err != nil {
|
|
// restoreFiles should have caught this error before
|
|
panic(err)
|
|
}
|
|
} else if packsMap, ok := file.blobs.(map[restic.ID][]fileBlobInfo); ok {
|
|
for _, blob := range packsMap[pack.id] {
|
|
idxPacks := r.idx(restic.DataBlob, blob.id)
|
|
for _, idxPack := range idxPacks {
|
|
if idxPack.PackID.Equal(pack.id) {
|
|
addBlob(idxPack.Blob, blob.offset)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// track already processed blobs for precise error reporting
|
|
processedBlobs := restic.NewBlobSet()
|
|
err := r.downloadBlobs(ctx, pack.id, blobs, processedBlobs)
|
|
return r.reportError(blobs, processedBlobs, err)
|
|
}
|
|
|
|
func (r *fileRestorer) sanitizeError(file *fileInfo, err error) error {
|
|
switch err {
|
|
case nil, context.Canceled, context.DeadlineExceeded:
|
|
// Context errors are permanent.
|
|
return err
|
|
default:
|
|
return r.Error(file.location, err)
|
|
}
|
|
}
|
|
|
|
func (r *fileRestorer) reportError(blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet, err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
// only report error for not yet processed blobs
|
|
affectedFiles := make(map[*fileInfo]struct{})
|
|
for _, entry := range blobs {
|
|
if processedBlobs.Has(entry.blob.BlobHandle) {
|
|
continue
|
|
}
|
|
for file := range entry.files {
|
|
affectedFiles[file] = struct{}{}
|
|
}
|
|
}
|
|
|
|
for file := range affectedFiles {
|
|
if errFile := r.sanitizeError(file, err); errFile != nil {
|
|
return errFile
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID,
|
|
blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error {
|
|
|
|
blobList := make([]restic.Blob, 0, len(blobs))
|
|
for _, entry := range blobs {
|
|
blobList = append(blobList, entry.blob)
|
|
}
|
|
return r.blobsLoader(ctx, packID, blobList,
|
|
func(h restic.BlobHandle, blobData []byte, err error) error {
|
|
processedBlobs.Insert(h)
|
|
blob := blobs[h.ID]
|
|
if err != nil {
|
|
for file := range blob.files {
|
|
if errFile := r.sanitizeError(file, err); errFile != nil {
|
|
return errFile
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
for file, offsets := range blob.files {
|
|
for _, offset := range offsets {
|
|
// avoid long cancelation delays for frequently used blobs
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
writeToFile := func() error {
|
|
// this looks overly complicated and needs explanation
|
|
// two competing requirements:
|
|
// - must create the file once and only once
|
|
// - should allow concurrent writes to the file
|
|
// so write the first blob while holding file lock
|
|
// write other blobs after releasing the lock
|
|
createSize := int64(-1)
|
|
file.lock.Lock()
|
|
if file.inProgress {
|
|
file.lock.Unlock()
|
|
} else {
|
|
defer file.lock.Unlock()
|
|
file.inProgress = true
|
|
createSize = file.size
|
|
}
|
|
writeErr := r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize, file.sparse)
|
|
r.reportBlobProgress(file, uint64(len(blobData)))
|
|
return writeErr
|
|
}
|
|
err := r.sanitizeError(file, writeToFile())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (r *fileRestorer) reportBlobProgress(file *fileInfo, blobSize uint64) {
|
|
action := restore.ActionFileUpdated
|
|
if file.state == nil {
|
|
action = restore.ActionFileRestored
|
|
}
|
|
r.progress.AddProgress(file.location, action, uint64(blobSize), uint64(file.size))
|
|
}
|