diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java index 10e62d8d9bcc..38f00fb4fdc9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java @@ -18,10 +18,16 @@ package org.apache.dolphinscheduler.remote; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; + import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.Command; @@ -38,6 +44,8 @@ import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.remote.utils.NettyUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +55,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** - * remoting netty client + * remoting netty client */ public class NettyRemotingClient { @@ -59,7 +67,7 @@ public class NettyRemotingClient { private final Bootstrap bootstrap = new Bootstrap(); /** - * encoder + * encoder */ private final NettyEncoder encoder = new NettyEncoder(); @@ -69,57 +77,69 @@ public class NettyRemotingClient { private final ConcurrentHashMap channels = new ConcurrentHashMap(128); /** - * started flag + * started flag */ private final AtomicBoolean isStarted = new AtomicBoolean(false); /** - * worker group + * worker group */ - private final NioEventLoopGroup workerGroup; + private final EventLoopGroup workerGroup; /** - * client config + * client config */ private final NettyClientConfig clientConfig; /** - * saync semaphore + * saync semaphore */ private final Semaphore asyncSemaphore = new Semaphore(200, true); /** - * callback thread executor + * callback thread executor */ private final ExecutorService callbackExecutor; /** - * client handler + * client handler */ private final NettyClientHandler clientHandler; /** - * response future executor + * response future executor */ private final ScheduledExecutorService responseFutureExecutor; /** - * client init + * client init + * * @param clientConfig client config */ - public NettyRemotingClient(final NettyClientConfig clientConfig){ + public NettyRemotingClient(final NettyClientConfig clientConfig) { this.clientConfig = clientConfig; - this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + if (NettyUtils.useEpoll()) { + this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); - } - }); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); + } + }); + } else { + this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); + } + }); + } this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), - new CallerThreadExecutePolicy()); + new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10), + new CallerThreadExecutePolicy()); this.clientHandler = new NettyClientHandler(this, callbackExecutor); this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor")); @@ -128,26 +148,27 @@ public Thread newThread(Runnable r) { } /** - * start + * start */ - private void start(){ + private void start() { this.bootstrap - .group(this.workerGroup) - .channel(NioSocketChannel.class) - .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) - .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) - .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) - .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - new NettyDecoder(), - clientHandler, - encoder); - } - }); + .group(this.workerGroup) + .channel(NettyUtils.getSocketChannelClass()) + .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) + .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) + .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) + .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis()) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new NettyDecoder(), + clientHandler, + encoder); + } + }); this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -159,10 +180,11 @@ public void run() { } /** - * async send - * @param host host - * @param command command - * @param timeoutMillis timeoutMillis + * async send + * + * @param host host + * @param command command + * @param timeoutMillis timeoutMillis * @param invokeCallback callback function * @throws InterruptedException * @throws RemotingException @@ -182,22 +204,22 @@ public void sendAsync(final Host host, final Command command, * control concurrency number */ boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); - if(acquired){ + if (acquired) { final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore); /** * response future */ final ResponseFuture responseFuture = new ResponseFuture(opaque, - timeoutMillis, - invokeCallback, - releaseSemaphore); + timeoutMillis, + invokeCallback, + releaseSemaphore); try { - channel.writeAndFlush(command).addListener(new ChannelFutureListener(){ + channel.writeAndFlush(command).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if(future.isSuccess()){ + if (future.isSuccess()) { responseFuture.setSendOk(true); return; } else { @@ -207,28 +229,29 @@ public void operationComplete(ChannelFuture future) throws Exception { responseFuture.putResponse(null); try { responseFuture.executeInvokeCallback(); - } catch (Throwable ex){ + } catch (Throwable ex) { logger.error("execute callback error", ex); - } finally{ + } finally { responseFuture.release(); } } }); - } catch (Throwable ex){ + } catch (Throwable ex) { responseFuture.release(); throw new RemotingException(String.format("send command to host: %s failed", host), ex); } - } else{ + } else { String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d", - timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); + timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits()); throw new RemotingTooMuchRequestException(message); } } /** * sync send - * @param host host - * @param command command + * + * @param host host + * @param command command * @param timeoutMillis timeoutMillis * @return command * @throws InterruptedException @@ -244,7 +267,7 @@ public Command sendSync(final Host host, final Command command, final long timeo channel.writeAndFlush(command).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - if(future.isSuccess()){ + if (future.isSuccess()) { responseFuture.setSendOk(true); return; } else { @@ -259,10 +282,10 @@ public void operationComplete(ChannelFuture future) throws Exception { * sync wait for result */ Command result = responseFuture.waitResponse(); - if(result == null){ - if(responseFuture.isSendOK()){ + if (result == null) { + if (responseFuture.isSendOK()) { throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause()); - } else{ + } else { throw new RemotingException(host.toString(), responseFuture.getCause()); } } @@ -270,8 +293,9 @@ public void operationComplete(ChannelFuture future) throws Exception { } /** - * send task - * @param host host + * send task + * + * @param host host * @param command command * @throws RemotingException */ @@ -296,33 +320,35 @@ public void send(final Host host, final Command command) throws RemotingExceptio } /** - * register processor + * register processor + * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { this.clientHandler.registerProcessor(commandType, processor, executor); } /** - * get channel + * get channel + * * @param host * @return */ public Channel getChannel(Host host) { Channel channel = channels.get(host); - if(channel != null && channel.isActive()){ + if (channel != null && channel.isActive()) { return channel; } return createChannel(host, true); @@ -330,17 +356,18 @@ public Channel getChannel(Host host) { /** * create channel - * @param host host + * + * @param host host * @param isSync sync flag * @return channel */ public Channel createChannel(Host host, boolean isSync) { ChannelFuture future; try { - synchronized (bootstrap){ + synchronized (bootstrap) { future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort())); } - if(isSync){ + if (isSync) { future.sync(); } if (future.isSuccess()) { @@ -358,16 +385,16 @@ public Channel createChannel(Host host, boolean isSync) { * close */ public void close() { - if(isStarted.compareAndSet(true, false)){ + if (isStarted.compareAndSet(true, false)) { try { closeChannels(); - if(workerGroup != null){ + if (workerGroup != null) { this.workerGroup.shutdownGracefully(); } - if(callbackExecutor != null){ + if (callbackExecutor != null) { this.callbackExecutor.shutdownNow(); } - if(this.responseFutureExecutor != null){ + if (this.responseFutureExecutor != null) { this.responseFutureExecutor.shutdownNow(); } } catch (Exception ex) { @@ -378,9 +405,9 @@ public void close() { } /** - * close channels + * close channels */ - private void closeChannels(){ + private void closeChannels() { for (Channel channel : this.channels.values()) { channel.close(); } @@ -389,11 +416,12 @@ private void closeChannels(){ /** * close channel + * * @param host host */ - public void closeChannel(Host host){ + public void closeChannel(Host host) { Channel channel = this.channels.remove(host); - if(channel != null){ + if (channel != null) { channel.close(); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java index dbeb318f2d80..c3bdc97551cf 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java @@ -17,14 +17,6 @@ package org.apache.dolphinscheduler.remote; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -32,8 +24,7 @@ import org.apache.dolphinscheduler.remote.handler.NettyServerHandler; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Constants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.dolphinscheduler.remote.utils.NettyUtils; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,45 +32,58 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; + /** - * remoting netty server + * remoting netty server */ public class NettyRemotingServer { private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class); /** - * server bootstrap + * server bootstrap */ private final ServerBootstrap serverBootstrap = new ServerBootstrap(); /** - * encoder + * encoder */ private final NettyEncoder encoder = new NettyEncoder(); /** - * default executor + * default executor */ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); /** * boss group */ - private final NioEventLoopGroup bossGroup; + private final EventLoopGroup bossGroup; /** - * worker group + * worker group */ - private final NioEventLoopGroup workGroup; + private final EventLoopGroup workGroup; /** - * server config + * server config */ private final NettyServerConfig serverConfig; /** - * server handler + * server handler */ private final NettyServerHandler serverHandler = new NettyServerHandler(this); @@ -89,82 +93,96 @@ public class NettyRemotingServer { private final AtomicBoolean isStarted = new AtomicBoolean(false); /** - * server init + * server init * * @param serverConfig server config */ - public NettyRemotingServer(final NettyServerConfig serverConfig){ + public NettyRemotingServer(final NettyServerConfig serverConfig) { this.serverConfig = serverConfig; + if (NettyUtils.useEpoll()) { + this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); - this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); + } + }); - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); - } - }); + this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); - this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); + } + }); + } else { + this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); - } - }); - } + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); + } + }); - /** - * server start - */ - public void start(){ + this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); - if(this.isStarted.get()){ - return; + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); + } + }); } + } - this.serverBootstrap + /** + * server start + */ + public void start() { + if (isStarted.compareAndSet(false, true)) { + this.serverBootstrap .group(this.bossGroup, this.workGroup) - .channel(NioServerSocketChannel.class) + .channel(NettyUtils.getServerSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) - .childHandler(new ChannelInitializer() { + .childHandler(new ChannelInitializer() { @Override - protected void initChannel(NioSocketChannel ch) throws Exception { + protected void initChannel(SocketChannel ch) throws Exception { initNettyChannel(ch); } }); - ChannelFuture future; - try { - future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); - } catch (Exception e) { - logger.error("NettyRemotingServer bind fail {}, exit",e.getMessage(), e); - throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); - } - if (future.isSuccess()) { - logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); - } else if (future.cause() != null) { - throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); - } else { - throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); + ChannelFuture future; + try { + future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); + } catch (Exception e) { + logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e); + throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); + } + if (future.isSuccess()) { + logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); + } else if (future.cause() != null) { + throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); + } else { + throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); + } } - // - isStarted.compareAndSet(false, true); } /** - * init netty channel + * init netty channel + * * @param ch socket channel - * @throws Exception */ - private void initNettyChannel(NioSocketChannel ch) throws Exception{ + private void initNettyChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encoder", encoder); pipeline.addLast("decoder", new NettyDecoder()); @@ -172,27 +190,29 @@ private void initNettyChannel(NioSocketChannel ch) throws Exception{ } /** - * register processor + * register processor + * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { this.serverHandler.registerProcessor(commandType, processor, executor); } /** - * get default thread executor + * get default thread executor + * * @return thread executor */ public ExecutorService getDefaultExecutor() { @@ -200,12 +220,12 @@ public ExecutorService getDefaultExecutor() { } public void close() { - if(isStarted.compareAndSet(true, false)){ + if (isStarted.compareAndSet(true, false)) { try { - if(bossGroup != null){ + if (bossGroup != null) { this.bossGroup.shutdownGracefully(); } - if(workGroup != null){ + if (workGroup != null) { this.workGroup.shutdownGracefully(); } if(defaultExecutor != null){ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 17c1e44fd35a..77c6064e5b33 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -99,7 +99,12 @@ public enum CommandType { TASK_KILL_RESPONSE, /** - * ping + * HEART_BEAT + */ + HEART_BEAT, + + /** + * ping */ PING, diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java index 831e05f7e709..34783369d25d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java @@ -48,6 +48,11 @@ public class NettyClientConfig { */ private int receiveBufferSize = 65535; + /** + * connect timeout millis + */ + private int connectTimeoutMillis = 3000; + public int getWorkerThreads() { return workerThreads; } @@ -88,4 +93,11 @@ public void setReceiveBufferSize(int receiveBufferSize) { this.receiveBufferSize = receiveBufferSize; } + public int getConnectTimeoutMillis() { + return connectTimeoutMillis; + } + + public void setConnectTimeoutMillis(int connectTimeoutMillis) { + this.connectTimeoutMillis = connectTimeoutMillis; + } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java index 48d78d9ad678..a988acfe17ed 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.handler; -import io.netty.channel.*; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -25,16 +25,24 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; + /** - * netty client request handler + * netty client request handler */ @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @@ -42,12 +50,14 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); /** - * netty client + * netty client */ private final NettyRemotingClient nettyRemotingClient; + private static byte[] heartBeatData = "heart_beat".getBytes(); + /** - * callback thread executor + * callback thread executor */ private final ExecutorService callbackExecutor; @@ -57,19 +67,19 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final ConcurrentHashMap> processors; /** - * default executor + * default executor */ private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); - public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){ + public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) { this.nettyRemotingClient = nettyRemotingClient; this.callbackExecutor = callbackExecutor; this.processors = new ConcurrentHashMap(); } /** - * When the current channel is not active, - * the current channel has reached the end of its life cycle + * When the current channel is not active, + * the current channel has reached the end of its life cycle * * @param ctx channel handler context * @throws Exception @@ -81,7 +91,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } /** - * The current channel reads data from the remote + * The current channel reads data from the remote * * @param ctx channel handler context * @param msg message @@ -89,55 +99,55 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - processReceived(ctx.channel(), (Command)msg); + processReceived(ctx.channel(), (Command) msg); } /** * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; - if(executorRef == null){ + if (executorRef == null) { executorRef = defaultExecutor; } this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); } /** - * process received logic + * process received logic * * @param command command */ private void processReceived(final Channel channel, final Command command) { ResponseFuture future = ResponseFuture.getFuture(command.getOpaque()); - if(future != null){ + if (future != null) { future.setResponseCommand(command); future.release(); - if(future.getInvokeCallback() != null){ + if (future.getInvokeCallback() != null) { this.callbackExecutor.submit(new Runnable() { @Override public void run() { future.executeInvokeCallback(); } }); - } else{ + } else { future.putResponse(command); } - } else{ + } else { processByCommandType(channel, command); } } @@ -163,9 +173,10 @@ public void processByCommandType(final Channel channel, final Command command) { } /** - * caught exception - * @param ctx channel handler context - * @param cause cause + * caught exception + * + * @param ctx channel handler context + * @param cause cause * @throws Exception */ @Override @@ -175,4 +186,18 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E ctx.channel().close(); } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + Command heartBeat = new Command(); + heartBeat.setType(CommandType.HEART_BEAT); + heartBeat.setBody(heartBeatData); + ctx.writeAndFlush(heartBeat) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + + } else { + super.userEventTriggered(ctx, evt); + } + } + } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java index da2a6ff8bf3a..09e41e9b5412 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java @@ -17,22 +17,30 @@ package org.apache.dolphinscheduler.remote.handler; -import io.netty.channel.*; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; + + /** - * netty server request handler + * netty server request handler */ @ChannelHandler.Sharable public class NettyServerHandler extends ChannelInboundHandlerAdapter { @@ -40,22 +48,23 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); /** - * netty remote server + * netty remote server */ private final NettyRemotingServer nettyRemotingServer; /** - * server processors queue + * server processors queue */ private final ConcurrentHashMap> processors = new ConcurrentHashMap(); - public NettyServerHandler(NettyRemotingServer nettyRemotingServer){ + public NettyServerHandler(NettyRemotingServer nettyRemotingServer) { this.nettyRemotingServer = nettyRemotingServer; } /** - * When the current channel is not active, - * the current channel has reached the end of its life cycle + * When the current channel is not active, + * the current channel has reached the end of its life cycle + * * @param ctx channel handler context * @throws Exception */ @@ -73,38 +82,39 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - processReceived(ctx.channel(), (Command)msg); + processReceived(ctx.channel(), (Command) msg); } /** * register processor * * @param commandType command type - * @param processor processor + * @param processor processor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { this.registerProcessor(commandType, processor, null); } /** - * register processor + * register processor * * @param commandType command type - * @param processor processor - * @param executor thread executor + * @param processor processor + * @param executor thread executor */ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { ExecutorService executorRef = executor; - if(executorRef == null){ + if (executorRef == null) { executorRef = nettyRemotingServer.getDefaultExecutor(); } this.processors.putIfAbsent(commandType, new Pair<>(processor, executorRef)); } /** - * process received logic + * process received logic + * * @param channel channel - * @param msg message + * @param msg message */ private void processReceived(final Channel channel, final Command msg) { final CommandType commandType = msg.getType(); @@ -132,22 +142,22 @@ public void run() { } /** - * caught exception + * caught exception * - * @param ctx channel handler context + * @param ctx channel handler context * @param cause cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error("exceptionCaught : {}",cause.getMessage(), cause); + logger.error("exceptionCaught : {}", cause.getMessage(), cause); ctx.channel().close(); } /** - * channel write changed + * channel write changed * - * @param ctx channel handler context + * @param ctx channel handler context * @throws Exception */ @Override @@ -158,16 +168,25 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio if (!ch.isWritable()) { if (logger.isWarnEnabled()) { logger.warn("{} is not writable, over high water level : {}", - ch, config.getWriteBufferHighWaterMark()); + ch, config.getWriteBufferHighWaterMark()); } config.setAutoRead(false); } else { if (logger.isWarnEnabled()) { logger.warn("{} is writable, to low water : {}", - ch, config.getWriteBufferLowWaterMark()); + ch, config.getWriteBufferLowWaterMark()); } config.setAutoRead(true); } } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + ctx.channel().close(); + } else { + super.userEventTriggered(ctx, evt); + } + } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java index 48736ca6940c..43b0789e2021 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java @@ -19,7 +19,6 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; - /** * constant */ @@ -29,6 +28,10 @@ public class Constants { public static final String SLASH = "/"; + public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 + 1000; + + public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 60; + /** * charset */ @@ -42,4 +45,14 @@ public class Constants { public static final String LOCAL_ADDRESS = IPUtils.getFirstNoLoopbackIP4Address(); + /** + * netty epoll enable switch + */ + public static final String NETTY_EPOLL_ENABLE = System.getProperty("netty.epoll.enable", "true"); + + /** + * OS Name + */ + public static final String OS_NAME = System.getProperty("os.name"); + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java new file mode 100644 index 000000000000..cc62287ebfb1 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NettyUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.utils; + +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +/** + * NettyUtils + */ +public class NettyUtils { + + private NettyUtils() { + } + + public static boolean useEpoll() { + String osName = Constants.OS_NAME; + if (!osName.toLowerCase().contains("linux")) { + return false; + } + if (!Epoll.isAvailable()) { + return false; + } + String enableNettyEpoll = Constants.NETTY_EPOLL_ENABLE; + return Boolean.parseBoolean(enableNettyEpoll); + } + + public static Class getServerSocketChannelClass() { + if (useEpoll()) { + return EpollServerSocketChannel.class; + } + return NioServerSocketChannel.class; + } + + public static Class getSocketChannelClass() { + if (useEpoll()) { + return EpollSocketChannel.class; + } + return NioSocketChannel.class; + } + +} \ No newline at end of file