From b802c8ceda93fce52e35b58e94eeec2cd039a433 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Wed, 16 Mar 2022 14:56:26 -0700 Subject: [PATCH] Expose `SslConfig` through `ConnectionInfo` Motivation: `SSLSession` represents the result of SSL negotiation. However, it's useful to know what `SslConfig` was used to establish the session for observability or runtime checks. Modifications: - Add `ConnectionInfo#sslConfig()` method; - Propagate `SslConfig` to all `ConnectionInfo` implementations; - Adjust tests; Result: Users have access to `SslConfig` from service or connection factory/filter. --- .../NettyPipelinedConnectionBenchmark.java | 7 ++ .../grpc/api/DefaultGrpcServiceContext.java | 7 ++ .../grpc/customtransport/Utils.java | 7 ++ .../api/DelegatingHttpServiceContext.java | 7 ++ .../http/api/TestHttpServiceContext.java | 7 ++ .../netty/AlpnLBHttpConnectionFactory.java | 3 +- .../H2ClientParentConnectionContext.java | 14 ++- .../http/netty/H2LBHttpConnectionFactory.java | 2 +- .../http/netty/H2ParentConnectionContext.java | 11 +++ .../H2ServerParentConnectionContext.java | 7 +- .../http/netty/NettyHttpServer.java | 12 ++- .../http/netty/NettyPipelinedConnection.java | 7 ++ .../netty/StreamingConnectionFactory.java | 2 +- .../http/netty/AlpnClientAndServerTest.java | 2 + .../http/netty/HttpRequestEncoderTest.java | 6 +- .../netty/NettyPipelinedConnectionTest.java | 2 +- .../io/servicetalk/http/netty/Tls13Test.java | 8 ++ .../tcp/netty/internal/AbstractTcpConfig.java | 8 +- .../internal/ReadOnlyTcpClientConfig.java | 5 + .../internal/ReadOnlyTcpServerConfig.java | 19 +++- .../tcp/netty/internal/TcpConnectorTest.java | 2 +- .../tcp/netty/internal/TcpClient.java | 1 + .../tcp/netty/internal/TcpServer.java | 1 + .../transport/api/ConnectionInfo.java | 10 ++ .../api/DelegatingConnectionContext.java | 6 ++ .../internal/DefaultNettyConnection.java | 94 +++++++++++++++++-- ...stractSslCloseNotifyAlertHandlingTest.java | 2 +- .../internal/DefaultNettyConnectionTest.java | 3 +- .../NettyChannelPublisherRefCountTest.java | 2 +- .../internal/NettyChannelPublisherTest.java | 4 +- 30 files changed, 241 insertions(+), 27 deletions(-) diff --git a/servicetalk-benchmarks/src/jmh/java/io/servicetalk/http/netty/NettyPipelinedConnectionBenchmark.java b/servicetalk-benchmarks/src/jmh/java/io/servicetalk/http/netty/NettyPipelinedConnectionBenchmark.java index b97c6811b2..0ff314a16b 100644 --- a/servicetalk-benchmarks/src/jmh/java/io/servicetalk/http/netty/NettyPipelinedConnectionBenchmark.java +++ b/servicetalk-benchmarks/src/jmh/java/io/servicetalk/http/netty/NettyPipelinedConnectionBenchmark.java @@ -24,6 +24,7 @@ import io.servicetalk.concurrent.api.Single; import io.servicetalk.http.api.HttpProtocolVersion; import io.servicetalk.transport.api.ExecutionContext; +import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.netty.internal.FlushStrategies; import io.servicetalk.transport.netty.internal.FlushStrategy; import io.servicetalk.transport.netty.internal.GlobalExecutionContext; @@ -232,6 +233,12 @@ public SocketAddress remoteAddress() { return new InetSocketAddress(0); } + @Nullable + @Override + public SslConfig sslConfig() { + return null; + } + @Nullable @Override public SSLSession sslSession() { diff --git a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcServiceContext.java b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcServiceContext.java index dd21930d3f..6a08035ca8 100644 --- a/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcServiceContext.java +++ b/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/DefaultGrpcServiceContext.java @@ -20,6 +20,7 @@ import io.servicetalk.http.api.HttpProtocolVersion; import io.servicetalk.http.api.HttpServiceContext; import io.servicetalk.transport.api.ConnectionContext; +import io.servicetalk.transport.api.SslConfig; import java.net.SocketAddress; import java.net.SocketOption; @@ -56,6 +57,12 @@ public SocketAddress remoteAddress() { return connectionContext.remoteAddress(); } + @Nullable + @Override + public SslConfig sslConfig() { + return connectionContext.sslConfig(); + } + @Override @Nullable public SSLSession sslSession() { diff --git a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/Utils.java b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/Utils.java index e7b84945f6..11dd0284ba 100644 --- a/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/Utils.java +++ b/servicetalk-grpc-netty/src/test/java/io/servicetalk/grpc/customtransport/Utils.java @@ -38,6 +38,7 @@ import io.servicetalk.serializer.api.StreamingSerializer; import io.servicetalk.serializer.utils.FramedDeserializerOperator; import io.servicetalk.transport.api.IoExecutor; +import io.servicetalk.transport.api.SslConfig; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -165,6 +166,12 @@ public SocketAddress remoteAddress() { return InMemorySocketAddress.INSTANCE; } + @Nullable + @Override + public SslConfig sslConfig() { + return null; + } + @Nullable @Override public SSLSession sslSession() { diff --git a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java index e9e77dbdd6..98b51fa171 100644 --- a/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java +++ b/servicetalk-http-api/src/main/java/io/servicetalk/http/api/DelegatingHttpServiceContext.java @@ -16,6 +16,7 @@ package io.servicetalk.http.api; import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.transport.api.SslConfig; import java.net.SocketAddress; import java.net.SocketOption; @@ -64,6 +65,12 @@ public SocketAddress remoteAddress() { return delegate.remoteAddress(); } + @Nullable + @Override + public SslConfig sslConfig() { + return delegate.sslConfig(); + } + @Override @Nullable public SSLSession sslSession() { diff --git a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java index ed1e75d874..42d5b139f7 100644 --- a/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java +++ b/servicetalk-http-api/src/testFixtures/java/io/servicetalk/http/api/TestHttpServiceContext.java @@ -16,6 +16,7 @@ package io.servicetalk.http.api; import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.netty.internal.AddressUtils; import java.net.SocketAddress; @@ -71,6 +72,12 @@ public SocketAddress remoteAddress() { return remoteAddress; } + @Nullable + @Override + public SslConfig sslConfig() { + return null; + } + @Nullable @Override public SSLSession sslSession() { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AlpnLBHttpConnectionFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AlpnLBHttpConnectionFactory.java index daec9eecc2..c956ee9d37 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AlpnLBHttpConnectionFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/AlpnLBHttpConnectionFactory.java @@ -93,7 +93,8 @@ private Single createConnection( assert h2Config != null; return H2ClientParentConnectionContext.initChannel(channel, executionContext, h2Config, reqRespFactoryFunc.apply(HttpProtocolVersion.HTTP_2_0), tcpConfig.flushStrategy(), - tcpConfig.idleTimeoutMs(), new H2ClientParentChannelInitializer(h2Config), + tcpConfig.idleTimeoutMs(), tcpConfig.sslConfig(), + new H2ClientParentChannelInitializer(h2Config), connectionObserver, config.allowDropTrailersReadFromTransport()); default: return failed(new IllegalStateException("Unknown ALPN protocol negotiated: " + protocol)); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java index 4171069c1c..cc358a00e3 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ClientParentConnectionContext.java @@ -44,6 +44,7 @@ import io.servicetalk.transport.api.ConnectionObserver.MultiplexedObserver; import io.servicetalk.transport.api.ConnectionObserver.StreamObserver; import io.servicetalk.transport.api.IoThreadFactory; +import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.netty.internal.ChannelInitializer; import io.servicetalk.transport.netty.internal.CloseHandler; import io.servicetalk.transport.netty.internal.DefaultNettyConnection; @@ -93,8 +94,9 @@ final class H2ClientParentConnectionContext extends H2ParentConnectionContext { private H2ClientParentConnectionContext(Channel channel, HttpExecutionContext executionContext, FlushStrategy flushStrategy, @Nullable Long idleTimeoutMs, + @Nullable final SslConfig sslConfig, final KeepAliveManager keepAliveManager) { - super(channel, executionContext, flushStrategy, idleTimeoutMs, keepAliveManager); + super(channel, executionContext, flushStrategy, idleTimeoutMs, sslConfig, keepAliveManager); } interface H2ClientParentConnection extends FilterableStreamingHttpConnection, NettyConnectionContext { @@ -105,6 +107,7 @@ static Single initChannel(Channel channel, HttpExecuti StreamingHttpRequestResponseFactory reqRespFactory, FlushStrategy parentFlushStrategy, @Nullable Long idleTimeoutMs, + @Nullable SslConfig sslConfig, ChannelInitializer initializer, ConnectionObserver observer, boolean allowDropTrailersReadFromTransport) { @@ -118,7 +121,7 @@ protected void handleSubscribe(final Subscriber future, parentContext.flushStrategyHolder.currentStrategy(), parentContext.idleTimeoutMs, HTTP_2_0, + parentContext.sslConfig(), parentContext.sslSession(), parentContext.nettyChannel().config(), streamObserver, @@ -412,6 +416,12 @@ public SocketAddress remoteAddress() { return parentContext.remoteAddress(); } + @Nullable + @Override + public SslConfig sslConfig() { + return parentContext.sslConfig(); + } + @Nullable @Override public SSLSession sslSession() { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2LBHttpConnectionFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2LBHttpConnectionFactory.java index 8934e37e92..468206bf58 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2LBHttpConnectionFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2LBHttpConnectionFactory.java @@ -64,7 +64,7 @@ Single newFilterableConnection( return TcpConnector.connect(null, resolvedAddress, tcpConfig, true, executionContext, (channel, connectionObserver) -> H2ClientParentConnectionContext.initChannel(channel, executionContext, config.h2Config(), reqRespFactoryFunc.apply(HTTP_2_0), - tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(), + tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(), tcpConfig.sslConfig(), new TcpClientChannelInitializer(tcpConfig, connectionObserver).andThen( new H2ClientParentChannelInitializer(config.h2Config())), connectionObserver, config.allowDropTrailersReadFromTransport()), observer); diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java index b5a0a56235..008a0c14f6 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ParentConnectionContext.java @@ -26,6 +26,7 @@ import io.servicetalk.http.api.HttpExecutionContext; import io.servicetalk.http.api.HttpProtocolVersion; import io.servicetalk.transport.api.ConnectionObserver; +import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.netty.internal.FlushStrategy; import io.servicetalk.transport.netty.internal.FlushStrategyHolder; import io.servicetalk.transport.netty.internal.NettyChannelListenableAsyncCloseable; @@ -69,18 +70,22 @@ class H2ParentConnectionContext extends NettyChannelListenableAsyncCloseable imp private final Processor onClosing = newCompletableProcessor(); private final KeepAliveManager keepAliveManager; @Nullable + private final SslConfig sslConfig; + @Nullable final Long idleTimeoutMs; @Nullable private SSLSession sslSession; H2ParentConnectionContext(final Channel channel, final HttpExecutionContext executionContext, final FlushStrategy flushStrategy, @Nullable final Long idleTimeoutMs, + @Nullable final SslConfig sslConfig, final KeepAliveManager keepAliveManager) { super(channel, executionContext.executor()); this.executionContext = new DefaultHttpExecutionContext(executionContext.bufferAllocator(), fromNettyEventLoop(channel.eventLoop(), executionContext.ioExecutor().isIoThreadSupported()), executionContext.executor(), executionContext.executionStrategy()); this.flushStrategyHolder = new FlushStrategyHolder(flushStrategy); + this.sslConfig = sslConfig; this.idleTimeoutMs = idleTimeoutMs; this.keepAliveManager = keepAliveManager; // Just in case the channel abruptly closes, we should complete the onClosing Completable. @@ -117,6 +122,12 @@ public final SocketAddress remoteAddress() { return channel().remoteAddress(); } + @Nullable + @Override + public SslConfig sslConfig() { + return sslConfig; + } + @Nullable @Override public final SSLSession sslSession() { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java index 75961c341d..6fb04da165 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/H2ServerParentConnectionContext.java @@ -30,6 +30,7 @@ import io.servicetalk.transport.api.ConnectionObserver.MultiplexedObserver; import io.servicetalk.transport.api.ConnectionObserver.StreamObserver; import io.servicetalk.transport.api.ServerContext; +import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.netty.internal.ChannelCloseUtils; import io.servicetalk.transport.netty.internal.ChannelInitializer; import io.servicetalk.transport.netty.internal.CloseHandler; @@ -64,9 +65,10 @@ final class H2ServerParentConnectionContext extends H2ParentConnectionContext im private H2ServerParentConnectionContext(final Channel channel, final HttpExecutionContext executionContext, final FlushStrategy flushStrategy, @Nullable final Long idleTimeoutMs, + @Nullable final SslConfig sslConfig, final SocketAddress listenAddress, final KeepAliveManager keepAliveManager) { - super(channel, executionContext, flushStrategy, idleTimeoutMs, keepAliveManager); + super(channel, executionContext, flushStrategy, idleTimeoutMs, sslConfig, keepAliveManager); this.listenAddress = requireNonNull(listenAddress); } @@ -129,7 +131,7 @@ protected void handleSubscribe(final Subscriber initChannel(final Channel chann if (h1Config == null) { return failed(newH1ConfigException()); } + final ReadOnlyTcpServerConfig tcpConfig = config.tcpConfig(); return showPipeline(DefaultNettyConnection.initChannel(channel, httpExecutionContext.bufferAllocator(), httpExecutionContext.executor(), - httpExecutionContext.ioExecutor(), closeHandler, config.tcpConfig().flushStrategy(), - config.tcpConfig().idleTimeoutMs(), + httpExecutionContext.ioExecutor(), closeHandler, tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(), + tcpConfig.sslConfig(), initializer.andThen(getChannelInitializer(getByteBufAllocator(httpExecutionContext.bufferAllocator()), h1Config, closeHandler)), httpExecutionContext.executionStrategy(), HTTP_1_1, observer, false, __ -> false) @@ -480,6 +482,12 @@ public SocketAddress remoteAddress() { return connection.remoteAddress(); } + @Nullable + @Override + public SslConfig sslConfig() { + return connection.sslConfig(); + } + @Nullable @Override public SSLSession sslSession() { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyPipelinedConnection.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyPipelinedConnection.java index 6bf4e9a248..7918b973d9 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyPipelinedConnection.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/NettyPipelinedConnection.java @@ -23,6 +23,7 @@ import io.servicetalk.concurrent.api.Single; import io.servicetalk.concurrent.internal.ConcurrentUtils; import io.servicetalk.transport.api.ExecutionContext; +import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.netty.internal.FlushStrategy; import io.servicetalk.transport.netty.internal.NettyConnection; import io.servicetalk.transport.netty.internal.NettyConnectionContext; @@ -137,6 +138,12 @@ public SocketAddress remoteAddress() { return connection.remoteAddress(); } + @Nullable + @Override + public SslConfig sslConfig() { + return connection.sslConfig(); + } + @Override @Nullable public SSLSession sslSession() { diff --git a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/StreamingConnectionFactory.java b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/StreamingConnectionFactory.java index c0fd50780d..e0a3f523b5 100644 --- a/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/StreamingConnectionFactory.java +++ b/servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/StreamingConnectionFactory.java @@ -67,7 +67,7 @@ static Single> createConnection final CloseHandler closeHandler = forPipelinedRequestResponse(true, channel.config()); return showPipeline(DefaultNettyConnection.initChannel(channel, executionContext.bufferAllocator(), executionContext.executor(), executionContext.ioExecutor(), closeHandler, - tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(), + tcpConfig.flushStrategy(), tcpConfig.idleTimeoutMs(), tcpConfig.sslConfig(), initializer.andThen(new HttpClientChannelInitializer( getByteBufAllocator(executionContext.bufferAllocator()), h1Config, closeHandler)), executionContext.executionStrategy(), HTTP_1_1, connectionObserver, true, OBJ_EXPECT_CONTINUE), diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AlpnClientAndServerTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AlpnClientAndServerTest.java index 1415878d28..ea4d7ceded 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AlpnClientAndServerTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/AlpnClientAndServerTest.java @@ -182,6 +182,7 @@ void testAlpnConnection(List serverSideProtocols, try (ReservedBlockingHttpConnection connection = client.reserveConnection(client.get("/"))) { assertThat(connection.connectionContext().protocol(), is(expectedProtocol)); + assertThat(connection.connectionContext().sslConfig(), is(notNullValue())); assertThat(connection.connectionContext().sslSession(), is(notNullValue())); assertResponseAndServiceContext(connection.request(client.get("/"))); @@ -212,6 +213,7 @@ private void assertResponseAndServiceContext(HttpResponse response) throws Excep HttpServiceContext serviceCtx = serviceContext.take(); assertThat(serviceCtx.protocol(), is(expectedProtocol)); assertThat(serviceCtx.sslSession(), is(notNullValue())); + assertThat(serviceCtx.sslSession(), is(notNullValue())); assertThat(requestVersion.take(), is(expectedProtocol)); assertThat(serviceContext, is(empty())); diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpRequestEncoderTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpRequestEncoderTest.java index e5c21d170b..676cf31cbf 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpRequestEncoderTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/HttpRequestEncoderTest.java @@ -423,6 +423,7 @@ void protocolPayloadEndOutboundShouldNotTriggerOnFailedFlush() throws Exception (channel, observer) -> DefaultNettyConnection.initChannel(channel, SEC.bufferAllocator(), SEC.executor(), SEC.ioExecutor(), forPipelinedRequestResponse(false, channel.config()), defaultFlushStrategy(), null, + null, new TcpServerChannelInitializer(sConfig, observer).andThen( channel2 -> { serverChannelRef.compareAndSet(null, channel2); @@ -439,8 +440,9 @@ void protocolPayloadEndOutboundShouldNotTriggerOnFailedFlush() throws Exception closeHandlerRef.compareAndSet(null, closeHandler); return DefaultNettyConnection.initChannel(channel, CEC.bufferAllocator(), CEC.executor(), CEC.ioExecutor(), - closeHandler, defaultFlushStrategy(), - null, new TcpClientChannelInitializer(cConfig.tcpConfig(), + closeHandler, defaultFlushStrategy(), null, + cConfig.tcpConfig().sslConfig(), + new TcpClientChannelInitializer(cConfig.tcpConfig(), connectionObserver) .andThen(new HttpClientChannelInitializer( getByteBufAllocator(CEC.bufferAllocator()), diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyPipelinedConnectionTest.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyPipelinedConnectionTest.java index 38dc941c02..d5937847f1 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyPipelinedConnectionTest.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/NettyPipelinedConnectionTest.java @@ -101,7 +101,7 @@ void setUp() throws Exception { CloseHandler closeHandler = UNSUPPORTED_PROTOCOL_CLOSE_HANDLER; final DefaultNettyConnection connection = DefaultNettyConnection.initChannel(channel, DEFAULT_ALLOCATOR, - immediate(), null, closeHandler, defaultFlushStrategy(), null, channel2 -> { + immediate(), null, closeHandler, defaultFlushStrategy(), null, null, channel2 -> { channel2.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { diff --git a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/Tls13Test.java b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/Tls13Test.java index 987912d4f4..6fa9b9a6ad 100644 --- a/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/Tls13Test.java +++ b/servicetalk-http-netty/src/test/java/io/servicetalk/http/netty/Tls13Test.java @@ -22,6 +22,7 @@ import io.servicetalk.transport.api.ClientSslConfigBuilder; import io.servicetalk.transport.api.ServerContext; import io.servicetalk.transport.api.ServerSslConfigBuilder; +import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.api.SslProvider; import io.servicetalk.transport.netty.internal.ExecutionContextExtension; @@ -48,6 +49,7 @@ import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort; import static java.util.Collections.singletonList; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -98,6 +100,9 @@ void requiredCipher(SslProvider serverSslProvider, SslProvider clientSslProvider .sslConfig(serverSslBuilder.build()) .listenBlockingAndAwait((ctx, request, responseFactory) -> { assertThat(request.payloadBody(textSerializerUtf8()), equalTo("request-payload-body")); + SslConfig sslConfig = ctx.sslConfig(); + assertThat(sslConfig, is(notNullValue())); + assertThat(sslConfig.sslProtocols(), contains(TLS1_3)); SSLSession sslSession = ctx.sslSession(); assertThat(sslSession, is(notNullValue())); return responseFactory.ok().payloadBody(sslSession.getProtocol(), textSerializerUtf8()); @@ -117,6 +122,9 @@ void requiredCipher(SslProvider serverSslProvider, SslProvider clientSslProvider .sslConfig(clientSslBuilder.build()).buildBlocking(); BlockingHttpConnection connection = client.reserveConnection(client.get("/"))) { + SslConfig sslConfig = connection.connectionContext().sslConfig(); + assertThat(sslConfig, is(notNullValue())); + assertThat(sslConfig.sslProtocols(), contains(TLS1_3)); SSLSession sslSession = connection.connectionContext().sslSession(); assertThat(sslSession, is(notNullValue())); assertThat(sslSession.getProtocol(), equalTo(TLS1_3)); diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractTcpConfig.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractTcpConfig.java index 1430a7303b..6b13a586d3 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractTcpConfig.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/AbstractTcpConfig.java @@ -18,7 +18,6 @@ import io.servicetalk.logging.api.LogLevel; import io.servicetalk.logging.api.UserDataLoggerConfig; import io.servicetalk.logging.slf4j.internal.DefaultUserDataLoggerConfig; -import io.servicetalk.transport.api.ServerSslConfig; import io.servicetalk.transport.api.ServiceTalkSocketOptions; import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.netty.internal.FlushStrategy; @@ -85,6 +84,11 @@ final UserDataLoggerConfig wireLoggerConfig() { return wireLoggerConfig; } + /** + * Get the {@link SslConfigType}. + * + * @return the {@link SslConfigType}, or {@code null} if SSL/TLS is not configured. + */ @Nullable public final SslConfigType sslConfig() { return sslConfig; @@ -139,7 +143,7 @@ public final void enableWireLogging(final String loggerName, /** * Add SSL/TLS related config. * - * @param sslConfig the {@link ServerSslConfig}. + * @param sslConfig the {@link SslConfigType}. */ public final void sslConfig(final SslConfigType sslConfig) { this.sslConfig = requireNonNull(sslConfig); diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/ReadOnlyTcpClientConfig.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/ReadOnlyTcpClientConfig.java index 0969af6545..578e64e830 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/ReadOnlyTcpClientConfig.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/ReadOnlyTcpClientConfig.java @@ -51,6 +51,11 @@ private ReadOnlyTcpClientConfig(final ReadOnlyTcpClientConfig config, final Stri sslContext = config.sslContext; } + /** + * Returns the {@link SslContext}. + * + * @return {@link SslContext}, {@code null} if none specified + */ @Nullable @Override public SslContext sslContext() { diff --git a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/ReadOnlyTcpServerConfig.java b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/ReadOnlyTcpServerConfig.java index b7e272cc3d..39f766501d 100644 --- a/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/ReadOnlyTcpServerConfig.java +++ b/servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/ReadOnlyTcpServerConfig.java @@ -40,6 +40,8 @@ public final class ReadOnlyTcpServerConfig extends AbstractReadOnlyTcpConfig listenOptions; private final TransportObserver transportObserver; @Nullable + private final ServerSslConfig sslConfig; + @Nullable private final SslContext sslContext; @Nullable private final Mapping sniMapping; @@ -51,7 +53,7 @@ public final class ReadOnlyTcpServerConfig extends AbstractReadOnlyTcpConfig sniMap = from.sniConfig(); if (sniMap != null) { if (sslConfig == null) { @@ -98,6 +100,21 @@ public TransportObserver transportObserver() { return transportObserver; } + /** + * Get the {@link ServerSslConfig}. + * + * @return the {@link ServerSslConfig}, or {@code null} if SSL/TLS is not configured. + */ + @Nullable + public ServerSslConfig sslConfig() { + return sslConfig; + } + + /** + * Returns the {@link SslContext}. + * + * @return {@link SslContext}, {@code null} if none specified + */ @Nullable @Override public SslContext sslContext() { diff --git a/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpConnectorTest.java b/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpConnectorTest.java index c77586d66a..3aa5c7f493 100644 --- a/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpConnectorTest.java +++ b/servicetalk-tcp-netty-internal/src/test/java/io/servicetalk/tcp/netty/internal/TcpConnectorTest.java @@ -103,7 +103,7 @@ void testRegisteredAndActiveEventsFired() throws Exception { serverContext.listenAddress(), new TcpClientConfig().asReadOnly(), false, CLIENT_CTX, (channel, connectionObserver) -> DefaultNettyConnection.initChannel(channel, CLIENT_CTX.bufferAllocator(), CLIENT_CTX.executor(), CLIENT_CTX.ioExecutor(), closeHandler, - defaultFlushStrategy(), null, channel2 -> + defaultFlushStrategy(), null, null, channel2 -> channel2.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRegistered(ChannelHandlerContext ctx) { diff --git a/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java b/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java index b9ffba41cd..b3a6420682 100644 --- a/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java +++ b/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpClient.java @@ -95,6 +95,7 @@ public Single> connect(ExecutionContext execu (channel, connectionObserver) -> initChannel(channel, executionContext.bufferAllocator(), executionContext.executor(), executionContext.ioExecutor(), UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, config.flushStrategy(), config.idleTimeoutMs(), + config.sslConfig(), new TcpClientChannelInitializer(config, connectionObserver).andThen( channel2 -> channel2.pipeline().addLast(BufferHandler.INSTANCE)), executionContext.executionStrategy(), TCP, connectionObserver, true, __ -> false), diff --git a/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpServer.java b/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpServer.java index fb768154ea..8071c7bdfc 100644 --- a/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpServer.java +++ b/servicetalk-tcp-netty-internal/src/testFixtures/java/io/servicetalk/tcp/netty/internal/TcpServer.java @@ -86,6 +86,7 @@ public ServerContext bind(ExecutionContext executionContext, int port, (channel, connectionObserver) -> DefaultNettyConnection.initChannel(channel, executionContext.bufferAllocator(), executionContext.executor(), executionContext.ioExecutor(), UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, config.flushStrategy(), config.idleTimeoutMs(), + config.sslConfig(), new TcpServerChannelInitializer(config, connectionObserver) .andThen(getChannelInitializer(service, executionContext)), executionStrategy, TCP, connectionObserver, false, __ -> false), diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionInfo.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionInfo.java index 2a8ad49474..0ac3dd6f9f 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionInfo.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/ConnectionInfo.java @@ -39,6 +39,16 @@ public interface ConnectionInfo { */ SocketAddress remoteAddress(); + /** + * Get the {@link SslConfig} for this connection. + * + * @return The {@link SslConfig} if SSL/TLS is configured, or {@code null} otherwise. + */ + @Nullable + default SslConfig sslConfig() { // FIXME: 0.43 - consider removing default impl + throw new UnsupportedOperationException(getClass().getSimpleName() + "does not implement sslConfig()"); + } + /** * Get the {@link SSLSession} for this connection. * diff --git a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/DelegatingConnectionContext.java b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/DelegatingConnectionContext.java index 999a571aa5..c991548d5f 100644 --- a/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/DelegatingConnectionContext.java +++ b/servicetalk-transport-api/src/main/java/io/servicetalk/transport/api/DelegatingConnectionContext.java @@ -60,6 +60,12 @@ public SocketAddress remoteAddress() { return delegate.remoteAddress(); } + @Nullable + @Override + public SslConfig sslConfig() { + return delegate.sslConfig(); + } + @Nullable @Override public SSLSession sslSession() { diff --git a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java index 5e27b94220..200efaed0c 100644 --- a/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java +++ b/servicetalk-transport-netty-internal/src/main/java/io/servicetalk/transport/netty/internal/DefaultNettyConnection.java @@ -41,6 +41,7 @@ import io.servicetalk.transport.api.IoExecutor; import io.servicetalk.transport.api.RetryableException; import io.servicetalk.transport.api.ServiceTalkSocketOptions; +import io.servicetalk.transport.api.SslConfig; import io.servicetalk.transport.netty.internal.CloseHandler.AbortWritesEvent; import io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent; import io.servicetalk.transport.netty.internal.CloseHandler.CloseEventObservedException; @@ -116,6 +117,8 @@ public final class DefaultNettyConnection extends NettyChannelListe @Nullable private final Long idleTimeoutMs; private final Protocol protocol; + @Nullable + private final SslConfig sslConfig; private volatile ChannelOutboundListener channelOutboundListener = NoopChannelOutboundListener.INSTANCE; /** * Potentially contains more information when a protocol or channel level close event was observed. @@ -167,7 +170,8 @@ private void cleanupOnWriteTerminated() { private DefaultNettyConnection( Channel channel, ExecutionContext executionContext, CloseHandler closeHandler, FlushStrategy flushStrategy, - @Nullable Long idleTimeoutMs, Protocol protocol, @Nullable SSLSession sslSession, + @Nullable Long idleTimeoutMs, Protocol protocol, + @Nullable SslConfig sslConfig, @Nullable SSLSession sslSession, @Nullable ChannelConfig parentChannelConfig, DataObserver dataObserver, boolean isClient, Predicate shouldWait, UnaryOperator enrichProtocolError) { super(channel, @@ -204,6 +208,7 @@ private DefaultNettyConnection( } else { onClosing = null; } + this.sslConfig = sslConfig; this.sslSession = sslSession; this.parentChannelConfig = parentChannelConfig; this.protocol = requireNonNull(protocol); @@ -232,7 +237,7 @@ private DefaultNettyConnection( * @return A {@link Single} that completes with a {@link DefaultNettyConnection} after the channel is activated and * ready to use. * @deprecated Use {@code #initChildChannel(Channel, ExecutionContext, CloseHandler, FlushStrategy, Long, Protocol, - * SSLSession, ChannelConfig, StreamObserver, boolean, Predicate, UnaryOperator)}. + * SslConfig, SSLSession, ChannelConfig, StreamObserver, boolean, Predicate, UnaryOperator)}. */ @Deprecated // FIXME: 0.43 - remove deprecated method public static DefaultNettyConnection initChildChannel( @@ -264,19 +269,54 @@ public static DefaultNettyConnection initChildChannel * @param Type of objects written to the {@link NettyConnection}. * @return A {@link Single} that completes with a {@link DefaultNettyConnection} after the channel is activated and * ready to use. + * @deprecated Use {@code #initChildChannel(Channel, ExecutionContext, CloseHandler, FlushStrategy, Long, Protocol, + * SslConfig, SSLSession, ChannelConfig, StreamObserver, boolean, Predicate, UnaryOperator)}. */ + @Deprecated // FIXME: 0.43 - remove deprecated method public static DefaultNettyConnection initChildChannel( Channel channel, ExecutionContext executionContext, CloseHandler closeHandler, FlushStrategy flushStrategy, @Nullable Long idleTimeoutMs, Protocol protocol, @Nullable SSLSession sslSession, @Nullable ChannelConfig parentChannelConfig, StreamObserver streamObserver, boolean isClient, Predicate shouldWait, UnaryOperator enrichProtocolError) { + return initChildChannel(channel, executionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, null, + sslSession, parentChannelConfig, streamObserver, isClient, shouldWait, enrichProtocolError); + } + + /** + * Given a {@link Channel} this will initialize the {@link ChannelPipeline} just to create a + * {@link DefaultNettyConnection}. It is assumed this is a child channel and all TLS handshaking is completed. + * @param channel A newly created {@link Channel}. + * @param executionContext Used to derive the {@link #executionContext()}. + * @param closeHandler Manages the half closure of the {@link DefaultNettyConnection}. + * @param flushStrategy Manages flushing of data for the {@link DefaultNettyConnection}. + * @param idleTimeoutMs Value for {@link ServiceTalkSocketOptions#IDLE_TIMEOUT IDLE_TIMEOUT} socket option. + * @param protocol {@link Protocol} for the returned {@link DefaultNettyConnection}. + * @param sslConfig The {@link SslConfig} to use for the {@link DefaultNettyConnection}. + * @param sslSession Provides access to the {@link SSLSession} associated with this connection. + * @param parentChannelConfig {@link ChannelConfig} of the parent {@link Channel} to query {@link SocketOption}s. + * @param streamObserver {@link StreamObserver} to report internal events. + * @param isClient tells if this {@link Channel} is for the client. + * @param enrichProtocolError enriches protocol-specific {@link Throwable}s. + * @param shouldWait predicate that tells when request payload body should wait for continuation signal. + * @param Type of objects read from the {@link NettyConnection}. + * @param Type of objects written to the {@link NettyConnection}. + * @return A {@link Single} that completes with a {@link DefaultNettyConnection} after the channel is activated and + * ready to use. + */ + public static DefaultNettyConnection initChildChannel( + Channel channel, ExecutionContext executionContext, + CloseHandler closeHandler, FlushStrategy flushStrategy, + @Nullable Long idleTimeoutMs, Protocol protocol, + @Nullable SslConfig sslConfig, @Nullable SSLSession sslSession, + @Nullable ChannelConfig parentChannelConfig, StreamObserver streamObserver, boolean isClient, + Predicate shouldWait, UnaryOperator enrichProtocolError) { DefaultExecutionContext childExecutionContext = new DefaultExecutionContext<>( executionContext.bufferAllocator(), fromNettyEventLoop(channel.eventLoop(), executionContext.ioExecutor().isIoThreadSupported()), executionContext.executor(), executionContext.executionStrategy()); DefaultNettyConnection connection = new DefaultNettyConnection<>(channel, childExecutionContext, - closeHandler, flushStrategy, idleTimeoutMs, protocol, sslSession, parentChannelConfig, + closeHandler, flushStrategy, idleTimeoutMs, protocol, sslConfig, sslSession, parentChannelConfig, streamObserver.streamEstablished(), isClient, shouldWait, enrichProtocolError); channel.pipeline().addLast(new NettyToStChannelInboundHandler<>(connection, null, null, false, NoopConnectionObserver.INSTANCE)); @@ -304,7 +344,7 @@ public static DefaultNettyConnection initChildChannel * @return A {@link Single} that completes with a {@link DefaultNettyConnection} after the channel is activated and * ready to use. * @deprecated Use {@code #initChannel(Channel, BufferAllocator, Executor, IoExecutor, CloseHandler, FlushStrategy, - * Long, ChannelInitializer, ExecutionStrategy, Protocol, ConnectionObserver, boolean, Predicate)}. + * Long, SslConfig, ChannelInitializer, ExecutionStrategy, Protocol, ConnectionObserver, boolean, Predicate)}. */ @Deprecated // FIXME: 0.43 - remove deprecated method public static Single> initChannel( @@ -337,10 +377,46 @@ public static Single> initChan * @param Type of objects written to the {@link NettyConnection}. * @return A {@link Single} that completes with a {@link DefaultNettyConnection} after the channel is activated and * ready to use. + * @deprecated Use {@code #initChannel(Channel, BufferAllocator, Executor, IoExecutor, CloseHandler, FlushStrategy, + * Long, SslConfig, ChannelInitializer, ExecutionStrategy, Protocol, ConnectionObserver, boolean, Predicate)}. + */ + @Deprecated // FIXME: 0.43 - remove deprecated method + public static Single> initChannel( + Channel channel, BufferAllocator allocator, Executor executor, @Nullable IoExecutor ioExecutor, + CloseHandler closeHandler, FlushStrategy flushStrategy, @Nullable Long idleTimeoutMs, + ChannelInitializer initializer, ExecutionStrategy executionStrategy, Protocol protocol, + ConnectionObserver observer, boolean isClient, Predicate shouldWait) { + return initChannel(channel, allocator, executor, ioExecutor, closeHandler, flushStrategy, idleTimeoutMs, null, + initializer, executionStrategy, protocol, observer, isClient, shouldWait); + } + + /** + * Given a {@link Channel} this will initialize the {@link ChannelPipeline} and create a + * {@link DefaultNettyConnection}. The resulting single will complete after the TLS handshake has completed + * (if applicable) or otherwise after the channel is active and ready to use. + * @param channel A newly created {@link Channel}. + * @param allocator The {@link BufferAllocator} to use for the {@link DefaultNettyConnection}. + * @param executor The {@link Executor} to use for the {@link DefaultNettyConnection}. + * @param ioExecutor The {@link IoExecutor} to use for the {@link DefaultNettyConnection}. + * @param closeHandler Manages the half closure of the {@link DefaultNettyConnection}. + * @param flushStrategy Manages flushing of data for the {@link DefaultNettyConnection}. + * @param idleTimeoutMs Value for {@link ServiceTalkSocketOptions#IDLE_TIMEOUT IDLE_TIMEOUT} socket option. + * @param sslConfig The {@link SslConfig} to use for the {@link DefaultNettyConnection}. + * @param initializer Synchronously initializes the pipeline upon subscribe. + * @param executionStrategy {@link ExecutionStrategy} to use for this connection. + * @param protocol {@link Protocol} for the returned {@link DefaultNettyConnection}. + * @param observer {@link ConnectionObserver} to report network events. + * @param isClient tells if this {@link Channel} is for the client. + * @param shouldWait predicate that tells when request payload body should wait for continuation signal. + * @param Type of objects read from the {@link NettyConnection}. + * @param Type of objects written to the {@link NettyConnection}. + * @return A {@link Single} that completes with a {@link DefaultNettyConnection} after the channel is activated and + * ready to use. */ public static Single> initChannel( Channel channel, BufferAllocator allocator, Executor executor, @Nullable IoExecutor ioExecutor, CloseHandler closeHandler, FlushStrategy flushStrategy, @Nullable Long idleTimeoutMs, + @Nullable SslConfig sslConfig, ChannelInitializer initializer, ExecutionStrategy executionStrategy, Protocol protocol, ConnectionObserver observer, boolean isClient, Predicate shouldWait) { return new SubscribableSingle>() { @@ -355,8 +431,8 @@ protected void handleSubscribe( DefaultExecutionContext executionContext = new DefaultExecutionContext<>(allocator, fromNettyEventLoop(channel.eventLoop(), supportsIoThread), executor, executionStrategy); DefaultNettyConnection connection = new DefaultNettyConnection<>(channel, - executionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, null, null, - NoopDataObserver.INSTANCE, isClient, shouldWait, identity()); + executionContext, closeHandler, flushStrategy, idleTimeoutMs, protocol, sslConfig, null, + null, NoopDataObserver.INSTANCE, isClient, shouldWait, identity()); channel.attr(CHANNEL_CLOSEABLE_KEY).set(connection); // We need the NettyToStChannelInboundHandler to be last in the pipeline. We accomplish that by // calling the ChannelInitializer before we do addLast for the NettyToStChannelInboundHandler. @@ -521,6 +597,12 @@ public SocketAddress remoteAddress() { return channel().remoteAddress(); } + @Nullable + @Override + public SslConfig sslConfig() { + return sslConfig; + } + @Override public SSLSession sslSession() { return sslSession; diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractSslCloseNotifyAlertHandlingTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractSslCloseNotifyAlertHandlingTest.java index fc226893d2..e59648fd36 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractSslCloseNotifyAlertHandlingTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/AbstractSslCloseNotifyAlertHandlingTest.java @@ -53,7 +53,7 @@ abstract class AbstractSslCloseNotifyAlertHandlingTest { channel = new EmbeddedDuplexChannel(false); final CloseHandler closeHandler = forPipelinedRequestResponse(isClient, channel.config()); conn = DefaultNettyConnection.initChannel(channel, DEFAULT_ALLOCATOR, immediate(), - null, closeHandler, defaultFlushStrategy(), null, + null, closeHandler, defaultFlushStrategy(), null, null, WIRE_LOGGING_INITIALIZER.andThen(ch -> ch.pipeline().addLast(new ChannelDuplexHandler() { @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/DefaultNettyConnectionTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/DefaultNettyConnectionTest.java index f3d02b7d4e..55376a7f47 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/DefaultNettyConnectionTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/DefaultNettyConnectionTest.java @@ -113,7 +113,8 @@ private void setupWithCloseHandler(Function close when(demandEstimator.estimateRequestN(anyLong())).then(invocation1 -> (long) requestNext); CloseHandler closeHandler = closeHandlerFactory.apply(channel); conn = DefaultNettyConnection.initChannel(channel, allocator, executor, - null, closeHandler, defaultFlushStrategy(), null, trailerProtocolEndEventEmitter(closeHandler), + null, closeHandler, defaultFlushStrategy(), null, null, + trailerProtocolEndEventEmitter(closeHandler), offloadAll(), mock(Protocol.class), NoopConnectionObserver.INSTANCE, true, __ -> false) .toFuture().get(); publisher = new TestPublisher<>(); diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/NettyChannelPublisherRefCountTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/NettyChannelPublisherRefCountTest.java index 86234f5fb8..35283f3956 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/NettyChannelPublisherRefCountTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/NettyChannelPublisherRefCountTest.java @@ -50,7 +50,7 @@ public void setUp() throws Exception { channel = new EmbeddedDuplexChannel(false); publisher = DefaultNettyConnection.initChannel(channel, DEFAULT_ALLOCATOR, immediate(), null, UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, - defaultFlushStrategy(), null, channel2 -> { }, offloadAll(), mock(Protocol.class), + defaultFlushStrategy(), null, null, channel2 -> { }, offloadAll(), mock(Protocol.class), NoopConnectionObserver.INSTANCE, true, __ -> false).toFuture().get() .read(); } diff --git a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/NettyChannelPublisherTest.java b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/NettyChannelPublisherTest.java index 8925cf0924..9cc5b29f7a 100644 --- a/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/NettyChannelPublisherTest.java +++ b/servicetalk-transport-netty-internal/src/test/java/io/servicetalk/transport/netty/internal/NettyChannelPublisherTest.java @@ -92,7 +92,7 @@ public void setUp(Predicate terminalPredicate) throws Exception { CloseHandler closeHandler = UNSUPPORTED_PROTOCOL_CLOSE_HANDLER; NettyConnection connection = DefaultNettyConnection.initChannel(channel, DEFAULT_ALLOCATOR, - immediate(), null, closeHandler, defaultFlushStrategy(), null, channel -> + immediate(), null, closeHandler, defaultFlushStrategy(), null, null, channel -> channel.pipeline().addLast(new ChannelOutboundHandlerAdapter() { @Override public void read(ChannelHandlerContext ctx) throws Exception { @@ -127,7 +127,7 @@ private void setupFireReadOnCloseEvents() throws Exception { channel = new EmbeddedDuplexChannel(false); NettyConnection connection = DefaultNettyConnection.initChannel(channel, DEFAULT_ALLOCATOR, immediate(), null, UNSUPPORTED_PROTOCOL_CLOSE_HANDLER, defaultFlushStrategy(), null, - channel -> { + null, channel -> { channel.pipeline().addLast(new ChannelDuplexHandler() { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {