Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27271 BufferCallBeforeInitHandler should ignore the flush request #4676

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
@InterfaceAudience.Private
class BufferCallBeforeInitHandler extends ChannelDuplexHandler {

static final String NAME = "BufferCall";

private enum BufferCallAction {
FLUSH,
FAIL
Expand Down Expand Up @@ -77,6 +79,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// do not flush anything out
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof BufferCallEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
Expand Down Expand Up @@ -156,14 +156,14 @@ public void cleanupConnection() {

private void established(Channel ch) throws IOException {
assert eventLoop.inEventLoop();
ChannelPipeline p = ch.pipeline();
String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name();
p.addBefore(addBeforeHandler, null,
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS));
p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
p.addBefore(addBeforeHandler, null,
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
p.fireUserEventTriggered(BufferCallEvent.success());
ch.pipeline()
.addBefore(BufferCallBeforeInitHandler.NAME, null,
new IdleStateHandler(0, rpcClient.minIdleTimeBeforeClose, 0, TimeUnit.MILLISECONDS))
.addBefore(BufferCallBeforeInitHandler.NAME, null,
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4))
.addBefore(BufferCallBeforeInitHandler.NAME, null,
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor))
.fireUserEventTriggered(BufferCallEvent.success());
}

private boolean reloginInProgress;
Expand Down Expand Up @@ -217,8 +217,8 @@ private void saslNegotiate(final Channel ch) {
failInit(ch, e);
return;
}
ch.pipeline().addFirst("SaslDecoder", new SaslChallengeDecoder()).addAfter("SaslDecoder",
"SaslHandler", saslHandler);
ch.pipeline().addBefore(BufferCallBeforeInitHandler.NAME, null, new SaslChallengeDecoder())
.addBefore(BufferCallBeforeInitHandler.NAME, null, saslHandler);
NettyFutureUtils.addListener(saslPromise, new FutureListener<Boolean>() {

@Override
Expand All @@ -229,20 +229,22 @@ public void operationComplete(Future<Boolean> future) throws Exception {
if (saslHandler.isNeedProcessConnectionHeader()) {
Promise<Boolean> connectionHeaderPromise = ch.eventLoop().newPromise();
// create the handler to handle the connection header
ChannelHandler chHandler = new NettyHBaseRpcConnectionHeaderHandler(
connectionHeaderPromise, conf, connectionHeaderWithLength);
NettyHBaseRpcConnectionHeaderHandler chHandler =
new NettyHBaseRpcConnectionHeaderHandler(connectionHeaderPromise, conf,
connectionHeaderWithLength);

// add ReadTimeoutHandler to deal with server doesn't response connection header
// because of the different configuration in client side and server side
p.addFirst(
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS));
p.addLast(chHandler);
final String readTimeoutHandlerName = "ReadTimeout";
p.addBefore(BufferCallBeforeInitHandler.NAME, readTimeoutHandlerName,
new ReadTimeoutHandler(RpcClient.DEFAULT_SOCKET_TIMEOUT_READ, TimeUnit.MILLISECONDS))
.addBefore(BufferCallBeforeInitHandler.NAME, null, chHandler);
connectionHeaderPromise.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = ch.pipeline();
p.remove(ReadTimeoutHandler.class);
p.remove(readTimeoutHandlerName);
p.remove(NettyHBaseRpcConnectionHeaderHandler.class);
// don't send connection header, NettyHbaseRpcConnectionHeaderHandler
// sent it already
Expand Down Expand Up @@ -276,8 +278,15 @@ private void connect() throws UnknownHostException {
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
.remoteAddress(remoteAddr).connect().addListener(new ChannelFutureListener() {
.handler(new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(BufferCallBeforeInitHandler.NAME,
new BufferCallBeforeInitHandler());
}
}).localAddress(rpcClient.localAddr).remoteAddress(remoteAddr).connect()
.addListener(new ChannelFutureListener() {
Comment on lines +281 to +289
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You don't need to do this change in this PR. This is only needed for the TLS stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to use this as I need to specify the handler name. The handler method for Bootstrap can not specify handler name.


@Override
public void operationComplete(ChannelFuture future) throws Exception {
Expand Down