diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 7825e3ebe1c59..2e511d1562279 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -34,7 +34,7 @@ import java.util.HashMap; import java.util.Map; -class Netty4HttpRequest extends RestRequest { +public class Netty4HttpRequest extends RestRequest { private final FullHttpRequest request; private final Channel channel; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 0d4a6ab5ee179..7472d87209e0d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -525,12 +525,12 @@ public ChannelHandler configureServerChannelHandler() { return new HttpChannelHandler(this, detailedErrorsEnabled, threadPool.getThreadContext()); } - static class HttpChannelHandler extends ChannelInitializer { + protected static class HttpChannelHandler extends ChannelInitializer { private final Netty4HttpServerTransport transport; private final Netty4HttpRequestHandler requestHandler; - HttpChannelHandler( + protected HttpChannelHandler( final Netty4HttpServerTransport transport, final boolean detailedErrorsEnabled, final ThreadContext threadContext) { @@ -539,7 +539,7 @@ static class HttpChannelHandler extends ChannelInitializer { } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast("openChannels", transport.serverOpenChannels); final HttpRequestDecoder decoder = new HttpRequestDecoder( Math.toIntExact(transport.maxInitialLineLength.bytes()), diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index c1b2ef10211ee..d7631acd6b7a1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -25,6 +25,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -199,16 +200,7 @@ private Bootstrap createBootstrap() { bootstrap.channel(NioSocketChannel.class); } - bootstrap.handler(new ChannelInitializer() { - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); - // using a dot as a prefix means this cannot come from any settings parsed - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); - } - - }); + bootstrap.handler(getClientChannelInitializer()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis())); bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); @@ -292,14 +284,7 @@ private void createServerBootstrap(String name, Settings settings) { serverBootstrap.channel(NioServerSocketChannel.class); } - serverBootstrap.childHandler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels); - ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); - ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); - } - }); + serverBootstrap.childHandler(getServerChannelInitializer(name, settings)); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings)); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings)); @@ -326,6 +311,14 @@ protected void initChannel(SocketChannel ch) throws Exception { serverBootstraps.put(name, serverBootstrap); } + protected ChannelHandler getServerChannelInitializer(String name, Settings settings) { + return new ServerChannelInitializer(name, settings); + } + + protected ChannelHandler getClientChannelInitializer() { + return new ClientChannelInitializer(); + } + protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class); final Throwable t = unwrapped != null ? unwrapped : cause; @@ -348,7 +341,9 @@ protected NodeChannels connectToChannelsLight(DiscoveryNode node) { Channel[] channels = new Channel[1]; channels[0] = connect.channel(); channels[0].closeFuture().addListener(new ChannelCloseListener(node)); - return new NodeChannels(channels, channels, channels, channels, channels); + NodeChannels nodeChannels = new NodeChannels(channels, channels, channels, channels, channels); + onAfterChannelsConnected(nodeChannels); + return nodeChannels; } protected NodeChannels connectToChannels(DiscoveryNode node) { @@ -409,6 +404,7 @@ protected NodeChannels connectToChannels(DiscoveryNode node) { } throw e; } + onAfterChannelsConnected(nodeChannels); success = true; } finally { if (success == false) { @@ -422,6 +418,14 @@ protected NodeChannels connectToChannels(DiscoveryNode node) { return nodeChannels; } + /** + * Allows for logic to be executed after a connection has been made on all channels. While this method is being executed, the node is + * not listed as being connected to. + * @param nodeChannels the {@link NodeChannels} that have been connected + */ + protected void onAfterChannelsConnected(NodeChannels nodeChannels) { + } + private class ChannelCloseListener implements ChannelFutureListener { private final DiscoveryNode node; @@ -503,4 +507,33 @@ protected void stopInternal() { }); } + protected class ClientChannelInitializer extends ChannelInitializer { + + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); + // using a dot as a prefix means this cannot come from any settings parsed + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client")); + } + + } + + protected class ServerChannelInitializer extends ChannelInitializer { + + protected final String name; + protected final Settings settings; + + protected ServerChannelInitializer(String name, Settings settings) { + this.name = name; + this.settings = settings; + } + + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels); + ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder()); + ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name)); + } + } + } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java index 4d94dc2ccaf87..155bbe4bb5b08 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; @@ -179,7 +180,7 @@ private class CustomHttpChannelHandler extends Netty4HttpServerTransport.HttpCha } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(Channel ch) throws Exception { super.initChannel(ch); ch.pipeline().replace("handler", "handler", new PossiblySlowUpstreamHandler(executorService)); }