Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.0 Triple] Fix graceful shutdown not work #9938

Merged
merged 1 commit into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.net.InetSocketAddress;
import java.util.List;
Expand All @@ -59,6 +59,10 @@ public class PortUnificationServer {
private static final Logger logger = LoggerFactory.getLogger(PortUnificationServer.class);
private final List<WireProtocol> protocols;
private final URL url;

private final DefaultChannelGroup channels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);

private final int serverShutdownTimeoutMills;
/**
* netty server bootstrap.
Expand All @@ -68,7 +72,6 @@ public class PortUnificationServer {
* the boss channel that receive connections and dispatch these to worker channel.
*/
private Channel channel;
private DefaultChannelGroup channelGroup;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;

Expand Down Expand Up @@ -127,15 +130,15 @@ protected void initChannel(SocketChannel ch) throws Exception {
final PortUnificationServerHandler puHandler;
if (enableSsl) {
puHandler = new PortUnificationServerHandler(url,
SslContexts.buildServerSslContext(url), true, protocols);
SslContexts.buildServerSslContext(url), true, protocols, channels);
} else {
puHandler = new PortUnificationServerHandler(url, null, false, protocols);
puHandler = new PortUnificationServerHandler(url, null, false, protocols,
channels);
}

p.addLast("server-idle-handler",
new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
p.addLast("negotiation-protocol", puHandler);
channelGroup = puHandler.getChannels();
}
});
// bind
Expand All @@ -161,10 +164,7 @@ protected void doClose() throws Throwable {
channel = null;
}

if (channelGroup != null) {
ChannelGroupFuture closeFuture = channelGroup.close();
closeFuture.await(serverShutdownTimeoutMills);
}
channels.close().await(serverShutdownTimeoutMills);
final long cost = System.currentTimeMillis() - st;
logger.info("Port unification server closed. cost:" + cost);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.List;
import java.util.Set;
Expand All @@ -38,46 +37,33 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
private static final Logger LOGGER = LoggerFactory.getLogger(
PortUnificationServerHandler.class);

private final ChannelGroup channels;

private final SslContext sslCtx;
private final URL url;
private final boolean detectSsl;
private final List<WireProtocol> protocols;
private final DefaultChannelGroup channels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);

public PortUnificationServerHandler(URL url, List<WireProtocol> protocols) {
this(url, null, false, protocols);
}

public PortUnificationServerHandler(URL url, SslContext sslCtx, boolean detectSsl,
List<WireProtocol> protocols) {
List<WireProtocol> protocols, ChannelGroup channels) {
this.url = url;
this.sslCtx = sslCtx;
this.protocols = protocols;
this.detectSsl = detectSsl;
this.channels = channels;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("Unexpected exception from downstream before protocol detected.", cause);
}

public DefaultChannelGroup getChannels() {
return channels;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
channels.add(ctx.channel());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
channels.remove(ctx.channel());
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
Expand Down Expand Up @@ -124,7 +110,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
private void enableSsl(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
p.addLast("unificationA", new PortUnificationServerHandler(url, sslCtx, false, protocols));
p.addLast("unificationA",
new PortUnificationServerHandler(url, sslCtx, false, protocols, channels));
p.remove(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ public void gracefulShutdown() {
Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR, ByteBufUtil
.writeAscii(ctx.alloc(), goAwayMessage));
goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE);
ctx.write(goAwayFrame);
ctx.writeAndFlush(goAwayFrame);
pingFuture = ctx.executor().schedule(
() -> secondGoAwayAndClose(ctx),
GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
TimeUnit.NANOSECONDS);

Http2PingFrame pingFrame = new DefaultHttp2PingFrame(GRACEFUL_SHUTDOWN_PING, false);
ctx.write(pingFrame);
ctx.writeAndFlush(pingFrame);
}

void secondGoAwayAndClose(ChannelHandlerContext ctx) {
Expand All @@ -69,8 +69,7 @@ void secondGoAwayAndClose(ChannelHandlerContext ctx) {
try {
Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR,
ByteBufUtil.writeAscii(this.ctx.alloc(), this.goAwayMessage));
ctx.write(goAwayFrame);
ctx.flush();
ctx.writeAndFlush(goAwayFrame);
//TODO support customize graceful shutdown timeout mills
ctx.close(originPromise);
} catch (Exception e) {
Expand Down