Skip to content

Commit

Permalink
Fix flakiness in ResponseCancelTest (#954)
Browse files Browse the repository at this point in the history
__Motivation__

`ResponseCancelTest` is flaky due to a race condition that successful response or cancel will race to get to the connection filter and the result will either be success or failure.

__Modification__

- `cancel()` and `cancelAfterSuccessOnTransport()` tests were both doing same actions unintentionally. Modified the test to reflect the intentions.
- `cancelAfterSuccessOnTransport()` can either result in success or failure after the first request, modified test to verify the same.

__Result__

Fix tests.
  • Loading branch information
Nitesh Kant authored Mar 3, 2020
1 parent e035da6 commit 90058f2
Showing 1 changed file with 29 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,14 @@ public void tearDown() throws Exception {
@Test
public void cancel() throws Throwable {
CountDownLatch latch1 = new CountDownLatch(1);
sendRequestAndCancel(latch1).onSuccess(client.httpResponseFactory().ok());
Cancellable cancellable = sendRequest(latch1);
// wait for server to receive request.
serverResponses.take();
assertThat("Unexpected connections count.", connectionCount.get(), is(1));
cancellable.cancel();
// wait for cancel to be observed but don't send cancel to the transport so that transport does not close the
// connection which will then be ambiguous.
delayedClientCancels.take();
// We do not let cancel propagate to the transport so the concurrency controller should close the connection
// and hence fail the response.
ClientTerminationSignal.resumeExpectFailure(delayedClientTermination, latch1,
Expand All @@ -132,32 +139,25 @@ public void cancel() throws Throwable {
@Test
public void cancelAfterSuccessOnTransport() throws Throwable {
CountDownLatch latch1 = new CountDownLatch(1);
Processor<HttpResponse, HttpResponse> serverResp = sendRequestAndCancel(latch1);
serverResp.onSuccess(client.httpResponseFactory().ok());
// We do not let cancel propagate to the transport so the concurrency controller should close the connection
// and hence fail the response.
ClientTerminationSignal.resumeExpectFailure(delayedClientTermination, latch1,
instanceOf(ClosedChannelException.class));

CountDownLatch latch2 = new CountDownLatch(1);
sendRequest(latch2);
serverResponses.take().onSuccess(client.httpResponseFactory().ok());
ClientTerminationSignal.resume(delayedClientTermination, latch2);
assertThat("Unexpected connections count.", connectionCount.get(), is(2));
}

private Processor<HttpResponse, HttpResponse> sendRequestAndCancel(CountDownLatch latch)
throws InterruptedException {
Cancellable cancellable = sendRequest(latch);
Cancellable cancellable = sendRequest(latch1);
// wait for server to receive request.
Processor<HttpResponse, HttpResponse> serverResp = serverResponses.take();

assertThat("Unexpected connections count.", connectionCount.get(), is(1));

serverResp.onSuccess(client.httpResponseFactory().ok());
cancellable.cancel();
// wait for cancel to be observed but don't send cancel to the transport so that transport does not close the
// connection which will then be ambiguous.
delayedClientCancels.take();
return serverResp;
// As there is a race between completion and cancellation, we may get a success or failure, so just wait for
// any termination.
delayedClientTermination.take().resume();

CountDownLatch latch2 = new CountDownLatch(1);
sendRequest(latch2);
serverResponses.take().onSuccess(client.httpResponseFactory().ok());
ClientTerminationSignal.resume(delayedClientTermination, latch2);
assertThat("Unexpected connections count.", connectionCount.get(), is(2));
}

private Cancellable sendRequest(final CountDownLatch latch) {
Expand Down Expand Up @@ -208,6 +208,15 @@ private static final class ClientTerminationSignal {
this.response = response;
}

@SuppressWarnings("unchecked")
void resume() {
if (err != null) {
subscriber.onError(err);
} else {
subscriber.onSuccess(response);
}
}

@SuppressWarnings("unchecked")
static void resume(BlockingQueue<ClientTerminationSignal> signals,
final CountDownLatch latch) throws Throwable {
Expand Down

0 comments on commit 90058f2

Please sign in to comment.