Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/transfermanager): add SkipIfExists option #10893

Merged
merged 6 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,22 @@ func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirec
}

// Check if the file exists.
// TODO: add skip option.
fileExists := false

filePath := filepath.Join(input.LocalDirectory, attrs.Name)
if _, err := os.Stat(filePath); err == nil {
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist)
fileExists = true
if !d.config.skipIfExists {
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, os.ErrExist)
}
} else if !errors.Is(err, os.ErrNotExist) {
// Encountered an error other than file does not exist.
return fmt.Errorf("transfermanager: failed to create file(%q): %w", filePath, err)
}

objectsToQueue = append(objectsToQueue, attrs.Name)
if !(d.config.skipIfExists && fileExists) {
objectsToQueue = append(objectsToQueue, attrs.Name)
}
}

outs := make(chan DownloadOutput, len(objectsToQueue))
Expand Down
70 changes: 70 additions & 0 deletions storage/transfermanager/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"math/rand"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -295,6 +296,75 @@ func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
t.Errorf("unexpected file %q in dir", entry.Name())
}
}

// Now attempt to download the entire directory.
// The existing files should be skipped.
callbacks := make(chan bool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would help to clarify that you're checking that SkipIfExists() should not attempt downloading included files from previous directory download;

// In lex order we have:
// "dir/nested/again/obj1", -- included
// "dir/nested/objA", -- skipped
// "dir/file" -- included
// "dir/objA", -- skipped
// "dir/objB", -- skipped
// "dir/objC", -- included

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment here and put those objects in a variable - does that seem sufficient here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thank you;


d, err = NewDownloader(c, WithWorkers(2), SkipIfExists())
if err != nil {
t.Fatalf("NewDownloader: %v", err)
}

if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{
Bucket: tb.bucket,
LocalDirectory: localDir,
OnObjectDownload: func(got *DownloadOutput) {
callbacks <- true

if got.Err != nil {
t.Errorf("result.Err: %v", got.Err)
}

if strings.EqualFold(got.Object, "dir/objA") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only checks dir/objA, do you want to check the other included objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

t.Errorf("should have skipped download of object %s", got.Object)
}

if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got {
t.Errorf("expected object size %d, got %d", want, got)
}

path := filepath.Join(localDir, got.Object)
f, err := os.Open(path)
if err != nil {
t.Errorf("os.Open(%q): %v", path, err)
}
defer f.Close()

b := bytes.NewBuffer(make([]byte, 0, got.Attrs.Size))
if _, err := io.Copy(b, f); err != nil {
t.Errorf("io.Copy: %v", err)
}

if wantCRC, gotCRC := tb.contentHashes[got.Object], crc32c(b.Bytes()); gotCRC != wantCRC {
t.Errorf("object(%q) at filepath(%q): content crc32c does not match; got: %v, expected: %v", got.Object, path, gotCRC, wantCRC)
}
},
}); err != nil {
t.Errorf("d.DownloadDirectory: %v", err)
}

gotCallbacks := 0
done := make(chan bool)
go func() {
for {
select {
case <-done:
break
case <-callbacks:
gotCallbacks++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my learning, gotCallbacks is synchronized; so the directory download which uses a mutex could be rewritten this way. Is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. I re-wrote that logic, PTAL.

}
}
}()

if _, err := d.WaitAndClose(); err != nil {
t.Errorf("d.WaitAndClose: %v", err)
}
done <- true

if want, got := len(tb.objects)-wantObjs, gotCallbacks; want != got {
t.Errorf("expected to receive %d callbacks, got %d", want, got)
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
}
})
}

Expand Down
19 changes: 19 additions & 0 deletions storage/transfermanager/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ func (wps withPartSize) apply(tm *transferManagerConfig) {
tm.partSize = wps.partSize
}

// SkipIfExists returns a TransferManagerOption that will not download files
// that already exist in the local directory.
//
// By default, if a file already exists the operation will abort and return an error.
func SkipIfExists() Option {
return &skipIfExists{}
}

type skipIfExists struct {
}

func (sie skipIfExists) apply(tm *transferManagerConfig) {
tm.skipIfExists = true
}

type transferManagerConfig struct {
// Workers in thread pool; default numCPU/2 based on previous benchmarks?
numWorkers int
Expand All @@ -107,6 +122,10 @@ type transferManagerConfig struct {
// If true, callbacks are used instead of returning results synchronously
// in a slice at the end.
asynchronous bool

// If true, files that already exist in the local directory will not be
// downloaded.
skipIfExists bool
}

func defaultTransferManagerConfig() *transferManagerConfig {
Expand Down
Loading