From 8d2d50d095767bbe91e80ad4d85b55732d0dfafc Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 22 Mar 2025 21:46:51 +0100 Subject: [PATCH] repository: merge small pack files before flushing This prevents chunk size leaks when a backup only consists of a small file which is split in two parts, which end up in two individual pack files. --- internal/repository/pack/pack.go | 19 +++++++ internal/repository/packer_manager.go | 59 +++++++++++++++++++--- internal/repository/packer_manager_test.go | 2 +- internal/repository/repository_test.go | 53 +++++++++++++++++++ 4 files changed, 124 insertions(+), 9 deletions(-) diff --git a/internal/repository/pack/pack.go b/internal/repository/pack/pack.go index 9dfd1004d..e9c0ab77b 100644 --- a/internal/repository/pack/pack.go +++ b/internal/repository/pack/pack.go @@ -163,6 +163,25 @@ func makeHeader(blobs []restic.Blob) ([]byte, error) { return buf, nil } +func (p *Packer) Merge(other *Packer, otherData io.Reader) error { + other.m.Lock() + defer other.m.Unlock() + + for _, blob := range other.blobs { + data := make([]byte, blob.Length) + _, err := io.ReadFull(otherData, data) + if err != nil { + return err + } + + if _, err := p.Add(blob.Type, blob.ID, data, int(blob.UncompressedLength)); err != nil { + return err + } + } + + return nil +} + // Size returns the number of bytes written so far. func (p *Packer) Size() uint { p.m.Lock() diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index 6ffec8352..c21d4b150 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -57,19 +57,62 @@ func (r *packerManager) Flush(ctx context.Context) error { r.pm.Lock() defer r.pm.Unlock() - for i, packer := range r.packers { - if packer != nil { - debug.Log("manually flushing pending pack") - err := r.queueFn(ctx, r.tpe, packer) - if err != nil { - return err - } - r.packers[i] = nil + pendingPackers, err := r.mergePackers() + if err != nil { + return err + } + + for _, packer := range pendingPackers { + debug.Log("manually flushing pending pack") + err := r.queueFn(ctx, r.tpe, packer) + if err != nil { + return err } } return nil } +// mergePackers merges small pack files before those are uploaded by Flush(). The main +// purpose of this method is to reduce information leaks if a small file is backed up +// and the blobs end up in spearate pack files. If the file only consists of two blobs +// this would leak the size of the individual blobs. +func (r *packerManager) mergePackers() ([]*packer, error) { + pendingPackers := []*packer{} + var p *packer + for i, packer := range r.packers { + if packer == nil { + continue + } + + r.packers[i] = nil + if p == nil { + p = packer + } else if p.Size()+packer.Size() < r.packSize { + // merge if the result stays below the target pack size + err := packer.bufWr.Flush() + if err != nil { + return nil, err + } + _, err = packer.tmpfile.Seek(0, io.SeekStart) + if err != nil { + return nil, err + } + + err = p.Merge(packer.Packer, packer.tmpfile) + if err != nil { + return nil, err + } + } else { + pendingPackers = append(pendingPackers, p) + p = packer + } + } + if p != nil { + pendingPackers = append(pendingPackers, p) + } + return pendingPackers, nil +} + func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) (int, error) { r.pm.Lock() defer r.pm.Unlock() diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 3dd6a079d..7f3d34f1b 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -97,7 +97,7 @@ func TestPackerManagerWithOversizeBlob(t *testing.T) { test.OK(t, pm.Flush(context.TODO())) // oversized blob must be stored in a separate packfile - test.Assert(t, packFiles == 2 || packFiles == 3, "unexpected number of packfiles %v, expected 2 or 3", packFiles) + test.Assert(t, packFiles == 2, "unexpected number of packfiles %v, expected 2", packFiles) } func BenchmarkPackerManager(t *testing.B) { diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 1b0d47c8f..573ab557b 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -16,6 +16,7 @@ import ( "github.com/restic/restic/internal/backend/cache" "github.com/restic/restic/internal/backend/local" "github.com/restic/restic/internal/backend/mem" + "github.com/restic/restic/internal/checker" "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/repository" @@ -81,6 +82,58 @@ func testSave(t *testing.T, version uint, calculateID bool) { } } +func TestSavePackMerging(t *testing.T) { + t.Run("75%", func(t *testing.T) { + testSavePackMerging(t, 75, 1) + }) + t.Run("150%", func(t *testing.T) { + testSavePackMerging(t, 175, 2) + }) + t.Run("250%", func(t *testing.T) { + testSavePackMerging(t, 275, 3) + }) +} + +func testSavePackMerging(t *testing.T, targetPercentage int, expectedPacks int) { + repo, _ := repository.TestRepositoryWithBackend(t, nil, 0, repository.Options{ + // minimum pack size to speed up test + PackSize: repository.MinPackSize, + }) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + + var ids restic.IDs + // add blobs with size targetPercentage / 100 * repo.PackSize to the repository + blobSize := repository.MinPackSize / 100 + for range targetPercentage { + data := make([]byte, blobSize) + _, err := io.ReadFull(rnd, data) + rtest.OK(t, err) + + sid, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, restic.ID{}, false) + rtest.OK(t, err) + ids = append(ids, sid) + } + + rtest.OK(t, repo.Flush(context.Background())) + + // check that all blobs are readable + for _, id := range ids { + _, err := repo.LoadBlob(context.TODO(), restic.DataBlob, id, nil) + rtest.OK(t, err) + } + + // check for correct number of pack files + packs := 0 + rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, _ int64) error { + packs++ + return nil + })) + rtest.Equals(t, expectedPacks, packs, "unexpected number of pack files") + + checker.TestCheckRepo(t, repo, true) +} + func BenchmarkSaveAndEncrypt(t *testing.B) { repository.BenchmarkAllVersions(t, benchmarkSaveAndEncrypt) }