Skip to content

Commit

Permalink
Refactor worker (#7)
Browse files Browse the repository at this point in the history
* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type (#2012)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
  • Loading branch information
Technoboy- and qiaozhanwei authored Feb 25, 2020
1 parent 89a70f8 commit f0f8644
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ExecutorDispatcher implements InitializingBean {
@Autowired
private RoundRobinHostManager hostManager;

private final ConcurrentHashMap<ExecutorType, ExecutorManager> executorManagers;
private final ConcurrentHashMap<ExecutorType, ExecutorManager<Boolean>> executorManagers;

public ExecutorDispatcher(){
this.executorManagers = new ConcurrentHashMap<>();
Expand All @@ -61,11 +61,11 @@ public ExecutorDispatcher(){
* @param context context
* @throws ExecuteException
*/
public void dispatch(final ExecutionContext context) throws ExecuteException {
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
/**
* get executor manager
*/
ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType());
ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if(executorManager == null){
throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
}
Expand All @@ -83,7 +83,7 @@ public void dispatch(final ExecutionContext context) throws ExecuteException {
/**
* task execute
*/
executorManager.execute(context);
return executorManager.execute(context);
} finally {
executorManager.afterExecute(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* abstract executor manager
*/
public abstract class AbstractExecutorManager implements ExecutorManager{
public abstract class AbstractExecutorManager<T> implements ExecutorManager<T>{

/**
* before execute , add time monitor , timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* executor manager
*/
public interface ExecutorManager {
public interface ExecutorManager<T> {

/**
* before execute
Expand All @@ -37,7 +37,7 @@ public interface ExecutorManager {
* @param context context
* @throws ExecuteException
*/
void execute(ExecutionContext context) throws ExecuteException;
T execute(ExecutionContext context) throws ExecuteException;

/**
* after execute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* netty executor manager
*/
@Service
public class NettyExecutorManager extends AbstractExecutorManager{
public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{

private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class);

Expand All @@ -64,6 +64,10 @@ public class NettyExecutorManager extends AbstractExecutorManager{
public NettyExecutorManager(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
/**
* register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
* register EXECUTE_TASK_ACK command type TaskAckProcessor
*/
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
}
Expand All @@ -74,7 +78,7 @@ public NettyExecutorManager(){
* @throws ExecuteException
*/
@Override
public void execute(ExecutionContext context) throws ExecuteException {
public Boolean execute(ExecutionContext context) throws ExecuteException {

/**
* all nodes
Expand Down Expand Up @@ -118,6 +122,8 @@ public void execute(ExecutionContext context) throws ExecuteException {
}
}
}

return success;
}

/**
Expand Down Expand Up @@ -189,7 +195,7 @@ private Set<String> getAllNodes(ExecutionContext context){
break;
case CLIENT:
break;
default:
default:
throw new IllegalArgumentException("invalid executor type : " + executorType);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,10 @@
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
Expand All @@ -48,7 +36,6 @@
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.Callable;

Expand Down Expand Up @@ -138,14 +125,16 @@ public void kill(){
* dispatch task to worker
* @param taskInstance
*/
public void dispatch(TaskInstance taskInstance){
private Boolean dispatch(TaskInstance taskInstance){
TaskExecutionContext context = getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER);
try {
dispatcher.dispatch(executionContext);
return dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
logger.error("execute exception", e);
return false;
}

}

/**
Expand Down Expand Up @@ -234,8 +223,7 @@ protected TaskInstance submit(){
}
if(submitDB && !submitQueue){
// submit task to queue
dispatch(task);
submitQueue = true;
submitQueue = dispatch(task);
}
if(submitDB && submitQueue){
return task;
Expand Down

0 comments on commit f0f8644

Please sign in to comment.