Skip to content

Commit

Permalink
fix race conditions in DownloadBlobs tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cppforlife committed Oct 20, 2016
1 parent b7a3216 commit 88598e3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
42 changes: 21 additions & 21 deletions releasedir/fs_blobs_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,36 +109,24 @@ func (d FSBlobsDir) DownloadBlobs(numOfParallelWorkers int) error {
return err
}

results := make(chan error, len(blobs))
jobs := make(chan Blob, numOfParallelWorkers)

defer close(results)
defer close(jobs)

worker := func(blobs chan Blob, results chan<- error) {
for blob := range blobs {
if len(blob.BlobstoreID) > 0 {
err := d.downloadBlob(blob)
if err != nil {
results <- err
}
}
results <- nil
}
}
resultsCh := make(chan error, len(blobs))
defer close(resultsCh)

blobsCh := make(chan Blob, numOfParallelWorkers)
defer close(blobsCh)

for w := 0; w < numOfParallelWorkers; w++ {
go worker(jobs, results)
go d.downloadBlobsWorker(blobsCh, resultsCh)
}

for _, blob := range blobs {
jobs <- blob
blobsCh <- blob
}

var errs []error
for i := 0; i < len(blobs); i++ {
err := <-results

for i := 0; i < len(blobs); i++ {
err := <-resultsCh
if err != nil {
errs = append(errs, err)
}
Expand All @@ -151,6 +139,18 @@ func (d FSBlobsDir) DownloadBlobs(numOfParallelWorkers int) error {
return nil
}

func (d FSBlobsDir) downloadBlobsWorker(blobsCh chan Blob, resultsCh chan<- error) {
for blob := range blobsCh {
var err error

if len(blob.BlobstoreID) > 0 {
err = d.downloadBlob(blob)
}

resultsCh <- err
}
}

func (d FSBlobsDir) TrackBlob(path string, src io.ReadCloser) (Blob, error) {
tempFile, err := d.fs.TempFile("track-blob")
if err != nil {
Expand Down
32 changes: 28 additions & 4 deletions releasedir/fs_blobs_dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,28 @@ already-downloaded.tgz:

Context("Multiple workers used to download blobs", func() {
It("downloads all blobs without local blob copy, skipping non-uploaded blobs", func() {
// Order of blobstore.Get calls may be done in any order
blobstore := FakeConcurrentBlobstore{
GetCallback: func(blobID, fingerprint string) (string, error) {
if blobID == "blob1" && fingerprint == "blob1-sha" {
return "/blob1-tmp", nil
} else if blobID == "blob2" && fingerprint == "blob2-sha" {
return "/blob2-tmp", nil
} else {
panic("Received non-matching blobstore.Get call")
}
},
}

blobsDir = NewFSBlobsDir("/dir", reporter, blobstore, sha1calc, fs)

err := act(4)
Expect(err).ToNot(HaveOccurred())

Expect(blobstore.GetBlobIDs).To(Equal([]string{"blob1", "blob2"}))
Expect(blobstore.GetFingerprints).To(Equal([]string{"blob1-sha", "blob2-sha"}))

Expect(fs.FileExists("/dir/blobs/dir")).To(BeTrue())
Expect(fs.ReadFileString("/dir/blobs/dir/file-in-directory.tgz")).To(Equal("blob1-content"))
Expect(fs.ReadFileString("/dir/blobs/file-in-root.tgz")).To(Equal("blob2-content"))
})

})

Context("A single worker to download blobs", func() {
Expand Down Expand Up @@ -665,3 +676,16 @@ non-uploaded2.tgz:
})
})
})

type FakeConcurrentBlobstore struct {
GetCallback func(blobID, fingerprint string) (string, error)
}

func (bs FakeConcurrentBlobstore) Get(blobID, fingerprint string) (string, error) {
return bs.GetCallback(blobID, fingerprint)
}

func (bs FakeConcurrentBlobstore) CleanUp(fileName string) error { return nil }
func (bs FakeConcurrentBlobstore) Delete(blobId string) error { return nil }
func (bs FakeConcurrentBlobstore) Create(fileName string) (string, string, error) { return "", "", nil }
func (bs FakeConcurrentBlobstore) Validate() error { return nil }

0 comments on commit 88598e3

Please sign in to comment.