Skip to content

Commit

Permalink
HBASE-27271 BufferCallBeforeInitHandler should ignore the flush reque…
Browse files Browse the repository at this point in the history
…st (apache#4676)

Signed-off-by: Balazs Meszaros <meszibalu@apache.org>
(cherry picked from commit fb529e2)

Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
(cherry picked from commit 1094b15)
Change-Id: Iff314c026f3842cceedb5a11505dcc586747835c
  • Loading branch information
Apache9 committed Aug 8, 2022
1 parent b0a083c commit d0bf233
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
@InterfaceAudience.Private
class BufferCallBeforeInitHandler extends ChannelDuplexHandler {

static final String NAME = "BufferCall";

private enum BufferCallAction {
FLUSH,
FAIL
Expand Down Expand Up @@ -77,6 +79,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// do not flush anything out
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof BufferCallEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
Expand Down Expand Up @@ -152,14 +152,14 @@ public void cleanupConnection() {

private void established(Channel ch) throws IOException {
assert eventLoop.inEventLoop();
ChannelPipeline p = ch.pipeline();
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
p.addBefore(addBeforeHandler, null,
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
p.addBefore(addBeforeHandler, null,
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
p.fireUserEventTriggered(BufferCallEvent.success());
ch.pipeline()
.addBefore(BufferCallBeforeInitHandler.NAME, null,
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS))
.addBefore(BufferCallBeforeInitHandler.NAME, null,
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
.addBefore(BufferCallBeforeInitHandler.NAME, null,
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor))
.fireUserEventTriggered(BufferCallEvent.success());
}

private boolean reloginInProgress;
Expand Down Expand Up @@ -212,7 +212,8 @@ private void saslNegotiate(final Channel ch) {
failInit(ch, e);
return;
}
ch.pipeline().addFirst(new SaslChallengeDecoder(), saslHandler);
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
.addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
saslPromise.addListener(new FutureListener<Boolean>() {

@Override
Expand All @@ -226,20 +227,22 @@ public void operationComplete(Future<Boolean> future) throws Exception {
if (saslHandler.isNeedProcessConnectionHeader()) {
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
// create the handler to handle the connection header
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
connectionHeaderPromise, conf, connectionHeaderWithLength);
NettyHBaseRpcConnectionHeaderHandler chHandler =
new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf,
connectionHeaderWithLength);

// add ReadTimeoutHandler to deal with server doesn't response connection header
// because of the different configuration in client side and server side
p.addFirst(
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
p.addLast(chHandler);
final String readTimeoutHandlerName = "ReadTimeout";
p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS))
.addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = ch.pipeline();
p.remove(ReadTimeoutHandler.class);
p.remove(readTimeoutHandlerName);
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
// sent it already
Expand Down Expand Up @@ -273,8 +276,15 @@ private void connect() {
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
.remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {
.handler(new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
new BufferCallBeforeInitHandler());
}
}).localAddress(rpcClient.localAddr).remoteAddress(remoteId.address).connect()
.addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
Expand Down

0 comments on commit d0bf233

Please sign in to comment.