1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2025-03-09 00:00:02 +01:00
This commit is contained in:
Winfried Plappert 2025-03-06 16:43:47 +00:00 committed by GitHub
commit ee92f700f2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 421 additions and 33 deletions

View file

@ -0,0 +1,29 @@
Enhancement: Limit repository size to a predefined maximum size during vackup
Restic backup can now limit the repository size to a given maximum value. The
repository size limit can be specified as option `--max-repo-size`, with
the usual meaning for size related options.
During backup, the current size is monitored by calculating its actual repository
size plus the size of any new blob added during the backup process. Once the
defined limit is exceeded, backup is prevented from creating new backup entries
and the 'in progress' files are finalized, without adding large amounts of new data.
The size limit is a rough limit and cannot be taken as a precise limit,
since indexes and snapshot file have to be finalized.
Due to the highly parallel processing of a backup process
the limit can be overshot by `packfile size` multiple times.
With a current default of 16m for a packfile,
you might experience differences of multiple megabytes between the expected limit
and the actual size when the backup process terminates.
The new packfiles are created by the current backup process and a new snapshot
will be created.
Currently the implemenation is incomplete. There are currently two indications
that there is a incomplete backup:
field `PartialSnapshot` exist and is set to true and
the snapshot is tagged as `partial-snapshot`.
https://github.com/restic/restic/issues/4583
https://github.com/restic/restic/pull/5215

View file

@ -41,6 +41,23 @@ func newBackupCommand() *cobra.Command {
The "backup" command creates a new snapshot and saves the files and directories
given as the arguments.
When the option --max-repo-size is used, and the current backup is about to exceed the
defined limit, the snapshot is consistent in itself but unusable. It is safe to assume that
any data for this snapshot are NOT valid nor usable for a restore or a dump coomand.
An indication for such a backup is the tag 'partial-snapshot'. In addition, a new field
'partial_snapshot' Snapshot is set to true, when you list the snapshot with the command
'restic -r ... cat snapshot <snap-id>'.
The actual size of the snapshot can be several megabytes of data beyond the
specified limit, due to the high parallelism of the 'restic backup' command.
Files which have been started to be backed up, but haven't completed yet
will be truncated as soon as the condition (repository size limit reached) is raised.
Backup file data will NOT be correct.
If however the current backup stays within its defined limits of --max-repo-size,
everything is fine and the backup can be used.
EXIT STATUS
===========
@ -103,6 +120,8 @@ type BackupOptions struct {
ReadConcurrency uint
NoScan bool
SkipIfUnchanged bool
RepoMaxSize string
}
func (opts *BackupOptions) AddFlags(f *pflag.FlagSet) {
@ -143,6 +162,7 @@ func (opts *BackupOptions) AddFlags(f *pflag.FlagSet) {
f.BoolVar(&opts.ExcludeCloudFiles, "exclude-cloud-files", false, "excludes online-only cloud files (such as OneDrive Files On-Demand)")
}
f.BoolVar(&opts.SkipIfUnchanged, "skip-if-unchanged", false, "skip snapshot creation if identical to parent snapshot")
f.StringVar(&opts.RepoMaxSize, "max-repo-size", "", "`limit` maximum size of repository - absolute value in bytes with suffixes m/M, g/G, t/T, default no limit")
// parse read concurrency from env, on error the default value will be used
readConcurrency, _ := strconv.ParseUint(os.Getenv("RESTIC_READ_CONCURRENCY"), 10, 32)
@ -506,6 +526,14 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
}
}
if len(opts.RepoMaxSize) > 0 {
size, err := ui.ParseBytes(opts.RepoMaxSize)
if err != nil {
return err
}
gopts.RepoSizeMax = uint64(size)
}
if gopts.verbosity >= 2 && !gopts.JSON {
Verbosef("open repository\n")
}
@ -516,6 +544,20 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
}
defer unlock()
// get current size of repository
if gopts.RepoSizeMax > 0 {
curRepoSize, err := repo.CurrentRepositorySize(ctx)
if err != nil {
return err
}
if !gopts.JSON {
Verbosef("Current repository size is %s\n", ui.FormatBytes(curRepoSize))
}
if curRepoSize >= gopts.RepoSizeMax {
return errors.Fatal("repository maximum size already exceeded")
}
}
var progressPrinter backup.ProgressPrinter
if gopts.JSON {
progressPrinter = backup.NewJSONProgress(term, gopts.verbosity)
@ -630,7 +672,8 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
wg.Go(func() error { return sc.Scan(cancelCtx, targets) })
}
arch := archiver.New(repo, targetFS, archiver.Options{ReadConcurrency: opts.ReadConcurrency})
arch := archiver.New(repo, targetFS, archiver.Options{ReadConcurrency: opts.ReadConcurrency,
RepoSizeMax: gopts.RepoSizeMax})
arch.SelectByName = selectByNameFilter
arch.Select = selectFilter
arch.WithAtime = opts.WithAtime
@ -685,6 +728,21 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
return errors.Fatalf("unable to save snapshot: %v", err)
}
if !gopts.JSON && repo.MaxCapacityExceeded() {
Printf("\n=========================================\n")
Printf("repository maximum size has been exceeded\n")
curRepoSize, err := repo.CurrentRepositorySize(ctx)
if err != nil {
return err
}
Printf("Current repository size is %s\n", ui.FormatBytes(curRepoSize))
Printf("=========================================\n\n")
}
if werr == nil && repo.MaxCapacityExceeded() {
werr = errors.Fatal("backup incomplete, repository capacity exceeded")
}
// Report finished execution
progressReporter.Finish(id, summary, opts.DryRun)
if !success {
@ -694,3 +752,33 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
// Return error if any
return werr
}
func checkPartialSnapshot(sn *restic.Snapshot, checkType string, subCommand string) error {
found := false
if sn.PartialSnapshot {
found = true
} else {
for _, tag := range sn.Tags {
if tag == "partial-snapshot" {
found = true
break
}
}
}
if !found {
return nil
}
switch checkType {
case "error", "fatal":
return errors.Fatalf("selected snapshot %s cannot be used with the command %q because it is a partial snapshot",
sn.ID().Str(), subCommand)
case "warn":
Warnf("be aware that command %s may create unexpected results because %s is a partial snapshot\n",
subCommand, sn.ID().Str())
return nil
default:
return errors.New("type %s is invalid")
}
}

