Skip to content

Commit

Permalink
core: Fix NPE race during hedging
Browse files Browse the repository at this point in the history
The problem was one hedge was committed before another had drained
start(). This was not testable because HedgingRunnable checks whether
scheduledHedgingRef is cancelled, which is racy, but there's no way to
deterministically trigger either race.

The same problem couldn't be triggered with retries because only one
attempt will be draining at a time. Retries with cancellation also
couldn't trigger it, for the surprising reason that the noop stream used
in cancel() wasn't considered drained.

This commit marks the noop stream as drained with cancel(), which allows
memory to be garbage collected sooner and exposes the race for tests.
That then showed the stream as hanging, because inFlightSubStreams
wasn't being decremented.

Fixes #9185
  • Loading branch information
ejona86 committed Apr 13, 2023
1 parent 92b4fae commit 4b01c90
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 20 deletions.
39 changes: 22 additions & 17 deletions core/src/main/java/io/grpc/internal/RetriableStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,12 @@ private void drain(Substream substream) {

synchronized (lock) {
savedState = state;
if (streamStarted) {
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
break;
}
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
break;
}
if (index == savedState.buffer.size()) { // I'm drained
state = savedState.substreamDrained(substream);
Expand Down Expand Up @@ -326,15 +324,13 @@ public void run() {
if (bufferEntry instanceof RetriableStream.StartEntry) {
streamStarted = true;
}
if (streamStarted) {
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
break;
}
savedState = state;
if (savedState.winningSubstream != null && savedState.winningSubstream != substream) {
// committed but not me, to be cancelled
break;
}
if (savedState.cancelled) {
break;
}
}
}
Expand All @@ -344,6 +340,10 @@ public void run() {
return;
}

if (!streamStarted) {
// Start stream so inFlightSubStreams is decremented in Sublistener.closed()
substream.stream.start(new Sublistener(substream));
}
substream.stream.cancel(
state.winningSubstream == substream ? cancellationStatus : CANCELLED_BECAUSE_COMMITTED);
}
Expand Down Expand Up @@ -484,6 +484,8 @@ public void run() {
}

if (cancelled) {
// Start stream so inFlightSubStreams is decremented in Sublistener.closed()
newSubstream.stream.start(new Sublistener(newSubstream));
newSubstream.stream.cancel(Status.CANCELLED.withDescription("Unneeded hedging"));
return;
}
Expand All @@ -507,6 +509,9 @@ public final void cancel(final Status reason) {
Runnable runnable = commit(noopSubstream);

if (runnable != null) {
synchronized (lock) {
state = state.substreamDrained(noopSubstream);
}
runnable.run();
safeCloseMasterListener(reason, RpcProgress.PROCESSED, new Metadata());
return;
Expand Down
47 changes: 44 additions & 3 deletions core/src/test/java/io/grpc/internal/RetriableStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,21 @@ Status prestart() {
}
}

private final RetriableStream<String> retriableStream =
private RetriableStream<String> retriableStream =
newThrottledRetriableStream(null /* throttle */);
private final RetriableStream<String> hedgingStream =
newThrottledHedgingStream(null /* throttle */);

private ClientStreamTracer bufferSizeTracer;

private RetriableStream<String> newThrottledRetriableStream(Throttle throttle) {
return newThrottledRetriableStream(throttle, MoreExecutors.directExecutor());
}

private RetriableStream<String> newThrottledRetriableStream(Throttle throttle, Executor drainer) {
return new RecordedRetriableStream(
method, new Metadata(), channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), RETRY_POLICY,
null, throttle);
drainer, fakeClock.getScheduledExecutorService(), RETRY_POLICY, null, throttle);
}

private RetriableStream<String> newThrottledHedgingStream(Throttle throttle) {
Expand Down Expand Up @@ -598,6 +601,44 @@ public void retry_cancel_closed() {
inOrder.verify(retriableStreamRecorder, never()).postCommit();
}

@Test
public void transparentRetry_cancel_race() {
FakeClock drainer = new FakeClock();
retriableStream = newThrottledRetriableStream(null, drainer.getScheduledExecutorService());
ClientStream mockStream1 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
InOrder inOrder = inOrder(retriableStreamRecorder);

retriableStream.start(masterListener);

ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());

// retry, but don't drain
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(0);
sublistenerCaptor1.getValue().closed(
Status.fromCode(NON_RETRIABLE_STATUS_CODE), MISCARRIED, new Metadata());
assertEquals(1, drainer.numPendingTasks());

// cancel
retriableStream.cancel(Status.CANCELLED);
// drain transparent retry
drainer.runDueTasks();
inOrder.verify(retriableStreamRecorder).postCommit();

ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockStream2).cancel(statusCaptor.capture());
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
sublistenerCaptor2.getValue().closed(statusCaptor.getValue(), PROCESSED, new Metadata());
verify(masterListener).closed(same(Status.CANCELLED), same(PROCESSED), any(Metadata.class));
}

@Test
public void unretriableClosed_cancel() {
ClientStream mockStream1 = mock(ClientStream.class);
Expand Down

0 comments on commit 4b01c90

Please sign in to comment.