Skip to content

Commit

Permalink
Handle HTTP/1.1 response cancelation same way on all levels
Browse files Browse the repository at this point in the history
Motivation:

When response cancel is generated at client level,
`LoadBalancedStreamingHttpClient` has a logic in place to close the
connection to mitigate a race between connection availability and
closure. However, this logic is skipped if cancel is generated from
connection-level filter.

Modifications:

- Move connection closure from `LoadBalancedStreamingHttpClient` to
`PipelinedStreamingHttpConnection`;
- Adjust tests and comments;

Result:

HTTP/1.1 connection is always closed in case of cancellation. We do that
from the caller thread to proactively set `onClosing` state.
  • Loading branch information
idelpivnitskiy committed Jul 11, 2022
1 parent 8cc2f32 commit efb153f
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,16 @@ public void onError(final Throwable throwable) {

@Override
public void cancel() {
// If the request gets cancelled, we pessimistically assume that the transport will
// close the connection since the Subscriber did not read the entire response and
// cancelled. This reduces the time window during which a connection is eligible for
// selection by the load balancer post cancel and the connection being closed by the
// transport.
// Transport MAY not close the connection if cancel raced with completion and completion
// was seen by the transport before cancel. We have no way of knowing at this layer
// if this indeed happen.
//
// For H2 and above, connection are multiplexed and use virtual streams for each
// request-response exchange. Because we don't have access to the stream at this level
// we cannot close it. Instead, we use a Runnable which will be registered for the
// stream and executed when it closes. However, cancellation can happen before transport
// created a stream. We check the ownership of the Runnable and if it was not owned by
// the transport, we mark request as finished immediately.
if (ownedRunnable == null) {
c.closeAsync().subscribe();
} else if (ownedRunnable.own()) {
// we cannot cancel/close it immediately. Instead, we use an OwnedRunnable which will be
// registered for the stream and executed when the stream closes. However, cancellation
// can happen before transport created a stream. We check the ownership of the Runnable
// and if it was not owned by the transport, we mark request as finished immediately.
if (ownedRunnable != null && ownedRunnable.own()) {
c.requestFinished();
}
// Cancellation of HTTP/1.x requests is handled inside PipelinedStreamingHttpConnection.
}
}))
// shareContextOnSubscribe is used because otherwise the AsyncContext modified during response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ final class NonPipelinedStreamingHttpConnection
@Override
protected Publisher<Object> writeAndRead(final Publisher<Object> requestStream,
@Nullable final FlushStrategy flushStrategy) {
assert connectionContext().protocol().major() >= 2 : "Unexpected protocol version";
if (flushStrategy == null) {
return connection.write(requestStream).mergeDelayError(connection.read());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.transport.netty.internal.FlushStrategy;
Expand All @@ -40,16 +41,40 @@ final class PipelinedStreamingHttpConnection
@Override
protected Publisher<Object> writeAndRead(Publisher<Object> requestStream,
@Nullable final FlushStrategy flushStrategy) {
if (flushStrategy == null) {
return connection.write(requestStream);
} else {
// TODO(scott): if we can remove the flush state on the connection we can simplify the control flow here.
return Publisher.defer(() -> {
assert connectionContext().protocol().major() <= 1 : "Unexpected protocol version";
return (flushStrategy == null ? connection.write(requestStream) :
Publisher.defer(() -> {
final Cancellable resetFlushStrategy = connection.updateFlushStrategy(
(prev, isOriginal) -> isOriginal ? flushStrategy : prev);
return connection.write(requestStream, connection::defaultFlushStrategy,
WriteDemandEstimators::newDefaultEstimator).afterFinally(resetFlushStrategy::cancel);
})).beforeFinally(new TerminalSignalConsumer() {
@Override
public void onComplete() {
// noop
}

@Override
public void onError(final Throwable throwable) {
// noop
}

@Override
public void cancel() {
// If the HTTP/1.x request gets cancelled, we pessimistically assume that the transport will close
// the connection since the Subscriber did not read the entire response and cancelled. This reduces
// the time window during which a connection is eligible for selection by the load balancer post
// cancel and the connection being closed by the transport.
// Transport MAY not close the connection if cancel raced with completion and completion was seen by
// the transport before cancel. We have no way of knowing at this layer if this indeed happen.
closeAsync().subscribe();
// Not necessary to do anything for HTTP/2 at the similar level
// (NonPipelinedStreamingHttpConnection) because NettyChannelPublisher#cancel0 will be scheduled on
// the EventLoop prior marking the request as finished. Therefore, any new attempt to open a stream
// on the same h2-connection will see the current stream as already cancelled and won't result in
// "max-concurrent-streams" error.
// For all other cases, LoadBalancedStreamingHttpClient already has logic to handle streams.
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -54,9 +53,9 @@
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

// @Timeout(3)
class ResponseCancelTest {

private final BlockingQueue<Processor<HttpResponse, HttpResponse>> serverResponses;
Expand All @@ -65,6 +64,7 @@ class ResponseCancelTest {
private final ServerContext ctx;
private final HttpClient client;
private final AtomicInteger connectionCount = new AtomicInteger();
private final CountDownLatch clientConnectionClosed = new CountDownLatch(1);

ResponseCancelTest() throws Exception {
serverResponses = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -100,7 +100,7 @@ public void onError(final Throwable t) {
}
})
.appendConnectionFactoryFilter(ConnectionFactoryFilter.withStrategy(
original -> new CountingConnectionFactory(original, connectionCount),
original -> new CountingConnectionFactory(original, connectionCount, clientConnectionClosed),
HttpExecutionStrategies.offloadNone()))
.build();
}
Expand All @@ -119,13 +119,10 @@ void cancel() throws Throwable {
serverResponses.take();
assertThat("Unexpected connections count.", connectionCount.get(), is(1));
cancellable.cancel();
// wait for cancel to be observed but don't send cancel to the transport so that transport does not close the
// connection which will then be ambiguous.
delayedClientCancels.take();
// We do not let cancel propagate to the transport so the concurrency controller should close the connection
// and hence fail the response.
ClientTerminationSignal.resumeExpectFailure(delayedClientTermination, latch1,
instanceOf(ClosedChannelException.class));
// wait for cancel to be observed, then send it to the transport so that transport closes the connection.
delayedClientCancels.take().cancel();
// The connection is expected to close.
clientConnectionClosed.await();

CountDownLatch latch2 = new CountDownLatch(1);
sendRequest(latch2);
Expand Down Expand Up @@ -168,12 +165,15 @@ private Cancellable sendRequest(final CountDownLatch latch) {
private static class CountingConnectionFactory
extends DelegatingConnectionFactory<InetSocketAddress, FilterableStreamingHttpConnection> {
private final AtomicInteger connectionCount;
private final CountDownLatch clientConnectionClosed;

CountingConnectionFactory(
final ConnectionFactory<InetSocketAddress, FilterableStreamingHttpConnection> delegate,
final AtomicInteger connectionCount) {
final AtomicInteger connectionCount,
final CountDownLatch clientConnectionClosed) {
super(delegate);
this.connectionCount = connectionCount;
this.clientConnectionClosed = clientConnectionClosed;
}

@Override
Expand All @@ -182,7 +182,9 @@ public Single<FilterableStreamingHttpConnection> newConnection(final InetSocketA
@Nullable final TransportObserver observer) {
return defer(() -> {
connectionCount.incrementAndGet();
return delegate().newConnection(inetSocketAddress, context, observer);
return delegate().newConnection(inetSocketAddress, context, observer).whenOnSuccess(c -> {
c.onClose().whenFinally(clientConnectionClosed::countDown).subscribe();
});
});
}
}
Expand Down

0 comments on commit efb153f

Please sign in to comment.