Skip to content

Commit

Permalink
Remove ThreadlessExecutor from BlockingServerStream (#10496)
Browse files Browse the repository at this point in the history
* Remove ThreadlessExecutor from BlockingServerStream

fixes #10490
  • Loading branch information
larry-safran authored Aug 18, 2023
1 parent eb18cba commit 55c5040
Showing 1 changed file with 14 additions and 49 deletions.
63 changes: 14 additions & 49 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,7 @@ public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall(
public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
try {
return getUnchecked(futureUnaryCall(call, req));
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
} catch (RuntimeException | Error e) {
throw cancelThrow(call, e);
}
}
Expand Down Expand Up @@ -167,10 +165,7 @@ public static <ReqT, RespT> RespT blockingUnaryCall(
}
executor.shutdown();
return getUnchecked(responseFuture);
} catch (RuntimeException e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} catch (Error e) {
} catch (RuntimeException | Error e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
Expand Down Expand Up @@ -206,14 +201,12 @@ public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
*
* @return an iterator over the response stream.
*/
// TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method,
callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING));

BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}
Expand Down Expand Up @@ -288,8 +281,7 @@ private static StatusRuntimeException toStatusRuntimeException(Throwable t) {
private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
try {
call.cancel(null, t);
} catch (Throwable e) {
assert e instanceof RuntimeException || e instanceof Error;
} catch (RuntimeException | Error e) {
logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
}
if (t instanceof RuntimeException) {
Expand Down Expand Up @@ -320,9 +312,7 @@ private static <ReqT, RespT> void asyncUnaryRequestCall(
try {
call.sendMessage(req);
call.halfClose();
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
} catch (RuntimeException | Error e) {
throw cancelThrow(call, e);
}
}
Expand Down Expand Up @@ -597,20 +587,12 @@ private static final class BlockingResponseStream<T> implements Iterator<T> {
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
private final StartableListener<T> listener = new QueuingListener();
private final ClientCall<?, T> call;
/** May be null. */
private final ThreadlessExecutor threadless;
// Only accessed when iterating.
private Object last;

// Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call) {
this(call, null);
}

// Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) {
this.call = call;
this.threadless = threadless;
}

StartableListener<T> listener() {
Expand All @@ -620,31 +602,14 @@ StartableListener<T> listener() {
private Object waitForNext() {
boolean interrupt = false;
try {
if (threadless == null) {
while (true) {
try {
return buffer.take();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
}
} else {
Object next;
while ((next = buffer.poll()) == null) {
try {
threadless.waitAndDrain();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
if (next == this || next instanceof StatusRuntimeException) {
threadless.shutdown();
while (true) {
try {
return buffer.take();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
return next;
}
} finally {
if (interrupt) {
Expand Down

0 comments on commit 55c5040

Please sign in to comment.