Skip to content

Commit

Permalink
Stop remote blob upload if upload is complete. (#14467)
Browse files Browse the repository at this point in the history
If a ByteStream/Write RPC fails, but ByteStream/QueryWriteStatus reveals the upload is in fact complete, avoid a NullPointerException. This CL is the dual fix of 78b89a0 for uploads.

On bazel-6.0.0-pre.20211117.1, I observed:
```
java.lang.NullPointerException
        at com.google.devtools.build.lib.remote.Chunker.seek(Chunker.java:156)
        at com.google.devtools.build.lib.remote.ByteStreamUploader$AsyncUpload.lambda$start$0(ByteStreamUploader.java:416)
        at com.google.devtools.build.lib.remote.Retrier.executeAsync(Retrier.java:277)
        at com.google.devtools.build.lib.remote.Retrier.lambda$onExecuteAsyncFailure$1(Retrier.java:293)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:160)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:143)
        at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
        at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
        at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
```

Closes #14464.

PiperOrigin-RevId: 417795715
  • Loading branch information
benjaminp authored Dec 22, 2021
1 parent 7deb940 commit 90965b0
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public ListenableFuture<Void> uploadBlobAsync(
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");

if (!forceUpload && uploadedBlobs.contains(HashCode.fromString(digest.getHash()))) {
return Futures.immediateFuture(null);
return immediateVoidFuture();
}

ListenableFuture<Void> inProgress = uploadsInProgress.get(digest);
Expand Down Expand Up @@ -410,7 +410,7 @@ ListenableFuture<Void> start() {
() ->
retrier.executeAsync(
() -> {
if (chunker.getSize() == 0) {
if (chunker.getSize() == committedOffset.get()) {
return immediateVoidFuture();
}
try {
Expand All @@ -426,7 +426,7 @@ ListenableFuture<Void> start() {
if (chunker.hasNext()) {
return callAndQueryOnFailure(committedOffset, progressiveBackoff);
}
return Futures.immediateFuture(null);
return immediateVoidFuture();
},
progressiveBackoff),
callCredentialsProvider);
Expand All @@ -448,7 +448,7 @@ ListenableFuture<Void> start() {
return Futures.immediateFailedFuture(new IOException(message));
}
}
return Futures.immediateFuture(null);
return immediateVoidFuture();
},
MoreExecutors.directExecutor());
}
Expand Down Expand Up @@ -536,7 +536,7 @@ private ListenableFuture<Void> query(
progressiveBackoff.reset();
}
committedOffset.set(committedSize);
return Futures.immediateFuture(null);
return immediateVoidFuture();
},
MoreExecutors.directExecutor());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,68 @@ public void onCompleted() {
blockUntilInternalStateConsistent(uploader);
}

@Test
public void failureAfterUploadCompletes() throws Exception {
AtomicInteger numUploads = new AtomicInteger();
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(
() -> mockBackoff, e -> e instanceof StatusRuntimeException, retryService);
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
new ReferenceCountedChannel(channelConnectionFactory),
CallCredentialsProvider.NO_CREDENTIALS,
/* callTimeoutSecs= */ 60,
retrier);

byte[] blob = new byte[CHUNK_SIZE - 1];
new Random().nextBytes(blob);

serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
numUploads.incrementAndGet();
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest writeRequest) {}

@Override
public void onError(Throwable throwable) {
fail("onError should never be called.");
}

@Override
public void onCompleted() {
streamObserver.onNext(
WriteResponse.newBuilder().setCommittedSize(blob.length).build());
streamObserver.onError(Status.UNAVAILABLE.asException());
}
};
}

@Override
public void queryWriteStatus(
QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
response.onNext(
QueryWriteStatusResponse.newBuilder()
.setCommittedSize(blob.length)
.setComplete(true)
.build());
response.onCompleted();
}
});

Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());

uploader.uploadBlob(context, hash, chunker, true);

blockUntilInternalStateConsistent(uploader);

assertThat(numUploads.get()).isEqualTo(1);
}

@Test
public void testCompressedUploads() throws Exception {
RemoteRetrier retrier =
Expand Down

0 comments on commit 90965b0

Please sign in to comment.