Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX_#3789][remote]cherry pick from dev to support netty heart beat #3913

Merged
merged 2 commits into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,73 @@

package org.apache.dolphinscheduler.remote;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.handler.NettyServerHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;

/**
* remoting netty server
* remoting netty server
*/
public class NettyRemotingServer {

private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);

/**
* server bootstrap
* server bootstrap
*/
private final ServerBootstrap serverBootstrap = new ServerBootstrap();

/**
* encoder
* encoder
*/
private final NettyEncoder encoder = new NettyEncoder();

/**
* default executor
* default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);

/**
* boss group
*/
private final NioEventLoopGroup bossGroup;
private final EventLoopGroup bossGroup;

/**
* worker group
* worker group
*/
private final NioEventLoopGroup workGroup;
private final EventLoopGroup workGroup;

/**
* server config
* server config
*/
private final NettyServerConfig serverConfig;

/**
* server handler
* server handler
*/
private final NettyServerHandler serverHandler = new NettyServerHandler(this);

Expand All @@ -89,123 +93,139 @@ public class NettyRemotingServer {
private final AtomicBoolean isStarted = new AtomicBoolean(false);

/**
* server init
* server init
*
* @param serverConfig server config
*/
public NettyRemotingServer(final NettyServerConfig serverConfig){
public NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig;
if (NettyUtils.useEpoll()) {
this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);

this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
});

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
});
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);

this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
} else {
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
});

/**
* server start
*/
public void start(){
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);

if(this.isStarted.get()){
return;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
}
}

this.serverBootstrap
/**
* server start
*/
public void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
.group(this.bossGroup, this.workGroup)
.channel(NioServerSocketChannel.class)
.channel(NettyUtils.getServerSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<NioSocketChannel>() {
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
protected void initChannel(SocketChannel ch) throws Exception {
initNettyChannel(ch);
}
});

ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit",e.getMessage(), e);
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
}
if (future.isSuccess()) {
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause());
} else {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
}
if (future.isSuccess()) {
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (future.cause() != null) {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause());
} else {
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
}
}
//
isStarted.compareAndSet(false, true);
}

/**
* init netty channel
* init netty channel
*
* @param ch socket channel
* @throws Exception
*/
private void initNettyChannel(NioSocketChannel ch) throws Exception{
private void initNettyChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", encoder);
pipeline.addLast("decoder", new NettyDecoder());
pipeline.addLast("handler", serverHandler);
}

/**
* register processor
* register processor
*
* @param commandType command type
* @param processor processor
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}

/**
* register processor
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.serverHandler.registerProcessor(commandType, processor, executor);
}

/**
* get default thread executor
* get default thread executor
*
* @return thread executor
*/
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}

public void close() {
if(isStarted.compareAndSet(true, false)){
if (isStarted.compareAndSet(true, false)) {
try {
if(bossGroup != null){
if (bossGroup != null) {
this.bossGroup.shutdownGracefully();
}
if(workGroup != null){
if (workGroup != null) {
this.workGroup.shutdownGracefully();
}
if(defaultExecutor != null){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ public enum CommandType {
TASK_KILL_RESPONSE,

/**
* ping
* HEART_BEAT
*/
HEART_BEAT,

/**
* ping
*/
PING,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public class NettyClientConfig {
*/
private int receiveBufferSize = 65535;

/**
* connect timeout millis
*/
private int connectTimeoutMillis = 3000;

public int getWorkerThreads() {
return workerThreads;
}
Expand Down Expand Up @@ -88,4 +93,11 @@ public void setReceiveBufferSize(int receiveBufferSize) {
this.receiveBufferSize = receiveBufferSize;
}

public int getConnectTimeoutMillis() {
return connectTimeoutMillis;
}

public void setConnectTimeoutMillis(int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
}
}
Loading