Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Batcher#close(timeout) and Batcher#cancelOutstanding #3141

Merged
merged 13 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions gax-java/gax/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,10 @@
<className>com/google/api/gax/tracing/MetricsTracer</className>
<field>*</field>
</difference>
<!-- Ignore method additions to an InternalExtensionOnly interface-->
<difference>
<differenceType>7012</differenceType>
<className>com/google/api/gax/batching/Batcher</className>
<method>*</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.rpc.ApiCallContext;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Represents a batching context where individual elements will be accumulated and flushed in a
Expand Down Expand Up @@ -77,13 +79,25 @@ public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
*/
void sendOutstanding();

/** Cancels all outstanding batch RPCs. */
void cancelOutstanding();

/**
* Closes this Batcher by preventing new elements from being added, and then flushing the existing
* elements.
* Closes this Batcher by preventing new elements from being added, then flushing the existing
* elements and waiting for all the outstanding work to be resolved.
*/
@Override
void close() throws InterruptedException;

/**
* Closes this Batcher by preventing new elements from being added, then flushing the existing
* elements and waiting for all the outstanding work to be resolved. If all of the outstanding
* work has not been resolved, then a {@link BatchingException} will be thrown with details of the
* remaining work. The batcher will remain in a closed state and will not allow additional
* elements to be added.
*/
void close(@Nullable Duration timeout) throws InterruptedException;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Closes this Batcher by preventing new elements from being added, and then sending outstanding
* elements. The returned future will be resolved when the last element completes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Futures;
Expand All @@ -51,16 +52,20 @@
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
Expand All @@ -86,7 +91,8 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final BatcherReference currentBatcherReference;

private Batch<ElementT, ElementResultT, RequestT, ResponseT> currentOpenBatch;
private final AtomicInteger numOfOutstandingBatches = new AtomicInteger(0);
private final ConcurrentMap<Batch<ElementT, ElementResultT, RequestT, ResponseT>, Boolean>
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
outstandingBatches = new ConcurrentHashMap<>();
private final Object flushLock = new Object();
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
Expand Down Expand Up @@ -297,8 +303,10 @@ public void sendOutstanding() {
} catch (Exception ex) {
batchResponse = ApiFutures.immediateFailedFuture(ex);
}
accumulatedBatch.setOperation(batchResponse);

outstandingBatches.put(accumulatedBatch, Boolean.TRUE);

numOfOutstandingBatches.incrementAndGet();
ApiFutures.addCallback(
batchResponse,
new ApiFutureCallback<ResponseT>() {
Expand All @@ -310,7 +318,7 @@ public void onSuccess(ResponseT response) {
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchSuccess(response);
} finally {
onBatchCompletion();
onBatchCompletion(accumulatedBatch);
}
}

Expand All @@ -322,18 +330,19 @@ public void onFailure(Throwable throwable) {
accumulatedBatch.resource.getByteCount());
accumulatedBatch.onBatchFailure(throwable);
} finally {
onBatchCompletion();
onBatchCompletion(accumulatedBatch);
}
}
},
directExecutor());
}

private void onBatchCompletion() {
private void onBatchCompletion(Batch<ElementT, ElementResultT, RequestT, ResponseT> batch) {
boolean shouldClose = false;

synchronized (flushLock) {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
outstandingBatches.remove(batch);
if (outstandingBatches.isEmpty()) {
flushLock.notifyAll();
shouldClose = closeFuture != null;
}
Expand All @@ -349,22 +358,37 @@ private void onBatchCompletion() {
}

private void awaitAllOutstandingBatches() throws InterruptedException {
while (numOfOutstandingBatches.get() > 0) {
while (!outstandingBatches.isEmpty()) {
synchronized (flushLock) {
// Check again under lock to avoid racing with onBatchCompletion
if (numOfOutstandingBatches.get() == 0) {
if (outstandingBatches.isEmpty()) {
break;
}
flushLock.wait();
}
}
}

@Override
public void cancelOutstanding() {
for (Batch<?, ?, ?, ?> batch : outstandingBatches.keySet()) {
batch.cancel();
}
}
/** {@inheritDoc} */
@Override
public void close() throws InterruptedException {
close(null);
}

@Override
public void close(@Nullable Duration timeout) throws InterruptedException {
try {
closeAsync().get();
if (timeout != null) {
closeAsync().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} else {
closeAsync().get();
}
} catch (ExecutionException e) {
// Original stacktrace of a batching exception is not useful, so rethrow the error with
// the caller stacktrace
Expand All @@ -374,6 +398,17 @@ public void close() throws InterruptedException {
} else {
throw new IllegalStateException("unexpected error closing the batcher", e.getCause());
}
} catch (TimeoutException e) {
StringJoiner batchesStr = new StringJoiner(",");
for (Batch<ElementT, ElementResultT, RequestT, ResponseT> batch :
outstandingBatches.keySet()) {
batchesStr.add(batch.toString());
}
String msg = "Timed out trying to close batcher after " + timeout + ".";
msg += " Batch request prototype: " + prototype + ".";
msg += " Outstanding batches: " + batchesStr;

throw new BatchingException(msg);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -393,7 +428,7 @@ public ApiFuture<Void> closeAsync() {
// prevent admission of new elements
closeFuture = SettableApiFuture.create();
// check if we can close immediately
closeImmediately = numOfOutstandingBatches.get() == 0;
closeImmediately = outstandingBatches.isEmpty();
}

// Clean up accounting
Expand Down Expand Up @@ -435,6 +470,8 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private long totalThrottledTimeMs = 0;
private BatchResource resource;

private ApiFuture<ResponseT> operation;

private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
Expand All @@ -457,6 +494,17 @@ void add(
totalThrottledTimeMs += throttledTimeMs;
}

void setOperation(@Nonnull ApiFuture<ResponseT> operation) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkNotNull(operation);
this.operation = operation;
}

void cancel() {
if (this.operation != null) {
this.operation.cancel(true);
}
}

void onBatchSuccess(ResponseT response) {
try {
descriptor.splitResponse(response, entries);
Expand All @@ -480,6 +528,19 @@ void onBatchFailure(Throwable throwable) {
boolean isEmpty() {
return resource.getElementCount() == 0;
}

@Override
public String toString() {
StringJoiner elementsStr = new StringJoiner(",");
for (BatchEntry<ElementT, ElementResultT> entry : entries) {
elementsStr.add(
Optional.ofNullable(entry.getElement()).map(Object::toString).orElse("null"));
}
return MoreObjects.toStringHelper(this)
.add("operation", operation)
.add("elements", elementsStr)
.toString();
}
}

/**
Expand Down
Loading
Loading