diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 2118ae24b5..e46f61babd 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -276,8 +276,13 @@ public void sendOutstanding() { callContextWithOption = callContext.withOption(THROTTLED_TIME_KEY, accumulatedBatch.totalThrottledTimeMs); } - final ApiFuture batchResponse = - unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption); + ApiFuture batchResponse; + try { + batchResponse = + unaryCallable.futureCall(accumulatedBatch.builder.build(), callContextWithOption); + } catch (Exception ex) { + batchResponse = ApiFutures.immediateFailedFuture(ex); + } numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index b503e28da8..1f009c2276 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -46,6 +46,7 @@ import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2; +import com.google.api.gax.rpc.testing.FakeCallContext; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; @@ -931,6 +932,82 @@ public void testThrottlingNonBlocking() throws Exception { } } + /** + * If the batcher's unary callable throws an exception when obtaining a response, then the + * response .get() should throw the exception + */ + @Test + public void testAddDoesNotHangIfExceptionThrowStartingACall() { + BatchingDescriptor batchingDescriptor = + new BatchingDescriptor() { + @Override + public BatchingRequestBuilder newRequestBuilder(Object o) { + return new BatchingRequestBuilder() { + @Override + public void add(Object o) {} + + @Override + public Object build() { + return new Object(); + } + }; + } + + @Override + public void splitResponse(Object o, List> list) { + for (BatchEntry e : list) { + e.getResultFuture().set(new Object()); + } + } + + @Override + public void splitException(Throwable throwable, List> list) { + for (BatchEntry e : list) { + e.getResultFuture().setException(new RuntimeException("fake")); + } + } + + @Override + public long countBytes(Object o) { + return 1; + } + }; + + UnaryCallable unaryCallable = + new UnaryCallable() { + @Override + public ApiFuture futureCall(Object o, ApiCallContext apiCallContext) { + throw new RuntimeException("this should bubble up"); + } + }; + Object prototype = new Object(); + BatchingSettings batchingSettings = + BatchingSettings.newBuilder() + .setDelayThreshold(Duration.ofSeconds(1)) + .setElementCountThreshold(100L) + .setRequestByteThreshold(100L) + .setFlowControlSettings(FlowControlSettings.getDefaultInstance()) + .build(); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); + FlowController flowController = new FlowController(batchingSettings.getFlowControlSettings()); + ApiCallContext callContext = FakeCallContext.createDefault(); + + BatcherImpl batcher = + new BatcherImpl<>( + batchingDescriptor, + unaryCallable, + prototype, + batchingSettings, + executor, + flowController, + callContext); + + ApiFuture f = batcher.add(new Object()); + Assert.assertThrows(ExecutionException.class, f::get); + // bubbles up + Assert.assertThrows(RuntimeException.class, batcher::close); + } + private void testElementTriggers(BatchingSettings settings) throws Exception { underTest = createDefaultBatcherImpl(settings, null); Future result = underTest.add(4);