From 2ea1326ea496087c35c7009021fb1b2097c05224 Mon Sep 17 00:00:00 2001 From: Benjamin Peterson Date: Tue, 21 Dec 2021 12:23:18 -0600 Subject: [PATCH] Stop remote blob upload if upload is complete. If a ByteStream/Write RPC fails, but ByteStream/QueryWriteStatus reveals the upload is in fact complete, avoid a NullPointerException. This CL is the dual fix of 78b89a0136a83d303d4d88373d6e510f85a81fbb for uploads. On bazel-6.0.0-pre.20211117.1, I observed: ``` java.lang.NullPointerException at com.google.devtools.build.lib.remote.Chunker.seek(Chunker.java:156) at com.google.devtools.build.lib.remote.ByteStreamUploader$AsyncUpload.lambda$start$0(ByteStreamUploader.java:416) at com.google.devtools.build.lib.remote.Retrier.executeAsync(Retrier.java:277) at com.google.devtools.build.lib.remote.Retrier.lambda$onExecuteAsyncFailure$1(Retrier.java:293) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:160) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:143) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ``` --- .../build/lib/remote/ByteStreamUploader.java | 6 +- .../lib/remote/ByteStreamUploaderTest.java | 63 +++++++++++++++++++ 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index e409fdfdcf648c..b8002a3ab74af4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -424,7 +424,7 @@ ListenableFuture start() { () -> retrier.executeAsync( () -> { - if (chunker.getSize() == 0) { + if (chunker.getSize() == committedOffset.get()) { return immediateVoidFuture(); } try { @@ -452,7 +452,7 @@ ListenableFuture start() { if (chunker.hasNext()) { return callAndQueryOnFailure(committedOffset, progressiveBackoff); } - return Futures.immediateFuture(null); + return immediateVoidFuture(); }, progressiveBackoff), callCredentialsProvider); @@ -564,7 +564,7 @@ private ListenableFuture query( progressiveBackoff.reset(); } committedOffset.set(committedSize); - return Futures.immediateFuture(null); + return immediateVoidFuture(); }, MoreExecutors.directExecutor()); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 4865af29d735d9..2623921f4a9e15 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -1605,6 +1605,69 @@ public void onCompleted() { blockUntilInternalStateConsistent(uploader); } + @Test + public void failureAfterUploadCompletes() throws Exception { + AtomicInteger numUploads = new AtomicInteger(); + RemoteRetrier retrier = + TestUtils.newRemoteRetrier( + () -> mockBackoff, e -> e instanceof StatusRuntimeException, retryService); + ByteStreamUploader uploader = + new ByteStreamUploader( + INSTANCE_NAME, + new ReferenceCountedChannel(channelConnectionFactory), + CallCredentialsProvider.NO_CREDENTIALS, + /* callTimeoutSecs= */ 60, + retrier, + -1); + + byte[] blob = new byte[CHUNK_SIZE - 1]; + new Random().nextBytes(blob); + + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver streamObserver) { + numUploads.incrementAndGet(); + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) {} + + @Override + public void onError(Throwable throwable) { + fail("onError should never be called."); + } + + @Override + public void onCompleted() { + streamObserver.onNext( + WriteResponse.newBuilder().setCommittedSize(blob.length).build()); + streamObserver.onError(Status.UNAVAILABLE.asException()); + } + }; + } + + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, StreamObserver response) { + response.onNext( + QueryWriteStatusResponse.newBuilder() + .setCommittedSize(blob.length) + .setComplete(true) + .build()); + response.onCompleted(); + } + }); + + Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); + HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); + + uploader.uploadBlob(context, hash, chunker, true); + + blockUntilInternalStateConsistent(uploader); + + assertThat(numUploads.get()).isEqualTo(1); + } + @Test public void testCompressedUploads() throws Exception { RemoteRetrier retrier =