Skip to content

Commit

Permalink
feat(storage/transfermanager): add DownloadDirectory
Browse files Browse the repository at this point in the history
Prototype.

Follow-up PRs:
- Cleanup on failure
- Skipifexists + option to fail if exists?
- strip prefix ? (strip prefix on local files)
- if empty object, create a directory instead ?
- possibly more integration tests
  • Loading branch information
BrennaEpp committed Jun 25, 2024
1 parent d809dbe commit 1fa820f
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 5 deletions.
131 changes: 131 additions & 0 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math"
"os"
"path/filepath"
"sync"
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2/callctx"
"google.golang.org/api/iterator"
)

// Downloader manages a set of parallelized downloads.
Expand Down Expand Up @@ -69,6 +73,82 @@ func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectIn
return nil
}

// DownloadDirectory queues the download of a set of objects to a local path.
// This will initiate the download but is non-blocking; call Downloader.Results
// or use the callback to process the result. DownloadDirectory is thread-safe
// and can be called simultaneously from different goroutines.
// DownloadDirectory will resolve any filters on the input and create the needed
// directory structure locally as the operations progress.
// Note: DownloadDirectory overwrites existing files in the directory.
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) error {
if d.config.asynchronous && input.Callback == nil {
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
}
if !d.config.asynchronous && input.Callback != nil {
return errors.New("transfermanager: input.Callback must be nil unless the WithCallbacks option is set")
}

select {
case <-d.doneReceivingInputs:
return errors.New("transfermanager: WaitAndClose called before DownloadDirectory")
default:
}

objectsToQueue := []DownloadObjectInput{}
query := &storage.Query{
Prefix: input.Prefix,
StartOffset: input.StartOffset,
EndOffset: input.EndOffset,
MatchGlob: input.MatchGlob,
}
if err := query.SetAttrSelection([]string{"Name"}); err != nil {
return fmt.Errorf("transfermanager: DownloadDirectory query.SetAttrSelection: %w", err)
}

it := d.client.Bucket(input.Bucket).Objects(ctx, query)

// TODO: Clean up any created directory structure on failure.

attrs, err := it.Next()
for err != iterator.Done {
if err != nil {
return fmt.Errorf("transfermanager: DownloadDirectory failed to list objects: %w", err)
}

// Add generation?

object := attrs.Name

// Make sure all directories in the object path exist.
objDirectory := filepath.Join(input.LocalDirectory, filepath.Dir(object))
err = os.MkdirAll(objDirectory, fs.ModeDir|fs.ModePerm)
if err != nil {
return fmt.Errorf("transfermanager: DownloadDirectory failed to make directory(%q): %w", objDirectory, err)
}

// Create file to download to.
filePath := filepath.Join(input.LocalDirectory, object)
f, fErr := os.Create(filePath)
if fErr != nil {
return fmt.Errorf("transfermanager: DownloadDirectory failed to create file(%q): %w", filePath, fErr)
}

objectsToQueue = append(objectsToQueue, DownloadObjectInput{
Bucket: input.Bucket,
Object: attrs.Name,
Destination: f,
Callback: input.Callback,
ctx: ctx,
directory: true,
})

attrs, err = it.Next()
}

d.addNewInputs(objectsToQueue)
return nil
}

// WaitAndClose waits for all outstanding downloads to complete and closes the
// Downloader. Adding new downloads after this has been called will cause an error.
//
Expand Down Expand Up @@ -143,7 +223,23 @@ func (d *Downloader) addInput(input *DownloadObjectInput) {
d.inputsMu.Unlock()
}

// addNewInputs adds a slice of inputs to the downloader.
// This should only be used to queue new objects.
func (d *Downloader) addNewInputs(inputs []DownloadObjectInput) {
d.downloadsInProgress.Add(len(inputs))

d.inputsMu.Lock()
d.inputs = append(d.inputs, inputs...)
d.inputsMu.Unlock()
}

