From 90caf3941d71fe1b77725a41262feb0e149e05f5 Mon Sep 17 00:00:00 2001 From: Larry Safran Date: Mon, 16 Sep 2024 15:03:41 -0700 Subject: [PATCH] Fixed hang from not getting interrupted. Added some 'final' to class definitions. --- stub/src/main/java/io/grpc/stub/ClientCalls.java | 16 ++++++++++++---- .../test/java/io/grpc/stub/ClientCallsTest.java | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 79fab9425bb..2bc5e1f785f 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -235,7 +235,7 @@ public static Iterator blockingServerStreamingCall( * @throws StatusException if the write to the server failed */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10918") - public static BlockingClientCall blockingV2ServerStreamingCall( + public static BlockingClientCall blockingV2ServerStreamingCall( Channel channel, MethodDescriptor method, CallOptions callOptions, ReqT req) throws InterruptedException, StatusException { BlockingClientCall call = @@ -436,7 +436,7 @@ private abstract static class StartableListener extends ClientCall.Listener + private static final class CallToStreamObserverAdapter extends ClientCallStreamObserver { private boolean frozen; private final ClientCall call; @@ -787,7 +787,7 @@ void onStart() { } @SuppressWarnings("serial") - static final class ThreadlessExecutor extends ConcurrentLinkedQueue + private static final class ThreadlessExecutor extends ConcurrentLinkedQueue implements Executor { private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName()); @@ -804,12 +804,14 @@ static final class ThreadlessExecutor extends ConcurrentLinkedQueue * Must only be called by one thread at a time. */ public void waitAndDrain() throws InterruptedException { + throwIfInterrupted(); Runnable runnable = poll(); if (runnable == null) { waiter = Thread.currentThread(); try { while ((runnable = poll()) == null) { LockSupport.park(this); + throwIfInterrupted(); } } finally { waiter = null; @@ -820,6 +822,12 @@ public void waitAndDrain() throws InterruptedException { } while ((runnable = poll()) != null); } + private static void throwIfInterrupted() throws InterruptedException { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + } + /** * Called after final call to {@link #waitAndDrain()}, from same thread. */ @@ -857,7 +865,7 @@ static final class ThreadSafeThreadlessExecutor extends ConcurrentLinkedQueue ClientCall interceptCall( } @Override public void halfClose() { - Thread.currentThread().interrupt(); super.halfClose(); + Thread.currentThread().interrupt(); } }; }