Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.HashMap;
import java.util.Map;

class Netty4HttpRequest extends RestRequest {
public class Netty4HttpRequest extends RestRequest {

private final FullHttpRequest request;
private final Channel channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,12 +525,12 @@ public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, detailedErrorsEnabled, threadPool.getThreadContext());
}

static class HttpChannelHandler extends ChannelInitializer<SocketChannel> {
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {

private final Netty4HttpServerTransport transport;
private final Netty4HttpRequestHandler requestHandler;

HttpChannelHandler(
protected HttpChannelHandler(
final Netty4HttpServerTransport transport,
final boolean detailedErrorsEnabled,
final ThreadContext threadContext) {
Expand All @@ -539,7 +539,7 @@ static class HttpChannelHandler extends ChannelInitializer<SocketChannel> {
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("openChannels", transport.serverOpenChannels);
final HttpRequestDecoder decoder = new HttpRequestDecoder(
Math.toIntExact(transport.maxInitialLineLength.bytes()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -199,16 +200,7 @@ private Bootstrap createBootstrap() {
bootstrap.channel(NioSocketChannel.class);
}

bootstrap.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
}

});
bootstrap.handler(getClientChannelInitializer());

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.millis()));
bootstrap.option(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
Expand Down Expand Up @@ -292,14 +284,7 @@ private void createServerBootstrap(String name, Settings settings) {
serverBootstrap.channel(NioServerSocketChannel.class);
}

serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
}
});
serverBootstrap.childHandler(getServerChannelInitializer(name, settings));

serverBootstrap.childOption(ChannelOption.TCP_NODELAY, TCP_NO_DELAY.get(settings));
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, TCP_KEEP_ALIVE.get(settings));
Expand All @@ -326,6 +311,14 @@ protected void initChannel(SocketChannel ch) throws Exception {
serverBootstraps.put(name, serverBootstrap);
}

protected ChannelHandler getServerChannelInitializer(String name, Settings settings) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason not to inline this method and the one below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to enable extensions.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this with @jasontedor, he explained that this class can be extended and these methods need to be overridable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we document the fact that these methods can be extended?

return new ServerChannelInitializer(name, settings);
}

protected ChannelHandler getClientChannelInitializer() {
return new ClientChannelInitializer();
}

protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable t = unwrapped != null ? unwrapped : cause;
Expand All @@ -348,7 +341,9 @@ protected NodeChannels connectToChannelsLight(DiscoveryNode node) {
Channel[] channels = new Channel[1];
channels[0] = connect.channel();
channels[0].closeFuture().addListener(new ChannelCloseListener(node));
return new NodeChannels(channels, channels, channels, channels, channels);
NodeChannels nodeChannels = new NodeChannels(channels, channels, channels, channels, channels);
onAfterChannelsConnected(nodeChannels);
return nodeChannels;
}

protected NodeChannels connectToChannels(DiscoveryNode node) {
Expand Down Expand Up @@ -409,6 +404,7 @@ protected NodeChannels connectToChannels(DiscoveryNode node) {
}
throw e;
}
onAfterChannelsConnected(nodeChannels);
success = true;
} finally {
if (success == false) {
Expand All @@ -422,6 +418,14 @@ protected NodeChannels connectToChannels(DiscoveryNode node) {
return nodeChannels;
}

/**
* Allows for logic to be executed after a connection has been made on all channels. While this method is being executed, the node is
* not listed as being connected to.
* @param nodeChannels the {@link NodeChannels} that have been connected
*/
protected void onAfterChannelsConnected(NodeChannels nodeChannels) {
}

private class ChannelCloseListener implements ChannelFutureListener {

private final DiscoveryNode node;
Expand Down Expand Up @@ -503,4 +507,33 @@ protected void stopInternal() {
});
}

protected class ClientChannelInitializer extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
}

}

protected class ServerChannelInitializer extends ChannelInitializer<Channel> {

protected final String name;
protected final Settings settings;

protected ServerChannelInitializer(String name, Settings settings) {
this.name = name;
this.settings = settings;
}

@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("open_channels", Netty4Transport.this.serverOpenChannels);
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
Expand Down Expand Up @@ -179,7 +180,7 @@ private class CustomHttpChannelHandler extends Netty4HttpServerTransport.HttpCha
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel(Channel ch) throws Exception {
super.initChannel(ch);
ch.pipeline().replace("handler", "handler", new PossiblySlowUpstreamHandler(executorService));
}
Expand Down