diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java index 3625b7e1b5..0a69eabf70 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/LoadBalancedStreamingHttpClient.java @@ -100,26 +100,16 @@ public void onError(final Throwable throwable) { @Override public void cancel() { - // If the request gets cancelled, we pessimistically assume that the transport will - // close the connection since the Subscriber did not read the entire response and - // cancelled. This reduces the time window during which a connection is eligible for - // selection by the load balancer post cancel and the connection being closed by the - // transport. - // Transport MAY not close the connection if cancel raced with completion and completion - // was seen by the transport before cancel. We have no way of knowing at this layer - // if this indeed happen. - // // For H2 and above, connection are multiplexed and use virtual streams for each // request-response exchange. Because we don't have access to the stream at this level - // we cannot close it. Instead, we use a Runnable which will be registered for the - // stream and executed when it closes. However, cancellation can happen before transport - // created a stream. We check the ownership of the Runnable and if it was not owned by - // the transport, we mark request as finished immediately. - if (ownedRunnable == null) { - c.closeAsync().subscribe(); - } else if (ownedRunnable.own()) { + // we cannot cancel/close it immediately. Instead, we use an OwnedRunnable which will be + // registered for the stream and executed when the stream closes. However, cancellation + // can happen before transport created a stream. We check the ownership of the Runnable + // and if it was not owned by the transport, we mark request as finished immediately. + if (ownedRunnable != null && ownedRunnable.own()) { c.requestFinished(); } + // Cancellation of HTTP/1.x requests is handled inside PipelinedStreamingHttpConnection. } })) // shareContextOnSubscribe is used because otherwise the AsyncContext modified during response diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java index ef59918745..d6938dec44 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NonPipelinedStreamingHttpConnection.java @@ -39,6 +39,7 @@ final class NonPipelinedStreamingHttpConnection @Override protected Publisher writeAndRead(final Publisher requestStream, @Nullable final FlushStrategy flushStrategy) { + assert connectionContext().protocol().major() >= 2 : "Unexpected protocol version"; if (flushStrategy == null) { return connection.write(requestStream).mergeDelayError(connection.read()); } else { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/PipelinedStreamingHttpConnection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/PipelinedStreamingHttpConnection.java index 35304f9901..e57837fba4 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/PipelinedStreamingHttpConnection.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/PipelinedStreamingHttpConnection.java @@ -17,6 +17,7 @@ import io.servicetalk.concurrent.Cancellable; import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.TerminalSignalConsumer; import io.servicetalk.http.api.HttpExecutionContext; import io.servicetalk.http.api.StreamingHttpRequestResponseFactory; import io.servicetalk.transport.netty.internal.FlushStrategy; @@ -40,16 +41,40 @@ final class PipelinedStreamingHttpConnection @Override protected Publisher writeAndRead(Publisher requestStream, @Nullable final FlushStrategy flushStrategy) { - if (flushStrategy == null) { - return connection.write(requestStream); - } else { - // TODO(scott): if we can remove the flush state on the connection we can simplify the control flow here. - return Publisher.defer(() -> { + assert connectionContext().protocol().major() <= 1 : "Unexpected protocol version"; + return (flushStrategy == null ? connection.write(requestStream) : + Publisher.defer(() -> { final Cancellable resetFlushStrategy = connection.updateFlushStrategy( (prev, isOriginal) -> isOriginal ? flushStrategy : prev); return connection.write(requestStream, connection::defaultFlushStrategy, WriteDemandEstimators::newDefaultEstimator).afterFinally(resetFlushStrategy::cancel); + })).beforeFinally(new TerminalSignalConsumer() { + @Override + public void onComplete() { + // noop + } + + @Override + public void onError(final Throwable throwable) { + // noop + } + + @Override + public void cancel() { + // If the HTTP/1.x request gets cancelled, we pessimistically assume that the transport will close + // the connection since the Subscriber did not read the entire response and cancelled. This reduces + // the time window during which a connection is eligible for selection by the load balancer post + // cancel and the connection being closed by the transport. + // Transport MAY not close the connection if cancel raced with completion and completion was seen by + // the transport before cancel. We have no way of knowing at this layer if this indeed happen. + closeAsync().subscribe(); + // Not necessary to do anything for HTTP/2 at the similar level + // (NonPipelinedStreamingHttpConnection) because NettyChannelPublisher#cancel0 will be scheduled on + // the EventLoop prior marking the request as finished. Therefore, any new attempt to open a stream + // on the same h2-connection will see the current stream as already cancelled and won't result in + // "max-concurrent-streams" error. + // For all other cases, LoadBalancedStreamingHttpClient already has logic to handle streams. + } }); - } } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ResponseCancelTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ResponseCancelTest.java index 5ed4c63296..eea44464ba 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ResponseCancelTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ResponseCancelTest.java @@ -38,7 +38,6 @@ import org.junit.jupiter.api.Test; import java.net.InetSocketAddress; -import java.nio.channels.ClosedChannelException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -54,9 +53,9 @@ import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress; import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +// @Timeout(3) class ResponseCancelTest { private final BlockingQueue> serverResponses; @@ -65,6 +64,7 @@ class ResponseCancelTest { private final ServerContext ctx; private final HttpClient client; private final AtomicInteger connectionCount = new AtomicInteger(); + private final CountDownLatch clientConnectionClosed = new CountDownLatch(1); ResponseCancelTest() throws Exception { serverResponses = new LinkedBlockingQueue<>(); @@ -100,7 +100,7 @@ public void onError(final Throwable t) { } }) .appendConnectionFactoryFilter(ConnectionFactoryFilter.withStrategy( - original -> new CountingConnectionFactory(original, connectionCount), + original -> new CountingConnectionFactory(original, connectionCount, clientConnectionClosed), HttpExecutionStrategies.offloadNone())) .build(); } @@ -119,13 +119,10 @@ void cancel() throws Throwable { serverResponses.take(); assertThat("Unexpected connections count.", connectionCount.get(), is(1)); cancellable.cancel(); - // wait for cancel to be observed but don't send cancel to the transport so that transport does not close the - // connection which will then be ambiguous. - delayedClientCancels.take(); - // We do not let cancel propagate to the transport so the concurrency controller should close the connection - // and hence fail the response. - ClientTerminationSignal.resumeExpectFailure(delayedClientTermination, latch1, - instanceOf(ClosedChannelException.class)); + // wait for cancel to be observed, then send it to the transport so that transport closes the connection. + delayedClientCancels.take().cancel(); + // The connection is expected to close. + clientConnectionClosed.await(); CountDownLatch latch2 = new CountDownLatch(1); sendRequest(latch2); @@ -168,12 +165,15 @@ private Cancellable sendRequest(final CountDownLatch latch) { private static class CountingConnectionFactory extends DelegatingConnectionFactory { private final AtomicInteger connectionCount; + private final CountDownLatch clientConnectionClosed; CountingConnectionFactory( final ConnectionFactory delegate, - final AtomicInteger connectionCount) { + final AtomicInteger connectionCount, + final CountDownLatch clientConnectionClosed) { super(delegate); this.connectionCount = connectionCount; + this.clientConnectionClosed = clientConnectionClosed; } @Override @@ -182,7 +182,9 @@ public Single newConnection(final InetSocketA @Nullable final TransportObserver observer) { return defer(() -> { connectionCount.incrementAndGet(); - return delegate().newConnection(inetSocketAddress, context, observer); + return delegate().newConnection(inetSocketAddress, context, observer).whenOnSuccess(c -> { + c.onClose().whenFinally(clientConnectionClosed::countDown).subscribe(); + }); }); } }