Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stub: Have ClientCalls.ThreadlessExecutor reject Runnables after end of RPC #7798

Merged
merged 2 commits into from
Feb 23, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -161,6 +162,7 @@ public static <ReqT, RespT> RespT blockingUnaryCall(
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
executor.shutdown();
if (interrupt) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -626,6 +628,9 @@ private Object waitForNext() {
// Now wait for onClose() to be called, so interceptors can clean up
}
}
if (next == this || next instanceof StatusRuntimeException) {
threadless.shutdown();
}
return next;
}
} finally {
Expand Down Expand Up @@ -712,7 +717,10 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runn
implements Executor {
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());

private volatile Thread waiter;
private static final Object SHUTDOWN = new Object(); // sentinel

// Set to the calling thread while it's parked, SHUTDOWN on RPC completion
private volatile Object waiter;

// Non private to avoid synthetic class
ThreadlessExecutor() {}
Expand All @@ -736,14 +744,29 @@ public void waitAndDrain() throws InterruptedException {
}
}
do {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
runQuietly(runnable);
} while ((runnable = poll()) != null);
}

/**
* Called after final call to {@link #waitAndDrain()}, from same thread.
*/
public void shutdown() {
waiter = SHUTDOWN;
Runnable runnable;
while ((runnable = poll()) != null) {
runQuietly(runnable);
}
}

private static void runQuietly(Runnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
}

private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
Expand All @@ -753,7 +776,12 @@ private static void throwIfInterrupted() throws InterruptedException {
@Override
public void execute(Runnable runnable) {
add(runnable);
LockSupport.unpark(waiter); // no-op if null
Object waiter = this.waiter;
if (waiter != SHUTDOWN) {
LockSupport.unpark((Thread) waiter); // no-op if null
} else if (remove(runnable)) {
throw new RejectedExecutionException();
}
}
}

Expand Down