From d8dd24444d9904c7e4b108567dfd885b5feda8c7 Mon Sep 17 00:00:00 2001 From: nickhill Date: Tue, 9 Jun 2020 08:41:29 -0700 Subject: [PATCH 1/2] stub: Have ClientCalls.ThreadlessExecutor reject Runnables after end of RPC Changes originally proposed as part of #7106. Fixes #3557 --- .../main/java/io/grpc/stub/ClientCalls.java | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 0266cb7d9af..8cee6019e81 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.locks.LockSupport; import java.util.logging.Level; import java.util.logging.Logger; @@ -161,6 +162,7 @@ public static RespT blockingUnaryCall( // Something very bad happened. All bets are off; it may be dangerous to wait for onClose(). throw cancelThrow(call, e); } finally { + executor.shutdown(); if (interrupt) { Thread.currentThread().interrupt(); } @@ -626,6 +628,9 @@ private Object waitForNext() { // Now wait for onClose() to be called, so interceptors can clean up } } + if (next == this) { + threadless.shutdown(); + } return next; } } finally { @@ -712,6 +717,8 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue Date: Mon, 11 Jan 2021 17:19:53 -0800 Subject: [PATCH 2/2] Address @ejona86's comments --- stub/src/main/java/io/grpc/stub/ClientCalls.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 8cee6019e81..e8e29eddcc6 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -628,7 +628,7 @@ private Object waitForNext() { // Now wait for onClose() to be called, so interceptors can clean up } } - if (next == this) { + if (next == this || next instanceof StatusRuntimeException) { threadless.shutdown(); } return next; @@ -717,9 +717,10 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue