From c3ff7bf27f461d6a138f48c784e3ceab8678bccb Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Mon, 12 Oct 2020 23:25:21 -0700 Subject: [PATCH] Avoid `Connection reset by peer` error when server closes the connection (#1141) Motivation: When users close the server gracefully or when the server adds a `Connection: close` header to the response, the `Channel` will be closed as soon as server read the request and wrote the response. Because a client is not aware that the server intends to close the connection, it may send a following request on the same connection before it reads the response. In this case, TCP stack on the server-side will respond with RST frame (because the `Channel` is already closed) that may erase data on the connection that were delivered to the client but not acknowledged. See https://tools.ietf.org/html/rfc7230#section-6.6 for more information. Modifications: - When the request is read swap the `HttpRequestDecoder` with a handler that discards all new incoming requests; - When response is written half-close the output side of the connection; - When the FIN is received from the client, close the `Channel`; - For SSL connections send `close_notify` before `shutdownOutput()`; - Save the original `CloseEvent` and use it later to produce more accurate logs for connection closure; - Update `RequestResponseCloseHandlerTest` to account for a new state machine; - Enhance `GracefulConnectionClosureHandlingTest` to test the same scenarios when graceful closure is initiated on the server-side; - Add `ServerGracefulConnectionClosureHandlingTest` that reproduces `Connection reset by peer` issue described in `Motivation` section; - Minor improvements for `ConnectionCloseHeaderHandlingTest` to align it with other tests; Result: Server does to cause `Connection reset by peer` exception when it closes gracefully or adds `Connection: close` header. --- .../http/netty/HttpObjectDecoder.java | 22 ++ ...GracefulConnectionClosureHandlingTest.java | 206 ++++++++++++++---- .../servicetalk/http/netty/ProxyTunnel.java | 1 + ...GracefulConnectionClosureHandlingTest.java | 136 ++++++++++++ .../netty/ServerRespondsOnClosingTest.java | 166 +++++++++----- .../netty/internal/CloseHandler.java | 35 ++- .../internal/DefaultNettyConnection.java | 2 +- .../internal/RequestResponseCloseHandler.java | 144 +++++++++--- .../netty/internal/WriteStreamSubscriber.java | 4 +- .../RequestResponseCloseHandlerTest.java | 110 +++++++--- 10 files changed, 656 insertions(+), 170 deletions(-) create mode 100644 servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerGracefulConnectionClosureHandlingTest.java diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpObjectDecoder.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpObjectDecoder.java index 9e37fcd509..504d7edc43 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpObjectDecoder.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/HttpObjectDecoder.java @@ -40,10 +40,13 @@ import io.servicetalk.http.api.HttpResponseMetaData; import io.servicetalk.transport.netty.internal.ByteToMessageDecoder; import io.servicetalk.transport.netty.internal.CloseHandler; +import io.servicetalk.transport.netty.internal.CloseHandler.DiscardFurtherInboundEvent; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.PrematureChannelClosureException; import io.netty.handler.codec.TooLongFrameException; @@ -502,6 +505,11 @@ public final void userEventTriggered(final ChannelHandlerContext ctx, final Obje default: break; } + } else if (evt instanceof DiscardFurtherInboundEvent) { + resetNow(); + ctx.pipeline().replace(HttpObjectDecoder.this, DiscardInboundHandler.INSTANCE.toString(), + DiscardInboundHandler.INSTANCE); + ctx.channel().config().setAutoRead(true); } super.userEventTriggered(ctx, evt); } @@ -844,4 +852,18 @@ private static boolean isVCHAR(final byte value) { private static boolean isObsText(final byte value) { return value >= (byte) 0xA0 && value <= (byte) 0xFF; // xA0-xFF } + + @Sharable + private static final class DiscardInboundHandler extends SimpleChannelInboundHandler { + static final ChannelInboundHandler INSTANCE = new DiscardInboundHandler(); + + private DiscardInboundHandler() { + super(/* autoRelease */ true); + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) { + // noop + } + } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/GracefulConnectionClosureHandlingTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/GracefulConnectionClosureHandlingTest.java index 23d426ade4..ad321b18b0 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/GracefulConnectionClosureHandlingTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/GracefulConnectionClosureHandlingTest.java @@ -16,9 +16,14 @@ package io.servicetalk.http.netty; import io.servicetalk.buffer.api.Buffer; +import io.servicetalk.client.api.ConnectionFactory; +import io.servicetalk.client.api.DelegatingConnectionFactory; import io.servicetalk.concurrent.BlockingIterator; +import io.servicetalk.concurrent.api.AsyncCloseable; import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; +import io.servicetalk.http.api.FilterableStreamingHttpConnection; import io.servicetalk.http.api.HttpPayloadWriter; import io.servicetalk.http.api.HttpServerBuilder; import io.servicetalk.http.api.ReservedStreamingHttpConnection; @@ -30,22 +35,30 @@ import io.servicetalk.transport.api.DelegatingConnectionAcceptor; import io.servicetalk.transport.api.HostAndPort; import io.servicetalk.transport.api.ServerContext; +import io.servicetalk.transport.api.TransportObserver; +import io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent; +import io.servicetalk.transport.netty.internal.CloseHandler.CloseEventObservedException; import io.servicetalk.transport.netty.internal.ExecutionContextRule; +import io.servicetalk.transport.netty.internal.NettyConnectionContext; import org.junit.After; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; +import org.junit.function.ThrowingRunnable; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import java.io.IOException; +import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Collection; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; @@ -60,17 +73,24 @@ import static io.servicetalk.http.api.Matchers.contentEqualTo; import static io.servicetalk.http.netty.HttpsProxyTest.safeClose; import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress; +import static io.servicetalk.transport.netty.internal.AddressUtils.newSocketAddress; import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort; +import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.CHANNEL_CLOSED_INBOUND; +import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.GRACEFUL_USER_CLOSING; import static io.servicetalk.transport.netty.internal.ExecutionContextRule.cached; import static io.servicetalk.utils.internal.PlatformDependent.throwException; import static java.lang.Integer.parseInt; import static java.lang.String.valueOf; import static java.util.Arrays.asList; +import static java.util.Objects.requireNonNull; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertThrows; +import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeTrue; @RunWith(Parameterized.class) public class GracefulConnectionClosureHandlingTest { @@ -86,6 +106,10 @@ public class GracefulConnectionClosureHandlingTest { @Rule public final ServiceTalkTestTimeout timeout = new ServiceTalkTestTimeout(); + private final boolean initiateClosureFromClient; + private final AsyncCloseable toClose; + private final CountDownLatch onClosing = new CountDownLatch(1); + @Nullable private final ProxyTunnel proxyTunnel; private final ServerContext serverContext; @@ -94,21 +118,38 @@ public class GracefulConnectionClosureHandlingTest { private final CountDownLatch clientConnectionClosed = new CountDownLatch(1); private final CountDownLatch serverConnectionClosed = new CountDownLatch(1); + private final CountDownLatch serverContextClosed = new CountDownLatch(1); private final CountDownLatch serverReceivedRequest = new CountDownLatch(1); private final BlockingQueue serverReceivedRequestPayload = new ArrayBlockingQueue<>(2); private final CountDownLatch serverSendResponse = new CountDownLatch(1); private final CountDownLatch serverSendResponsePayload = new CountDownLatch(1); - private final BlockingQueue responses = new ArrayBlockingQueue<>(2); + public GracefulConnectionClosureHandlingTest(boolean initiateClosureFromClient, boolean useUds, + boolean viaProxy) throws Exception { + this.initiateClosureFromClient = initiateClosureFromClient; + + if (useUds) { + assumeTrue("Server's IoExecutor does not support UnixDomainSocket", + SERVER_CTX.ioExecutor().isUnixDomainSocketSupported()); + assumeTrue("Client's IoExecutor does not support UnixDomainSocket", + CLIENT_CTX.ioExecutor().isUnixDomainSocketSupported()); + assumeFalse("UDS cannot be used via proxy", viaProxy); + } - public GracefulConnectionClosureHandlingTest(boolean viaProxy) throws Exception { - HttpServerBuilder serverBuilder = HttpServers.forAddress(localAddress(0)) + HttpServerBuilder serverBuilder = (useUds ? + HttpServers.forAddress(newSocketAddress()) : + HttpServers.forAddress(localAddress(0))) .ioExecutor(SERVER_CTX.ioExecutor()) .executionStrategy(defaultStrategy(SERVER_CTX.executor())) + .enableWireLogging("servicetalk-tests-server-wire-logger") .appendConnectionAcceptorFilter(original -> new DelegatingConnectionAcceptor(original) { @Override public Completable accept(final ConnectionContext context) { + if (!initiateClosureFromClient) { + ((NettyHttpServer.NettyHttpServerConnection) context).onClosing() + .whenFinally(onClosing::countDown).subscribe(); + } context.onClose().whenFinally(serverConnectionClosed::countDown).subscribe(); return completed(); } @@ -119,7 +160,9 @@ public Completable accept(final ConnectionContext context) { // Dummy proxy helps to emulate old intermediate systems that do not support half-closed TCP connections proxyTunnel = new ProxyTunnel(); proxyAddress = proxyTunnel.startProxy(); - serverBuilder.secure().commit(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey); + serverBuilder.secure() + .protocols("TLSv1.2") + .commit(DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey); } else { proxyTunnel = null; } @@ -146,23 +189,35 @@ public Completable accept(final ConnectionContext context) { writer.write(RESPONSE_CONTENT); } }); + serverContext.onClose().whenFinally(serverContextClosed::countDown).subscribe(); - HostAndPort serverAddress = serverHostAndPort(serverContext); - client = (viaProxy ? HttpClients.forSingleAddressViaProxy(serverAddress, proxyAddress) + client = (viaProxy ? HttpClients.forSingleAddressViaProxy(serverHostAndPort(serverContext), proxyAddress) .secure().disableHostnameVerification() + .protocols("TLSv1.2") .trustManager(DefaultTestCerts::loadMutualAuthCaPem) .commit() : - HttpClients.forSingleAddress(serverAddress)) + HttpClients.forResolvedAddress(serverContext.listenAddress())) .ioExecutor(CLIENT_CTX.ioExecutor()) .executionStrategy(defaultStrategy(CLIENT_CTX.executor())) + .enableWireLogging("servicetalk-tests-client-wire-logger") + .appendConnectionFactoryFilter(cf -> initiateClosureFromClient ? + new OnClosingConnectionFactoryFilter<>(cf, onClosing) : cf) .buildStreaming(); connection = client.reserveConnection(client.get("/")).toFuture().get(); connection.onClose().whenFinally(clientConnectionClosed::countDown).subscribe(); + + toClose = initiateClosureFromClient ? connection : serverContext; } - @Parameters(name = "viaProxy={0}") - public static Collection data() { - return asList(false, true); + @Parameters(name = "initiateClosureFromClient={0} useUds={1} viaProxy={2}") + public static Collection data() { + return asList( + new Boolean[] {false, false, false}, + new Boolean[] {false, false, true}, + new Boolean[] {false, true, false}, + new Boolean[] {true, false, false}, + new Boolean[] {true, false, true}, + new Boolean[] {true, true, false}); } @After @@ -178,7 +233,7 @@ public void tearDown() throws Exception { @Test public void closeIdleBeforeExchange() throws Exception { - connection.closeGracefully(); + triggerGracefulClosure(); awaitConnectionClosed(); } @@ -187,12 +242,11 @@ public void closeIdleAfterExchange() throws Exception { serverSendResponse.countDown(); serverSendResponsePayload.countDown(); - connection.request(newRequest("/first")).subscribe(responses::add); - StreamingHttpResponse response = responses.take(); + StreamingHttpResponse response = connection.request(newRequest("/first")).toFuture().get(); assertResponse(response); assertResponsePayloadBody(response); - connection.closeGracefully(); + triggerGracefulClosure(); awaitConnectionClosed(); } @@ -200,13 +254,13 @@ public void closeIdleAfterExchange() throws Exception { public void closeAfterRequestMetaDataSentNoResponseReceived() throws Exception { CountDownLatch clientSendRequestPayload = new CountDownLatch(1); StreamingHttpRequest request = newRequest("/first", clientSendRequestPayload); - connection.request(request).subscribe(responses::add); + Future responseFuture = connection.request(request).toFuture(); serverReceivedRequest.await(); - connection.closeAsyncGracefully().subscribe(); + triggerGracefulClosure(); serverSendResponse.countDown(); - StreamingHttpResponse response = responses.take(); + StreamingHttpResponse response = responseFuture.get(); assertResponse(response); clientSendRequestPayload.countDown(); @@ -221,13 +275,13 @@ public void closeAfterRequestMetaDataSentNoResponseReceived() throws Exception { @Test public void closeAfterFullRequestSentNoResponseReceived() throws Exception { StreamingHttpRequest request = newRequest("/first"); - connection.request(request).subscribe(responses::add); + Future responseFuture = connection.request(request).toFuture(); serverReceivedRequest.await(); - connection.closeAsyncGracefully().subscribe(); + triggerGracefulClosure(); serverSendResponse.countDown(); - StreamingHttpResponse response = responses.take(); + StreamingHttpResponse response = responseFuture.get(); assertResponse(response); assertRequestPayloadBody(request); @@ -242,13 +296,13 @@ public void closeAfterFullRequestSentNoResponseReceived() throws Exception { public void closeAfterRequestMetaDataSentResponseMetaDataReceived() throws Exception { CountDownLatch clientSendRequestPayload = new CountDownLatch(1); StreamingHttpRequest request = newRequest("/first", clientSendRequestPayload); - connection.request(request).subscribe(responses::add); + Future responseFuture = connection.request(request).toFuture(); serverSendResponse.countDown(); - StreamingHttpResponse response = responses.take(); + StreamingHttpResponse response = responseFuture.get(); assertResponse(response); - connection.closeAsyncGracefully().subscribe(); + triggerGracefulClosure(); clientSendRequestPayload.countDown(); serverSendResponsePayload.countDown(); @@ -262,13 +316,13 @@ public void closeAfterRequestMetaDataSentResponseMetaDataReceived() throws Excep @Test public void closeAfterFullRequestSentResponseMetaDataReceived() throws Exception { StreamingHttpRequest request = newRequest("/first"); - connection.request(request).subscribe(responses::add); + Future responseFuture = connection.request(request).toFuture(); serverSendResponse.countDown(); - StreamingHttpResponse response = responses.take(); + StreamingHttpResponse response = responseFuture.get(); assertResponse(response); - connection.closeAsyncGracefully().subscribe(); + triggerGracefulClosure(); serverSendResponsePayload.countDown(); assertRequestPayloadBody(request); @@ -282,10 +336,10 @@ public void closeAfterFullRequestSentResponseMetaDataReceived() throws Exception public void closeAfterRequestMetaDataSentFullResponseReceived() throws Exception { CountDownLatch clientSendRequestPayload = new CountDownLatch(1); StreamingHttpRequest request = newRequest("/first", clientSendRequestPayload); - connection.request(request).subscribe(responses::add); + Future responseFuture = connection.request(request).toFuture(); serverSendResponse.countDown(); - StreamingHttpResponse response = responses.take(); + StreamingHttpResponse response = responseFuture.get(); assertResponse(response); serverSendResponsePayload.countDown(); @@ -302,7 +356,7 @@ public void closeAfterRequestMetaDataSentFullResponseReceived() throws Exception }).ignoreElements().subscribe(responsePayloadComplete::countDown); responsePayloadReceived.await(); - connection.closeAsyncGracefully().subscribe(); + triggerGracefulClosure(); clientSendRequestPayload.countDown(); responsePayloadComplete.await(); @@ -314,31 +368,52 @@ public void closeAfterRequestMetaDataSentFullResponseReceived() throws Exception @Test public void closePipelinedAfterTwoRequestsSentBeforeAnyResponseReceived() throws Exception { - StreamingHttpRequest zeroRequest = newRequest("/zero"); - connection.request(zeroRequest).subscribe(responses::add); + StreamingHttpRequest firstRequest = newRequest("/first"); + Future firstResponseFuture = connection.request(firstRequest).toFuture(); serverReceivedRequest.await(); - CountDownLatch firstRequestSent = new CountDownLatch(1); - StreamingHttpRequest firstRequest = newRequest("/first") - .transformPayloadBody(payload -> payload.whenOnComplete(firstRequestSent::countDown)); - connection.request(firstRequest).subscribe(responses::add); - firstRequestSent.await(); + CountDownLatch secondRequestSent = new CountDownLatch(1); + StreamingHttpRequest secondRequest = newRequest("/second") + .transformPayloadBody(payload -> payload.whenOnComplete(secondRequestSent::countDown)); + Future secondResponseFuture = connection.request(secondRequest).toFuture(); + secondRequestSent.await(); - connection.closeAsyncGracefully().subscribe(); + triggerGracefulClosure(); serverSendResponse.countDown(); serverSendResponsePayload.countDown(); - StreamingHttpResponse zeroResponse = responses.take(); - assertResponse(zeroResponse); - assertResponsePayloadBody(zeroResponse); - assertRequestPayloadBody(zeroRequest); - - StreamingHttpResponse firstResponse = responses.take(); + StreamingHttpResponse firstResponse = firstResponseFuture.get(); assertResponse(firstResponse); assertResponsePayloadBody(firstResponse); assertRequestPayloadBody(firstRequest); + if (initiateClosureFromClient) { + StreamingHttpResponse secondResponse = secondResponseFuture.get(); + assertResponse(secondResponse); + assertResponsePayloadBody(secondResponse); + assertRequestPayloadBody(secondRequest); + } else { + // In case of server graceful closure the second response may complete successfully if the second request + // reached the server before the closure was triggered, or may fail if it's not. + StreamingHttpResponse secondResponse = null; + try { + secondResponse = secondResponseFuture.get(); + } catch (ExecutionException e) { + if (proxyTunnel != null) { + assertThat(e.getCause(), anyOf(instanceOf(ClosedChannelException.class), + instanceOf(IOException.class))); + } else { + assertClosedChannelException(secondResponseFuture::get, CHANNEL_CLOSED_INBOUND); + } + } + if (secondResponse != null) { + assertResponse(secondResponse); + assertResponsePayloadBody(secondResponse); + assertRequestPayloadBody(secondRequest); + } + } + awaitConnectionClosed(); assertNextRequestFails(); } @@ -381,14 +456,53 @@ private void assertRequestPayloadBody(StreamingHttpRequest request) throws Excep assertThat(valueOf(actualContentLength), contentEqualTo(contentLengthHeader)); } + private void triggerGracefulClosure() throws Exception { + toClose.closeAsyncGracefully().subscribe(); + onClosing.await(); + } + private void awaitConnectionClosed() throws Exception { clientConnectionClosed.await(); serverConnectionClosed.await(); + if (!initiateClosureFromClient) { + serverContextClosed.await(); + } } private void assertNextRequestFails() { - Exception e = assertThrows(ExecutionException.class, - () -> connection.request(connection.get("/next").addHeader(CONTENT_LENGTH, ZERO)).toFuture().get()); - assertThat(e.getCause(), instanceOf(ClosedChannelException.class)); + assertClosedChannelException( + () -> connection.request(connection.get("/next").addHeader(CONTENT_LENGTH, ZERO)).toFuture().get(), + initiateClosureFromClient ? GRACEFUL_USER_CLOSING : CHANNEL_CLOSED_INBOUND); + } + + private void assertClosedChannelException(ThrowingRunnable runnable, CloseEvent expectedCloseEvent) { + Exception e = assertThrows(ExecutionException.class, runnable); + Throwable cause = e.getCause(); + assertThat(cause, instanceOf(ClosedChannelException.class)); + while (cause != null && !(cause instanceof CloseEventObservedException)) { + cause = cause.getCause(); + } + assertThat("Exception is not enhanced with CloseEvent", cause, is(notNullValue())); + assertThat(((CloseEventObservedException) cause).event(), is(expectedCloseEvent)); + } + + private static class OnClosingConnectionFactoryFilter + extends DelegatingConnectionFactory { + + private final CountDownLatch onClosing; + + OnClosingConnectionFactoryFilter(ConnectionFactory cf, + CountDownLatch onClosing) { + super(cf); + this.onClosing = requireNonNull(onClosing); + } + + @Override + public Single newConnection(ResolvedAddress address, + @Nullable final TransportObserver observer) { + return delegate().newConnection(address, observer).whenOnSuccess(connection -> + ((NettyConnectionContext) connection.connectionContext()).onClosing() + .whenFinally(onClosing::countDown).subscribe()); + } } } diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ProxyTunnel.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ProxyTunnel.java index 759eea548a..938aae245e 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ProxyTunnel.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ProxyTunnel.java @@ -144,6 +144,7 @@ private static void copyStream(final OutputStream out, final InputStream cin) { while ((b = cin.read()) >= 0) { out.write(b); } + out.flush(); } catch (IOException e) { LOGGER.error("Proxy exception", e); } finally { diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerGracefulConnectionClosureHandlingTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerGracefulConnectionClosureHandlingTest.java new file mode 100644 index 0000000000..93e1175fee --- /dev/null +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerGracefulConnectionClosureHandlingTest.java @@ -0,0 +1,136 @@ +/* + * Copyright © 2020 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.http.netty; + +import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; +import io.servicetalk.transport.api.ConnectionContext; +import io.servicetalk.transport.api.DelegatingConnectionAcceptor; +import io.servicetalk.transport.api.ServerContext; +import io.servicetalk.transport.netty.internal.ExecutionContextRule; + +import org.junit.After; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static io.servicetalk.concurrent.api.Completable.completed; +import static io.servicetalk.concurrent.api.Publisher.from; +import static io.servicetalk.concurrent.api.Single.succeeded; +import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; +import static io.servicetalk.http.api.HttpExecutionStrategies.noOffloadsStrategy; +import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH; +import static io.servicetalk.http.api.HttpSerializationProviders.textSerializer; +import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress; +import static io.servicetalk.transport.netty.internal.ExecutionContextRule.cached; +import static java.lang.String.valueOf; +import static java.nio.charset.StandardCharsets.US_ASCII; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +public class ServerGracefulConnectionClosureHandlingTest { + + @ClassRule + public static final ExecutionContextRule SERVER_CTX = cached("server-io", "server-executor"); + + private static final String REQUEST_CONTENT = "request_content"; + private static final String RESPONSE_CONTENT = "response_content"; + + @Rule + public final ServiceTalkTestTimeout timeout = new ServiceTalkTestTimeout(); + + private final CountDownLatch serverConnectionClosing = new CountDownLatch(1); + private final CountDownLatch serverConnectionClosed = new CountDownLatch(1); + private final CountDownLatch serverContextClosed = new CountDownLatch(1); + + private final ServerContext serverContext; + private final InetSocketAddress serverAddress; + + public ServerGracefulConnectionClosureHandlingTest() throws Exception { + AtomicReference serverClose = new AtomicReference<>(); + serverContext = HttpServers.forAddress(localAddress(0)) + .ioExecutor(SERVER_CTX.ioExecutor()) + .executionStrategy(defaultStrategy(SERVER_CTX.executor())) + .executionStrategy(noOffloadsStrategy()) + .appendConnectionAcceptorFilter(original -> new DelegatingConnectionAcceptor(original) { + @Override + public Completable accept(final ConnectionContext context) { + ((NettyHttpServer.NettyHttpServerConnection) context).onClosing() + .whenFinally(serverConnectionClosing::countDown).subscribe(); + context.onClose().whenFinally(serverConnectionClosed::countDown).subscribe(); + return completed(); + } + }).listenStreamingAndAwait((ctx, request, responseFactory) -> succeeded(responseFactory.ok() + .addHeader(CONTENT_LENGTH, valueOf(RESPONSE_CONTENT.length())) + .payloadBody(request.payloadBody().ignoreElements().concat(from(RESPONSE_CONTENT)), + textSerializer()) + // Close ServerContext after response is complete + .transformRawPayloadBody(payload -> payload.whenFinally(serverClose.get())))); + serverContext.onClose().whenFinally(serverContextClosed::countDown).subscribe(); + serverClose.set(() -> serverContext.closeAsyncGracefully().subscribe()); + + serverAddress = (InetSocketAddress) serverContext.listenAddress(); + } + + @After + public void tearDown() throws Exception { + serverContext.close(); + } + + @Test + public void test() throws Exception { + try (Socket clientSocket = new Socket(serverAddress.getAddress(), serverAddress.getPort()); + OutputStream out = clientSocket.getOutputStream(); + InputStream in = clientSocket.getInputStream()) { + + out.write(newRequestAsBytes("/first")); + out.flush(); + + serverConnectionClosing.await(); + + out.write(newRequestAsBytes("/second")); + out.flush(); + + int total = 0; + while (in.read() >= 0) { + total++; + } + assertThat(total, is(96)); + } + + awaitServerConnectionClosed(); + } + + private byte[] newRequestAsBytes(String path) { + return ("POST " + path + " HTTP/1.1\r\n" + + "host: localhost\r\n" + + "content-type: text/plain\r\n" + + "content-length: " + REQUEST_CONTENT.length() + "\r\n\r\n" + + REQUEST_CONTENT).getBytes(US_ASCII); + } + + private void awaitServerConnectionClosed() throws Exception { + serverConnectionClosed.await(); + serverContextClosed.await(); + } +} diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java index 351f265b79..419cfac3bc 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/ServerRespondsOnClosingTest.java @@ -15,81 +15,80 @@ */ package io.servicetalk.http.netty; -import io.servicetalk.concurrent.api.Executor; -import io.servicetalk.concurrent.api.ExecutorRule; +import io.servicetalk.concurrent.SingleSource.Processor; import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; -import io.servicetalk.http.api.BlockingHttpService; import io.servicetalk.http.api.DefaultHttpExecutionContext; +import io.servicetalk.http.api.DefaultHttpHeadersFactory; +import io.servicetalk.http.api.HttpRequest; import io.servicetalk.http.api.HttpResponse; -import io.servicetalk.http.netty.FlushStrategyOnServerTest.OutboundWriteEventsInterceptor; +import io.servicetalk.http.api.HttpResponseFactory; +import io.servicetalk.http.api.HttpService; import io.servicetalk.http.netty.NettyHttpServer.NettyHttpServerConnection; import io.servicetalk.tcp.netty.internal.TcpServerChannelInitializer; import io.servicetalk.transport.api.ConnectionObserver; import io.servicetalk.transport.netty.internal.EmbeddedDuplexChannel; import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver; +import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.junit.After; -import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; -import java.util.concurrent.CountDownLatch; +import java.util.ArrayDeque; +import java.util.Queue; import static io.netty.buffer.ByteBufUtil.writeAscii; import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR; -import static io.servicetalk.concurrent.api.ExecutorRule.newRule; +import static io.servicetalk.concurrent.api.Executors.immediate; +import static io.servicetalk.concurrent.api.Processors.newSingleProcessor; +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.http.api.HttpApiConversions.toStreamingHttpService; -import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy; -import static io.servicetalk.http.api.HttpExecutionStrategyInfluencer.defaultStreamingInfluencer; +import static io.servicetalk.http.api.HttpExecutionStrategies.noOffloadsStrategy; import static io.servicetalk.http.api.HttpHeaderNames.CONNECTION; import static io.servicetalk.http.api.HttpHeaderValues.CLOSE; import static io.servicetalk.http.api.HttpSerializationProviders.textSerializer; import static io.servicetalk.http.netty.NettyHttpServer.initChannel; import static io.servicetalk.transport.netty.internal.NettyIoExecutors.fromNettyEventLoop; +import static java.nio.charset.StandardCharsets.US_ASCII; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; public class ServerRespondsOnClosingTest { - @ClassRule - public static final ExecutorRule EXECUTOR_RULE = newRule(); + private static final HttpResponseFactory RESPONSE_FACTORY = new DefaultHttpResponseFactory( + DefaultHttpHeadersFactory.INSTANCE, DEFAULT_ALLOCATOR); + private static final String RESPONSE_PAYLOAD_BODY = "Hello World"; @Rule public final Timeout timeout = new ServiceTalkTestTimeout(); - private final OutboundWriteEventsInterceptor interceptor; private final EmbeddedDuplexChannel channel; private final NettyHttpServerConnection serverConnection; - - private final CountDownLatch serverConnectionClosed = new CountDownLatch(1); - private final CountDownLatch releaseResponse = new CountDownLatch(1); + private final Queue requests = new ArrayDeque<>(); public ServerRespondsOnClosingTest() throws Exception { - interceptor = new OutboundWriteEventsInterceptor(); - channel = new EmbeddedDuplexChannel(false, interceptor); - + channel = new EmbeddedDuplexChannel(false); DefaultHttpExecutionContext httpExecutionContext = new DefaultHttpExecutionContext(DEFAULT_ALLOCATOR, - fromNettyEventLoop(channel.eventLoop()), EXECUTOR_RULE.executor(), defaultStrategy()); + fromNettyEventLoop(channel.eventLoop()), immediate(), noOffloadsStrategy()); final HttpServerConfig httpServerConfig = new HttpServerConfig(); httpServerConfig.tcpConfig().enableWireLogging("servicetalk-tests-server-wire-logger"); ReadOnlyHttpServerConfig config = httpServerConfig.asReadOnly(); ConnectionObserver connectionObserver = NoopConnectionObserver.INSTANCE; - BlockingHttpService service = (ctx, request, responseFactory) -> { - releaseResponse.await(); - final HttpResponse response = responseFactory.ok().payloadBody("Hello World", textSerializer()); - if (request.hasQueryParameter("serverShouldClose")) { - response.addHeader(CONNECTION, CLOSE); - } - return response; + HttpService service = (ctx, request, responseFactory) -> { + Processor responseProcessor = newSingleProcessor(); + requests.add(new Exchange(request, responseProcessor)); + return fromSource(responseProcessor); }; serverConnection = initChannel(channel, httpExecutionContext, config, new TcpServerChannelInitializer( config.tcpConfig(), connectionObserver), - toStreamingHttpService(service, defaultStreamingInfluencer()).adaptor(), true, + toStreamingHttpService(service, strategy -> strategy).adaptor(), true, connectionObserver).toFuture().get(); - serverConnection.onClose().whenFinally(serverConnectionClosed::countDown).subscribe(); serverConnection.process(true); } @@ -98,6 +97,7 @@ public void tearDown() throws Exception { try { serverConnection.closeAsyncGracefully().toFuture().get(); } finally { + channel.finishAndReleaseAll(); channel.close().syncUninterruptibly(); } } @@ -105,10 +105,11 @@ public void tearDown() throws Exception { @Test public void protocolClosingInboundPipelinedFirstInitiatesClosure() throws Exception { sendRequest("/first", true); + // The following request after "Connection: close" header violates the spec, but we want to verify that server + // discards those requests and do not respond to them: sendRequest("/second", false); - releaseResponse.countDown(); - // Verify that the server responded: - assertThat("Unexpected writes", interceptor.takeWritesTillFlush(), hasSize(3)); // only first + handleRequests(); + verifyResponse("/first"); assertServerConnectionClosed(); } @@ -116,10 +117,9 @@ public void protocolClosingInboundPipelinedFirstInitiatesClosure() throws Except public void protocolClosingInboundPipelinedSecondInitiatesClosure() throws Exception { sendRequest("/first", false); sendRequest("/second", true); - releaseResponse.countDown(); - // Verify that the server responded: - assertThat("Unexpected writes", interceptor.takeWritesTillFlush(), hasSize(3)); // first - assertThat("Unexpected writes", interceptor.takeWritesTillFlush(), hasSize(3)); // second + handleRequests(); + verifyResponse("/first"); + verifyResponse("/second"); assertServerConnectionClosed(); } @@ -127,9 +127,10 @@ public void protocolClosingInboundPipelinedSecondInitiatesClosure() throws Excep public void protocolClosingOutboundPipelinedFirstInitiatesClosure() throws Exception { sendRequest("/first?serverShouldClose=true", false); sendRequest("/second", false); - releaseResponse.countDown(); - // Verify that the server responded: - assertThat("Unexpected writes", interceptor.takeWritesTillFlush(), hasSize(3)); // only first + handleRequests(); + verifyResponse("/first"); + // Second request is discarded + respondWithFIN(); assertServerConnectionClosed(); } @@ -137,10 +138,10 @@ public void protocolClosingOutboundPipelinedFirstInitiatesClosure() throws Excep public void protocolClosingOutboundPipelinedSecondInitiatesClosure() throws Exception { sendRequest("/first", false); sendRequest("/second?serverShouldClose=true", false); - releaseResponse.countDown(); - // Verify that the server responded: - assertThat("Unexpected writes", interceptor.takeWritesTillFlush(), hasSize(3)); // first - assertThat("Unexpected writes", interceptor.takeWritesTillFlush(), hasSize(3)); // second + handleRequests(); + verifyResponse("/first"); + verifyResponse("/second"); + respondWithFIN(); assertServerConnectionClosed(); } @@ -150,12 +151,37 @@ public void gracefulClosurePipelined() throws Exception { sendRequest("/second", false); serverConnection.closeAsyncGracefully().subscribe(); serverConnection.onClosing().toFuture().get(); - releaseResponse.countDown(); - // Verify that the server responded: - assertThat("Unexpected writes", interceptor.takeWritesTillFlush(), hasSize(3)); // first - assertThat("Unexpected writes", interceptor.takeWritesTillFlush(), hasSize(3)); // second - channel.awaitOutputShutdown(); - channel.shutdownInput(); // simulate FIN from the client + sendRequest("/third", false); // should be discarded + handleRequests(); + verifyResponse("/first"); + verifyResponse("/second"); + respondWithFIN(); + assertServerConnectionClosed(); + } + + @Test + public void gracefulClosurePipelinedDiscardPartialRequest() throws Exception { + sendRequest("/first", false); + // Send only initial line with CRLF that should hang in ByteToMessage cumulation buffer and will be discarded: + channel.writeInbound(writeAscii(PooledByteBufAllocator.DEFAULT, "GET /second HTTP/1.1")); + serverConnection.closeAsyncGracefully().subscribe(); + serverConnection.onClosing().toFuture().get(); + handleRequests(); + verifyResponse("/first"); + respondWithFIN(); + assertServerConnectionClosed(); + } + + @Test + public void gracefulClosurePipelinedFirstResponseClosesConnection() throws Exception { + sendRequest("/first?serverShouldClose=true", false); // PROTOCOL_CLOSING_OUTBOUND + sendRequest("/second", false); + serverConnection.closeAsyncGracefully().subscribe(); + serverConnection.onClosing().toFuture().get(); + sendRequest("/third", false); // should be discarded + handleRequests(); + verifyResponse("/first"); + respondWithFIN(); assertServerConnectionClosed(); } @@ -167,9 +193,49 @@ private void sendRequest(String requestTarget, boolean addCloseHeader) { "\r\n")); } + private void handleRequests() { + Exchange exchange; + while ((exchange = requests.poll()) != null) { + HttpRequest request = exchange.request; + HttpResponse response = RESPONSE_FACTORY.ok() + .setHeader("Request-Path", request.path()) + .payloadBody(RESPONSE_PAYLOAD_BODY, textSerializer()); + if (request.hasQueryParameter("serverShouldClose")) { + response.setHeader(CONNECTION, CLOSE); + } + exchange.responseProcessor.onSuccess(response); + } + } + + private void verifyResponse(String requestPath) { + // 3 items expected: meta-data, payload body, trailers + assertThat("Not a full response was written", channel.outboundMessages(), hasSize(greaterThanOrEqualTo(3))); + ByteBuf metaData = channel.readOutbound(); + assertThat("Unexpected response meta-data", metaData.toString(US_ASCII), containsString(requestPath)); + ByteBuf payloadBody = channel.readOutbound(); + assertThat("Unexpected response payload body", payloadBody.toString(US_ASCII), equalTo(RESPONSE_PAYLOAD_BODY)); + ByteBuf trailers = channel.readOutbound(); + assertThat("Unexpected response trailers object", trailers.readableBytes(), is(0)); + } + + private void respondWithFIN() { + assertThat("Server did not shutdown output", channel.isOutputShutdown(), is(true)); + channel.shutdownInput(); // simulate FIN from the client + } + private void assertServerConnectionClosed() throws Exception { - serverConnectionClosed.await(); - assertThat("Unexpected writes", interceptor.pendingEvents(), is(0)); + serverConnection.onClose().toFuture().get(); + assertThat("Unexpected writes", channel.outboundMessages(), hasSize(0)); assertThat("Channel is not closed", channel.isOpen(), is(false)); } + + private static final class Exchange { + final HttpRequest request; + final Processor responseProcessor; + + Exchange(HttpRequest request, Processor responseProcessor) { + this.request = request; + this.responseProcessor = responseProcessor; + } + } } diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/CloseHandler.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/CloseHandler.java index 237fafb8fc..d69d2f0628 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/CloseHandler.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/CloseHandler.java @@ -1,5 +1,5 @@ /* - * Copyright © 2018 Apple Inc. and the ServiceTalk project authors + * Copyright © 2018, 2020 Apple Inc. and the ServiceTalk project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -100,6 +100,8 @@ public static CloseHandler forPipelinedRequestResponse(boolean client, ChannelCo public abstract void protocolClosingOutbound(ChannelHandlerContext ctx); /** + * Registers a handler for {@link CloseEvent}. + * * @param channel the {@link Channel} for which this event handler is registering * @param eventHandler receives {@link CloseEvent}, to be emitted from the {@link EventLoop} for the {@link Channel} */ @@ -144,7 +146,7 @@ public static CloseHandler forPipelinedRequestResponse(boolean client, ChannelCo * * @param channel {@link Channel} */ - abstract void userClosing(Channel channel); + abstract void gracefulUserClosing(Channel channel); /** * These events indicate an event was observed from the protocol or {@link Channel} that indicates the end of the @@ -164,7 +166,7 @@ public enum CloseEvent { /** * User initiated close command, depends on the implementation but usually resembles outbound protocol close. */ - USER_CLOSING("The close* method was called in the local application."), + GRACEFUL_USER_CLOSING("The graceful close* method was called in the local application."), /** * Outbound {@link SocketChannel} shutdown observed. */ @@ -250,7 +252,7 @@ void closeChannelOutbound(final Channel channel) { } @Override - void userClosing(final Channel channel) { + void gracefulUserClosing(final Channel channel) { channel.close(); } @@ -308,7 +310,7 @@ void closeChannelOutbound(final Channel channel) { } @Override - void userClosing(final Channel channel) { + void gracefulUserClosing(final Channel channel) { channel.close(); } @@ -342,10 +344,18 @@ public void protocolClosingOutbound(final ChannelHandlerContext ctx) { } } + private abstract static class NettyUserEvent { + + @Override + public String toString() { + return this.getClass().getName(); + } + } + /** * Netty UserEvent to indicate the end of a outbound data was observed at the transport. */ - static final class OutboundDataEndEvent { + static final class OutboundDataEndEvent extends NettyUserEvent { /** * Netty UserEvent instance to indicate an outbound end of data. */ @@ -359,11 +369,22 @@ private OutboundDataEndEvent() { /** * Netty UserEvent to indicate the output writes should be aborted because the channel is closing. */ - static final class AbortWritesEvent { + static final class AbortWritesEvent extends NettyUserEvent { static final AbortWritesEvent INSTANCE = new AbortWritesEvent(); private AbortWritesEvent() { // No instances. } } + + /** + * Netty UserEvent to indicate the further inbound data should be discarded. + */ + public static final class DiscardFurtherInboundEvent extends NettyUserEvent { + static final DiscardFurtherInboundEvent INSTANCE = new DiscardFurtherInboundEvent(); + + private DiscardFurtherInboundEvent() { + // No instances. + } + } } diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java index ddc7d0d2b4..1bba964bf6 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java @@ -442,7 +442,7 @@ public Protocol protocol() { } private void invokeUserCloseHandler() { - closeHandler.userClosing(channel()); + closeHandler.gracefulUserClosing(channel()); } @Override diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandler.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandler.java index 7ebe5b7252..a87ccc6ed0 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandler.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandler.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DuplexChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.ssl.SslHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,18 +31,20 @@ import static io.netty.channel.ChannelOption.ALLOW_HALF_CLOSURE; import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.CHANNEL_CLOSED_INBOUND; import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.CHANNEL_CLOSED_OUTBOUND; +import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.GRACEFUL_USER_CLOSING; import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.PROTOCOL_CLOSING_INBOUND; import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.PROTOCOL_CLOSING_OUTBOUND; -import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.USER_CLOSING; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.ALL_CLOSED; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.CLOSED; -import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.CLOSING; +import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.CLOSING_SERVER_GRACEFULLY; +import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.DISCARDING_SERVER_INPUT; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.IN_CLOSED; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.IN_OUT_CLOSED; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.OUT_CLOSED; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.READ; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.WRITE; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.has; +import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.hasAny; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.idle; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.set; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandler.State.unset; @@ -75,13 +78,20 @@ class RequestResponseCloseHandler extends CloseHandler { */ private int pending; + /** + * Original {@link CloseEvent} that initiated closing. + */ + @Nullable + private CloseEvent closeEvent; + protected interface State { byte READ = 0x01; byte WRITE = 0x02; - byte CLOSING = 0x04; - byte IN_CLOSED = 0x08; - byte OUT_CLOSED = 0x10; - byte CLOSED = 0x20; + byte DISCARDING_SERVER_INPUT = 0x04; + byte CLOSING_SERVER_GRACEFULLY = 0x08; + byte IN_CLOSED = 0x10; + byte OUT_CLOSED = 0x20; + byte CLOSED = 0x40; byte ALL_CLOSED = CLOSED | IN_CLOSED | OUT_CLOSED; byte IN_OUT_CLOSED = IN_CLOSED | OUT_CLOSED; @@ -95,6 +105,10 @@ static boolean has(byte state, byte mask) { return (state & mask) == mask; } + static boolean hasAny(byte state, byte flag1, byte flag2) { + return (state & (flag1 | flag2)) != 0; + } + static byte set(byte state, byte flags) { return (byte) (state | flags); } @@ -134,7 +148,9 @@ void registerEventHandler(final Channel channel, Consumer eventHandl private void storeCloseRequestAndEmit(final CloseEvent event) { eventHandler.accept(event); - state = set(state, CLOSING); + if (this.closeEvent == null) { + this.closeEvent = event; + } } @Override @@ -148,8 +164,9 @@ public void protocolPayloadBeginInbound(final ChannelHandlerContext ctx) { public void protocolPayloadEndInbound(final ChannelHandlerContext ctx) { assert ctx.executor().inEventLoop(); state = unset(state, READ); - if (has(state, CLOSING)) { - closeChannelHalfOrFullyOnPayloadEnd(ctx.channel(), PROTOCOL_CLOSING_INBOUND); + final CloseEvent evt = this.closeEvent; + if (evt != null) { + closeChannelHalfOrFullyOnPayloadEnd(ctx.channel(), evt, true); } } @@ -162,7 +179,7 @@ public void protocolPayloadBeginOutbound(final ChannelHandlerContext ctx) { @Override public void protocolPayloadEndOutbound(final ChannelHandlerContext ctx) { - if (isClient || (has(state, CLOSING) && pending == 0)) { + if (isClient || (closeEvent != null && pending == 0)) { ctx.pipeline().fireUserEventTriggered(OutboundDataEndEvent.INSTANCE); } } @@ -171,8 +188,9 @@ public void protocolPayloadEndOutbound(final ChannelHandlerContext ctx) { public void protocolPayloadEndOutboundSuccess(final ChannelHandlerContext ctx) { assert ctx.executor().inEventLoop(); state = unset(state, WRITE); - if (has(state, CLOSING)) { - closeChannelHalfOrFullyOnPayloadEnd(ctx.channel(), PROTOCOL_CLOSING_OUTBOUND); + final CloseEvent evt = this.closeEvent; + if (evt != null) { + closeChannelHalfOrFullyOnPayloadEnd(ctx.channel(), evt, false); } } @@ -194,8 +212,11 @@ public void protocolClosingOutbound(final ChannelHandlerContext ctx) { void channelClosedInbound(final ChannelHandlerContext ctx) { assert ctx.executor().inEventLoop(); state = set(state, IN_CLOSED); - storeCloseRequestAndEmit(CHANNEL_CLOSED_INBOUND); - maybeCloseChannelOnHalfClosed(ctx.channel(), CHANNEL_CLOSED_INBOUND); + // Use the actual event that initiated graceful closure: + final CloseEvent evt = has(state, CLOSING_SERVER_GRACEFULLY) ? closeEvent : CHANNEL_CLOSED_INBOUND; + assert evt != null; + storeCloseRequestAndEmit(evt); + maybeCloseChannelOnHalfClosed(ctx.channel(), evt); state = unset(state, READ); } @@ -204,13 +225,18 @@ void channelClosedOutbound(final ChannelHandlerContext ctx) { assert ctx.executor().inEventLoop(); state = set(state, OUT_CLOSED); storeCloseRequestAndEmit(CHANNEL_CLOSED_OUTBOUND); - maybeCloseChannelOnHalfClosed(ctx.channel(), CHANNEL_CLOSED_OUTBOUND); + if (!has(state, CLOSING_SERVER_GRACEFULLY)) { + // Only try to close when we are not closing server gracefully + maybeCloseChannelOnHalfClosed(ctx.channel(), CHANNEL_CLOSED_OUTBOUND); + } state = unset(state, WRITE); } @Override void closeChannelInbound(final Channel channel) { - if (!has(state, IN_CLOSED)) { + // Do not reset INBOUND when server is closing gracefully. This event is triggered during processing of + // ChannelOutputShutdownEvent if the USER_CLOSE was initiated after response was written. + if (!hasAny(state, IN_CLOSED, CLOSING_SERVER_GRACEFULLY)) { LOGGER.debug("{} Half-Closing INBOUND (reset)", channel); setSocketResetOnClose(channel); ((DuplexChannel) channel).shutdownInput().addListener((ChannelFutureListener) this::onHalfClosed); @@ -222,25 +248,31 @@ void closeChannelOutbound(final Channel channel) { if (!has(state, OUT_CLOSED)) { LOGGER.debug("{} Half-Closing OUTBOUND (reset)", channel); setSocketResetOnClose(channel); - ((DuplexChannel) channel).shutdownOutput().addListener((ChannelFutureListener) this::onHalfClosed); + halfCloseOutbound(channel, true); } } @Override - void userClosing(final Channel channel) { + void gracefulUserClosing(final Channel channel) { assert channel.eventLoop().inEventLoop(); - storeCloseRequestAndEmit(USER_CLOSING); - maybeCloseChannelHalfOrFullyOnClosing(channel, USER_CLOSING); + storeCloseRequestAndEmit(GRACEFUL_USER_CLOSING); + maybeCloseChannelHalfOrFullyOnClosing(channel, GRACEFUL_USER_CLOSING); } // This closes the channel either completely when there are no more outstanding requests to drain or half-closes // when a deferred request was encountered. - private void closeChannelHalfOrFullyOnPayloadEnd(final Channel channel, final CloseEvent evt) { + private void closeChannelHalfOrFullyOnPayloadEnd(final Channel channel, final CloseEvent evt, + final boolean endInbound) { + if (idle(pending, state)) { - // close when all pending requests drained - closeChannel(channel, evt); - } else if (!isClient && evt == PROTOCOL_CLOSING_INBOUND) { - // deferred half close after current request is done + if (isClient || has(state, IN_CLOSED) || + (evt != GRACEFUL_USER_CLOSING && evt != PROTOCOL_CLOSING_OUTBOUND)) { + closeChannel(channel, evt); + } else { + serverCloseGracefully(channel); + } + } else if (!isClient && endInbound) { + // current request is complete, discard further inbound serverHalfCloseInbound(channel); } // do not perform half-closure on the client to prevent a server from premature connection closure @@ -248,8 +280,13 @@ private void closeChannelHalfOrFullyOnPayloadEnd(final Channel channel, final Cl // Eagerly close on a closing event rather than deferring private void maybeCloseChannelHalfOrFullyOnClosing(final Channel channel, final CloseEvent evt) { - if (idle(pending, state)) { - closeChannel(channel, evt); + if (idle(pending, state)) { // Only GRACEFUL_USER_CLOSING + assert evt == GRACEFUL_USER_CLOSING; + if (isClient) { + closeChannel(channel, evt); + } else { + serverCloseGracefully(channel); + } } else if (isClient) { if (evt == PROTOCOL_CLOSING_INBOUND && pending != 0) { // Protocol inbound closing for a client is when a response is read, which decrements the pending @@ -270,7 +307,8 @@ private void maybeCloseChannelHalfOrFullyOnClosing(final Channel channel, final } // discards extra pending requests when closing, ensures an eventual "idle" state pending = 0; - } else if (!has(state, READ)) { // Server && USER_CLOSING - Don't abort any request + } else if (!has(state, READ)) { // Server && GRACEFUL_USER_CLOSING - Don't abort any request + assert evt == GRACEFUL_USER_CLOSING; serverHalfCloseInbound(channel); } } @@ -295,6 +333,7 @@ private void maybeCloseChannelOnHalfClosed(final Channel channel, final CloseEve } } } else if (has(state, WRITE)) { // evt == CHANNEL_CLOSED_OUTBOUND + assert evt == CHANNEL_CLOSED_OUTBOUND; // ensure we finish reading pending responses, abort others setSocketResetOnClose(channel); if (pending <= 1 && !has(state, READ)) { @@ -314,9 +353,11 @@ private void maybeCloseChannelOnHalfClosed(final Channel channel, final CloseEve } } } else if (pending != 0) { // Server && CHANNEL_CLOSED_OUTBOUND + assert evt == CHANNEL_CLOSED_OUTBOUND; // pending > 0 => ensures we finish reading current request, abort others we can't respond to anyway closeAndResetChannel(channel, evt); } else if (!has(state, READ)) { // Server && CHANNEL_CLOSED_OUTBOUND && pending == 0 + assert evt == CHANNEL_CLOSED_OUTBOUND; // last response, we are not reading and OUTBOUND is closed, so just close the channel. closeChannel(channel, evt); } @@ -359,12 +400,53 @@ private void setSocketResetOnClose(final Channel channel) { } } + private void serverCloseGracefully(final Channel channel) { + // Perform half-closure as described in https://tools.ietf.org/html/rfc7230#section-6.6 + serverHalfCloseInbound(channel); + serverHalfCloseOutbound(channel); + } + private void serverHalfCloseInbound(final Channel channel) { assert !isClient; - if (!has(state, IN_CLOSED) && channel instanceof DuplexChannel) { - LOGGER.debug("{} Half-Closing INBOUND", channel); + if (!hasAny(state, DISCARDING_SERVER_INPUT, IN_CLOSED)) { + // Instead of actual half-closure via DuplexChannel.shutdownInput() we request the pipeline to discard all + // further inbound data until the FIN is received. Incoming FIN from the client-side + // (ChannelInputShutdownReadComplete event) notifies server that client received the last response and is + // also closing the connection. Therefore, we can complete graceful closure and close server's connection. + // DuplexChannel.shutdownInput() silently discards all incoming data at OS level and does not notify netty + // when the FIN is received. + LOGGER.debug("{} Discarding further INBOUND", channel); state = unset(state, READ); - ((DuplexChannel) channel).shutdownInput().addListener((ChannelFutureListener) this::onHalfClosed); + channel.pipeline().fireUserEventTriggered(DiscardFurtherInboundEvent.INSTANCE); + state = set(state, DISCARDING_SERVER_INPUT); + } + } + + private void serverHalfCloseOutbound(final Channel channel) { + assert !isClient && idle(pending, state); + if (!has(state, OUT_CLOSED)) { + state = set(state, CLOSING_SERVER_GRACEFULLY); + LOGGER.debug("{} Half-Closing OUTBOUND", channel); + halfCloseOutbound(channel, false); + // Final channel.close() will happen when FIN (ChannelInputShutdownReadComplete) is received + } + } + + private void halfCloseOutbound(final Channel channel, final boolean registerOnHalfClosed) { + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + if (sslHandler != null) { + // send close_notify: https://tools.ietf.org/html/rfc5246#section-7.2.1 + sslHandler.closeOutbound().addListener(f -> { + final ChannelFuture cf = ((DuplexChannel) channel).shutdownOutput(); + if (registerOnHalfClosed) { + cf.addListener((ChannelFutureListener) this::onHalfClosed); + } + }); + } else { + final ChannelFuture cf = ((DuplexChannel) channel).shutdownOutput(); + if (registerOnHalfClosed) { + cf.addListener((ChannelFutureListener) this::onHalfClosed); + } } } diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java index a8374249a8..3c19398322 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/WriteStreamSubscriber.java @@ -463,7 +463,6 @@ private boolean nettySharedPromiseTryStatus() { } private void terminateSubscriber(@Nullable Throwable cause) { - notifyAllListeners(cause); if (cause == null) { try { observer.writeComplete(); @@ -487,6 +486,9 @@ private void terminateSubscriber(@Nullable Throwable cause) { ChannelCloseUtils.close(channel, cause); } } + // Notify listeners after the subscriber is terminated. Otherwise, WriteStreamSubscriber#channelClosed may + // be invoked that leads to the Subscription cancellation. + notifyAllListeners(cause); } private void notifyAllListeners(@Nullable Throwable cause) { diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandlerTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandlerTest.java index 51ffea2490..411d0d7b38 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandlerTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/RequestResponseCloseHandlerTest.java @@ -19,6 +19,7 @@ import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout; import io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent; +import io.servicetalk.transport.netty.internal.CloseHandler.DiscardFurtherInboundEvent; import io.servicetalk.transport.netty.internal.CloseHandler.OutboundDataEndEvent; import io.netty.bootstrap.Bootstrap; @@ -78,30 +79,32 @@ import static io.servicetalk.transport.netty.internal.BuilderUtils.socketChannel; import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.CHANNEL_CLOSED_INBOUND; import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.CHANNEL_CLOSED_OUTBOUND; +import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.GRACEFUL_USER_CLOSING; import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.PROTOCOL_CLOSING_INBOUND; import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.PROTOCOL_CLOSING_OUTBOUND; -import static io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent.USER_CLOSING; import static io.servicetalk.transport.netty.internal.CloseHandler.forPipelinedRequestResponse; import static io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutors.toEventLoopAwareNettyIoExecutor; import static io.servicetalk.transport.netty.internal.NettyIoExecutors.createIoExecutor; +import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.CI; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.FC; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.IB; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.IC; +import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.ID; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.IE; -import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.IH; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.IS; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.OB; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.OC; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.OE; +import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.OH; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.OS; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.SR; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.UC; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.ExpectEvent.CCI; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.ExpectEvent.CCO; +import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.ExpectEvent.GUC; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.ExpectEvent.NIL; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.ExpectEvent.PCI; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.ExpectEvent.PCO; -import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.ExpectEvent.UCO; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Mode.C; import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Mode.S; import static java.lang.Boolean.TRUE; @@ -148,6 +151,7 @@ public static class Scenarios { private ChannelHandlerContext ctx; private SocketChannel channel; + private ChannelPipeline pipeline; private RequestResponseCloseHandler h; @Nullable private CloseEvent observedEvent; @@ -177,13 +181,15 @@ protected enum Events { SR, // validate Socket TCP RST -> SO_LINGER=0 UC, // emit User Closing IH, OH, FC, // validate Input/Output Half-close, Full-Close + ID, // validate Input discarding + CI, CO, // emit Inbound/Outbound close (read cancellation, write subscriber termination) } protected enum ExpectEvent { NIL(null), // No event, not closed PCO(PROTOCOL_CLOSING_OUTBOUND), PCI(PROTOCOL_CLOSING_INBOUND), - UCO(USER_CLOSING), + GUC(GRACEFUL_USER_CLOSING), CCO(CHANNEL_CLOSED_OUTBOUND), CCI(CHANNEL_CLOSED_INBOUND); @@ -210,13 +216,13 @@ public static Collection data() { // If inserting lines here, adjust t {C, e(OB, IB, OE, IC, IE, FC), PCI, "pipelined, closing inbound"}, {C, e(OB, IB, IC, IE, OE, FC), PCI, "pipelined, full dup, closing inbound"}, {C, e(OB, OE, IB, IC, IE, FC), PCI, "sequential, closing inbound"}, - {C, e(OB, UC, OE, IB, IE, FC), UCO, "sequential, user close"}, - {C, e(OB, IB, OE, OB, UC, OE, IE, IB, IE, FC), UCO, "pipelined req graceful close"}, - {C, e(OB, IB, UC, OE, IE, FC), UCO, "interleaved, user close"}, - {C, e(OB, OE, IB, IE, UC, FC), UCO, "sequential, idle, user close"}, - {C, e(OB, IB, OE, IE, UC, FC), UCO, "interleaved, idle, user close"}, - {C, e(OB, IB, IE, OE, UC, FC), UCO, "interleaved full dup, idle, user close"}, - {C, e(OB, IB, UC, IE, OE, FC), UCO, "interleaved full dup, user close"}, + {C, e(OB, UC, OE, IB, IE, FC), GUC, "sequential, user close"}, + {C, e(OB, IB, OE, OB, UC, OE, IE, IB, IE, FC), GUC, "pipelined req graceful close"}, + {C, e(OB, IB, UC, OE, IE, FC), GUC, "interleaved, user close"}, + {C, e(OB, OE, IB, IE, UC, FC), GUC, "sequential, idle, user close"}, + {C, e(OB, IB, OE, IE, UC, FC), GUC, "interleaved, idle, user close"}, + {C, e(OB, IB, IE, OE, UC, FC), GUC, "interleaved full dup, idle, user close"}, + {C, e(OB, IB, UC, IE, OE, FC), GUC, "interleaved full dup, user close"}, {C, e(OB, OE, IS, FC), CCI, "abrupt input close after complete write, resp abort"}, {C, e(IS, FC), CCI, "idle, inbound closed"}, {C, e(OB, IS, SR, FC), CCI, "req abort, inbound closed"}, @@ -240,27 +246,43 @@ public static Collection data() { // If inserting lines here, adjust t {S, e(OS, FC), CCO, "idle, outbound closed"}, {S, e(IB, OS, SR, FC), CCO, "req aborted, outbound closed"}, {S, e(IB, OB, OS, IE, FC), CCO, "continue req, outbound shutdown, no reset"}, + {S, e(IB, OB, OS, IS, FC), CCO, "outbound shutdown, inbound shutdown, no reset"}, {S, e(IB, OB, OE, OS, IE, FC), CCO, "resp completed, complete req, outbound closed"}, {S, e(IB, OB, IE, IB, OS, SR, FC), CCO, "new req abort, resp abort, outbound closed"}, {S, e(IB, OB, OE, IE, IB, OS, SR, FC), CCO, "new req abort, complete resp, outbound closed"}, {S, e(IB, IE, OB, OE), NIL, "sequential, no close"}, {S, e(IB, IE, OB, IB, IE, OE, OB, OE), NIL, "pipelined, no close"}, - {S, e(IB, IE, IB, OB, OC, IH, OE, FC), PCO, "pipelined, closing outbound"}, - {S, e(IB, IE, IB, IE, OB, OC, IH, OE, FC), PCO, "pipelined, closing outbound, drop pending!"}, - {S, e(IB, IE, OB, OC, IH, OE, FC), PCO, "sequential, closing outbound"}, - {S, e(IB, OB, IE, IB, IC, OE, OB, IE, IH, OE, FC), PCI, "pipelined, closing inbound, drain"}, - {S, e(IB, IE, OB, IB, IC, IE, IH, OE, OB, OE, FC), PCI, "pipelined, closing inbound"}, - {S, e(IB, IE, OB, IB, IE, UC, IH, OE, OB, OE, FC), UCO, "pipelined, user closing, drain"}, + {S, e(IB, IE, IB, OB, OC, ID, OE, OH, IS, FC), PCO, "pipelined, closing outbound"}, + {S, e(IB, IE, IB, IE, OB, OC, ID, OE, OH, IS, FC), PCO, "pipelined, closing outbound, drop pending!"}, + {S, e(IB, IE, OB, OC, ID, OE, OH, IS, FC), PCO, "sequential, closing outbound"}, + {S, e(IB, OB, OC, IE, ID, OE, OH, IS, FC), PCO, "interleaved, closing outbound"}, + {S, e(IB, OB, OC, OE, IE, ID, OH, IS, FC), PCO, "interleaved full dup, closing outbound"}, + {S, e(IB, OB, OC, IE, ID, IS, OE, FC), PCO, "interleaved, input shutdowns, closing outbound"}, + {S, e(IB, OB, IE, IB, IC, OE, OB, IE, ID, OE, FC), PCI, "pipelined, closing inbound, drain"}, + {S, e(IB, IE, OB, IB, IC, IE, ID, OE, OB, OE, FC), PCI, "pipelined, closing inbound"}, + {S, e(IB, IE, OB, IB, IE, UC, ID, OE, OB, OE, OH, IS, FC), GUC, "pipelined, user closing, drain"}, {S, e(IB, IC, OB, OE, IE, FC), PCI, "pipelined full dup, closing inbound"}, - {S, e(IB, OB, IE, IB, IC, IE, IH, OE, OB, OE, FC), PCI, "pipelined, closing inbound"}, + {S, e(IB, OB, IE, IB, IC, IE, ID, OE, OB, OE, FC), PCI, "pipelined, closing inbound"}, {S, e(IB, OB, IC, OE, IE, FC), PCI, "pipelined, full dup, closing inbound"}, - {S, e(IB, IC, IE, IH, OB, OE, FC), PCI, "sequential, closing inbound"}, - {S, e(IB, UC, IE, IH, OB, OE, FC), UCO, "sequential, user close"}, - {S, e(IB, OB, UC, IE, IH, OE, FC), UCO, "interleaved, user close"}, - {S, e(IB, IE, OB, OE, UC, FC), UCO, "sequential, idle, user close"}, - {S, e(IB, OB, IE, OE, UC, FC), UCO, "interleaved, idle, user close"}, - {S, e(IB, OB, OE, IE, UC, FC), UCO, "interleaved full dup, idle, user close"}, - {S, e(IB, OB, UC, OE, IE, FC), UCO, "interleaved full dup, user close"}, + {S, e(IB, IC, IE, ID, OB, OE, FC), PCI, "sequential, closing inbound"}, + {S, e(UC, ID, OH, IS, FC), GUC, "recently open connection, idle, user close"}, + {S, e(IB, UC, IE, ID, OB, OE, OH, IS, FC), GUC, "sequential, during req, user close"}, + {S, e(IB, IE, UC, ID, OB, OE, OH, IS, FC), GUC, "sequential, user close"}, + {S, e(IB, IE, UC, ID, IS, OB, OE, FC), GUC, "sequential, input shutdown before resp, user close"}, + {S, e(IB, IE, UC, ID, OB, IS, OE, FC), GUC, "sequential, input shutdown after resp, user close"}, + {S, e(IB, IE, OB, UC, ID, OE, OH, IS, FC), GUC, "sequential, during resp, user close"}, + {S, e(IB, IE, OB, OE, UC, ID, OH, IS, FC), GUC, "sequential, idle, user close"}, + {S, e(IB, IE, OB, OE, UC, ID, OH, CI, IS, FC), GUC, "sequential, idle, read cancelled, user close"}, + {S, e(IB, UC, OB, IE, ID, OE, OH, IS, FC), GUC, "interleaved, before resp, user close"}, + {S, e(IB, OB, UC, IE, ID, OE, OH, IS, FC), GUC, "interleaved, user close"}, + {S, e(IB, OB, IE, UC, ID, OE, OH, IS, FC), GUC, "interleaved, after req, user close"}, + {S, e(IB, OB, IE, OE, UC, ID, OH, IS, FC), GUC, "interleaved, idle, user close"}, + {S, e(IB, UC, OB, OE, IE, ID, OH, IS, FC), GUC, "interleaved full dup, before resp, user close"}, + {S, e(IB, OB, UC, OE, IE, ID, OH, IS, FC), GUC, "interleaved full dup, user close"}, + {S, e(IB, OB, UC, OE, IE, ID, OH, CI, IS, FC), GUC, "interleaved full dup, read cancelled, user close"}, + {S, e(IB, OB, OE, UC, IE, ID, OH, IS, FC), GUC, "interleaved full dup, after resp, user close"}, + {S, e(IB, OB, OE, IE, UC, ID, OH, IS, FC), GUC, "interleaved full dup, idle, user close"}, + {S, e(IB, IE, OB, OE, IS, FC), CCI, "sequential, idle, inbound closed"}, {S, e(IB, OB, IS, SR, OE, FC), CCI, "inbound closed while reading no pipeline"}, {S, e(IB, IS, SR, OB, OE, FC), CCI, "inbound closed while reading delay close until response"}, {S, e(IB, IE, IB, IS, SR, OB, OE, OB, OE, FC), CCI, "inbound closed while not writing pipelined, 2 pending"}, @@ -310,7 +332,7 @@ public void setup() { when(channel.eventLoop()).thenReturn(loop); when(loop.inEventLoop()).thenReturn(true); when(scc.getOption(ALLOW_HALF_CLOSURE)).thenReturn(TRUE); - ChannelPipeline pipeline = mock(ChannelPipeline.class); + pipeline = mock(ChannelPipeline.class); when(channel.pipeline()).thenReturn(pipeline); when(channel.isOutputShutdown()).then(__ -> outputShutdown.get()); @@ -320,13 +342,13 @@ public void setup() { when(channel.shutdownInput()).then(__ -> { inputShutdown.set(true); LOGGER.debug("channel.shutdownInput()"); - h.channelClosedInbound(ctx); // OutputShutDownEvent observed from transport + h.channelClosedInbound(ctx); // ChannelInputShutdownReadComplete observed from transport return future; }); when(channel.shutdownOutput()).then(__ -> { outputShutdown.set(true); LOGGER.debug("channel.shutdownOutput()"); - h.channelClosedOutbound(ctx); // InputShutDownReadComplete observed from transport + h.channelClosedOutbound(ctx); // ChannelOutputShutdownEvent observed from transport return future; }); when(channel.close()).then(__ -> { @@ -363,7 +385,7 @@ private void assertCanWrite() { public void simulate() { LOGGER.debug("Test.Params: ({})", location); // Intellij jump to parameter format, don't change! LOGGER.debug("[{}] {} - {} = {}", desc, mode, events, expectEvent); - InOrder order = inOrder(h, channel, scc); + InOrder order = inOrder(h, channel, pipeline, scc); verify(h).registerEventHandler(eq(channel), any()); for (Events event : events) { LOGGER.debug("{}", event); @@ -410,8 +432,8 @@ public void simulate() { order.verify(scc).setSoLinger(0); break; case UC: - h.userClosing(channel); - order.verify(h).userClosing(channel); + h.gracefulUserClosing(channel); + order.verify(h).gracefulUserClosing(channel); break; case IH: order.verify(channel).shutdownInput(); @@ -422,6 +444,17 @@ public void simulate() { case FC: order.verify(channel).close(); break; + case ID: + order.verify(pipeline).fireUserEventTriggered(DiscardFurtherInboundEvent.INSTANCE); + break; + case CI: + h.closeChannelInbound(channel); + order.verify(h).closeChannelInbound(channel); + break; + case CO: + h.closeChannelOutbound(channel); + order.verify(h).closeChannelOutbound(channel); + break; default: throw new IllegalArgumentException("Unknown: " + event); } @@ -456,7 +489,7 @@ public void simulate() { verify(scc, never()).setSoLinger(0); break; case UC: - verify(h, never()).userClosing(channel); + verify(h, never()).gracefulUserClosing(channel); break; case FC: verify(channel, never()).close(); @@ -467,6 +500,15 @@ public void simulate() { case OH: verify(channel, never()).shutdownOutput(); break; + case ID: + verify(pipeline, never()).fireUserEventTriggered(DiscardFurtherInboundEvent.INSTANCE); + break; + case CI: + verify(h, never()).closeChannelInbound(channel); + break; + case CO: + verify(h, never()).closeChannelOutbound(channel); + break; default: throw new IllegalArgumentException("Unknown: " + c); } @@ -540,7 +582,7 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt // Request #2 channel.eventLoop().execute(() -> ch.protocolPayloadBeginInbound(ctx)); channel.eventLoop().execute(() -> ch.protocolPayloadEndInbound(ctx)); - channel.eventLoop().execute(() -> ch.userClosing(channel)); + channel.eventLoop().execute(() -> ch.gracefulUserClosing(channel)); // Response #1 channel.eventLoop().execute(() -> ch.protocolPayloadBeginOutbound(ctx)); channel.eventLoop().execute(() -> ch.protocolPayloadEndOutbound(ctx)); @@ -566,7 +608,7 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt } }); final RequestResponseCloseHandler ch = new RequestResponseCloseHandler(false); - channel.eventLoop().execute(() -> ch.userClosing(channel)); + channel.eventLoop().execute(() -> ch.gracefulUserClosing(channel)); channel.eventLoop().execute(() -> ch.protocolPayloadEndOutbound(channel.pipeline().firstContext())); channel.close().syncUninterruptibly(); assertThat("OutboundDataEndEvent not fired", ab.get(), is(true));