@@ -136,7 +136,8 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
136
136
List <BidiWriteObjectRequest > messages = new ArrayList <>();
137
137
138
138
int bytesConsumed = 0 ;
139
- for (ChunkSegment datum : data ) {
139
+ for (int i = 0 ; i < data .length ; i ++) {
140
+ ChunkSegment datum = data [i ];
140
141
Crc32cLengthKnown crc32c = datum .getCrc32c ();
141
142
ByteString b = datum .getB ();
142
143
int contentSize = b .size ();
@@ -149,11 +150,14 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
149
150
if (crc32c != null ) {
150
151
checksummedData .setCrc32C (crc32c .getValue ());
151
152
}
152
- BidiWriteObjectRequest .Builder builder =
153
- writeCtx
154
- .newRequestBuilder ()
155
- .setWriteOffset (offset )
156
- .setChecksummedData (checksummedData .build ());
153
+ BidiWriteObjectRequest .Builder builder = writeCtx .newRequestBuilder ();
154
+ if (!first ) {
155
+ builder .clearUploadId ();
156
+ builder .clearObjectChecksums ();
157
+ } else {
158
+ first = false ;
159
+ }
160
+ builder .setWriteOffset (offset ).setChecksummedData (checksummedData .build ());
157
161
if (!datum .isOnlyFullBlocks ()) {
158
162
builder .setFinishWrite (true );
159
163
if (cumulative != null ) {
@@ -163,8 +167,11 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
163
167
finished = true ;
164
168
}
165
169
166
- BidiWriteObjectRequest build = possiblyPairDownBidiRequest (builder , first ).build ();
167
- first = false ;
170
+ if (i == data .length - 1 && !finished ) {
171
+ builder .setFlush (true ).setStateLookup (true );
172
+ }
173
+
174
+ BidiWriteObjectRequest build = builder .build ();
168
175
messages .add (build );
169
176
bytesConsumed += contentSize ;
170
177
}
@@ -224,11 +231,6 @@ private void flush(@NonNull List<BidiWriteObjectRequest> segments) {
224
231
for (BidiWriteObjectRequest message : segments ) {
225
232
opened .onNext (message );
226
233
}
227
- if (!finished ) {
228
- BidiWriteObjectRequest message =
229
- BidiWriteObjectRequest .newBuilder ().setFlush (true ).setStateLookup (true ).build ();
230
- opened .onNext (message );
231
- }
232
234
responseObserver .await ();
233
235
return null ;
234
236
} catch (Exception e ) {
@@ -240,26 +242,6 @@ private void flush(@NonNull List<BidiWriteObjectRequest> segments) {
240
242
Decoder .identity ());
241
243
}
242
244
243
- private static BidiWriteObjectRequest .Builder possiblyPairDownBidiRequest (
244
- BidiWriteObjectRequest .Builder b , boolean firstMessageOfStream ) {
245
- if (firstMessageOfStream && b .getWriteOffset () == 0 ) {
246
- return b ;
247
- }
248
-
249
- if (!firstMessageOfStream ) {
250
- b .clearUploadId ();
251
- }
252
-
253
- if (b .getWriteOffset () > 0 ) {
254
- b .clearWriteObjectSpec ();
255
- }
256
-
257
- if (b .getWriteOffset () > 0 && !b .getFinishWrite ()) {
258
- b .clearObjectChecksums ();
259
- }
260
- return b ;
261
- }
262
-
263
245
private class BidiObserver implements ApiStreamObserver <BidiWriteObjectResponse > {
264
246
265
247
private final Semaphore sem ;
0 commit comments