23
23
import com .google .api .gax .retrying .ResultRetryAlgorithm ;
24
24
import com .google .api .gax .rpc .ApiStreamObserver ;
25
25
import com .google .api .gax .rpc .ClientStreamingCallable ;
26
+ import com .google .api .gax .rpc .OutOfRangeException ;
26
27
import com .google .cloud .storage .ChunkSegmenter .ChunkSegment ;
27
28
import com .google .cloud .storage .Conversions .Decoder ;
28
29
import com .google .cloud .storage .Crc32cValue .Crc32cLengthKnown ;
41
42
import java .util .ArrayList ;
42
43
import java .util .List ;
43
44
import java .util .concurrent .ExecutionException ;
44
- import java .util .function .Consumer ;
45
- import java .util .function .LongConsumer ;
46
45
import java .util .function .Supplier ;
47
46
import org .checkerframework .checker .nullness .qual .NonNull ;
47
+ import org .checkerframework .checker .nullness .qual .Nullable ;
48
48
49
49
final class GapicUnbufferedChunkedResumableWritableByteChannel
50
50
implements UnbufferedWritableByteChannel {
@@ -58,30 +58,26 @@ final class GapicUnbufferedChunkedResumableWritableByteChannel
58
58
private final RetryingDependencies deps ;
59
59
private final ResultRetryAlgorithm <?> alg ;
60
60
private final Supplier <GrpcCallContext > baseContextSupplier ;
61
- private final LongConsumer sizeCallback ;
62
- private final Consumer <WriteObjectResponse > completeCallback ;
63
61
64
- private boolean open = true ;
62
+ private volatile boolean open = true ;
65
63
private boolean finished = false ;
66
64
67
65
GapicUnbufferedChunkedResumableWritableByteChannel (
68
66
SettableApiFuture <WriteObjectResponse > resultFuture ,
69
67
@ NonNull ChunkSegmenter chunkSegmenter ,
70
68
ClientStreamingCallable <WriteObjectRequest , WriteObjectResponse > write ,
71
- ResumableWrite requestFactory ,
69
+ WriteCtx < ResumableWrite > writeCtx ,
72
70
RetryingDependencies deps ,
73
71
ResultRetryAlgorithm <?> alg ,
74
72
Supplier <GrpcCallContext > baseContextSupplier ) {
75
73
this .resultFuture = resultFuture ;
76
74
this .chunkSegmenter = chunkSegmenter ;
77
75
this .write = write ;
78
- this .bucketName = requestFactory .bucketName ();
79
- this .writeCtx = new WriteCtx <>( requestFactory ) ;
76
+ this .bucketName = writeCtx . getRequestFactory () .bucketName ();
77
+ this .writeCtx = writeCtx ;
80
78
this .deps = deps ;
81
79
this .alg = alg ;
82
80
this .baseContextSupplier = baseContextSupplier ;
83
- this .sizeCallback = writeCtx .getConfirmedBytes ()::set ;
84
- this .completeCallback = resultFuture ::set ;
85
81
}
86
82
87
83
@ Override
@@ -106,7 +102,7 @@ public void close() throws IOException {
106
102
if (open && !finished ) {
107
103
WriteObjectRequest message = finishMessage (true );
108
104
try {
109
- flush (ImmutableList .of (message ));
105
+ flush (ImmutableList .of (message ), null , true );
110
106
finished = true ;
111
107
} catch (RuntimeException e ) {
112
108
resultFuture .setException (e );
@@ -122,12 +118,13 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
122
118
throw new ClosedChannelException ();
123
119
}
124
120
121
+ long begin = writeCtx .getConfirmedBytes ().get ();
122
+ RewindableContent content = RewindableContent .of (srcs , srcsOffset , srcsLength );
125
123
ChunkSegment [] data = chunkSegmenter .segmentBuffers (srcs , srcsOffset , srcsLength );
126
124
127
125
List <WriteObjectRequest > messages = new ArrayList <>();
128
126
129
127
boolean first = true ;
130
- int bytesConsumed = 0 ;
131
128
for (ChunkSegment datum : data ) {
132
129
Crc32cLengthKnown crc32c = datum .getCrc32c ();
133
130
ByteString b = datum .getB ();
@@ -144,8 +141,13 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
144
141
WriteObjectRequest .Builder builder =
145
142
writeCtx
146
143
.newRequestBuilder ()
144
+ .clearWriteObjectSpec ()
145
+ .clearObjectChecksums ()
147
146
.setWriteOffset (offset )
148
147
.setChecksummedData (checksummedData .build ());
148
+ if (!first ) {
149
+ builder .clearUploadId ();
150
+ }
149
151
if (!datum .isOnlyFullBlocks ()) {
150
152
builder .setFinishWrite (true );
151
153
if (cumulative != null ) {
@@ -155,23 +157,25 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
155
157
finished = true ;
156
158
}
157
159
158
- WriteObjectRequest build = possiblyPairDownRequest ( builder , first ) .build ();
160
+ WriteObjectRequest build = builder .build ();
159
161
first = false ;
160
162
messages .add (build );
161
- bytesConsumed += contentSize ;
162
163
}
163
164
if (finalize && !finished ) {
164
165
messages .add (finishMessage (first ));
165
166
finished = true ;
166
167
}
167
168
168
169
try {
169
- flush (messages );
170
+ flush (messages , content , finalize );
170
171
} catch (RuntimeException e ) {
171
172
resultFuture .setException (e );
172
173
throw e ;
173
174
}
174
175
176
+ long end = writeCtx .getConfirmedBytes ().get ();
177
+
178
+ long bytesConsumed = end - begin ;
175
179
return bytesConsumed ;
176
180
}
177
181
@@ -182,14 +186,20 @@ private WriteObjectRequest finishMessage(boolean first) {
182
186
183
187
WriteObjectRequest .Builder b =
184
188
writeCtx .newRequestBuilder ().setFinishWrite (true ).setWriteOffset (offset );
189
+ if (!first ) {
190
+ b .clearUploadId ();
191
+ }
185
192
if (crc32cValue != null ) {
186
193
b .setObjectChecksums (ObjectChecksums .newBuilder ().setCrc32C (crc32cValue .getValue ()).build ());
187
194
}
188
- WriteObjectRequest message = possiblyPairDownRequest ( b , first ) .build ();
195
+ WriteObjectRequest message = b .build ();
189
196
return message ;
190
197
}
191
198
192
- private void flush (@ NonNull List <WriteObjectRequest > segments ) {
199
+ private void flush (
200
+ @ NonNull List <WriteObjectRequest > segments ,
201
+ @ Nullable RewindableContent content ,
202
+ boolean finalizing ) {
193
203
GrpcCallContext internalContext = contextWithBucketName (bucketName , baseContextSupplier .get ());
194
204
ClientStreamingCallable <WriteObjectRequest , WriteObjectResponse > callable =
195
205
write .withDefaultCallContext (internalContext );
@@ -198,7 +208,7 @@ private void flush(@NonNull List<WriteObjectRequest> segments) {
198
208
deps ,
199
209
alg ,
200
210
() -> {
201
- Observer observer = new Observer (sizeCallback , completeCallback );
211
+ Observer observer = new Observer (content , finalizing );
202
212
ApiStreamObserver <WriteObjectRequest > write = callable .clientStreamingCall (observer );
203
213
204
214
for (WriteObjectRequest message : segments ) {
@@ -211,81 +221,93 @@ private void flush(@NonNull List<WriteObjectRequest> segments) {
211
221
Decoder .identity ());
212
222
}
213
223
214
- /**
215
- * Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs,
216
- * this utility method centralizes the logic necessary to clear those fields for use by subsequent
217
- * messages.
218
- */
219
- private static WriteObjectRequest .Builder possiblyPairDownRequest (
220
- WriteObjectRequest .Builder b , boolean firstMessageOfStream ) {
221
- if (firstMessageOfStream && b .getWriteOffset () == 0 ) {
222
- return b ;
223
- }
224
-
225
- if (!firstMessageOfStream ) {
226
- b .clearUploadId ();
227
- }
228
-
229
- if (b .getWriteOffset () > 0 ) {
230
- b .clearWriteObjectSpec ();
231
- }
232
-
233
- if (b .getWriteOffset () > 0 && !b .getFinishWrite ()) {
234
- b .clearObjectChecksums ();
235
- }
236
- return b ;
237
- }
238
-
239
224
@ VisibleForTesting
240
225
WriteCtx <?> getWriteCtx () {
241
226
return writeCtx ;
242
227
}
243
228
244
- static class Observer implements ApiStreamObserver <WriteObjectResponse > {
229
+ class Observer implements ApiStreamObserver <WriteObjectResponse > {
245
230
246
- private final LongConsumer sizeCallback ;
247
- private final Consumer < WriteObjectResponse > completeCallback ;
231
+ private final RewindableContent content ;
232
+ private final boolean finalizing ;
248
233
249
234
private final SettableApiFuture <Void > invocationHandle ;
250
235
private volatile WriteObjectResponse last ;
251
236
252
- Observer (LongConsumer sizeCallback , Consumer < WriteObjectResponse > completeCallback ) {
253
- this .sizeCallback = sizeCallback ;
254
- this .completeCallback = completeCallback ;
237
+ Observer (@ Nullable RewindableContent content , boolean finalizing ) {
238
+ this .content = content ;
239
+ this .finalizing = finalizing ;
255
240
this .invocationHandle = SettableApiFuture .create ();
256
241
}
257
242
258
243
@ Override
259
244
public void onNext (WriteObjectResponse value ) {
260
- // incremental update
261
- if (value .hasPersistedSize ()) {
262
- sizeCallback .accept (value .getPersistedSize ());
263
- } else if (value .hasResource ()) {
264
- sizeCallback .accept (value .getResource ().getSize ());
265
- }
266
245
last = value ;
267
246
}
268
247
269
- /**
270
- * observed exceptions so far
271
- *
272
- * <ol>
273
- * <li>{@link com.google.api.gax.rpc.OutOfRangeException}
274
- * <li>{@link com.google.api.gax.rpc.AlreadyExistsException}
275
- * <li>{@link io.grpc.StatusRuntimeException}
276
- * </ol>
277
- */
278
248
@ Override
279
249
public void onError (Throwable t ) {
280
- invocationHandle .setException (t );
250
+ if (t instanceof OutOfRangeException ) {
251
+ OutOfRangeException oore = (OutOfRangeException ) t ;
252
+ open = false ;
253
+ invocationHandle .setException (
254
+ ResumableSessionFailureScenario .SCENARIO_5 .toStorageException ());
255
+ } else {
256
+ invocationHandle .setException (t );
257
+ }
281
258
}
282
259
283
260
@ Override
284
261
public void onCompleted () {
285
- if (last != null && last .hasResource ()) {
286
- completeCallback .accept (last );
262
+ try {
263
+ if (last == null ) {
264
+ throw new StorageException (
265
+ 0 , "onComplete without preceding onNext, unable to determine success." );
266
+ } else if (!finalizing && last .hasPersistedSize ()) { // incremental
267
+ long totalSentBytes = writeCtx .getTotalSentBytes ().get ();
268
+ long persistedSize = last .getPersistedSize ();
269
+
270
+ if (totalSentBytes == persistedSize ) {
271
+ writeCtx .getConfirmedBytes ().set (persistedSize );
272
+ } else if (persistedSize < totalSentBytes ) {
273
+ long delta = totalSentBytes - persistedSize ;
274
+ // rewind our content and any state that my have run ahead of the actual ack'd bytes
275
+ content .rewindTo (delta );
276
+ writeCtx .getTotalSentBytes ().set (persistedSize );
277
+ writeCtx .getConfirmedBytes ().set (persistedSize );
278
+ } else {
279
+ throw ResumableSessionFailureScenario .SCENARIO_7 .toStorageException ();
280
+ }
281
+ } else if (finalizing && last .hasResource ()) {
282
+ long totalSentBytes = writeCtx .getTotalSentBytes ().get ();
283
+ long finalSize = last .getResource ().getSize ();
284
+ if (totalSentBytes == finalSize ) {
285
+ writeCtx .getConfirmedBytes ().set (finalSize );
286
+ resultFuture .set (last );
287
+ } else if (finalSize < totalSentBytes ) {
288
+ throw ResumableSessionFailureScenario .SCENARIO_4_1 .toStorageException ();
289
+ } else {
290
+ throw ResumableSessionFailureScenario .SCENARIO_4_2 .toStorageException ();
291
+ }
292
+ } else if (!finalizing && last .hasResource ()) {
293
+ throw ResumableSessionFailureScenario .SCENARIO_1 .toStorageException ();
294
+ } else if (finalizing && last .hasPersistedSize ()) {
295
+ long totalSentBytes = writeCtx .getTotalSentBytes ().get ();
296
+ long persistedSize = last .getPersistedSize ();
297
+ if (persistedSize < totalSentBytes ) {
298
+ throw ResumableSessionFailureScenario .SCENARIO_3 .toStorageException ();
299
+ } else {
300
+ throw ResumableSessionFailureScenario .SCENARIO_2 .toStorageException ();
301
+ }
302
+ } else {
303
+ throw ResumableSessionFailureScenario .SCENARIO_0 .toStorageException ();
304
+ }
305
+ } catch (Throwable se ) {
306
+ open = false ;
307
+ invocationHandle .setException (se );
308
+ } finally {
309
+ invocationHandle .set (null );
287
310
}
288
- invocationHandle .set (null );
289
311
}
290
312
291
313
void await () {
0 commit comments