Skip to content

Commit

Permalink
core: Delay client listener exception notification until normal close
Browse files Browse the repository at this point in the history
This should avoid messages being leaked when a Listener throws an exception and
the executor is shut down immediately after the call completes. This is related
to grpc#7105 but a different scenario and we aren't aware of any user having
observed the previous behavior.

Note also this does _not_ fix the similar case of reordering caused by
delayedCancelOnDeadlineExceeded().
  • Loading branch information
ejona86 authored and dfawley committed Jan 15, 2021
1 parent c09c3a1 commit 43b4f7f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 33 deletions.
75 changes: 42 additions & 33 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -584,12 +584,24 @@ public String toString() {

private class ClientStreamListenerImpl implements ClientStreamListener {
private final Listener<RespT> observer;
private boolean closed;
private Status exceptionStatus;

public ClientStreamListenerImpl(Listener<RespT> observer) {
this.observer = checkNotNull(observer, "observer");
}

/**
* Cancels call and schedules onClose() notification. May only be called from the application
* thread.
*/
private void exceptionThrown(Status status) {
// Since each RPC can have its own executor, we can only call onClose() when we are sure there
// will be no further callbacks. We set the status here and overwrite the onClose() details
// when it arrives.
exceptionStatus = status;
stream.cancel(status);
}

@Override
public void headersRead(final Metadata headers) {
PerfMark.startTask("ClientStreamListener.headersRead", tag);
Expand All @@ -612,16 +624,14 @@ public void runInContext() {
}

private void runInternal() {
if (closed) {
if (exceptionStatus != null) {
return;
}
try {
observer.onHeaders(headers);
} catch (Throwable t) {
Status status =
Status.CANCELLED.withCause(t).withDescription("Failed to read headers");
stream.cancel(status);
close(status, new Metadata());
exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to read headers"));
}
}
}
Expand Down Expand Up @@ -655,7 +665,7 @@ public void runInContext() {
}

private void runInternal() {
if (closed) {
if (exceptionStatus != null) {
GrpcUtil.closeQuietly(producer);
return;
}
Expand All @@ -672,10 +682,8 @@ private void runInternal() {
}
} catch (Throwable t) {
GrpcUtil.closeQuietly(producer);
Status status =
Status.CANCELLED.withCause(t).withDescription("Failed to read message.");
stream.cancel(status);
close(status, new Metadata());
exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to read message."));
}
}
}
Expand All @@ -687,20 +695,6 @@ private void runInternal() {
}
}

/**
* Must be called from application thread.
*/
private void close(Status status, Metadata trailers) {
closed = true;
cancelListenersShouldBeRemoved = true;
try {
closeObserver(observer, status, trailers);
} finally {
removeContextListenerAndCancelDeadlineFuture();
channelCallsTracer.reportCallEnded(status.isOk());
}
}

@Override
public void closed(Status status, Metadata trailers) {
closed(status, RpcProgress.PROCESSED, trailers);
Expand Down Expand Up @@ -752,11 +746,25 @@ public void runInContext() {
}

private void runInternal() {
if (closed) {
// We intentionally don't keep the status or metadata from the server.
return;
Status status = savedStatus;
Metadata trailers = savedTrailers;
if (exceptionStatus != null) {
// Ideally exceptionStatus == savedStatus, as exceptionStatus was passed to cancel().
// However the cancel is racy and this closed() may have already been queued when the
// cancellation occurred. Since other calls like onMessage() will throw away data if
// exceptionStatus != null, it is semantically essential that we _not_ use a status
// provided by the server.
status = exceptionStatus;
// Replace trailers to prevent mixing sources of status and trailers.
trailers = new Metadata();
}
cancelListenersShouldBeRemoved = true;
try {
closeObserver(observer, status, trailers);
} finally {
removeContextListenerAndCancelDeadlineFuture();
channelCallsTracer.reportCallEnded(status.isOk());
}
close(savedStatus, savedTrailers);
}
}

Expand Down Expand Up @@ -789,13 +797,14 @@ public void runInContext() {
}

private void runInternal() {
if (exceptionStatus != null) {
return;
}
try {
observer.onReady();
} catch (Throwable t) {
Status status =
Status.CANCELLED.withCause(t).withDescription("Failed to call onReady.");
stream.cancel(status);
close(status, new Metadata());
exceptionThrown(
Status.CANCELLED.withCause(t).withDescription("Failed to call onReady."));
}
}
}
Expand Down
45 changes: 45 additions & 0 deletions core/src/test/java/io/grpc/internal/ClientCallImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,51 @@ public void exceptionInOnHeadersTakesPrecedenceOverServer() {
verify(stream).cancel(same(callListenerStatus));
}

@Test
public void exceptionInOnHeadersHasOnCloseQueuedLast() {
class PointOfNoReturnExecutor implements Executor {
boolean rejectNewRunnables;

@Override public void execute(Runnable command) {
assertThat(rejectNewRunnables).isFalse();
command.run();
}
}

final PointOfNoReturnExecutor executor = new PointOfNoReturnExecutor();
ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method,
executor,
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
/* retryEnabled= */ false);
callListener = new NoopClientCall.NoopClientCallListener<Void>() {
private final RuntimeException failure = new RuntimeException("bad");

@Override public void onHeaders(Metadata metadata) {
throw failure;
}

@Override public void onClose(Status status, Metadata metadata) {
verify(stream).cancel(same(status));
assertThat(status.getCode()).isEqualTo(Status.Code.CANCELLED);
assertThat(status.getCause()).isSameInstanceAs(failure);
// At the point onClose() is called the user may shut down the executor, so no further
// Runnables may be scheduled. The only thread-safe way of guaranteeing that is for
// onClose() to be queued last.
executor.rejectNewRunnables = true;
}
};
call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();

streamListener.headersRead(new Metadata());
streamListener.closed(Status.OK, new Metadata());
}

@Test
public void exceptionInOnReadyTakesPrecedenceOverServer() {
DelayedExecutor executor = new DelayedExecutor();
Expand Down

0 comments on commit 43b4f7f

Please sign in to comment.