diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java index 3dc48ce3e00e..6f8339895b5b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java @@ -33,6 +33,8 @@ @InterfaceAudience.Private class BufferCallBeforeInitHandler extends ChannelDuplexHandler { + static final String NAME = "BufferCall"; + private enum BufferCallAction { FLUSH, FAIL @@ -77,6 +79,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + // do not flush anything out + } + @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof BufferCallEvent) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 81a61d772638..b1aeef725090 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -48,7 +48,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture; import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; -import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; @@ -152,14 +152,14 @@ public void cleanupConnection() { private void established(Channel ch) throws IOException { assert eventLoop.inEventLoop(); - ChannelPipeline p = ch.pipeline(); - String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name(); - p.addBefore(addBeforeHandler, null, - new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)); - p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); - p.addBefore(addBeforeHandler, null, - new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)); - p.fireUserEventTriggered(BufferCallEvent.success()); + ch.pipeline() + .addBefore(BufferCallBeforeInitHandler.NAME, null, + new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS)) + .addBefore(BufferCallBeforeInitHandler.NAME, null, + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)) + .addBefore(BufferCallBeforeInitHandler.NAME, null, + new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)) + .fireUserEventTriggered(BufferCallEvent.success()); } private boolean reloginInProgress; @@ -212,7 +212,8 @@ private void saslNegotiate(final Channel ch) { failInit(ch, e); return; } - ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler); + ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder()) + .addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler); saslPromise.addListener(new FutureListener() { @Override @@ -226,20 +227,22 @@ public void operationComplete(Future future) throws Exception { if (saslHandler.isNeedProcessConnectionHeader()) { Promise connectionHeaderPromise = ch.eventLoop().newPromise(); // create the handler to handle the connection header - ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler( - connectionHeaderPromise, conf, connectionHeaderWithLength); + NettyHBaseRpcConnectionHeaderHandler chHandler = + new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf, + connectionHeaderWithLength); // add ReadTimeoutHandler to deal with server doesn't response connection header // because of the different configuration in client side and server side - p.addFirst( - new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)); - p.addLast(chHandler); + final String readTimeoutHandlerName = "ReadTimeout"; + p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName, + new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS)) + .addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler); connectionHeaderPromise.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { ChannelPipeline p = ch.pipeline(); - p.remove(ReadTimeoutHandler.class); + p.remove(readTimeoutHandlerName); p.remove(NettyHBaseRpcConnectionHeaderHandler.class); // don't send connection header, NettyHbaseRpcConnectionHeaderHandler // sent it already @@ -273,8 +276,15 @@ private void connect() { .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) - .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) - .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { + .handler(new ChannelInitializer() { + + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME, + new BufferCallBeforeInitHandler()); + } + }).localAddress(rpcClient.localAddr).remoteAddress(remoteId.address).connect() + .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception {