Skip to content

Commit 55a6d15

Browse files
authored
fix: update grpc finalize on close resumable uploads to validate ack'd object size (#2572)
* fix: update grpc finalize on close resumable uploads to validate ack'd object size Follow up to #2527 Add ITGapicUnbufferedFinalizeOnCloseResumableWritableByteChannelTest with failure scenarios * fix: update grpc finalize on close resumable uploads to validate ack'd object size Refactoring to clean up some request building lifecycle and variable scope * fix: update grpc finalize on close resumable uploads to validate ack'd object size Tests are passing
1 parent d193243 commit 55a6d15

File tree

4 files changed

+478
-110
lines changed

4 files changed

+478
-110
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel.java

+138-108
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020

2121
import com.google.api.core.SettableApiFuture;
2222
import com.google.api.gax.grpc.GrpcCallContext;
23+
import com.google.api.gax.rpc.ApiException;
2324
import com.google.api.gax.rpc.ApiStreamObserver;
2425
import com.google.api.gax.rpc.ClientStreamingCallable;
2526
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
2627
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
2728
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
29+
import com.google.common.collect.ImmutableList;
2830
import com.google.protobuf.ByteString;
2931
import com.google.storage.v2.ChecksummedData;
3032
import com.google.storage.v2.ObjectChecksums;
@@ -33,11 +35,7 @@
3335
import java.io.IOException;
3436
import java.nio.ByteBuffer;
3537
import java.nio.channels.ClosedChannelException;
36-
import java.util.ArrayList;
37-
import java.util.List;
3838
import java.util.concurrent.ExecutionException;
39-
import java.util.function.Consumer;
40-
import java.util.function.LongConsumer;
4139
import org.checkerframework.checker.nullness.qual.NonNull;
4240

4341
final class GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel
@@ -55,22 +53,23 @@ final class GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel
5553
private boolean open = true;
5654
private boolean first = true;
5755
private boolean finished = false;
56+
private volatile WriteObjectRequest lastWrittenRequest;
5857

5958
GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
6059
SettableApiFuture<WriteObjectResponse> resultFuture,
6160
ChunkSegmenter chunkSegmenter,
6261
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write,
63-
ResumableWrite requestFactory) {
64-
String bucketName = requestFactory.bucketName();
62+
WriteCtx<ResumableWrite> writeCtx) {
63+
String bucketName = writeCtx.getRequestFactory().bucketName();
6564
this.resultFuture = resultFuture;
6665
this.chunkSegmenter = chunkSegmenter;
6766

6867
GrpcCallContext internalContext =
6968
contextWithBucketName(bucketName, GrpcCallContext.createDefault());
7069
this.write = write.withDefaultCallContext(internalContext);
7170

72-
this.writeCtx = new WriteCtx<>(requestFactory);
73-
this.responseObserver = new Observer(writeCtx.getConfirmedBytes()::set, resultFuture::set);
71+
this.writeCtx = writeCtx;
72+
this.responseObserver = new Observer(internalContext);
7473
}
7574

7675
@Override
@@ -92,27 +91,24 @@ public boolean isOpen() {
9291

9392
@Override
9493
public void close() throws IOException {
94+
if (!open) {
95+
return;
96+
}
97+
open = false;
9598
ApiStreamObserver<WriteObjectRequest> openedStream = openedStream();
96-
if (!finished) {
97-
WriteObjectRequest message = finishMessage();
98-
try {
99+
try {
100+
if (!finished) {
101+
WriteObjectRequest message = finishMessage();
102+
lastWrittenRequest = message;
99103
openedStream.onNext(message);
100-
openedStream.onCompleted();
101104
finished = true;
102-
} catch (RuntimeException e) {
103-
resultFuture.setException(e);
104-
throw e;
105-
}
106-
} else {
107-
try {
108-
openedStream.onCompleted();
109-
} catch (RuntimeException e) {
110-
resultFuture.setException(e);
111-
throw e;
112105
}
106+
openedStream.onCompleted();
107+
responseObserver.await();
108+
} catch (RuntimeException e) {
109+
resultFuture.setException(e);
110+
throw e;
113111
}
114-
open = false;
115-
responseObserver.await();
116112
}
117113

118114
private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, boolean finalize)
@@ -122,51 +118,54 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
122118
}
123119

124120
ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength);
125-
126-
List<WriteObjectRequest> messages = new ArrayList<>();
121+
if (data.length == 0) {
122+
return 0;
123+
}
127124