View file

@ -2,6 +2,7 @@ package main
import (
"context"
"crypto/rand"
"fmt"
"io"
"os"
@ -10,6 +11,7 @@ import (
"testing"
"time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
@ -723,3 +725,55 @@ func TestBackupSkipIfUnchanged(t *testing.T) {
testRunCheck(t, env.gopts)
}
func TestBackupRepoSizeMonitorOnFirstBackup(t *testing.T) {
env, cleanup := withTestEnvironment(t)
defer cleanup()
datafile := filepath.Join("testdata", "backup-data.tar.gz")
testRunInit(t, env.gopts)
rtest.SetupTarTestFixture(t, env.testdata, datafile)
opts := BackupOptions{RepoMaxSize: "50k"}
// create and delete snapshot to create unused blobs
oldHook := env.gopts.backendTestHook
env.gopts.backendTestHook = func(r backend.Backend) (backend.Backend, error) { return newListMultipleBackend(r), nil }
defer func() {
env.gopts.backendTestHook = oldHook
}()
err := testRunBackupAssumeFailure(t, filepath.Dir(env.testdata), []string{env.testdata}, opts, env.gopts)
rtest.Assert(t, err != nil && err.Error() == "Fatal: backup incomplete, repository capacity exceeded",
"failed as %q", err)
}
func TestBackupRepoSizeMonitorOnSecondBackup(t *testing.T) {
env, cleanup := withTestEnvironment(t)
defer cleanup()
datafile := filepath.Join("testdata", "backup-data.tar.gz")
testRunInit(t, env.gopts)
rtest.SetupTarTestFixture(t, env.testdata, datafile)
opts := BackupOptions{}
// backup #1
testRunBackup(t, filepath.Dir(env.testdata), []string{env.testdata}, opts, env.gopts)
testListSnapshots(t, env.gopts, 1)
// insert new file into backup structure (8k)
createRandomDataFile(t, filepath.Join(env.testdata, "0", "0", "9", "rand.data"))
// backup #2
opts = BackupOptions{RepoMaxSize: "1k"}
err := testRunBackupAssumeFailure(t, filepath.Dir(env.testdata), []string{env.testdata}, opts, env.gopts)
rtest.Assert(t, err != nil && err.Error() == "Fatal: repository maximum size already exceeded",
"failed as %q", err)
}
// make a random file, large enough to exceed backup size limit
func createRandomDataFile(t *testing.T, path string) {
randomData := make([]byte, 8*1024)
_, _ = rand.Read(randomData)
rtest.OK(t, os.WriteFile(path, randomData, 0600))
}

View file

@ -129,6 +129,10 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []
if sn.Original != nil {
srcOriginal = *sn.Original
}
err = checkPartialSnapshot(sn, "fatal", "copy")
if err != nil {
return err
}
if originalSns, ok := dstSnapshotByOriginal[srcOriginal]; ok {
isCopy := false

View file

@ -74,6 +74,10 @@ func loadSnapshot(ctx context.Context, be restic.Lister, repo restic.LoaderUnpac
if err != nil {
return nil, "", errors.Fatal(err.Error())
}
err = checkPartialSnapshot(sn, "fatal", "diff")
if err != nil {
return sn, "", err
}
return sn, subfolder, err
}

View file

@ -156,6 +156,10 @@ func runDump(ctx context.Context, opts DumpOptions, gopts GlobalOptions, args []
if err != nil {
return errors.Fatalf("failed to find snapshot: %v", err)
}
err = checkPartialSnapshot(sn, "fatal", "dump")
if err != nil {
return err
}
bar := newIndexProgress(gopts.Quiet, gopts.JSON)
err = repo.LoadIndex(ctx, bar)

View file

@ -29,6 +29,9 @@ func newPruneCommand() *cobra.Command {
The "prune" command checks the repository and removes data that is not
referenced and therefore not needed any more.
The "prune" command automatically eliminates pertial snapshots since they take
up space and cannot really be used to do some usefull work.
EXIT STATUS
===========
@ -162,6 +165,12 @@ func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, term
}
defer unlock()
// check for partial snapshots - and remove them
err = findPartialSnapshots(ctx, repo, gopts, term)
if err != nil {
return err
}
if opts.UnsafeNoSpaceRecovery != "" {
repoID := repo.Config().ID
if opts.UnsafeNoSpaceRecovery != repoID {
@ -292,3 +301,45 @@ func getUsedBlobs(ctx context.Context, repo restic.Repository, usedBlobs restic.
return restic.FindUsedBlobs(ctx, repo, snapshotTrees, usedBlobs, bar)
}
// findPartialSnapshots find all partial snapshots and 'forget' them
func findPartialSnapshots(ctx context.Context, repo *repository.Repository, gopts GlobalOptions, term *termstatus.Terminal) error {
snapshotLister, err := restic.MemorizeList(ctx, repo, restic.SnapshotFile)
if err != nil {
return err
}
selectedSnaps := restic.IDSet{}
err = (&restic.SnapshotFilter{Tags: restic.TagLists{restic.TagList{"partial-snapshot"}}}).FindAll(ctx, snapshotLister, repo, []string{}, func(_ string, sn *restic.Snapshot, err error) error {
if err != nil {
return err
}
selectedSnaps.Insert(*sn.ID())
return nil
})
if err != nil {
return err
} else if len(selectedSnaps) == 0 {
return nil
}
// run forget
verbosity := gopts.verbosity
if gopts.JSON {
verbosity = 0
}
printer := newTerminalProgressPrinter(verbosity, term)
bar := printer.NewCounter("partial snapshots deleted")
err = restic.ParallelRemove(ctx, repo, selectedSnaps, restic.WriteableSnapshotFile, func(id restic.ID, err error) error {
if err != nil {
printer.E("unable to remove partial snapshot %v/%v from the repository\n", restic.SnapshotFile, id)
} else {
printer.VV("removed partial snapshot %v/%v\n", restic.SnapshotFile, id)
}
return nil
}, bar)
bar.Done()
return err
}

View file

@ -8,13 +8,14 @@ import (
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/repository"
//"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"github.com/restic/restic/internal/ui/termstatus"
)
func testRunPrune(t testing.TB, gopts GlobalOptions, opts PruneOptions) {
oldHook := gopts.backendTestHook
gopts.backendTestHook = func(r backend.Backend) (backend.Backend, error) { return newListOnceBackend(r), nil }
gopts.backendTestHook = func(r backend.Backend) (backend.Backend, error) { return newListMultipleBackend(r), nil }
defer func() {
gopts.backendTestHook = oldHook
}()
@ -142,7 +143,7 @@ func TestPruneWithDamagedRepository(t *testing.T) {
removePacksExcept(env.gopts, t, oldPacks, false)
oldHook := env.gopts.backendTestHook
env.gopts.backendTestHook = func(r backend.Backend) (backend.Backend, error) { return newListOnceBackend(r), nil }
env.gopts.backendTestHook = func(r backend.Backend) (backend.Backend, error) { return newListMultipleBackend(r), nil }
defer func() {
env.gopts.backendTestHook = oldHook
}()
@ -237,3 +238,28 @@ func testEdgeCaseRepo(t *testing.T, tarfile string, optionsCheck CheckOptions, o
"prune should have reported an error")
}
}
func TestPruneSizeMonitoring(t *testing.T) {
env, cleanup := withTestEnvironment(t)
defer cleanup()
datafile := filepath.Join("testdata", "backup-data.tar.gz")
testRunInit(t, env.gopts)
rtest.SetupTarTestFixture(t, env.testdata, datafile)
opts := BackupOptions{RepoMaxSize: "50k"}
// create and delete snapshot to create unused blobs
oldHook := env.gopts.backendTestHook
env.gopts.backendTestHook = func(r backend.Backend) (backend.Backend, error) { return newListMultipleBackend(r), nil }
defer func() {
env.gopts.backendTestHook = oldHook
}()
err := testRunBackupAssumeFailure(t, filepath.Dir(env.testdata), []string{env.testdata}, opts, env.gopts)
rtest.Assert(t, err != nil && err.Error() == "Fatal: backup incomplete, repository capacity exceeded",
"failed as %q", err)
testListSnapshots(t, env.gopts, 1)
testRunPrune(t, env.gopts, PruneOptions{MaxUnused: "0"})
testListSnapshots(t, env.gopts, 0)
}

View file

@ -145,6 +145,10 @@ func runRestore(ctx context.Context, opts RestoreOptions, gopts GlobalOptions,
if err != nil {
return errors.Fatalf("failed to find snapshot: %v", err)
}
err = checkPartialSnapshot(sn, "error", "restore")
if err != nil {
return err
}
bar := newIndexTerminalProgress(gopts.Quiet, gopts.JSON, term)
err = repo.LoadIndex(ctx, bar)

View file

@ -321,6 +321,11 @@ func runRewrite(ctx context.Context, opts RewriteOptions, gopts GlobalOptions, a
changedCount := 0
for sn := range FindFilteredSnapshots(ctx, snapshotLister, repo, &opts.SnapshotFilter, args) {
err = checkPartialSnapshot(sn, "fatal", "rewrite")
if err != nil {
return err
}
Verbosef("\n%v\n", sn)
changed, err := rewriteSnapshot(ctx, repo, sn, opts)
if err != nil {

View file

@ -29,13 +29,13 @@ func newStatsCommand() *cobra.Command {
Short: "Scan the repository and show basic statistics",
Long: `
The "stats" command walks one or multiple snapshots in a repository
and accumulates statistics about the data stored therein. It reports
and accumulates statistics about the data stored therein. It reports
on the number of unique files and their sizes, according to one of
the counting modes as given by the --mode flag.
It operates on all snapshots matching the selection criteria or all
snapshots if nothing is specified. The special snapshot ID "latest"
is also supported. Some modes make more sense over
is also supported. Some modes make more sense over
just a single snapshot, while others are useful across all snapshots,
depending on what you are trying to calculate.
@ -130,6 +130,10 @@ func runStats(ctx context.Context, opts StatsOptions, gopts GlobalOptions, args
}
for sn := range FindFilteredSnapshots(ctx, snapshotLister, repo, &opts.SnapshotFilter, args) {
err = checkPartialSnapshot(sn, "fatal", "stats")
if err != nil {
return err
}
err = statsWalkSnapshot(ctx, sn, repo, opts, stats)
if err != nil {
return fmt.Errorf("error walking snapshot: %v", err)

View file

@ -157,6 +157,10 @@ func runTag(ctx context.Context, opts TagOptions, gopts GlobalOptions, term *ter
}
for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) {
err := checkPartialSnapshot(sn, "fatal", "tag")
if err != nil {
return err
}
changed, err := changeTags(ctx, repo, sn, opts.SetTags.Flatten(), opts.AddTags.Flatten(), opts.RemoveTags.Flatten(), printFunc)
if err != nil {
Warnf("unable to modify the tags for snapshot ID %q, ignoring: %v\n", sn.ID(), err)

View file

@ -73,6 +73,7 @@ type GlobalOptions struct {
PackSize uint
NoExtraVerify bool
InsecureNoPassword bool
RepoSizeMax uint64
backend.TransportOptions
limiter.Limits
@ -485,6 +486,7 @@ func OpenRepository(ctx context.Context, opts GlobalOptions) (*repository.Reposi
Compression: opts.Compression,
PackSize: opts.PackSize * 1024 * 1024,
NoExtraVerify: opts.NoExtraVerify,
RepoSizeMax: opts.RepoSizeMax,
})
if err != nil {
return nil, errors.Fatal(err.Error())

View file

@ -46,15 +46,20 @@ type listOnceBackend struct {
backend.Backend
listedFileType map[restic.FileType]bool
strictOrder bool
allowMultiple bool
}
func newListOnceBackend(be backend.Backend) *listOnceBackend {
// the linter bites here: says newListOnceBackend is not used
// I need to be able to call SnapshotLister more than once to check for partial snapshots
// and restic prune uses `getUsedBlobs` to get all used blobs which uses
// `restic.ForAllSnapshots()` to do the work.
/*func newListOnceBackend(be backend.Backend) *listOnceBackend {
return &listOnceBackend{
Backend: be,
listedFileType: make(map[restic.FileType]bool),
strictOrder: false,
}
}
}*/
func newOrderedListOnceBackend(be backend.Backend) *listOnceBackend {
return &listOnceBackend{
@ -64,6 +69,15 @@ func newOrderedListOnceBackend(be backend.Backend) *listOnceBackend {
}
}
func newListMultipleBackend(be backend.Backend) *listOnceBackend {
return &listOnceBackend{
Backend: be,
listedFileType: make(map[restic.FileType]bool),
strictOrder: false,
allowMultiple: true,
}
}
func (be *listOnceBackend) List(ctx context.Context, t restic.FileType, fn func(backend.FileInfo) error) error {
if t != restic.LockFile && be.listedFileType[t] {
return errors.Errorf("tried listing type %v the second time", t)
@ -71,7 +85,9 @@ func (be *listOnceBackend) List(ctx context.Context, t restic.FileType, fn func(
if be.strictOrder && t == restic.SnapshotFile && be.listedFileType[restic.IndexFile] {
return errors.Errorf("tried listing type snapshots after index")
}
be.listedFileType[t] = true
if !be.allowMultiple {
be.listedFileType[t] = true
}
return be.Backend.List(ctx, t, fn)
}

View file

@ -15,6 +15,7 @@ import (
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/feature"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)
@ -75,6 +76,7 @@ type archiverRepo interface {
restic.Loader
restic.BlobSaver
restic.SaverUnpacked[restic.WriteableFileType]
repository.CapacityChecker
Config() restic.Config
StartPackUploader(ctx context.Context, wg *errgroup.Group)
@ -154,6 +156,9 @@ type Options struct {
// SaveTreeConcurrency sets how many trees are marshalled and saved to the
// repo concurrently.
SaveTreeConcurrency uint
// RepoSizeMax > 0 signals repository size monitoring
RepoSizeMax uint64
}
// ApplyDefaults returns a copy of o with the default options set for all unset
@ -333,28 +338,30 @@ func (arch *Archiver) saveDir(ctx context.Context, snPath string, dir string, me
pathname := arch.FS.Join(dir, name)
oldNode := previous.Find(name)
snItem := join(snPath, name)
fn, excluded, err := arch.save(ctx, snItem, pathname, oldNode)
// don't save if we are in repository shutdown mode
if !arch.Repo.MaxCapacityExceeded() {
fn, excluded, err := arch.save(ctx, snItem, pathname, oldNode)
// return error early if possible
if err != nil {
err = arch.error(pathname, err)
if err == nil {
// ignore error
// return error early if possible
if err != nil {
err = arch.error(pathname, err)
if err == nil {
// ignore error
continue
}
return futureNode{}, err
}
if excluded {
continue
}
return futureNode{}, err
nodes = append(nodes, fn)
}
if excluded {
continue
}
nodes = append(nodes, fn)
}
fn := arch.treeSaver.Save(ctx, snPath, dir, treeNode, nodes, complete)
return fn, nil
}
@ -832,9 +839,10 @@ func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group) {
arch.fileSaver = newFileSaver(ctx, wg,
arch.blobSaver.Save,
arch.Repo.Config().ChunkerPolynomial,
arch.Options.ReadConcurrency, arch.Options.SaveBlobConcurrency)
arch.Options.ReadConcurrency, arch.Options.SaveBlobConcurrency, arch.Repo)
arch.fileSaver.CompleteBlob = arch.CompleteBlob
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
//arch.fileSaver.repo = arch.Repo
arch.treeSaver = newTreeSaver(ctx, wg, arch.Options.SaveTreeConcurrency, arch.blobSaver.Save, arch.Error)
}
@ -909,7 +917,6 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
debug.Log("error while saving tree: %v", err)
return err
}
return arch.Repo.Flush(ctx)
})
err = wgUp.Wait()
@ -924,6 +931,9 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
}
}
if arch.Repo.MaxCapacityExceeded() {
opts.Tags = append(opts.Tags, "partial-snapshot")
}
sn, err := restic.NewSnapshot(targets, opts.Tags, opts.Hostname, opts.Time)
if err != nil {
return nil, restic.ID{}, nil, err
@ -934,6 +944,10 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
if opts.ParentSnapshot != nil {
sn.Parent = opts.ParentSnapshot.ID()
}
if arch.Repo.MaxCapacityExceeded() {
sn.PartialSnapshot = true
}
sn.Tree = &rootTreeID
arch.summary.BackupEnd = time.Now()
sn.Summary = &restic.SnapshotSummary{

View file

@ -68,8 +68,9 @@ type saveBlobResponse struct {
func (s *blobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) {
id, known, sizeInRepo, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false)
if err != nil {
if err != nil && err.Error() == "MaxCapacityExceeded" {
err = nil
} else if err != nil {
return saveBlobResponse{}, err
}

View file

@ -29,11 +29,12 @@ type fileSaver struct {
CompleteBlob func(bytes uint64)
NodeFromFileInfo func(snPath, filename string, meta ToNoder, ignoreXattrListError bool) (*restic.Node, error)
repo archiverRepo
}
// newFileSaver returns a new file saver. A worker pool with fileWorkers is
// started, it is stopped when ctx is cancelled.
func newFileSaver(ctx context.Context, wg *errgroup.Group, save saveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *fileSaver {
func newFileSaver(ctx context.Context, wg *errgroup.Group, save saveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint, repo archiverRepo) *fileSaver {
ch := make(chan saveFileJob)
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
@ -47,6 +48,7 @@ func newFileSaver(ctx context.Context, wg *errgroup.Group, save saveBlobFn, pol
ch: ch,
CompleteBlob: func(uint64) {},
repo: repo,
}
for i := uint(0); i < fileWorkers; i++ {
@ -126,11 +128,23 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
if isCompleted {
panic("completed twice")
}
for _, id := range fnr.node.Content {
if id.IsNull() {
fixEnd := false
firstIndex := 0
for i, id := range fnr.node.Content {
if s.repo.MaxCapacityExceeded() && id.IsNull() {
fixEnd = true
firstIndex = i
break
} else if id.IsNull() {
panic("completed file with null ID")
}
}
if fixEnd {
fnr.node.Content = fnr.node.Content[:firstIndex]
debug.Log("truncating file %q", fnr.snPath)
}
isCompleted = true
finish(fnr)
}
@ -202,6 +216,9 @@ func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
node.Content = append(node.Content, restic.ID{})
lock.Unlock()
if s.repo.MaxCapacityExceeded() {
break
}
s.saveBlob(ctx, restic.DataBlob, buf, target, func(sbr saveBlobResponse) {
lock.Lock()
if !sbr.known {

View file

@ -10,6 +10,7 @@ import (
"github.com/restic/chunker"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
@ -48,7 +49,8 @@ func startFileSaver(ctx context.Context, t testing.TB, fsInst fs.FS) (*fileSaver
t.Fatal(err)
}
s := newFileSaver(ctx, wg, saveBlob, pol, workers, workers)
repo := repository.TestRepository(t)
s := newFileSaver(ctx, wg, saveBlob, pol, workers, workers, repo)
s.NodeFromFileInfo = func(snPath, filename string, meta ToNoder, ignoreXattrListError bool) (*restic.Node, error) {
return meta.ToNode(ignoreXattrListError)
}

View file

@ -51,6 +51,9 @@ type Repository struct {
allocDec sync.Once
enc *zstd.Encoder
dec *zstd.Decoder
maxRepoCapReached bool
maxRepoMutex sync.Mutex
}
// internalRepository allows using SaveUnpacked and RemoveUnpacked with all FileTypes
@ -62,6 +65,8 @@ type Options struct {
Compression CompressionMode
PackSize uint
NoExtraVerify bool
RepoSizeMax uint64
repoCurSize uint64
}
// CompressionMode configures if data should be compressed.
@ -391,7 +396,57 @@ func (r *Repository) saveAndEncrypt(ctx context.Context, t restic.BlobType, data
panic(fmt.Sprintf("invalid type: %v", t))
}
return pm.SaveBlob(ctx, t, id, ciphertext, uncompressedLength)
length, err := pm.SaveBlob(ctx, t, id, ciphertext, uncompressedLength)
// maximum repository capacity exceeded?
r.maxRepoMutex.Lock()
defer r.maxRepoMutex.Unlock()
if r.opts.RepoSizeMax > 0 {
r.opts.repoCurSize += uint64(length)
if r.opts.repoCurSize > r.opts.RepoSizeMax {
if !r.maxRepoCapReached {
debug.Log("MaxCapacityExceeded")
r.maxRepoCapReached = true
}
return length, errors.New("MaxCapacityExceeded")
}
}
return length, err
}
// CurrentRepositorySize counts the sizes of the filetypes snapshot, index and packs
func (r *Repository) CurrentRepositorySize(ctx context.Context) (uint64, error) {
curSize := uint64(0)
if r.opts.RepoSizeMax > 0 {
for _, ft := range []restic.FileType{restic.SnapshotFile, restic.IndexFile, restic.PackFile} {
err := r.List(ctx, ft, func(_ restic.ID, size int64) error {
curSize += uint64(size)
return nil
})
if err != nil {
return 0, err
}
}
r.opts.repoCurSize = curSize
return curSize, nil
}
return 0, errors.New("repository maximum size has not been set")
}
// MaxCapacityExceeded reports if repository has a limit and if it is exceeded
func (r *Repository) MaxCapacityExceeded() bool {
r.maxRepoMutex.Lock()
defer r.maxRepoMutex.Unlock()
if r.opts.RepoSizeMax == 0 {
return false
}
return r.maxRepoCapReached
}
// CapacityChecker has to satisfy restic.Repository interface needs
type CapacityChecker interface {
MaxCapacityExceeded() bool
}
func (r *Repository) verifyCiphertext(buf []byte, uncompressedLength int, id restic.ID) error {

View file

@ -63,6 +63,9 @@ type Repository interface {
// StartWarmup creates a new warmup job, requesting the backend to warmup the specified packs.
StartWarmup(ctx context.Context, packs IDSet) (WarmupJob, error)
// MaxCapacityExceeded checks if repository capacity has been exceeded
MaxCapacityExceeded() bool
}
type FileType = backend.FileType

View file

@ -25,8 +25,9 @@ type Snapshot struct {
Tags []string `json:"tags,omitempty"`
Original *ID `json:"original,omitempty"`
ProgramVersion string `json:"program_version,omitempty"`
Summary *SnapshotSummary `json:"summary,omitempty"`
ProgramVersion string `json:"program_version,omitempty"`
PartialSnapshot bool `json:"partial_snapshot,omitempty"`
Summary *SnapshotSummary `json:"summary,omitempty"`
id *ID // plaintext ID, used during restore
}