diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 4dc69d1eadb9..7aec4aa29170 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -1044,11 +1044,13 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage } // The chunk buffer is full, but there is no end in sight. This - // means that a resumable upload will need to be used to send + // means that either: + // 1. A resumable upload will need to be used to send // multiple chunks, until we are done reading data. Start a // resumable upload if it has not already been started. - // Otherwise, all data will be sent over a single gRPC stream. - if !doneReading && gw.upid == "" { + // 2. ChunkSize of zero may also have a full buffer, but a resumable + // session should not be initiated in this case. + if !doneReading && gw.upid == "" && params.chunkSize != 0 { err = gw.startResumableUpload() if err != nil { err = checkCanceled(err) @@ -1065,11 +1067,15 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage pr.CloseWithError(err) return } + // At this point, the current buffer has been uploaded. For resumable - // uploads, capture the committed offset here in case the upload was not - // finalized and another chunk is to be uploaded. - if gw.upid != "" { + // uploads and chunkSize = 0, capture the committed offset here in case + // the upload was not finalized and another chunk is to be uploaded. Call + // the progress function for resumable uploads only. + if gw.upid != "" || gw.chunkSize == 0 { offset = off + } + if gw.upid != "" { progress(offset) } @@ -1485,14 +1491,11 @@ func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader) size += googleapi.MinUploadChunkSize - (size % googleapi.MinUploadChunkSize) } + // A completely bufferless upload is not possible as it is in JSON because + // the buffer must be provided to the message. However use the minimum size + // possible in this case. if params.chunkSize == 0 { - // TODO: Should we actually use the minimum of 256 KB here when the user - // indicates they want minimal memory usage? We cannot do a zero-copy, - // bufferless upload like HTTP/JSON can. - // TODO: We need to determine if we can avoid starting a - // resumable upload when the user *plans* to send more than bufSize but - // with a bufferless upload. - size = maxPerMessageWriteSize + size = googleapi.MinUploadChunkSize } return &gRPCWriter{ @@ -1505,6 +1508,7 @@ func newGRPCWriter(c *grpcStorageClient, params *openWriterParams, r io.Reader) conds: params.conds, encryptionKey: params.encryptionKey, sendCRC32C: params.sendCRC32C, + chunkSize: params.chunkSize, } } @@ -1524,6 +1528,7 @@ type gRPCWriter struct { settings *settings sendCRC32C bool + chunkSize int // The gRPC client-stream used for sending buffers. stream storagepb.Storage_WriteObjectClient @@ -1589,7 +1594,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st offset := start toWrite := w.buf[:recvd] for { - first := sent == 0 // This indicates that this is the last message and the remaining // data fits in one message. belowLimit := recvd-sent <= limit @@ -1612,10 +1616,10 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st FinishWrite: finishWrite, } - // Open a new stream and set the first_message field on the request. - // The first message on the WriteObject stream must either be the - // Object or the Resumable Upload ID. - if first { + // Open a new stream if necessary and set the first_message field on + // the request. The first message on the WriteObject stream must either + // be the Object or the Resumable Upload ID. + if w.stream == nil { ctx := gapic.InsertMetadata(w.ctx, metadata.Pairs("x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket)))) w.stream, err = w.c.raw.WriteObject(ctx) if err != nil { @@ -1678,6 +1682,13 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st continue } + // The buffer has been uploaded and there is still more data to be + // uploaded, but this is not a resumable upload session. Therefore + // keep the stream open and don't commit yet. + if !finishWrite && w.chunkSize == 0 { + return nil, offset, false, nil + } + // Done sending data. Close the stream to "commit" the data sent. resp, finalized, err := w.commit() // Retriable errors mean we should start over and attempt to diff --git a/storage/integration_test.go b/storage/integration_test.go index bca8951c5980..947581ca9318 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -2393,6 +2393,15 @@ func TestIntegration_WriterChunksize(t *testing.T) { if callbacks != test.wantCallbacks { t.Errorf("ProgressFunc was called %d times, expected %d", callbacks, test.wantCallbacks) } + + // Confirm all bytes were uploaded. + attrs, err := obj.Attrs(ctx) + if err != nil { + t.Fatalf("obj.Attrs: %v", err) + } + if attrs.Size != int64(objSize) { + t.Errorf("incorrect number of bytes written; got %v, want %v", attrs.Size, objSize) + } }) } })