From c889736d2c25b6b84ddb93984b5a4222499b9199 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Thu, 29 Aug 2024 13:11:23 -0400 Subject: [PATCH 1/8] There have been reports of batcher.close() hanging every once in awhile. Currently it is impossible to debug because we dont expose any internal state to analyze. This PR adds 2 additional methods that should help in diagnosing issues: 1. close(timeout) will try to close the batcher, but if any of the underlying batch operations fail, the exception message will contain a wealth of information describing the underlying state of operations as provided by #3140 2. cancelOutstanding this allows for remediation for close(timeout) throwing an exception. The intended usecase is dataflow connector's FinishBundle: try { batcher.close(Duration.ofMinutes(1)); } catch(BatchingException e) { batcher.cancelOutstanding(); batcher.close(Duration.ofMinutes(1)); } --- .../com/google/api/gax/batching/Batcher.java | 20 ++++- .../google/api/gax/batching/BatcherImpl.java | 85 ++++++++++++++++--- 2 files changed, 92 insertions(+), 13 deletions(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java index 1e069d53e0..82d33867c3 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java @@ -32,6 +32,8 @@ import com.google.api.core.ApiFuture; import com.google.api.core.InternalExtensionOnly; import com.google.api.gax.rpc.ApiCallContext; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; /** * Represents a batching context where individual elements will be accumulated and flushed in a @@ -78,12 +80,26 @@ public interface Batcher extends AutoCloseable { void sendOutstanding(); /** - * Closes this Batcher by preventing new elements from being added, and then flushing the existing - * elements. + * Cancels all outstanding batch RPCs. + */ + void cancelOutstanding(); + + /** + * Closes this Batcher by preventing new elements from being added, then flushing the existing + * elements and waiting for all the outstanding work to be resolved. */ @Override void close() throws InterruptedException; + /** + * Closes this Batcher by preventing new elements from being added, then flushing the existing + * elements and waiting for all the outstanding work to be resolved. If all of the outstanding + * work has not been resolved, then a {@link BatchingException} will be thrown with details of the + * remaining work. The batcher will remain in a closed state and will not allow additional + * elements to be added. + */ + void close(@Nullable Duration timeout) throws InterruptedException; + /** * Closes this Batcher by preventing new elements from being added, and then sending outstanding * elements. The returned future will be resolved when the last element completes diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 8cb437a5e2..7763cafa23 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -42,6 +42,7 @@ import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; @@ -51,16 +52,20 @@ import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.threeten.bp.Duration; /** * Queues up the elements until {@link #flush()} is called; once batching is over, returned future @@ -86,7 +91,7 @@ public class BatcherImpl private final BatcherReference currentBatcherReference; private Batch currentOpenBatch; - private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0); + private final ConcurrentMap, Boolean> outstandingBatches = new ConcurrentHashMap<>(); private final Object flushLock = new Object(); private final Object elementLock = new Object(); private final Future scheduledFuture; @@ -297,8 +302,10 @@ public void sendOutstanding() { } catch (Exception ex) { batchResponse = ApiFutures.immediateFailedFuture(ex); } + accumulatedBatch.setOperation(batchResponse); + + outstandingBatches.put(accumulatedBatch, Boolean.TRUE); - numOfOutstandingBatches.incrementAndGet(); ApiFutures.addCallback( batchResponse, new ApiFutureCallback() { @@ -310,7 +317,7 @@ public void onSuccess(ResponseT response) { accumulatedBatch.resource.getByteCount()); accumulatedBatch.onBatchSuccess(response); } finally { - onBatchCompletion(); + onBatchCompletion(accumulatedBatch); } } @@ -322,18 +329,19 @@ public void onFailure(Throwable throwable) { accumulatedBatch.resource.getByteCount()); accumulatedBatch.onBatchFailure(throwable); } finally { - onBatchCompletion(); + onBatchCompletion(accumulatedBatch); } } }, directExecutor()); } - private void onBatchCompletion() { + private void onBatchCompletion(Batch batch) { boolean shouldClose = false; synchronized (flushLock) { - if (numOfOutstandingBatches.decrementAndGet() == 0) { + outstandingBatches.remove(batch); + if (outstandingBatches.isEmpty()) { flushLock.notifyAll(); shouldClose = closeFuture != null; } @@ -349,10 +357,10 @@ private void onBatchCompletion() { } private void awaitAllOutstandingBatches() throws InterruptedException { - while (numOfOutstandingBatches.get() > 0) { + while (!outstandingBatches.isEmpty()) { synchronized (flushLock) { // Check again under lock to avoid racing with onBatchCompletion - if (numOfOutstandingBatches.get() == 0) { + if (outstandingBatches.isEmpty()) { break; } flushLock.wait(); @@ -360,11 +368,26 @@ private void awaitAllOutstandingBatches() throws InterruptedException { } } + @Override + public void cancelOutstanding() { + for (Batch batch : outstandingBatches.keySet()) { + batch.cancel(); + } + } /** {@inheritDoc} */ @Override public void close() throws InterruptedException { + close(null); + } + + @Override + public void close(@Nullable Duration timeout) throws InterruptedException { try { - closeAsync().get(); + if (timeout != null) { + closeAsync().get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } else { + closeAsync().get(); + } } catch (ExecutionException e) { // Original stacktrace of a batching exception is not useful, so rethrow the error with // the caller stacktrace @@ -374,6 +397,16 @@ public void close() throws InterruptedException { } else { throw new IllegalStateException("unexpected error closing the batcher", e.getCause()); } + } catch (TimeoutException e) { + StringJoiner batchesStr = new StringJoiner(","); + for (Batch batch : outstandingBatches.keySet()) { + batchesStr.add(batch.toString()); + } + String msg = "Timed out trying to close batcher after " + timeout + "."; + msg += " Batch request prototype: " + prototype + "."; + msg += " Outstanding batches: " + batchesStr; + + throw new BatchingException(msg); } } @@ -393,7 +426,7 @@ public ApiFuture closeAsync() { // prevent admission of new elements closeFuture = SettableApiFuture.create(); // check if we can close immediately - closeImmediately = numOfOutstandingBatches.get() == 0; + closeImmediately = outstandingBatches.isEmpty(); } // Clean up accounting @@ -435,6 +468,8 @@ private static class Batch { private long totalThrottledTimeMs = 0; private BatchResource resource; + private ApiFuture operation; + private Batch( RequestT prototype, BatchingDescriptor descriptor, @@ -457,6 +492,18 @@ void add( totalThrottledTimeMs += throttledTimeMs; } + void setOperation(@Nonnull ApiFuture operation) { + Preconditions.checkNotNull(operation); + this.operation = operation; + } + + void cancel() { + if (this.operation != null) { + this.operation.cancel(true); + } + } + + void onBatchSuccess(ResponseT response) { try { descriptor.splitResponse(response, entries); @@ -480,6 +527,22 @@ void onBatchFailure(Throwable throwable) { boolean isEmpty() { return resource.getElementCount() == 0; } + + @Override + public String toString() { + StringJoiner elementsStr = new StringJoiner(","); + for (BatchEntry entry : entries) { + elementsStr.add( + Optional.ofNullable(entry.getElement()) + .map(Object::toString) + .orElse("null") + ); + } + return MoreObjects.toStringHelper(this) + .add("operation", operation) + .add("elements", elementsStr) + .toString(); + } } /** From 323e5a16c84910035b3c18e1edd8a5c4d5c1562e Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Thu, 29 Aug 2024 14:47:52 -0400 Subject: [PATCH 2/8] format --- .../java/com/google/api/gax/batching/Batcher.java | 4 +--- .../com/google/api/gax/batching/BatcherImpl.java | 14 ++++++-------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java index 82d33867c3..3d8cad228b 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java @@ -79,9 +79,7 @@ public interface Batcher extends AutoCloseable { */ void sendOutstanding(); - /** - * Cancels all outstanding batch RPCs. - */ + /** Cancels all outstanding batch RPCs. */ void cancelOutstanding(); /** diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 7763cafa23..e30460a910 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -91,7 +91,8 @@ public class BatcherImpl private final BatcherReference currentBatcherReference; private Batch currentOpenBatch; - private final ConcurrentMap, Boolean> outstandingBatches = new ConcurrentHashMap<>(); + private final ConcurrentMap, Boolean> + outstandingBatches = new ConcurrentHashMap<>(); private final Object flushLock = new Object(); private final Object elementLock = new Object(); private final Future scheduledFuture; @@ -370,7 +371,7 @@ private void awaitAllOutstandingBatches() throws InterruptedException { @Override public void cancelOutstanding() { - for (Batch batch : outstandingBatches.keySet()) { + for (Batch batch : outstandingBatches.keySet()) { batch.cancel(); } } @@ -399,7 +400,8 @@ public void close(@Nullable Duration timeout) throws InterruptedException { } } catch (TimeoutException e) { StringJoiner batchesStr = new StringJoiner(","); - for (Batch batch : outstandingBatches.keySet()) { + for (Batch batch : + outstandingBatches.keySet()) { batchesStr.add(batch.toString()); } String msg = "Timed out trying to close batcher after " + timeout + "."; @@ -503,7 +505,6 @@ void cancel() { } } - void onBatchSuccess(ResponseT response) { try { descriptor.splitResponse(response, entries); @@ -533,10 +534,7 @@ public String toString() { StringJoiner elementsStr = new StringJoiner(","); for (BatchEntry entry : entries) { elementsStr.add( - Optional.ofNullable(entry.getElement()) - .map(Object::toString) - .orElse("null") - ); + Optional.ofNullable(entry.getElement()).map(Object::toString).orElse("null")); } return MoreObjects.toStringHelper(this) .add("operation", operation) From 17f4890217db8b06317ff95cacde6b24c73f47e6 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Thu, 29 Aug 2024 16:35:18 -0400 Subject: [PATCH 3/8] add tests --- .../api/gax/batching/BatcherImplTest.java | 127 ++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 3ebcc2c5d0..e12d09dcef 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -40,6 +40,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.api.core.AbstractApiFuture; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; @@ -60,6 +61,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -82,6 +84,7 @@ import org.junit.jupiter.api.Timeout; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.threeten.bp.Duration; class BatcherImplTest { @@ -249,6 +252,107 @@ public ApiFuture> futureCall( closeFuture.get(); } + @Test + void testCloseTimeout() throws ExecutionException, InterruptedException { + final String futureToStringMsg = "some descriptive message about this future"; + MySettableApiFuture> innerFuture = new MySettableApiFuture<>(futureToStringMsg); + + UnaryCallable> unaryCallable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { + return innerFuture; + } + }; + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); + + underTest.add(1); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + BatchingException closeException = + assertThrows(BatchingException.class, () -> underTest.close(Duration.ofMillis(10))); + + // resolve the future to allow batcher to close + innerFuture.set(ImmutableList.of(1)); + + assertThat(stopwatch.elapsed()).isAtMost(java.time.Duration.ofSeconds(1)); + System.out.println(); + assertThat(closeException) + .hasMessageThat() + .matches(".*Outstanding batches.*" + futureToStringMsg + ".*elements=1.*"); + } + + @Test + void testCloseTimeoutPreventsAdd() throws ExecutionException, InterruptedException { + final String futureToStringMsg = "some descriptive message about this future"; + MySettableApiFuture> innerFuture = new MySettableApiFuture<>(futureToStringMsg); + + UnaryCallable> unaryCallable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { + return innerFuture; + } + }; + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); + + underTest.add(1); + + try { + underTest.close(Duration.ofMillis(10)); + } catch (BatchingException ignored) { + // ignored + } + + // Even though the close operation timed out, the batcher should be in a closed state + // and reject new additions + assertThrows(IllegalStateException.class, () -> underTest.add(2)); + + // resolve the future to allow batcher to close + innerFuture.set(ImmutableList.of(1)); + } + + @Test + void testCancelOutstanding() throws ExecutionException, InterruptedException { + SettableApiFuture> innerFuture = SettableApiFuture.create(); + + UnaryCallable> unaryCallable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { + return innerFuture; + } + }; + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); + + ApiFuture elementF = underTest.add(1); + + // Initial close will timeout + BatchingException firstCloseException = + assertThrows(BatchingException.class, () -> underTest.close(Duration.ofMillis(10))); + assertThat(firstCloseException).hasMessageThat().contains("Timed out"); + + underTest.cancelOutstanding(); + + BatchingException finalCloseException = + assertThrows(BatchingException.class, () -> underTest.close(Duration.ofSeconds(1))); + assertThat(finalCloseException).hasMessageThat().contains("Batching finished"); + + // element future should resolve to a cancelled future + ExecutionException elementException = assertThrows(ExecutionException.class, elementF::get); + assertThat(elementException).hasCauseThat().isInstanceOf(CancellationException.class); + } + /** Verifies exception occurred at RPC is propagated to element results */ @Test void testResultFailureAfterRPCFailure() throws Exception { @@ -1102,4 +1206,27 @@ private BatcherImpl> createDefau EXECUTOR, flowController); } + + private static class MySettableApiFuture extends AbstractApiFuture { + private final String desc; + + MySettableApiFuture(String desc) { + this.desc = desc; + } + + @Override + public boolean set(T value) { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) { + return super.setException(throwable); + } + + @Override + public String toString() { + return desc; + } + } } From 4b35a0ae9352d40723b35e05965e55e3d57bbd18 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Fri, 30 Aug 2024 11:48:52 -0400 Subject: [PATCH 4/8] ignore clirr on INternalExtensionOnly interface --- gax-java/gax/clirr-ignored-differences.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/gax-java/gax/clirr-ignored-differences.xml b/gax-java/gax/clirr-ignored-differences.xml index cab9fe4f8a..fc9a37be37 100644 --- a/gax-java/gax/clirr-ignored-differences.xml +++ b/gax-java/gax/clirr-ignored-differences.xml @@ -105,4 +105,10 @@ com/google/api/gax/tracing/MetricsTracer * + + + 7012 + com.google.api.gax.batching.Batcher + * + From 77603757bd68a167a00dfd06ad7e2ed722434a3f Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Sat, 31 Aug 2024 12:43:09 -0400 Subject: [PATCH 5/8] Update gax-java/gax/clirr-ignored-differences.xml Co-authored-by: Blake Li --- gax-java/gax/clirr-ignored-differences.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gax-java/gax/clirr-ignored-differences.xml b/gax-java/gax/clirr-ignored-differences.xml index fc9a37be37..cbaed47eb5 100644 --- a/gax-java/gax/clirr-ignored-differences.xml +++ b/gax-java/gax/clirr-ignored-differences.xml @@ -108,7 +108,7 @@ 7012 - com.google.api.gax.batching.Batcher + com/google/api/gax/batching/Batcher * From 9be3743dce707e4d679c3160381a270c7e1fbb5b Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Mon, 9 Sep 2024 12:56:46 -0400 Subject: [PATCH 6/8] address feedback --- .../com/google/api/gax/batching/Batcher.java | 5 +++-- .../google/api/gax/batching/BatcherImpl.java | 22 ++++++++++++------- .../api/gax/batching/BatcherImplTest.java | 12 +++++----- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java index 3d8cad228b..6f9f878905 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/Batcher.java @@ -32,8 +32,9 @@ import com.google.api.core.ApiFuture; import com.google.api.core.InternalExtensionOnly; import com.google.api.gax.rpc.ApiCallContext; +import java.time.Duration; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; -import org.threeten.bp.Duration; /** * Represents a batching context where individual elements will be accumulated and flushed in a @@ -96,7 +97,7 @@ public interface Batcher extends AutoCloseable { * remaining work. The batcher will remain in a closed state and will not allow additional * elements to be added. */ - void close(@Nullable Duration timeout) throws InterruptedException; + void close(@Nullable Duration timeout) throws InterruptedException, TimeoutException; /** * Closes this Batcher by preventing new elements from being added, and then sending outstanding diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index e30460a910..9c782b03da 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -50,6 +50,7 @@ import java.lang.ref.ReferenceQueue; import java.lang.ref.SoftReference; import java.lang.ref.WeakReference; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -65,7 +66,6 @@ import java.util.logging.Logger; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.threeten.bp.Duration; /** * Queues up the elements until {@link #flush()} is called; once batching is over, returned future @@ -303,7 +303,7 @@ public void sendOutstanding() { } catch (Exception ex) { batchResponse = ApiFutures.immediateFailedFuture(ex); } - accumulatedBatch.setOperation(batchResponse); + accumulatedBatch.setResponseFuture(batchResponse); outstandingBatches.put(accumulatedBatch, Boolean.TRUE); @@ -378,11 +378,17 @@ public void cancelOutstanding() { /** {@inheritDoc} */ @Override public void close() throws InterruptedException { - close(null); + try { + close(null); + } catch (TimeoutException e) { + // should never happen with a null timeout + throw new IllegalStateException( + "Unexpected timeout exception when trying to close the batcher without a timeout", e); + } } @Override - public void close(@Nullable Duration timeout) throws InterruptedException { + public void close(@Nullable Duration timeout) throws InterruptedException, TimeoutException { try { if (timeout != null) { closeAsync().get(timeout.toMillis(), TimeUnit.MILLISECONDS); @@ -408,7 +414,7 @@ public void close(@Nullable Duration timeout) throws InterruptedException { msg += " Batch request prototype: " + prototype + "."; msg += " Outstanding batches: " + batchesStr; - throw new BatchingException(msg); + throw new TimeoutException(msg); } } @@ -494,9 +500,9 @@ void add( totalThrottledTimeMs += throttledTimeMs; } - void setOperation(@Nonnull ApiFuture operation) { - Preconditions.checkNotNull(operation); - this.operation = operation; + void setResponseFuture(@Nonnull ApiFuture responseFuture) { + Preconditions.checkNotNull(responseFuture); + this.operation = responseFuture; } void cancel() { diff --git a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index e12d09dcef..7f8957a4b2 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -56,6 +56,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -84,7 +85,6 @@ import org.junit.jupiter.api.Timeout; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import org.threeten.bp.Duration; class BatcherImplTest { @@ -273,8 +273,8 @@ public ApiFuture> futureCall( Stopwatch stopwatch = Stopwatch.createStarted(); - BatchingException closeException = - assertThrows(BatchingException.class, () -> underTest.close(Duration.ofMillis(10))); + TimeoutException closeException = + assertThrows(TimeoutException.class, () -> underTest.close(Duration.ofMillis(10))); // resolve the future to allow batcher to close innerFuture.set(ImmutableList.of(1)); @@ -307,7 +307,7 @@ public ApiFuture> futureCall( try { underTest.close(Duration.ofMillis(10)); - } catch (BatchingException ignored) { + } catch (TimeoutException ignored) { // ignored } @@ -338,8 +338,8 @@ public ApiFuture> futureCall( ApiFuture elementF = underTest.add(1); // Initial close will timeout - BatchingException firstCloseException = - assertThrows(BatchingException.class, () -> underTest.close(Duration.ofMillis(10))); + TimeoutException firstCloseException = + assertThrows(TimeoutException.class, () -> underTest.close(Duration.ofMillis(10))); assertThat(firstCloseException).hasMessageThat().contains("Timed out"); underTest.cancelOutstanding(); From f2fdac1ca5fd97e5815582ff7f1751be1932b6c0 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Mon, 9 Sep 2024 13:05:37 -0400 Subject: [PATCH 7/8] missing rename --- .../java/com/google/api/gax/batching/BatcherImpl.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 9c782b03da..d404bd4367 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -476,7 +476,7 @@ private static class Batch { private long totalThrottledTimeMs = 0; private BatchResource resource; - private ApiFuture operation; + private ApiFuture responseFuture; private Batch( RequestT prototype, @@ -502,12 +502,12 @@ void add( void setResponseFuture(@Nonnull ApiFuture responseFuture) { Preconditions.checkNotNull(responseFuture); - this.operation = responseFuture; + this.responseFuture = responseFuture; } void cancel() { - if (this.operation != null) { - this.operation.cancel(true); + if (this.responseFuture != null) { + this.responseFuture.cancel(true); } } @@ -543,7 +543,7 @@ public String toString() { Optional.ofNullable(entry.getElement()).map(Object::toString).orElse("null")); } return MoreObjects.toStringHelper(this) - .add("operation", operation) + .add("responseFuture", responseFuture) .add("elements", elementsStr) .toString(); } From b20824fbd9c000e92ea6db45e94dc9eb38a6f946 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Mon, 9 Sep 2024 13:32:22 -0400 Subject: [PATCH 8/8] make responseFuture volatile because it can be written by the autoflush thread and read by the user thread during cancel() --- .../src/main/java/com/google/api/gax/batching/BatcherImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index d404bd4367..51549f70b3 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -476,7 +476,7 @@ private static class Batch { private long totalThrottledTimeMs = 0; private BatchResource resource; - private ApiFuture responseFuture; + private volatile ApiFuture responseFuture; private Batch( RequestT prototype,