diff --git a/archiver_test.go b/archiver_test.go index 007ed411..75b1176c 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -5,12 +5,15 @@ import ( "crypto/sha256" "io" "testing" + "time" "github.com/restic/chunker" "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/checker" "github.com/restic/restic/crypto" "github.com/restic/restic/pack" + "github.com/restic/restic/repository" . "github.com/restic/restic/test" ) @@ -21,6 +24,11 @@ type Rdr interface { io.ReaderAt } +type chunkedData struct { + buf []byte + chunks []*chunker.Chunk +} + func benchmarkChunkEncrypt(b testing.TB, buf, buf2 []byte, rd Rdr, key *crypto.Key) { rd.Seek(0, 0) ch := chunker.New(rd, testPol, sha256.New()) @@ -233,3 +241,102 @@ func BenchmarkLoadTree(t *testing.B) { } } } + +// Saves several identical chunks concurrently and later checks that there are no +// unreferenced packs in the repository. See also #292 and #358. +func TestParallelSaveWithDuplication(t *testing.T) { + for seed := 0; seed < 10; seed++ { + testParallelSaveWithDuplication(t, seed) + } +} + +func testParallelSaveWithDuplication(t *testing.T, seed int) { + repo := SetupRepo() + defer TeardownRepo(repo) + + dataSizeMb := 128 + duplication := 7 + + arch := restic.NewArchiver(repo) + data, chunks := getRandomData(seed, dataSizeMb*1024*1024) + reader := bytes.NewReader(data) + + errChannels := [](<-chan error){} + + // interweaved processing of subsequent chunks + maxParallel := 2*duplication - 1 + barrier := make(chan struct{}, maxParallel) + + for _, c := range chunks { + for dupIdx := 0; dupIdx < duplication; dupIdx++ { + errChan := make(chan error) + errChannels = append(errChannels, errChan) + + go func(reader *bytes.Reader, c *chunker.Chunk, errChan chan<- error) { + barrier <- struct{}{} + + hash := c.Digest + id := backend.ID{} + copy(id[:], hash) + + time.Sleep(time.Duration(hash[0])) + err := arch.Save(pack.Data, id, c.Length, c.Reader(reader)) + <-barrier + errChan <- err + }(reader, c, errChan) + } + } + + for _, errChan := range errChannels { + OK(t, <-errChan) + } + + OK(t, repo.Flush()) + OK(t, repo.SaveIndex()) + + chkr := createAndInitChecker(t, repo) + assertNoUnreferencedPacks(t, chkr) +} + +func getRandomData(seed int, size int) ([]byte, []*chunker.Chunk) { + buf := Random(seed, size) + chunks := []*chunker.Chunk{} + chunker := chunker.New(bytes.NewReader(buf), testPol, sha256.New()) + + for { + c, err := chunker.Next() + if err == io.EOF { + break + } + chunks = append(chunks, c) + } + + return buf, chunks +} + +func createAndInitChecker(t *testing.T, repo *repository.Repository) *checker.Checker { + chkr := checker.New(repo) + + hints, errs := chkr.LoadIndex() + if len(errs) > 0 { + t.Fatalf("expected no errors, got %v: %v", len(errs), errs) + } + + if len(hints) > 0 { + t.Errorf("expected no hints, got %v: %v", len(hints), hints) + } + + return chkr +} + +func assertNoUnreferencedPacks(t *testing.T, chkr *checker.Checker) { + done := make(chan struct{}) + defer close(done) + + errChan := make(chan error) + go chkr.Packs(errChan, done) + + for err := range errChan { + OK(t, err) + } +} diff --git a/repository/master_index.go b/repository/master_index.go index a747d06e..2f63e82a 100644 --- a/repository/master_index.go +++ b/repository/master_index.go @@ -157,27 +157,47 @@ func (mi *MasterIndex) Current() *Index { // AddInFlight add the given ID to the list of in-flight IDs. An error is // returned when the ID is already in the list. func (mi *MasterIndex) AddInFlight(id backend.ID) error { - mi.inFlight.Lock() - defer mi.inFlight.Unlock() + // The index + inFlight store must be searched for a matching id in one + // atomic operation. This requires locking the inFlight store and the + // index together! + mi.inFlight.Lock() + defer mi.inFlight.Unlock() - debug.Log("MasterIndex.AddInFlight", "adding %v", id) - if mi.inFlight.Has(id) { - return fmt.Errorf("%v is already in flight", id) - } + // Note: mi.Has read locks the index again. + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() - mi.inFlight.Insert(id) - return nil + debug.Log("MasterIndex.AddInFlight", "adding %v", id) + if mi.inFlight.Has(id) { + return fmt.Errorf("%v is already in flight", id) + } + if mi.Has(id) { + return fmt.Errorf("%v is already indexed (fully processed)", id) + } + + mi.inFlight.Insert(id) + return nil } // IsInFlight returns true iff the id is contained in the list of in-flight IDs. func (mi *MasterIndex) IsInFlight(id backend.ID) bool { - mi.inFlight.RLock() - defer mi.inFlight.RUnlock() + // The index + inFlight store must be searched for a matching id in one + // atomic operation. This requires locking the inFlight store and the + // index together! + mi.inFlight.RLock() + defer mi.inFlight.RUnlock() - inFlight := mi.inFlight.Has(id) - debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight) + // Note: mi.Has read locks the index again. + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() - return inFlight + inFlight := mi.inFlight.Has(id) + debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight) + + indexed := mi.Has(id) + debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed) + + return inFlight } // RemoveFromInFlight deletes the given ID from the liste of in-flight IDs. diff --git a/test/helpers.go b/test/helpers.go index 7abb536b..69a75cd2 100644 --- a/test/helpers.go +++ b/test/helpers.go @@ -1,7 +1,6 @@ package test_helper import ( - "bytes" "compress/bzip2" "compress/gzip" "crypto/rand" @@ -86,10 +85,24 @@ func Random(seed, count int) []byte { return buf } +type rndReader struct { + src *mrand.Rand +} + +func (r *rndReader) Read(p []byte) (int, error) { + fmt.Printf("Read(%v)\n", len(p)) + for i := range p { + p[i] = byte(r.src.Uint32()) + } + + return len(p), nil +} + // RandomReader returns a reader that returns size bytes of pseudo-random data // derived from the seed. -func RandomReader(seed, size int) *bytes.Reader { - return bytes.NewReader(Random(seed, size)) +func RandomReader(seed, size int) io.Reader { + r := &rndReader{src: mrand.New(mrand.NewSource(int64(seed)))} + return io.LimitReader(r, int64(size)) } // GenRandom returns a []byte filled with up to 1000 random bytes.