func (d *Downloader) addResult(input *DownloadObjectInput, result *DownloadOutput) {
if input.directory {
f := input.Destination.(*os.File)
if err := f.Close(); err != nil && result.Err == nil {
result.Err = fmt.Errorf("closing file(%q): %w", f.Name(), err)
}
}
// TODO: check checksum if full object

if d.config.asynchronous {
Expand Down Expand Up @@ -330,6 +426,7 @@ type DownloadObjectInput struct {
cancelCtx context.CancelCauseFunc
shard int // the piece of the object range that should be downloaded
shardOutputs chan<- *DownloadOutput
directory bool // input was queued by calling DownloadDirectory
}

// downloadShard will read a specific object piece into in.Destination.
Expand Down Expand Up @@ -402,6 +499,40 @@ func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout tim
return
}

// DownloadObjectInput is the input for a directory to download.
type DownloadDirectoryInput struct {
// Required fields.
Bucket string
LocalDirectory string

// Prefix is the prefix filter to download objects whose names begin with this.
// Optional.
Prefix string

// StartOffset is used to filter results to objects whose names are
// lexicographically equal to or after startOffset. If endOffset is also set,
// the objects listed will have names between startOffset (inclusive) and
// endOffset (exclusive).
StartOffset string

// EndOffset is used to filter results to objects whose names are
// lexicographically before endOffset. If startOffset is also set, the
// objects listed will have names between startOffset (inclusive) and
// endOffset (exclusive). Optional.
EndOffset string

// MatchGlob is a glob pattern used to filter results (for example, foo*bar). See
// https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob
// for syntax details. Optional.
MatchGlob string

// Callback will run after each object in the directory as selected by the
// provided filters is finished downloading. It must be set if and only if
// the [WithCallbacks] option is set.
// WaitAndClose will wait for all callbacks to finish.
Callback func(*DownloadOutput)
}

// DownloadOutput provides output for a single object download, including all
// errors received while downloading object parts. If the download was successful,
// Attrs will be populated.
Expand Down
2 changes: 1 addition & 1 deletion storage/transfermanager/downloader_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestDownloadBufferParallel(t *testing.T) {
if err != nil {
t.Errorf("b.WriteAt: %v", err)
}
if n != 5 {
if n != step {
t.Errorf("expected to write 5 bytes, got %d", n)
}
wg.Done()
Expand Down
161 changes: 157 additions & 4 deletions storage/transfermanager/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package transfermanager

import (
"bytes"
"context"
crand "crypto/rand"
"errors"
Expand All @@ -24,6 +25,8 @@ import (
"io"
"log"
"math/rand"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -80,6 +83,151 @@ func TestMain(m *testing.M) {
}
}

// Downloads the entire contents of the bucket synchronously.
func TestIntegration_DownloadDirectory(t *testing.T) {
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, c *storage.Client, tb downloadTestBucket) {
localDir := t.TempDir()

d, err := NewDownloader(c, WithWorkers(8), WithPartSize(maxObjectSize/2))
if err != nil {
t.Fatalf("NewDownloader: %v", err)
}

if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{
Bucket: tb.bucket,
LocalDirectory: localDir,
}); err != nil {
t.Errorf("d.DownloadDirectory: %v", err)
}

results, err := d.WaitAndClose()
if err != nil {
t.Fatalf("d.WaitAndClose: %v", err)
}

if len(results) != len(tb.objects) {
t.Errorf("expected to receive %d results, got %d results", len(tb.objects), len(results))
}

for _, got := range results {
if got.Err != nil {
t.Errorf("result.Err: %v", got.Err)
continue
}

if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got {
t.Errorf("expected object size %d, got %d", want, got)
}

path := filepath.Join(localDir, got.Object)
f, err := os.Open(path)
if err != nil {
t.Errorf("os.Open(%q): %v", path, err)
}
defer f.Close()

b := bytes.NewBuffer(make([]byte, 0, got.Attrs.Size))
if _, err := io.Copy(b, f); err != nil {
t.Errorf("io.Copy: %v", err)
}

if wantCRC, gotCRC := tb.contentHashes[got.Object], crc32c(b.Bytes()); gotCRC != wantCRC {
t.Errorf("object(%q) at filepath(%q): content crc32c does not match; got: %v, expected: %v", got.Object, path, gotCRC, wantCRC)
}
}
})
}

// Downloads a subset of objects using callbacks.
func TestIntegration_DownloadDirectoryAsync(t *testing.T) {
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, c *storage.Client, tb downloadTestBucket) {
localDir := t.TempDir()

numCallbacks := 0
callbackMu := sync.Mutex{}

d, err := NewDownloader(c, WithWorkers(2), WithPartSize(maxObjectSize/2), WithCallbacks())
if err != nil {
t.Fatalf("NewDownloader: %v", err)
}

// In lex order we have:
// "dir/nested/again/obj1", -- excluded
// "dir/nested/objA", -- included
// "dir/file" -- excluded by MatchGlob
// "dir/objA", -- included
// "dir/objB", -- included
// "dir/objC", -- excluded
wantObjs := 3

if err := d.DownloadDirectory(ctx, &DownloadDirectoryInput{
Bucket: tb.bucket,
LocalDirectory: localDir,
Prefix: "dir",
StartOffset: "dir/nested/o",
EndOffset: "dir/objC",
MatchGlob: "**obj**",
Callback: func(got *DownloadOutput) {
callbackMu.Lock()
numCallbacks++
callbackMu.Unlock()

if got.Err != nil {
t.Errorf("result.Err: %v", got.Err)
}

if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got {
t.Errorf("expected object size %d, got %d", want, got)
}

path := filepath.Join(localDir, got.Object)
f, err := os.Open(path)
if err != nil {
t.Errorf("os.Open(%q): %v", path, err)
}
defer f.Close()

b := bytes.NewBuffer(make([]byte, 0, got.Attrs.Size))
if _, err := io.Copy(b, f); err != nil {
t.Errorf("io.Copy: %v", err)
}

if wantCRC, gotCRC := tb.contentHashes[got.Object], crc32c(b.Bytes()); gotCRC != wantCRC {
t.Errorf("object(%q) at filepath(%q): content crc32c does not match; got: %v, expected: %v", got.Object, path, gotCRC, wantCRC)
}
},
}); err != nil {
t.Errorf("d.DownloadDirectory: %v", err)
}

if _, err = d.WaitAndClose(); err != nil {
t.Fatalf("d.WaitAndClose: %v", err)
}

if numCallbacks != wantObjs {
t.Errorf("expected to receive %d results, got %d callbacks", (wantObjs), numCallbacks)
}

entries, err := os.ReadDir(filepath.Join(localDir, "dir"))
if err != nil {
t.Fatalf("os.ReadDir: %v", err)
}

if len(entries) != wantObjs {
t.Errorf("expected %d entries in dir, got %d", (wantObjs), len(entries))
}

for _, entry := range entries {
if entry.IsDir() && entry.Name() != "nested" {
t.Errorf("unexpected subdirectory %q in dir", entry.Name())
}
if !entry.IsDir() && entry.Name() != "objA" && entry.Name() != "objB" {
t.Errorf("unexpected file %q in dir", entry.Name())
}
}
})
}

func TestIntegration_DownloaderSynchronous(t *testing.T) {
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, c *storage.Client, tb downloadTestBucket) {
objects := tb.objects
Expand Down Expand Up @@ -700,12 +848,17 @@ func (tb *downloadTestBucket) Create(prefix string) error {

tb.bucket = prefix + uidSpace.New()
tb.objects = []string{
"!#$&'()*+,:;=,?@,[] and spaces",
"./obj",
"obj1",
"obj2",
"obj/with/slashes",
"obj/",
"./obj",
"!#$&'()*+,/:;=,?@,[] and spaces",
"dir/file",
"dir/objA",
"dir/objB",
"dir/objC",
"dir/nested/objA",
"dir/nested/again/obj1",
"anotherDir/objC",
}
tb.contentHashes = make(map[string]uint32)
tb.objectSizes = make(map[string]int64)
Expand Down

0 comments on commit 1fa820f

Please sign in to comment.