Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/transfermanager): add DownloadDirectory #10430

Merged
merged 9 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 198 additions & 14 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math"
"os"
"path/filepath"
"sync"
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2/callctx"
"google.golang.org/api/iterator"
)

// Downloader manages a set of parallelized downloads.
Expand All @@ -51,24 +55,96 @@ type Downloader struct {
// set on the ctx may time out before the download even starts. To set a timeout
// that starts with the download, use the [WithPerOpTimeout()] option.
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) error {
if d.config.asynchronous && input.Callback == nil {
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
if d.closed() {
return errors.New("transfermanager: Downloader used after WaitAndClose was called")
}
if !d.config.asynchronous && input.Callback != nil {
return errors.New("transfermanager: input.Callback must be nil unless the WithCallbacks option is set")
}

select {
case <-d.doneReceivingInputs:
return errors.New("transfermanager: WaitAndClose called before DownloadObject")
default:
if err := d.validateObjectInput(input); err != nil {
return err
}

input.ctx = ctx
d.addInput(input)
return nil
}

// DownloadDirectory queues the download of a set of objects to a local path.
// This will initiate the download but is non-blocking; call Downloader.Results
// or use the callback to process the result. DownloadDirectory is thread-safe
// and can be called simultaneously from different goroutines.
// DownloadDirectory will resolve any filters on the input and create the needed
// directory structure locally as the operations progress.
// Note: DownloadDirectory overwrites existing files in the directory.
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) error {
if d.closed() {
return errors.New("transfermanager: Downloader used after WaitAndClose was called")
}
if err := d.validateDirectoryInput(input); err != nil {
return err
}

query := &storage.Query{
Prefix: input.Prefix,
StartOffset: input.StartOffset,
EndOffset: input.EndOffset,
MatchGlob: input.MatchGlob,
}
if err := query.SetAttrSelection([]string{"Name"}); err != nil {
return fmt.Errorf("transfermanager: DownloadDirectory query.SetAttrSelection: %w", err)
}

// TODO: Clean up any created directory structure on failure.

objectsToQueue := []string{}
it := d.client.Bucket(input.Bucket).Objects(ctx, query)
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("transfermanager: DownloadDirectory failed to list objects: %w", err)
}

objectsToQueue = append(objectsToQueue, attrs.Name)
}

outs := make(chan DownloadOutput, len(objectsToQueue))
inputs := make([]DownloadObjectInput, 0, len(objectsToQueue))

for _, object := range objectsToQueue {
objDirectory := filepath.Join(input.LocalDirectory, filepath.Dir(object))
filePath := filepath.Join(input.LocalDirectory, object)

// Make sure all directories in the object path exist.
err := os.MkdirAll(objDirectory, fs.ModeDir|fs.ModePerm)
if err != nil {
return fmt.Errorf("transfermanager: DownloadDirectory failed to make directory(%q): %w", objDirectory, err)
}

// Create file to download to.
f, fErr := os.Create(filePath)
tritone marked this conversation as resolved.
Show resolved Hide resolved
if fErr != nil {
return fmt.Errorf("transfermanager: DownloadDirectory failed to create file(%q): %w", filePath, fErr)
}

inputs = append(inputs, DownloadObjectInput{
Bucket: input.Bucket,
Object: object,
Destination: f,
Callback: input.OnObjectDownload,
ctx: ctx,
directory: true,
directoryObjectOutputs: outs,
})
}

if d.config.asynchronous {
go input.gatherObjectOutputs(outs, len(inputs))
}
d.addNewInputs(inputs)
return nil
}

// WaitAndClose waits for all outstanding downloads to complete and closes the
// Downloader. Adding new downloads after this has been called will cause an error.
//
Expand Down Expand Up @@ -143,7 +219,27 @@ func (d *Downloader) addInput(input *DownloadObjectInput) {
d.inputsMu.Unlock()
}

// addNewInputs adds a slice of inputs to the downloader.
// This should only be used to queue new objects.
func (d *Downloader) addNewInputs(inputs []DownloadObjectInput) {
d.downloadsInProgress.Add(len(inputs))

d.inputsMu.Lock()
d.inputs = append(d.inputs, inputs...)
d.inputsMu.Unlock()
}

