From 05816f9fafd3132c371da37f3a879bb9e8e7e604 Mon Sep 17 00:00:00 2001 From: Brenna N Epp Date: Mon, 24 Jun 2024 18:22:42 -0700 Subject: [PATCH] feat(storage/transfermanager): automatically shard downloads (#10379) This is missing a few components that should be added in follow ups: - checksums - transcoding test --- storage/transfermanager/download_buffer.go | 71 +++ storage/transfermanager/downloader.go | 310 +++++++++--- .../transfermanager/downloader_buffer_test.go | 147 ++++++ storage/transfermanager/downloader_test.go | 455 ++++++++++++++++++ storage/transfermanager/integration_test.go | 140 ++---- storage/transfermanager/option.go | 25 +- storage/transfermanager/option_test.go | 2 + 7 files changed, 987 insertions(+), 163 deletions(-) create mode 100644 storage/transfermanager/download_buffer.go create mode 100644 storage/transfermanager/downloader_buffer_test.go diff --git a/storage/transfermanager/download_buffer.go b/storage/transfermanager/download_buffer.go new file mode 100644 index 000000000000..0152dcbd7af0 --- /dev/null +++ b/storage/transfermanager/download_buffer.go @@ -0,0 +1,71 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfermanager + +import ( + "sync" +) + +// NewDownloadBuffer initializes a DownloadBuffer using buf as the underlying +// buffer. Preferred way to create a DownloadBuffer as it does not need to grow +// the buffer if len(buf) is larger than or equal to the object length or range +// being downloaded to. +func NewDownloadBuffer(buf []byte) *DownloadBuffer { + return &DownloadBuffer{bytes: buf} +} + +// DownloadBuffer satisfies the io.WriterAt interface, allowing you to use it as +// a buffer to download to when using [Downloader]. DownloadBuffer is thread-safe +// as long as the ranges being written to do not overlap. +type DownloadBuffer struct { + bytes []byte + mu sync.Mutex +} + +// WriteAt writes len(p) bytes from p to the underlying buffer at offset off, +// growing the buffer if needed. It returns the number of bytes written from p +// and any error encountered that caused the write to stop early. +// WriteAt is thread-safe as long as the ranges being written to do not overlap. +// The supplied slice p is not retained. +func (db *DownloadBuffer) WriteAt(p []byte, off int64) (n int, err error) { + requiredLength := int64(len(p)) + off + + // Our buffer isn't big enough, let's grow it. + if int64(cap(db.bytes)) < requiredLength { + expandedBuff := make([]byte, requiredLength) + + db.mu.Lock() + copy(expandedBuff, db.bytes) + db.bytes = expandedBuff + } else { + db.mu.Lock() + } + + // Buffer should now have the capacity to hold the new bytes, if it didn't + // before, so we can copy directly to it. + copy(db.bytes[off:], p) + db.mu.Unlock() + + return len(p), nil +} + +// Bytes returns the slice of bytes written to DownloadBuffer. The slice aliases +// the buffer content at least until the next buffer modification, so +// immediate changes to the slice will affect the result of future reads. +func (db *DownloadBuffer) Bytes() []byte { + db.mu.Lock() + defer db.mu.Unlock() + return db.bytes +} diff --git a/storage/transfermanager/downloader.go b/storage/transfermanager/downloader.go index 6c647c1cbf34..95db47cd03fc 100644 --- a/storage/transfermanager/downloader.go +++ b/storage/transfermanager/downloader.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "io" + "math" "sync" "time" @@ -28,17 +29,18 @@ import ( // Downloader manages a set of parallelized downloads. type Downloader struct { - client *storage.Client - config *transferManagerConfig - inputs []DownloadObjectInput - results []DownloadOutput - errors []error - inputsMu *sync.Mutex - resultsMu *sync.Mutex - errorsMu *sync.Mutex - work chan *DownloadObjectInput // Piece of work to be executed. - done chan bool // Indicates to finish up work; expecting no more inputs. - workers *sync.WaitGroup // Keeps track of the workers that are currently running. + client *storage.Client + config *transferManagerConfig + inputs []DownloadObjectInput + results []DownloadOutput + errors []error + inputsMu sync.Mutex + resultsMu sync.Mutex + errorsMu sync.Mutex + work chan *DownloadObjectInput // Piece of work to be executed. + doneReceivingInputs chan bool // Indicates to finish up work; expecting no more inputs. + workers *sync.WaitGroup // Keeps track of the workers that are currently running. + downloadsInProgress *sync.WaitGroup // Keeps track of how many objects have completed at least 1 shard, but are waiting on more } // DownloadObject queues the download of a single object. This will initiate the @@ -57,7 +59,7 @@ func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectIn } select { - case <-d.done: + case <-d.doneReceivingInputs: return errors.New("transfermanager: WaitAndClose called before DownloadObject") default: } @@ -74,20 +76,22 @@ func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectIn // all errors that were encountered by the Downloader when downloading objects. // These errors are also returned in the respective DownloadOutput for the // failing download. The results are not guaranteed to be in any order. -// Results will be empty if using the [WithCallbacks] option. +// Results will be empty if using the [WithCallbacks] option. WaitAndClose will +// wait for all callbacks to finish. func (d *Downloader) WaitAndClose() ([]DownloadOutput, error) { errMsg := "transfermanager: at least one error encountered downloading objects:" select { - case <-d.done: // this allows users to call WaitAndClose various times + case <-d.doneReceivingInputs: // this allows users to call WaitAndClose various times var err error if len(d.errors) > 0 { err = fmt.Errorf("%s\n%w", errMsg, errors.Join(d.errors...)) } return d.results, err default: - d.done <- true + d.downloadsInProgress.Wait() + d.doneReceivingInputs <- true d.workers.Wait() - close(d.done) + close(d.doneReceivingInputs) if len(d.errors) > 0 { return d.results, fmt.Errorf("%s\n%w", errMsg, errors.Join(d.errors...)) @@ -96,14 +100,14 @@ func (d *Downloader) WaitAndClose() ([]DownloadOutput, error) { } } -// sendInputsToWorkChan listens continuously to the inputs slice until d.done. +// sendInputsToWorkChan polls the inputs slice until d.done. // It will send all items in inputs to the d.work chan. // Once it receives from d.done, it drains the remaining items in the inputs // (sending them to d.work) and then closes the d.work chan. func (d *Downloader) sendInputsToWorkChan() { for { select { - case <-d.done: + case <-d.doneReceivingInputs: d.drainInput() close(d.work) return @@ -131,15 +135,30 @@ func (d *Downloader) drainInput() { } func (d *Downloader) addInput(input *DownloadObjectInput) { + if input.shard == 0 { + d.downloadsInProgress.Add(1) + } d.inputsMu.Lock() d.inputs = append(d.inputs, *input) d.inputsMu.Unlock() } -func (d *Downloader) addResult(result *DownloadOutput) { - d.resultsMu.Lock() - d.results = append(d.results, *result) - d.resultsMu.Unlock() +func (d *Downloader) addResult(input *DownloadObjectInput, result *DownloadOutput) { + // TODO: check checksum if full object + + if d.config.asynchronous { + input.Callback(result) + } else { + d.resultsMu.Lock() + d.results = append(d.results, *result) + d.resultsMu.Unlock() + } + + // Track all errors that occurred. + if result.Err != nil { + d.error(fmt.Errorf("downloading %q from bucket %q: %w", input.Object, input.Bucket, result.Err)) + } + d.downloadsInProgress.Done() } func (d *Downloader) error(err error) { @@ -156,46 +175,108 @@ func (d *Downloader) downloadWorker() { break // no more work; exit } - // TODO: break down the input into smaller pieces if necessary; maybe as follows: - // Only request partSize data to begin with. If no error and we haven't finished - // reading the object, enqueue the remaining pieces of work and mark in the - // out var the amount of shards to wait for. - out := input.downloadShard(d.client, d.config.perOperationTimeout) - - // Keep track of any error that occurred. - if out.Err != nil { - d.error(fmt.Errorf("downloading %q from bucket %q: %w", input.Object, input.Bucket, out.Err)) - } - - // Either execute the callback, or append to results. - if d.config.asynchronous { - input.Callback(out) + out := input.downloadShard(d.client, d.config.perOperationTimeout, d.config.partSize) + + if input.shard == 0 { + if out.Err != nil { + // Don't queue more shards if the first failed. + d.addResult(input, out) + } else { + numShards := numShards(out.Attrs, input.Range, d.config.partSize) + + if numShards <= 1 { + // Download completed with a single shard. + d.addResult(input, out) + } else { + // Queue more shards. + outs := d.queueShards(input, out.Attrs.Generation, numShards) + // Start a goroutine that gathers shards sent to the output + // channel and adds the result once it has received all shards. + go d.gatherShards(input, outs, numShards) + } + } } else { - d.addResult(out) + // If this isn't the first shard, send to the output channel specific to the object. + // This should never block since the channel is buffered to exactly the number of shards. + input.shardOutputs <- out } } d.workers.Done() } +// queueShards queues all subsequent shards of an object after the first. +// The results should be forwarded to the returned channel. +func (d *Downloader) queueShards(in *DownloadObjectInput, gen int64, shards int) <-chan *DownloadOutput { + // Create a channel that can be received from to compile the + // shard outputs. + outs := make(chan *DownloadOutput, shards) + in.shardOutputs = outs + + // Create a shared context that we can cancel if a shard fails. + in.ctx, in.cancelCtx = context.WithCancelCause(in.ctx) + + // Add generation in case the object changes between calls. + in.Generation = &gen + + // Queue remaining shards. + for i := 1; i < shards; i++ { + newShard := in // this is fine, since the input should only differ in the shard num + newShard.shard = i + d.addInput(newShard) + } + + return outs +} + +var errCancelAllShards = errors.New("cancelled because another shard failed") + +// gatherShards receives from the given channel exactly (shards-1) times (since +// the first shard should already be complete). +// It will add the result to the Downloader once it has received all shards. +// gatherShards cancels remaining shards if any shard errored. +// It does not do any checking to verify that shards are for the same object. +func (d *Downloader) gatherShards(in *DownloadObjectInput, outs <-chan *DownloadOutput, shards int) { + errs := []error{} + var shardOut *DownloadOutput + for i := 1; i < shards; i++ { + // Add monitoring here? This could hang if any individual piece does. + shardOut = <-outs + + // We can ignore errors that resulted from a previous error. + // Note that we may still get some cancel errors if they + // occurred while the operation was already in progress. + if shardOut.Err != nil && !(errors.Is(shardOut.Err, context.Canceled) && errors.Is(context.Cause(in.ctx), errCancelAllShards)) { + // If a shard errored, track the error and cancel the shared ctx. + errs = append(errs, shardOut.Err) + in.cancelCtx(errCancelAllShards) + } + } + + // All pieces gathered; return output. Any shard output will do. + shardOut.Range = in.Range + if len(errs) != 0 { + shardOut.Err = fmt.Errorf("download shard errors:\n%w", errors.Join(errs...)) + } + d.addResult(in, shardOut) +} + // NewDownloader creates a new Downloader to add operations to. // Choice of transport, etc is configured on the client that's passed in. // The returned Downloader can be shared across goroutines to initiate downloads. func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error) { d := &Downloader{ - client: c, - config: initTransferManagerConfig(opts...), - inputs: []DownloadObjectInput{}, - results: []DownloadOutput{}, - errors: []error{}, - inputsMu: &sync.Mutex{}, - resultsMu: &sync.Mutex{}, - errorsMu: &sync.Mutex{}, - work: make(chan *DownloadObjectInput), - done: make(chan bool), - workers: &sync.WaitGroup{}, - } - - // Start a listener to send work through. + client: c, + config: initTransferManagerConfig(opts...), + inputs: []DownloadObjectInput{}, + results: []DownloadOutput{}, + errors: []error{}, + work: make(chan *DownloadObjectInput), + doneReceivingInputs: make(chan bool), + workers: &sync.WaitGroup{}, + downloadsInProgress: &sync.WaitGroup{}, + } + + // Start a polling routine to send work through. go d.sendInputsToWorkChan() // Start workers. @@ -208,10 +289,15 @@ func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error) { } // DownloadRange specifies the object range. +// If the object's metadata property "Content-Encoding" is set to "gzip" or +// satisfies decompressive transcoding per https://cloud.google.com/storage/docs/transcoding +// that file will be served back whole, regardless of the requested range as +// Google Cloud Storage dictates. type DownloadRange struct { // Offset is the starting offset (inclusive) from with the object is read. - // If offset is negative, the object is read abs(offset) bytes from the end, - // and length must also be negative to indicate all remaining bytes will be read. + // If offset is negative, the object is not sharded and is read by a single + // worker abs(offset) bytes from the end, and length must also be negative + // to indicate all remaining bytes will be read. Offset int64 // Length is the number of bytes to read. // If length is negative or larger than the object size, the object is read @@ -235,15 +321,20 @@ type DownloadObjectInput struct { // Callback will be run once the object is finished downloading. It must be // set if and only if the [WithCallbacks] option is set; otherwise, it must // not be set. + // A worker will be used to execute the callback; therefore, it should not + // be a long-running function. WaitAndClose will wait for all callbacks to + // finish. Callback func(*DownloadOutput) - ctx context.Context + ctx context.Context + cancelCtx context.CancelCauseFunc + shard int // the piece of the object range that should be downloaded + shardOutputs chan<- *DownloadOutput } -// downloadShard will read a specific object into in.Destination. +// downloadShard will read a specific object piece into in.Destination. // If timeout is less than 0, no timeout is set. -// TODO: download a single shard instead of the entire object. -func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout time.Duration) (out *DownloadOutput) { +func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout time.Duration, partSize int64) (out *DownloadOutput) { out = &DownloadOutput{Bucket: in.Bucket, Object: in.Object, Range: in.Range} // Set timeout. @@ -254,8 +345,13 @@ func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout tim ctx = c } - // TODO: set to downloadSharded when sharded - ctx = setUsageMetricHeader(ctx, downloadMany) + // The first shard will be sent as download many, since we do not know yet + // if it will be sharded. + method := downloadMany + if in.shard != 0 { + method = downloadSharded + } + ctx = setUsageMetricHeader(ctx, method) // Set options on the object. o := client.Bucket(in.Bucket).Object(in.Object) @@ -270,22 +366,27 @@ func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout tim o = o.Key(in.EncryptionKey) } - var offset, length int64 = 0, -1 // get the entire object by default - - if in.Range != nil { - offset, length = in.Range.Offset, in.Range.Length - } + objRange := shardRange(in.Range, partSize, in.shard) // Read. - r, err := o.NewRangeReader(ctx, offset, length) + r, err := o.NewRangeReader(ctx, objRange.Offset, objRange.Length) if err != nil { out.Err = err return } - // TODO: write at a specific offset. - off := io.NewOffsetWriter(in.Destination, 0) - _, err = io.Copy(off, r) + // Determine the offset this shard should write to. + offset := objRange.Offset + if in.Range != nil { + if in.Range.Offset > 0 { + offset = objRange.Offset - in.Range.Offset + } else { + offset = 0 + } + } + + w := io.NewOffsetWriter(in.Destination, offset) + _, err = io.Copy(w, r) if err != nil { out.Err = err r.Close() @@ -312,6 +413,79 @@ type DownloadOutput struct { Attrs *storage.ReaderObjectAttrs // attributes of downloaded object, if successful } +// TODO: use built-in after go < 1.21 is dropped. +func min(a, b int64) int64 { + if a < b { + return a + } + return b +} + +// numShards calculates how many shards the given range should be divided into +// given the part size. +func numShards(attrs *storage.ReaderObjectAttrs, r *DownloadRange, partSize int64) int { + objectSize := attrs.Size + + // Transcoded objects do not support ranged reads. + if attrs.ContentEncoding == "gzip" { + return 1 + } + + if r == nil { + // Divide entire object into shards. + return int(math.Ceil(float64(objectSize) / float64(partSize))) + } + // Negative offset reads the whole object in one go. + if r.Offset < 0 { + return 1 + } + + firstByte := r.Offset + + // Read to the end of the object, or, if smaller, read only the amount + // of bytes requested. + lastByte := min(objectSize-1, r.Offset+r.Length-1) + + // If length is negative, read to the end. + if r.Length < 0 { + lastByte = objectSize - 1 + } + + totalBytes := lastByte - firstByte + + return int(totalBytes/partSize) + 1 +} + +// shardRange calculates the range this shard corresponds to given the +// requested range. Expects the shard to be valid given the range. +func shardRange(r *DownloadRange, partSize int64, shard int) DownloadRange { + if r == nil { + // Entire object + return DownloadRange{ + Offset: int64(shard) * partSize, + Length: partSize, + } + } + + // Negative offset reads the whole object in one go. + if r.Offset < 0 { + return *r + } + + shardOffset := int64(shard)*partSize + r.Offset + shardLength := partSize // it's ok if we go over the object size + + // If requested bytes end before partSize, length should be smaller. + if shardOffset+shardLength > r.Offset+r.Length { + shardLength = r.Offset + r.Length - shardOffset + } + + return DownloadRange{ + Offset: shardOffset, + Length: shardLength, + } +} + const ( xGoogHeaderKey = "x-goog-api-client" usageMetricKey = "gccl-gcs-cmd" diff --git a/storage/transfermanager/downloader_buffer_test.go b/storage/transfermanager/downloader_buffer_test.go new file mode 100644 index 000000000000..ab99b465a19b --- /dev/null +++ b/storage/transfermanager/downloader_buffer_test.go @@ -0,0 +1,147 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transfermanager + +import ( + "bytes" + crand "crypto/rand" + "io" + "math/rand" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +func TestDownloadBuffer(t *testing.T) { + t.Parallel() + + // Create without an underlying buffer. + b := &DownloadBuffer{} + + // Start at an offset. + firstWrite := []byte("the best of times") + var firstWriteOff int64 = 7 + + n, err := b.WriteAt(firstWrite, firstWriteOff) + if err != nil { + t.Fatalf("DonwloadBuffer.WriteAt(%d): %v", firstWriteOff, err) + } + if exp := len(firstWrite); exp != n { + t.Fatalf("expected to write %d, got %d", exp, n) + } + + if want, got := firstWrite, b.Bytes()[firstWriteOff:]; cmp.Diff(got, want) != "" { + t.Errorf("got: %q\nwant: %q", b.Bytes(), want) + } + + // Write the beginning. + secondWrite := []byte("It was") + + n, err = b.WriteAt(secondWrite, 0) + if err != nil { + t.Fatalf("DonwloadBuffer.WriteAt(0): %v", err) + } + if exp := len(secondWrite); exp != n { + t.Fatalf("expected to write %d, got %d", exp, n) + } + + if want, got := secondWrite, b.Bytes()[:len(secondWrite)]; cmp.Diff(got, want) != "" { + t.Errorf("got: %q\nwant: %q", b.Bytes(), want) + } + + // The ending should have stayed the same. + if want, got := firstWrite, b.Bytes()[firstWriteOff:]; cmp.Diff(got, want) != "" { + t.Errorf("got: %q\nwant: %q", b.Bytes(), want) + } + + // Test write in the middle. + n, err = b.WriteAt([]byte{' '}, int64(len(secondWrite))) + if err != nil { + t.Fatalf("DonwloadBuffer.WriteAt(%d): %v", len(secondWrite), err) + } + if exp := 1; exp != n { + t.Fatalf("expected to write %d, got %d", exp, n) + } + + // Check the full string. + if want, got := []byte("It was the best of times"), b.Bytes(); cmp.Diff(got, want) != "" { + t.Errorf("got: %q\nwant: %q", b.Bytes(), want) + } + + // Test given underlying buffer. + b = NewDownloadBuffer(b.Bytes()) + if want, got := []byte("It was the best of times"), b.Bytes(); cmp.Diff(got, want) != "" { + t.Errorf("got: %q\nwant: %q", b.Bytes(), want) + } + + // Test overwrite. + overwrite := []byte("worst of times") + var off int64 = 11 + + n, err = b.WriteAt(overwrite, off) + if err != nil { + t.Fatalf("DonwloadBuffer.WriteAt(%d): %v", off, err) + } + if exp := len(overwrite); exp != n { + t.Fatalf("expected to write %d, got %d", exp, n) + } + + if want, got := []byte("It was the worst of times"), b.Bytes(); cmp.Diff(got, want) != "" { + t.Errorf("got: %q\nwant: %q", b.Bytes(), want) + } +} + +func TestDownloadBufferParallel(t *testing.T) { + t.Parallel() + + // Set up buffer. + size := 50 + buf := &bytes.Buffer{} + if _, err := io.CopyN(buf, crand.Reader, int64(size)); err != nil { + t.Fatalf("io.CopyN: %v", err) + } + want := buf.Bytes() + + b := NewDownloadBuffer(make([]byte, size)) + + // Write using 10 parallel goroutines. + wg := sync.WaitGroup{} + step := size / 10 + + for i := 0; i < size; i = i + step { + // Schedule a routine. + i := i + wg.Add(1) + go func() { + time.Sleep(time.Microsecond * time.Duration(rand.Intn(500))) + n, err := b.WriteAt(want[i:i+step], int64(i)) + if err != nil { + t.Errorf("b.WriteAt: %v", err) + } + if n != 5 { + t.Errorf("expected to write 5 bytes, got %d", n) + } + wg.Done() + }() + } + + wg.Wait() + + if diff := cmp.Diff(b.Bytes(), want); diff != "" { + t.Errorf("diff got(-) vs. want(+): %v", diff) + } +} diff --git a/storage/transfermanager/downloader_test.go b/storage/transfermanager/downloader_test.go index 6521ae534249..672c59532304 100644 --- a/storage/transfermanager/downloader_test.go +++ b/storage/transfermanager/downloader_test.go @@ -16,11 +16,16 @@ package transfermanager import ( "context" + "errors" "strings" + "sync" "testing" + + "cloud.google.com/go/storage" ) func TestWaitAndClose(t *testing.T) { + t.Parallel() d, err := NewDownloader(nil) if err != nil { t.Fatalf("NewDownloader: %v", err) @@ -39,3 +44,453 @@ func TestWaitAndClose(t *testing.T) { t.Errorf("expected err %q, got: %v", expectedErr, err.Error()) } } + +func TestNumShards(t *testing.T) { + t.Parallel() + for _, test := range []struct { + desc string + objRange *DownloadRange + objSize int64 + partSize int64 + transcoded bool + want int + }{ + { + desc: "nil range", + objSize: 100, + partSize: 1000, + want: 1, + }, + { + desc: "nil - object equal to partSize", + objSize: 100, + partSize: 100, + want: 1, + }, + { + desc: "nil - object larger than partSize", + objSize: 100, + partSize: 10, + want: 10, + }, + { + desc: "full object smaller than partSize", + objRange: &DownloadRange{ + Length: 100, + }, + objSize: 100, + partSize: 101, + want: 1, + }, + { + desc: "full object equal to partSize", + objRange: &DownloadRange{ + Length: 100, + }, + objSize: 100, + partSize: 100, + want: 1, + }, + { + desc: "full object larger than partSize", + objRange: &DownloadRange{ + Length: 100, + }, + objSize: 100, + partSize: 99, + want: 2, + }, + { + desc: "partial object smaller than partSize", + objRange: &DownloadRange{ + Length: 50, + }, + objSize: 100, + partSize: 1000, + want: 1, + }, + { + desc: "full object larger than partSize", + objRange: &DownloadRange{ + Length: 5000, + }, + objSize: 5001, + partSize: 1000, + want: 5, + }, + { + desc: "full object larger than partSize - off by one check", + objRange: &DownloadRange{ + Length: 5001, + }, + objSize: 5001, + partSize: 1000, + want: 6, + }, + { + desc: "length larger than object size", + objRange: &DownloadRange{ + Length: 17000, + }, + objSize: 5000, + partSize: 1000, + want: 5, + }, + { + desc: "negative length", + objRange: &DownloadRange{ + Length: -1, + }, + objSize: 5000, + partSize: 1000, + want: 5, + }, + { + desc: "offset object smaller than partSize", + objRange: &DownloadRange{ + Offset: 50, + Length: 99, + }, + objSize: 100, + partSize: 1000, + want: 1, + }, + { + desc: "offset object larger than partSize", + objRange: &DownloadRange{ + Offset: 1000, + Length: 1999, + }, + objSize: 2000, + partSize: 100, + want: 10, + }, + { + desc: "offset object larger than partSize - length larger than objSize", + objRange: &DownloadRange{ + Offset: 1000, + Length: 10000, + }, + objSize: 2001, + partSize: 100, + want: 11, + }, + { + desc: "offset object larger than partSize - length larger than objSize", + objRange: &DownloadRange{ + Offset: 1000, + Length: 10000, + }, + objSize: 2001, + partSize: 100, + want: 11, + }, + { + desc: "negative offset smaller than partSize", + objRange: &DownloadRange{ + Offset: -5, + Length: -1, + }, + objSize: 1024 * 1024 * 1024 * 10, + partSize: 100, + want: 1, + }, + { + desc: "negative offset larger than partSize", + objRange: &DownloadRange{ + Offset: -1000, + Length: -1, + }, + objSize: 2000, + partSize: 100, + want: 1, + }, + { + desc: "transcoded", + objSize: 2000, + partSize: 100, + transcoded: true, + want: 1, + }, + } { + t.Run(test.desc, func(t *testing.T) { + attrs := &storage.ReaderObjectAttrs{ + Size: test.objSize, + } + + if test.transcoded { + attrs.ContentEncoding = "gzip" + } + + got := numShards(attrs, test.objRange, test.partSize) + + if got != test.want { + t.Errorf("numShards incorrect; expect object to be divided into %d shards, got %d", test.want, got) + } + }) + } +} + +func TestCalculateRange(t *testing.T) { + t.Parallel() + for _, test := range []struct { + desc string + objRange *DownloadRange + partSize int64 + shard int + want DownloadRange + }{ + { + desc: "nil range - first shard", + partSize: 1000, + shard: 0, + want: DownloadRange{ + Length: 1000, + }, + }, + { + desc: "nil range", + partSize: 1001, + shard: 3, + want: DownloadRange{ + Offset: 3003, + Length: 1001, + }, + }, + { + desc: "first shard length smaller than partSize", + objRange: &DownloadRange{ + Length: 99, + }, + partSize: 1000, + shard: 0, + want: DownloadRange{ + Length: 99, + }, + }, + { + desc: "second shard", + objRange: &DownloadRange{ + Length: 4999, + }, + partSize: 1000, + shard: 1, + want: DownloadRange{ + Offset: 1000, + Length: 1000, + }, + }, + { + desc: "last shard", + objRange: &DownloadRange{ + Length: 5000, + }, + partSize: 1000, + shard: 4, + want: DownloadRange{ + Offset: 4000, + Length: 1000, + }, + }, + { + desc: "last shard", + objRange: &DownloadRange{ + Length: 5001, + }, + partSize: 1000, + shard: 5, + want: DownloadRange{ + Offset: 5000, + Length: 1, + }, + }, + { + desc: "single shard with offset", + objRange: &DownloadRange{ + Offset: 10, + Length: 99, + }, + partSize: 1000, + shard: 0, + want: DownloadRange{ + Offset: 10, + Length: 99, + }, + }, + { + desc: "second shard with offset", + objRange: &DownloadRange{ + Offset: 100, + Length: 500, + }, + partSize: 100, + shard: 1, + want: DownloadRange{ + Offset: 200, + Length: 100, + }, + }, + { + desc: "off by one", + objRange: &DownloadRange{ + Offset: 101, + Length: 500, + }, + partSize: 100, + shard: 2, + want: DownloadRange{ + Offset: 301, + Length: 100, + }, + }, + { + desc: "last shard", + objRange: &DownloadRange{ + Offset: 1, + Length: 5000, + }, + partSize: 1000, + shard: 4, + want: DownloadRange{ + Offset: 4001, + Length: 1000, + }, + }, + { + desc: "large object", + objRange: &DownloadRange{ + Offset: 1024 * 1024 * 1024 * 1024 / 2, + Length: 1024 * 1024 * 1024 * 1024, // 1TiB + }, + partSize: 1024 * 1024 * 1024, // 1 Gib + shard: 1024/2 - 1, // last shard + want: DownloadRange{ + Offset: 1024*1024*1024*1024 - 1024*1024*1024, + Length: 1024 * 1024 * 1024, + }, + }, + } { + t.Run(test.desc, func(t *testing.T) { + got := shardRange(test.objRange, test.partSize, test.shard) + + if got != test.want { + t.Errorf("want %v got %v", test.want, got) + } + }) + } +} + +// This tests that gather shards works as expected and cancels other shards +// without error after it encounters an error. +func TestGatherShards(t *testing.T) { + ctx, cancelCtx := context.WithCancelCause(context.Background()) + + // Start a downloader. + d, err := NewDownloader(nil, WithWorkers(2), WithCallbacks()) + if err != nil { + t.Fatalf("NewDownloader: %v", err) + } + + // Test that gatherShards finishes without error. + object := "obj1" + shards := 4 + downloadRange := &DownloadRange{ + Offset: 20, + Length: 120, + } + outChan := make(chan *DownloadOutput, shards) + outs := []*DownloadOutput{ + {Object: object, Range: &DownloadRange{Offset: 50, Length: 30}}, + {Object: object, Range: &DownloadRange{Offset: 80, Length: 30}}, + {Object: object, Range: &DownloadRange{Offset: 110, Length: 30}}, + } + + in := &DownloadObjectInput{ + Callback: func(o *DownloadOutput) { + if o.Err != nil { + t.Errorf("unexpected error in DownloadOutput: %v", o.Err) + } + if o.Range != downloadRange { + t.Errorf("mismatching download range, got: %v, want: %v", o.Range, downloadRange) + } + if o.Object != object { + t.Errorf("mismatching object names, got: %v, want: %v", o.Object, object) + } + }, + ctx: ctx, + cancelCtx: cancelCtx, + shardOutputs: outChan, + Range: downloadRange, + } + + var wg sync.WaitGroup + wg.Add(1) + d.downloadsInProgress.Add(1) + + go func() { + d.gatherShards(in, outChan, shards) + wg.Done() + }() + + for _, o := range outs { + outChan <- o + } + + wg.Wait() + + // Test that an error will cancel remaining pieces correctly. + shardErr := errors.New("some error") + + in.Callback = func(o *DownloadOutput) { + // Error returned should wrap the original error. + if !errors.Is(o.Err, shardErr) { + t.Errorf("error in DownloadOutput should wrap error %q; intead got: %v", shardErr, o.Err) + } + // Error returned should not wrap nor contain the sentinel error. + if errors.Is(o.Err, errCancelAllShards) || strings.Contains(o.Err.Error(), errCancelAllShards.Error()) { + t.Errorf("error in DownloadOutput should not contain error %q; got: %v", errCancelAllShards, o.Err) + } + if o.Range != downloadRange { + t.Errorf("mismatching download range, got: %v, want: %v", o.Range, downloadRange) + } + if o.Object != object { + t.Errorf("mismatching object names, got: %v, want: %v", o.Object, object) + } + } + + wg.Add(1) + d.downloadsInProgress.Add(1) + + go func() { + d.gatherShards(in, outChan, shards) + wg.Done() + }() + + // Send a successfull shard, an errored shard, and then a cancelled shard. + outs[1].Err = shardErr + outs[2].Err = context.Canceled + for _, o := range outs { + outChan <- o + } + + // Check that the context was cancelled with the sentinel error. + _, ok := <-in.ctx.Done() + if ok { + t.Error("context was not cancelled") + } + + if ctxErr := context.Cause(in.ctx); !errors.Is(ctxErr, errCancelAllShards) { + t.Errorf("context.Cause: error should wrap %q; intead got: %v", errCancelAllShards, ctxErr) + } + + wg.Wait() + + // Check that the overall error returned also wraps only the proper error. + _, err = d.WaitAndClose() + if !errors.Is(err, shardErr) { + t.Errorf("error in DownloadOutput should wrap error %q; intead got: %v", shardErr, err) + } + if errors.Is(err, errCancelAllShards) || strings.Contains(err.Error(), errCancelAllShards.Error()) { + t.Errorf("error in DownloadOutput should not contain error %q; got: %v", errCancelAllShards, err) + } +} diff --git a/storage/transfermanager/integration_test.go b/storage/transfermanager/integration_test.go index c4a5cb45b5c8..a65119176e15 100644 --- a/storage/transfermanager/integration_test.go +++ b/storage/transfermanager/integration_test.go @@ -15,7 +15,6 @@ package transfermanager import ( - "bytes" "context" crand "crypto/rand" "errors" @@ -43,10 +42,11 @@ const ( testPrefix = "go-integration-test-tm" grpcTestPrefix = "golang-grpc-test-tm" bucketExpiryAge = 24 * time.Hour + maxObjectSize = 1024 * 1024 ) var ( - uidSpace = uid.NewSpace("", &uid.Options{Short: true}) + uidSpace = uid.NewSpace("", nil) // These buckets are shared amongst download tests. They are created, // populated with objects and cleaned up in TestMain. httpTestBucket = downloadTestBucket{} @@ -86,15 +86,15 @@ func TestIntegration_DownloaderSynchronous(t *testing.T) { // Start a downloader. Give it a smaller amount of workers than objects, // to make sure we aren't blocking anywhere. - d, err := NewDownloader(c, WithWorkers(2)) + d, err := NewDownloader(c, WithWorkers(2), WithPartSize(maxObjectSize/2)) if err != nil { t.Fatalf("NewDownloader: %v", err) } // Download several objects. - writers := make([]*testWriter, len(objects)) + writers := make([]*DownloadBuffer, len(objects)) objToWriter := make(map[string]int) // so we can map the resulting content back to the correct object for i, obj := range objects { - writers[i] = &testWriter{} + writers[i] = NewDownloadBuffer(make([]byte, tb.objectSizes[obj])) objToWriter[obj] = i if err := d.DownloadObject(ctx, &DownloadObjectInput{ Bucket: tb.bucket, @@ -110,14 +110,6 @@ func TestIntegration_DownloaderSynchronous(t *testing.T) { t.Fatalf("d.WaitAndClose: %v", err) } - // Close the writers so we can check the contents. This should be fine, - // since the downloads should all be complete after WaitAndClose. - for i := range objects { - if err := writers[i].Close(); err != nil { - t.Fatalf("testWriter.Close: %v", err) - } - } - // Check the results. for _, got := range results { writerIdx := objToWriter[got.Object] @@ -127,12 +119,12 @@ func TestIntegration_DownloaderSynchronous(t *testing.T) { continue } - if want, got := tb.contentHashes[got.Object], writers[writerIdx].crc32c; got != want { + if want, got := tb.contentHashes[got.Object], crc32c(writers[writerIdx].Bytes()); got != want { t.Fatalf("content crc32c does not match; got: %v, expected: %v", got, want) } - if got.Attrs.Size != tb.objectSize { - t.Errorf("expected object size %d, got %d", tb.objectSize, got.Attrs.Size) + if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got { + t.Errorf("expected object size %d, got %d", want, got) } } @@ -156,16 +148,16 @@ func TestIntegration_DownloaderErrorSync(t *testing.T) { objects = append([]string{nonexistentObject}, objects...) // Start a downloader. - d, err := NewDownloader(c, WithWorkers(2)) + d, err := NewDownloader(c, WithWorkers(2), WithPartSize(maxObjectSize/2)) if err != nil { t.Fatalf("NewDownloader: %v", err) } // Download objects. - writers := make([]*testWriter, len(objects)) + writers := make([]*DownloadBuffer, len(objects)) objToWriter := make(map[string]int) // so we can map the resulting content back to the correct object for i, obj := range objects { - writers[i] = &testWriter{} + writers[i] = NewDownloadBuffer(make([]byte, tb.objectSizes[obj])) objToWriter[obj] = i if err := d.DownloadObject(ctx, &DownloadObjectInput{ Bucket: tb.bucket, @@ -182,14 +174,6 @@ func TestIntegration_DownloaderErrorSync(t *testing.T) { t.Error("d.WaitAndClose should return an error, instead got nil") } - // Close the writers so we can check the contents. This should be fine, - // since the downloads should all be complete after WaitAndClose. - for i := range objects { - if err := writers[i].Close(); err != nil { - t.Fatalf("testWriter.Close: %v", err) - } - } - // Check the results. for _, got := range results { writerIdx := objToWriter[got.Object] @@ -208,12 +192,12 @@ func TestIntegration_DownloaderErrorSync(t *testing.T) { continue } - if want, got := tb.contentHashes[got.Object], writers[writerIdx].crc32c; got != want { + if want, got := tb.contentHashes[got.Object], crc32c(writers[writerIdx].Bytes()); got != want { t.Fatalf("content crc32c does not match; got: %v, expected: %v", got, want) } - if got.Attrs.Size != tb.objectSize { - t.Errorf("expected object size %d, got %d", tb.objectSize, got.Attrs.Size) + if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got { + t.Errorf("expected object size %d, got %d", want, got) } } @@ -227,7 +211,7 @@ func TestIntegration_DownloaderAsynchronous(t *testing.T) { multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, c *storage.Client, tb downloadTestBucket) { objects := tb.objects - d, err := NewDownloader(c, WithWorkers(2), WithCallbacks()) + d, err := NewDownloader(c, WithWorkers(2), WithCallbacks(), WithPartSize(maxObjectSize/2)) if err != nil { t.Fatalf("NewDownloader: %v", err) } @@ -236,10 +220,10 @@ func TestIntegration_DownloaderAsynchronous(t *testing.T) { callbackMu := sync.Mutex{} // Download objects. - writers := make([]*testWriter, len(objects)) + writers := make([]*DownloadBuffer, len(objects)) for i, obj := range objects { i := i - writers[i] = &testWriter{} + writers[i] = NewDownloadBuffer(make([]byte, tb.objectSizes[obj])) if err := d.DownloadObject(ctx, &DownloadObjectInput{ Bucket: tb.bucket, Object: obj, @@ -253,17 +237,12 @@ func TestIntegration_DownloaderAsynchronous(t *testing.T) { t.Errorf("result.Err: %v", got.Err) } - // Close the writer so we can check the contents. - if err := writers[i].Close(); err != nil { - t.Fatalf("testWriter.Close: %v", err) - } - - if want, got := tb.contentHashes[got.Object], writers[i].crc32c; got != want { - t.Fatalf("content crc32c does not match; got: %v, expected: %v", got, want) + if want, got := tb.contentHashes[got.Object], crc32c(writers[i].Bytes()); got != want { + t.Errorf("content crc32c does not match; got: %v, expected: %v", got, want) } - if got.Attrs.Size != tb.objectSize { - t.Errorf("expected object size %d, got %d", tb.objectSize, got.Attrs.Size) + if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got { + t.Errorf("expected object size %d, got %d", want, got) } }, }); err != nil { @@ -297,7 +276,7 @@ func TestIntegration_DownloaderErrorAsync(t *testing.T) { if err := d.DownloadObject(ctx, &DownloadObjectInput{ Bucket: tb.bucket, Object: tb.objects[0], - Destination: &testWriter{}, + Destination: &DownloadBuffer{}, Conditions: &storage.Conditions{ GenerationMatch: -10, }, @@ -318,10 +297,10 @@ func TestIntegration_DownloaderErrorAsync(t *testing.T) { } // Download existing objects. - writers := make([]*testWriter, len(tb.objects)) + writers := make([]*DownloadBuffer, len(tb.objects)) for i, obj := range tb.objects { i := i - writers[i] = &testWriter{} + writers[i] = NewDownloadBuffer(make([]byte, tb.objectSizes[obj])) if err := d.DownloadObject(ctx, &DownloadObjectInput{ Bucket: tb.bucket, Object: obj, @@ -335,17 +314,12 @@ func TestIntegration_DownloaderErrorAsync(t *testing.T) { t.Errorf("result.Err: %v", got.Err) } - // Close the writer so we can check the contents. - if err := writers[i].Close(); err != nil { - t.Fatalf("testWriter.Close: %v", err) + if want, got := tb.contentHashes[got.Object], crc32c(writers[i].Bytes()); got != want { + t.Errorf("content crc32c does not match; got: %v, expected: %v", got, want) } - if want, got := tb.contentHashes[got.Object], writers[i].crc32c; got != want { - t.Fatalf("content crc32c does not match; got: %v, expected: %v", got, want) - } - - if got.Attrs.Size != tb.objectSize { - t.Errorf("expected object size %d, got %d", tb.objectSize, got.Attrs.Size) + if got, want := got.Attrs.Size, tb.objectSizes[got.Object]; want != got { + t.Errorf("expected object size %d, got %d", want, got) } }, }); err != nil { @@ -357,7 +331,7 @@ func TestIntegration_DownloaderErrorAsync(t *testing.T) { if err := d.DownloadObject(ctx, &DownloadObjectInput{ Bucket: tb.bucket, Object: "not-written", - Destination: &testWriter{}, + Destination: &DownloadBuffer{}, Callback: func(got *DownloadOutput) { callbackMu.Lock() numCallbacks++ @@ -417,7 +391,7 @@ func TestIntegration_DownloaderTimeout(t *testing.T) { if err := d.DownloadObject(ctx, &DownloadObjectInput{ Bucket: httpTestBucket.bucket, Object: httpTestBucket.objects[0], - Destination: &testWriter{}, + Destination: &DownloadBuffer{}, }); err != nil { t.Errorf("d.DownloadObject: %v", err) } @@ -440,6 +414,7 @@ func TestIntegration_DownloaderTimeout(t *testing.T) { func TestIntegration_DownloadShard(t *testing.T) { multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, c *storage.Client, tb downloadTestBucket) { objectName := tb.objects[0] + objectSize := tb.objectSizes[objectName] // Get expected Attrs. o := c.Bucket(tb.bucket).Object(objectName) @@ -477,7 +452,7 @@ func TestIntegration_DownloadShard(t *testing.T) { Bucket: tb.bucket, Object: objectName, Range: &DownloadRange{ - Offset: tb.objectSize - 5, + Offset: objectSize - 5, Length: -1, }, }, @@ -485,12 +460,12 @@ func TestIntegration_DownloadShard(t *testing.T) { Bucket: tb.bucket, Object: objectName, Range: &DownloadRange{ - Offset: tb.objectSize - 5, + Offset: objectSize - 5, Length: -1, }, Attrs: &storage.ReaderObjectAttrs{ - Size: tb.objectSize, - StartOffset: tb.objectSize - 5, + Size: objectSize, + StartOffset: objectSize - 5, ContentType: r.Attrs.ContentType, ContentEncoding: r.Attrs.ContentEncoding, CacheControl: r.Attrs.CacheControl, @@ -572,15 +547,14 @@ func TestIntegration_DownloadShard(t *testing.T) { }, } { t.Run(test.desc, func(t *testing.T) { - w := &testWriter{} - + w := &DownloadBuffer{} test.in.Destination = w if test.in.ctx == nil { test.in.ctx = ctx } - got := test.in.downloadShard(c, test.timeout) + got := test.in.downloadShard(c, test.timeout, 1024) if got.Bucket != test.want.Bucket || got.Object != test.want.Object { t.Errorf("wanted bucket %q, object %q, got: %q, %q", test.want.Bucket, test.want.Object, got.Bucket, got.Object) @@ -648,33 +622,6 @@ func randomInt64(min, max int64) int64 { return rand.Int63n(max-min+1) + min } -// TODO: once we provide a DownloaderBuffer that implements WriterAt in the -// library, we can use that instead. -type testWriter struct { - b []byte - crc32c uint32 - bufs [][]byte // temp bufs that will be joined on Close() -} - -// Close must be called to finalize the buffer -func (tw *testWriter) Close() error { - tw.b = bytes.Join(tw.bufs, nil) - crc := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - - _, err := io.Copy(crc, bytes.NewReader(tw.b)) - tw.crc32c = crc.Sum32() - return err -} - -func (tw *testWriter) WriteAt(b []byte, offset int64) (n int, err error) { - // TODO: use the offset. This is fine for now since reads are not yet sharded. - copiedB := make([]byte, len(b)) - copy(copiedB, b) - tw.bufs = append(tw.bufs, copiedB) - - return len(b), nil -} - func deleteExpiredBuckets(prefix string) error { if testing.Short() { return nil @@ -739,7 +686,7 @@ type downloadTestBucket struct { bucket string objects []string contentHashes map[string]uint32 - objectSize int64 + objectSizes map[string]int64 } // Create initializes the downloadTestBucket, creating a bucket and populating @@ -752,7 +699,6 @@ func (tb *downloadTestBucket) Create(prefix string) error { ctx := context.Background() tb.bucket = prefix + uidSpace.New() - tb.objectSize = randomInt64(200, 1024*1024) tb.objects = []string{ "obj1", "obj2", @@ -762,6 +708,7 @@ func (tb *downloadTestBucket) Create(prefix string) error { "!#$&'()*+,/:;=,?@,[] and spaces", } tb.contentHashes = make(map[string]uint32) + tb.objectSizes = make(map[string]int64) client, err := storage.NewClient(ctx) if err != nil { @@ -776,12 +723,13 @@ func (tb *downloadTestBucket) Create(prefix string) error { // Write objects. for _, obj := range tb.objects { - crc, err := generateFileInGCS(ctx, b.Object(obj), tb.objectSize) + size := randomInt64(1000, maxObjectSize) + crc, err := generateFileInGCS(ctx, b.Object(obj), size) if err != nil { return fmt.Errorf("generateFileInGCS: %v", err) } tb.contentHashes[obj] = crc - + tb.objectSizes[obj] = size } return nil } @@ -857,3 +805,7 @@ func initTransportClients(ctx context.Context) (map[string]*storage.Client, erro "grpc": gc, }, nil } + +func crc32c(b []byte) uint32 { + return crc32.Checksum(b, crc32.MakeTable(crc32.Castagnoli)) +} diff --git a/storage/transfermanager/option.go b/storage/transfermanager/option.go index 50d0611547d9..1f81672dded4 100644 --- a/storage/transfermanager/option.go +++ b/storage/transfermanager/option.go @@ -69,10 +69,32 @@ func (wpt withPerOpTimeout) apply(tm *transferManagerConfig) { tm.perOperationTimeout = wpt.timeout } +// WithPartSize returns a TransferManagerOption that specifies the size of the +// shards to transfer; that is, if the object is larger than this size, it will +// be uploaded or downloaded in concurrent pieces. +// The default is 32 MiB for downloads. +// Note that files that support decompressive transcoding will be downloaded in +// a single piece regardless of the partSize set here. +func WithPartSize(partSize int64) Option { + return &withPartSize{partSize: partSize} +} + +type withPartSize struct { + partSize int64 +} + +func (wps withPartSize) apply(tm *transferManagerConfig) { + tm.partSize = wps.partSize +} + type transferManagerConfig struct { // Workers in thread pool; default numCPU/2 based on previous benchmarks? numWorkers int + // Size of shards to transfer; Python found 32 MiB to be good default for + // JSON downloads but gRPC may benefit from larger. + partSize int64 + // Timeout for a single operation (including all retries). Zero value means // no timeout. perOperationTimeout time.Duration @@ -85,7 +107,8 @@ type transferManagerConfig struct { func defaultTransferManagerConfig() *transferManagerConfig { return &transferManagerConfig{ numWorkers: runtime.NumCPU() / 2, - perOperationTimeout: 0, // no timeout + partSize: 32 * 1024 * 1024, // 32 MiB + perOperationTimeout: 0, // no timeout } } diff --git a/storage/transfermanager/option_test.go b/storage/transfermanager/option_test.go index 1f524069f60d..d97cf831ec31 100644 --- a/storage/transfermanager/option_test.go +++ b/storage/transfermanager/option_test.go @@ -26,6 +26,7 @@ func TestApply(t *testing.T) { WithWorkers(3), WithPerOpTimeout(time.Hour), WithCallbacks(), + WithPartSize(30), } var got transferManagerConfig for _, opt := range opts { @@ -35,6 +36,7 @@ func TestApply(t *testing.T) { numWorkers: 3, perOperationTimeout: time.Hour, asynchronous: true, + partSize: 30, } if got != want {