128125
ApiStreamObserver<WriteObjectRequest> openedStream = openedStream();
129126
int bytesConsumed = 0;
130-
for (ChunkSegment datum : data) {
131-
Crc32cLengthKnown crc32c = datum.getCrc32c();
132-
ByteString b = datum.getB();
133-
int contentSize = b.size();
134-
long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize);
135-
Crc32cLengthKnown cumulative =
136-
writeCtx
137-
.getCumulativeCrc32c()
138-
.accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat);
139-
ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b);
140-
if (crc32c != null) {
141-
checksummedData.setCrc32C(crc32c.getValue());
142-
}
143-
WriteObjectRequest.Builder builder =
144-
writeCtx
145-
.newRequestBuilder()
146-
.setWriteOffset(offset)
147-
.setChecksummedData(checksummedData.build());
148-
if (!datum.isOnlyFullBlocks()) {
149-
builder.setFinishWrite(true);
150-
if (cumulative != null) {
151-
builder.setObjectChecksums(
152-
ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build());
127+
try {
128+
for (int i = 0; i < data.length; i++) {
129+
ChunkSegment datum = data[i];
130+
Crc32cLengthKnown crc32c = datum.getCrc32c();
131+
ByteString b = datum.getB();
132+
int contentSize = b.size();
133+
long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize);
134+
Crc32cLengthKnown cumulative =
135+
writeCtx
136+
.getCumulativeCrc32c()
137+
.accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat);
138+
ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b);
139+
if (crc32c != null) {
140+
checksummedData.setCrc32C(crc32c.getValue());
141+
}
142+
WriteObjectRequest.Builder builder = writeCtx.newRequestBuilder();
143+
if (!first) {
144+
builder.clearUploadId();
145+
builder.clearWriteObjectSpec();
146+
builder.clearObjectChecksums();
147+
}
148+
builder.setWriteOffset(offset).setChecksummedData(checksummedData.build());
149+
if (!datum.isOnlyFullBlocks() || (finalize && i + 1 == data.length)) {
150+
builder.setFinishWrite(true);
151+
if (cumulative != null) {
152+
builder.setObjectChecksums(
153+
ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build());
154+
}
155+
finished = true;
153156
}
154-
finished = true;
155-
}
156-
157-
WriteObjectRequest build = possiblyPairDownRequest(builder, first).build();
158-
first = false;
159-
messages.add(build);
160-
bytesConsumed += contentSize;
161-
}
162-
if (finalize && !finished) {
163-
messages.add(finishMessage());
164-
finished = true;
165-
}
166157