func (d *Downloader) addResult(input *DownloadObjectInput, result *DownloadOutput) {
if input.directory {
f := input.Destination.(*os.File)
if err := f.Close(); err != nil && result.Err == nil {
result.Err = fmt.Errorf("closing file(%q): %w", f.Name(), err)
}

if d.config.asynchronous {
input.directoryObjectOutputs <- *result
}
}
// TODO: check checksum if full object

if d.config.asynchronous {
Expand Down Expand Up @@ -260,6 +356,35 @@ func (d *Downloader) gatherShards(in *DownloadObjectInput, outs <-chan *Download
d.addResult(in, shardOut)
}

func (d *Downloader) validateObjectInput(in *DownloadObjectInput) error {
if d.config.asynchronous && in.Callback == nil {
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
}
if !d.config.asynchronous && in.Callback != nil {
return errors.New("transfermanager: input.Callback must be nil unless the WithCallbacks option is set")
}
return nil
}

func (d *Downloader) validateDirectoryInput(in *DownloadDirectoryInput) error {
if d.config.asynchronous && in.Callback == nil {
return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set")
}
if !d.config.asynchronous && in.Callback != nil {
return errors.New("transfermanager: input.Callback must be nil unless the WithCallbacks option is set")
}
return nil
}

func (d *Downloader) closed() bool {
select {
case <-d.doneReceivingInputs:
return true
default:
return false
}
}

// NewDownloader creates a new Downloader to add operations to.
// Choice of transport, etc is configured on the client that's passed in.
// The returned Downloader can be shared across goroutines to initiate downloads.
Expand Down Expand Up @@ -326,10 +451,12 @@ type DownloadObjectInput struct {
// finish.
Callback func(*DownloadOutput)

ctx context.Context
cancelCtx context.CancelCauseFunc
shard int // the piece of the object range that should be downloaded
shardOutputs chan<- *DownloadOutput
ctx context.Context
cancelCtx context.CancelCauseFunc
shard int // the piece of the object range that should be downloaded
shardOutputs chan<- *DownloadOutput
directory bool // input was queued by calling DownloadDirectory
directoryObjectOutputs chan<- DownloadOutput
}

// downloadShard will read a specific object piece into in.Destination.
Expand Down Expand Up @@ -402,6 +529,63 @@ func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout tim
return
}

// DownloadDirectoryInput is the input for a directory to download.
type DownloadDirectoryInput struct {
// Bucket is the bucket in GCS to download from. Required.
Bucket string

// LocalDirectory specifies the directory to download the matched objects
// to. Relative paths are allowed. The directory structure and contents
// must not be modified while the download is in progress.
// The directory will be created if it does not already exist. Required.
LocalDirectory string
tritone marked this conversation as resolved.
Show resolved Hide resolved

// Prefix is the prefix filter to download objects whose names begin with this.
// Optional.
Prefix string

// StartOffset is used to filter results to objects whose names are
// lexicographically equal to or after startOffset. If endOffset is also
// set, the objects listed will have names between startOffset (inclusive)
// and endOffset (exclusive). Optional.
StartOffset string

// EndOffset is used to filter results to objects whose names are
// lexicographically before endOffset. If startOffset is also set, the
// objects listed will have names between startOffset (inclusive) and
// endOffset (exclusive). Optional.
EndOffset string

// MatchGlob is a glob pattern used to filter results (for example, foo*bar). See
// https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob
// for syntax details. Optional.
MatchGlob string

// Callback will run after all the objects in the directory as selected by
// the provided filters are finished downloading.
// It must be set if and only if the [WithCallbacks] option is set.
Callback func([]DownloadOutput)

// OnObjectDownload will run after every finished object download.
// It can only be set if the Downloader has the [WithCallbacks] option set.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could probably loosen this requirement? A user could want to run in sync mode and still use this to track/log progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

OnObjectDownload func(*DownloadOutput)
}

// gatherObjectOutputs receives from the given channel exactly numObjects times.
// It will call the callback once all object outputs are received.
// It does not do any verification on the outputs nor does it cancel other
// objects on error.
func (dirin *DownloadDirectoryInput) gatherObjectOutputs(gatherOuts <-chan DownloadOutput, numObjects int) {
outs := make([]DownloadOutput, 0, numObjects)
for i := 0; i < numObjects; i++ {
obj := <-gatherOuts
outs = append(outs, obj)
}

// All objects have been gathered; execute the callback.
dirin.Callback(outs)
}

// DownloadOutput provides output for a single object download, including all
// errors received while downloading object parts. If the download was successful,
// Attrs will be populated.
Expand Down
2 changes: 1 addition & 1 deletion storage/transfermanager/downloader_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestDownloadBufferParallel(t *testing.T) {
if err != nil {
t.Errorf("b.WriteAt: %v", err)
}
if n != 5 {
if n != step {
t.Errorf("expected to write 5 bytes, got %d", n)
}
wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion storage/transfermanager/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestWaitAndClose(t *testing.T) {
t.Fatalf("WaitAndClose: %v", err)
}

expectedErr := "transfermanager: WaitAndClose called before DownloadObject"
expectedErr := "transfermanager: Downloader used after WaitAndClose was called"
err = d.DownloadObject(context.Background(), &DownloadObjectInput{})
if err == nil {
t.Fatalf("d.DownloadObject err was nil, should be %q", expectedErr)
Expand Down
Loading
Loading