diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java index 1ddf3241fa3..f9b2bb4272a 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java @@ -34,11 +34,11 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; -import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; import java.net.InetSocketAddress; import java.util.List; @@ -59,6 +59,10 @@ public class PortUnificationServer { private static final Logger logger = LoggerFactory.getLogger(PortUnificationServer.class); private final List protocols; private final URL url; + + private final DefaultChannelGroup channels = new DefaultChannelGroup( + GlobalEventExecutor.INSTANCE); + private final int serverShutdownTimeoutMills; /** * netty server bootstrap. @@ -68,7 +72,6 @@ public class PortUnificationServer { * the boss channel that receive connections and dispatch these to worker channel. */ private Channel channel; - private DefaultChannelGroup channelGroup; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; @@ -127,15 +130,15 @@ protected void initChannel(SocketChannel ch) throws Exception { final PortUnificationServerHandler puHandler; if (enableSsl) { puHandler = new PortUnificationServerHandler(url, - SslContexts.buildServerSslContext(url), true, protocols); + SslContexts.buildServerSslContext(url), true, protocols, channels); } else { - puHandler = new PortUnificationServerHandler(url, null, false, protocols); + puHandler = new PortUnificationServerHandler(url, null, false, protocols, + channels); } p.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)); p.addLast("negotiation-protocol", puHandler); - channelGroup = puHandler.getChannels(); } }); // bind @@ -161,10 +164,7 @@ protected void doClose() throws Throwable { channel = null; } - if (channelGroup != null) { - ChannelGroupFuture closeFuture = channelGroup.close(); - closeFuture.await(serverShutdownTimeoutMills); - } + channels.close().await(serverShutdownTimeoutMills); final long cost = System.currentTimeMillis() - st; logger.info("Port unification server closed. cost:" + cost); } catch (InterruptedException e) { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java index 1a28cb43f16..634b4063132 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java @@ -24,11 +24,10 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.group.ChannelGroup; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.GlobalEventExecutor; import java.util.List; import java.util.Set; @@ -38,23 +37,20 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder { private static final Logger LOGGER = LoggerFactory.getLogger( PortUnificationServerHandler.class); + private final ChannelGroup channels; + private final SslContext sslCtx; private final URL url; private final boolean detectSsl; private final List protocols; - private final DefaultChannelGroup channels = new DefaultChannelGroup( - GlobalEventExecutor.INSTANCE); - - public PortUnificationServerHandler(URL url, List protocols) { - this(url, null, false, protocols); - } public PortUnificationServerHandler(URL url, SslContext sslCtx, boolean detectSsl, - List protocols) { + List protocols, ChannelGroup channels) { this.url = url; this.sslCtx = sslCtx; this.protocols = protocols; this.detectSsl = detectSsl; + this.channels = channels; } @Override @@ -62,22 +58,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E LOGGER.error("Unexpected exception from downstream before protocol detected.", cause); } - public DefaultChannelGroup getChannels() { - return channels; - } - @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); channels.add(ctx.channel()); } - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - channels.remove(ctx.channel()); - } - @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { @@ -124,7 +110,8 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) private void enableSsl(ChannelHandlerContext ctx) { ChannelPipeline p = ctx.pipeline(); p.addLast("ssl", sslCtx.newHandler(ctx.alloc())); - p.addLast("unificationA", new PortUnificationServerHandler(url, sslCtx, false, protocols)); + p.addLast("unificationA", + new PortUnificationServerHandler(url, sslCtx, false, protocols, channels)); p.remove(this); } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java index 43852a988ed..5e7090c6c82 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java @@ -48,14 +48,14 @@ public void gracefulShutdown() { Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR, ByteBufUtil .writeAscii(ctx.alloc(), goAwayMessage)); goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE); - ctx.write(goAwayFrame); + ctx.writeAndFlush(goAwayFrame); pingFuture = ctx.executor().schedule( () -> secondGoAwayAndClose(ctx), GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS); Http2PingFrame pingFrame = new DefaultHttp2PingFrame(GRACEFUL_SHUTDOWN_PING, false); - ctx.write(pingFrame); + ctx.writeAndFlush(pingFrame); } void secondGoAwayAndClose(ChannelHandlerContext ctx) { @@ -69,8 +69,7 @@ void secondGoAwayAndClose(ChannelHandlerContext ctx) { try { Http2GoAwayFrame goAwayFrame = new DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR, ByteBufUtil.writeAscii(this.ctx.alloc(), this.goAwayMessage)); - ctx.write(goAwayFrame); - ctx.flush(); + ctx.writeAndFlush(goAwayFrame); //TODO support customize graceful shutdown timeout mills ctx.close(originPromise); } catch (Exception e) {