167-
try {
168-
for (WriteObjectRequest message : messages) {
169-
openedStream.onNext(message);
158+
WriteObjectRequest build = builder.build();
159+
first = false;
160+
lastWrittenRequest = build;
161+
openedStream.onNext(build);
162+
bytesConsumed += contentSize;
163+
}
164+
if (finalize && !finished) {
165+
WriteObjectRequest finishMessage = finishMessage();
166+
lastWrittenRequest = finishMessage;
167+
openedStream.onNext(finishMessage);
168+
finished = true;
170169
}
171170
} catch (RuntimeException e) {
172171
resultFuture.setException(e);
@@ -201,73 +200,104 @@ private ApiStreamObserver<WriteObjectRequest> openedStream() {
201200
return stream;
202201
}
203202

204-
/**
205-
* Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs,
206-
* this utility method centralizes the logic necessary to clear those fields for use by subsequent
207-
* messages.
208-
*/
209-
private static WriteObjectRequest.Builder possiblyPairDownRequest(
210-
WriteObjectRequest.Builder b, boolean firstMessageOfStream) {
211-
if (firstMessageOfStream && b.getWriteOffset() == 0) {
212-
return b;
213-
}
214-
if (b.getWriteOffset() > 0) {
215-
b.clearWriteObjectSpec();
216-
}
217-
218-
if (b.getWriteOffset() > 0 && !b.getFinishWrite()) {
219-
b.clearObjectChecksums();
220-
}
221-
return b;
222-
}
223-
224-
static class Observer implements ApiStreamObserver<WriteObjectResponse> {
203+
class Observer implements ApiStreamObserver<WriteObjectResponse> {
225204

226-
private final LongConsumer sizeCallback;
227-
private final Consumer<WriteObjectResponse> completeCallback;
205+
private final GrpcCallContext context;
228206

229207
private final SettableApiFuture<Void> invocationHandle;
230208
private volatile WriteObjectResponse last;
231209

232-
Observer(LongConsumer sizeCallback, Consumer<WriteObjectResponse> completeCallback) {
233-
this.sizeCallback = sizeCallback;
234-
this.completeCallback = completeCallback;
210+
Observer(GrpcCallContext context) {
211+
this.context = context;
235212
this.invocationHandle = SettableApiFuture.create();
236213
}
237214

238215
@Override
239216
public void onNext(WriteObjectResponse value) {
240-
// incremental update
241-
if (value.hasPersistedSize()) {
242-
sizeCallback.accept(value.getPersistedSize());
243-
} else if (value.hasResource()) {
244-
sizeCallback.accept(value.getResource().getSize());
245-
}
246217
last = value;
247218
}
248219

249-
/**
250-
* observed exceptions so far
251-
*
252-
* <ol>
253-
* <li>{@link com.google.api.gax.rpc.OutOfRangeException}
254-
* <li>{@link com.google.api.gax.rpc.AlreadyExistsException}
255-
* <li>{@link io.grpc.StatusRuntimeException}
256-
* </ol>
257-
*/
258220
@Override
259221
public void onError(Throwable t) {
260-
invocationHandle.setException(t);
222+
if (t instanceof ApiException) {
223+
// use StorageExceptions logic to translate from ApiException to our status codes ensuring
224+
// things fall in line with our retry handlers.
225+
// This is suboptimal, as it will initialize a second exception, however this is the
226+
// unusual case, and it should not cause a significant overhead given its rarity.
227+
StorageException tmp = StorageException.asStorageException((ApiException) t);
228+
StorageException storageException =
229+
ResumableSessionFailureScenario.toStorageException(
230+
tmp.getCode(),
231+
tmp.getMessage(),
232+
tmp.getReason(),
233+
ImmutableList.of(lastWrittenRequest),
234+
null,
235+
context,
236+
t);
237+
resultFuture.setException(storageException);
238+
invocationHandle.setException(storageException);
239+
} else {
240+
resultFuture.setException(t);
241+
invocationHandle.setException(t);
242+
}
261243
}
262244

263245
@Override
264246
public void onCompleted() {
265-
if (last != null && last.hasResource()) {
266-
completeCallback.accept(last);
247+
boolean finalizing = lastWrittenRequest.getFinishWrite();
248+
if (last == null) {
249+
clientDetectedError(
250+
ResumableSessionFailureScenario.toStorageException(
251+
0,
252+
"onComplete without preceding onNext, unable to determine success.",
253+
"invalid",
254+
ImmutableList.of(lastWrittenRequest),
255+
null,
256+
context,
257+
null));
258+
} else if (last.hasResource() /* && finalizing*/) {
259+
long totalSentBytes = writeCtx.getTotalSentBytes().get();
260+
long finalSize = last.getResource().getSize();
261+
if (totalSentBytes == finalSize) {
262+
ok(finalSize);
263+
} else if (finalSize < totalSentBytes) {
264+
clientDetectedError(
265+
ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException(
266+
ImmutableList.of(lastWrittenRequest), last, context, null));
267+
} else {
268+
clientDetectedError(
269+
ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException(
270+
ImmutableList.of(lastWrittenRequest), last, context, null));
271+
}
272+
} else if (!finalizing || last.hasPersistedSize()) { // unexpected incremental response
273+
clientDetectedError(
274+
ResumableSessionFailureScenario.toStorageException(
275+
0,
276+
"Unexpected incremental response for finalizing request.",
277+
"invalid",
278+
ImmutableList.of(lastWrittenRequest),
279+
last,
280+
context,
281+
null));
282+
} else {
283+
clientDetectedError(
284+
ResumableSessionFailureScenario.SCENARIO_0.toStorageException(
285+
ImmutableList.of(lastWrittenRequest), last, context, null));
267286
}
287+
}
288+
289+
private void ok(long persistedSize) {
290+
writeCtx.getConfirmedBytes().set(persistedSize);
291+
resultFuture.set(last);
268292
invocationHandle.set(null);
269293
}
270294

295+
private void clientDetectedError(StorageException storageException) {
296+
open = false;
297+
resultFuture.setException(storageException);
298+
invocationHandle.setException(storageException);
299+
}
300+
271301
void await() {
272302
try {
273303
invocationHandle.get();

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
305305
Retrying::newCallContext);
306306
} else {
307307
return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
308-
result, getChunkSegmenter(), write, start);
308+
result, getChunkSegmenter(), write, new WriteCtx<>(start));
309309
}
310310
})
311311
.andThen(StorageByteChannels.writable()::createSynchronized));
@@ -346,7 +346,7 @@ BufferedWritableByteChannelSession<WriteObjectResponse> build() {
346346
Retrying::newCallContext);
347347
} else {
348348
return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
349-
result, getChunkSegmenter(), write, start);
349+
result, getChunkSegmenter(), write, new WriteCtx<>(start));
350350
}
351351
})
352352
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))

0 commit comments

Comments
 (0)