diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index 01fb840303d2..8a803a2d0f09 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -50,7 +50,7 @@ public class ExecutorDispatcher implements InitializingBean { @Autowired private RoundRobinHostManager hostManager; - private final ConcurrentHashMap executorManagers; + private final ConcurrentHashMap> executorManagers; public ExecutorDispatcher(){ this.executorManagers = new ConcurrentHashMap<>(); @@ -61,11 +61,11 @@ public ExecutorDispatcher(){ * @param context context * @throws ExecuteException */ - public void dispatch(final ExecutionContext context) throws ExecuteException { + public Boolean dispatch(final ExecutionContext context) throws ExecuteException { /** * get executor manager */ - ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType()); + ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType()); if(executorManager == null){ throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType()); } @@ -83,7 +83,7 @@ public void dispatch(final ExecutionContext context) throws ExecuteException { /** * task execute */ - executorManager.execute(context); + return executorManager.execute(context); } finally { executorManager.afterExecute(context); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java index e1f0c3c97602..c0be5a875f55 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java @@ -23,7 +23,7 @@ /** * abstract executor manager */ -public abstract class AbstractExecutorManager implements ExecutorManager{ +public abstract class AbstractExecutorManager implements ExecutorManager{ /** * before execute , add time monitor , timeout diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java index 1d78d2f08f04..9b0b9af0e47a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java @@ -23,7 +23,7 @@ /** * executor manager */ -public interface ExecutorManager { +public interface ExecutorManager { /** * before execute @@ -37,7 +37,7 @@ public interface ExecutorManager { * @param context context * @throws ExecuteException */ - void execute(ExecutionContext context) throws ExecuteException; + T execute(ExecutionContext context) throws ExecuteException; /** * after execute diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index cf1a2646a66d..bdfe71cf5f6b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -46,7 +46,7 @@ * netty executor manager */ @Service -public class NettyExecutorManager extends AbstractExecutorManager{ +public class NettyExecutorManager extends AbstractExecutorManager{ private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class); @@ -64,6 +64,10 @@ public class NettyExecutorManager extends AbstractExecutorManager{ public NettyExecutorManager(){ final NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + /** + * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor + * register EXECUTE_TASK_ACK command type TaskAckProcessor + */ this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); } @@ -74,7 +78,7 @@ public NettyExecutorManager(){ * @throws ExecuteException */ @Override - public void execute(ExecutionContext context) throws ExecuteException { + public Boolean execute(ExecutionContext context) throws ExecuteException { /** * all nodes @@ -118,6 +122,8 @@ public void execute(ExecutionContext context) throws ExecuteException { } } } + + return success; } /** @@ -189,7 +195,7 @@ private Set getAllNodes(ExecutionContext context){ break; case CLIENT: break; - default: + default: throw new IllegalArgumentException("invalid executor type : " + executorType); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index f675493bf3e9..9bf69ddec847 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -20,22 +20,10 @@ import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.AlertDao; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.dao.utils.BeanContext; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; -import org.apache.dolphinscheduler.remote.exceptions.RemotingException; -import org.apache.dolphinscheduler.remote.future.InvokeCallback; -import org.apache.dolphinscheduler.remote.future.ResponseFuture; -import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; @@ -48,7 +36,6 @@ import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.Callable; @@ -138,14 +125,16 @@ public void kill(){ * dispatch task to worker * @param taskInstance */ - public void dispatch(TaskInstance taskInstance){ + private Boolean dispatch(TaskInstance taskInstance){ TaskExecutionContext context = getTaskExecutionContext(taskInstance); ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER); try { - dispatcher.dispatch(executionContext); + return dispatcher.dispatch(executionContext); } catch (ExecuteException e) { logger.error("execute exception", e); + return false; } + } /** @@ -234,8 +223,7 @@ protected TaskInstance submit(){ } if(submitDB && !submitQueue){ // submit task to queue - dispatch(task); - submitQueue = true; + submitQueue = dispatch(task); } if(submitDB && submitQueue){ return task;