diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java index 7df8e01b3d45..c663365dd9bb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/TaskExecutionContextCacheManager.java @@ -42,7 +42,16 @@ public interface TaskExecutionContextCacheManager { /** * remove taskInstance by taskInstanceId + * * @param taskInstanceId taskInstanceId */ void removeByTaskInstanceId(Integer taskInstanceId); + + /** + * If the value for the specified key is present and non-null,then perform the update,otherwise it will return false + * + * @param taskExecutionContext taskExecutionContext + * @return status + */ + boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java index 009332f05c1d..418c68d7df80 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/impl/TaskExecutionContextCacheManagerImpl.java @@ -59,10 +59,17 @@ public void cacheTaskExecutionContext(TaskExecutionContext taskExecutionContext) /** * remove taskInstance by taskInstanceId + * * @param taskInstanceId taskInstanceId */ @Override public void removeByTaskInstanceId(Integer taskInstanceId) { taskExecutionContextCache.remove(taskInstanceId); } + + @Override + public boolean updateTaskExecutionContext(TaskExecutionContext taskExecutionContext) { + taskExecutionContextCache.computeIfPresent(taskExecutionContext.getTaskInstanceId(), (k, v) -> taskExecutionContext); + return taskExecutionContextCache.containsKey(taskExecutionContext.getTaskInstanceId()); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 8d475c92ac5a..f8b95678ecab 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -39,9 +39,12 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,28 +65,45 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private final ExecutorService workerExecService; /** - * worker config + * worker config */ private final WorkerConfig workerConfig; /** - * task callback service + * task callback service */ private final TaskCallbackService taskCallbackService; - public TaskExecuteProcessor(){ + /** + * taskExecutionContextCacheManager + */ + private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + + public TaskExecuteProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); + this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); + } + + /** + * Pre-cache task to avoid extreme situations when kill task. There is no such task in the cache + * + * @param taskExecutionContext task + */ + private void setTaskCache(TaskExecutionContext taskExecutionContext) { + TaskExecutionContext preTaskCache = new TaskExecutionContext(); + preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); } @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), - String.format("invalid command type : %s", command.getType())); + String.format("invalid command type : %s", command.getType())); TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( - command.getBody(), TaskExecuteRequestCommand.class); + command.getBody(), TaskExecuteRequestCommand.class); logger.info("received command : {}", taskRequestCommand); @@ -99,6 +119,7 @@ public void process(Channel channel, Command command) { logger.error("task execution context is null"); return; } + setTaskCache(taskExecutionContext); // custom logger Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, taskExecutionContext.getProcessDefineId(), @@ -120,6 +141,7 @@ public void process(Channel channel, Command command) { String errorLog = String.format("create execLocalPath : %s", execLocalPath); LoggerUtils.logError(Optional.ofNullable(logger), errorLog, ex); LoggerUtils.logError(Optional.ofNullable(taskLogger), errorLog, ex); + taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); } FileUtils.taskLoggerThreadLocal.remove(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index cf0c051e02f8..c9f2cd48b530 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -102,15 +102,17 @@ public void process(Channel channel, Command command) { * @return kill result */ private Pair> doKill(TaskKillRequestCommand killCommand){ - List appIds = Collections.EMPTY_LIST; + List appIds = Collections.emptyList(); try { - TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); + int taskInstanceId = killCommand.getTaskInstanceId(); + TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); Integer processId = taskExecutionContext.getProcessId(); - if (processId == null || processId.equals(0)){ - logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId()); - return Pair.of(false, appIds); + if (processId.equals(0)) { + taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); + logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); + return Pair.of(true, appIds); } String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId())); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index a451ce38960a..6f8f3f9e63b2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -47,6 +47,7 @@ import java.util.regex.Pattern; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_KILL; import static org.apache.dolphinscheduler.common.Constants.EXIT_CODE_SUCCESS; /** @@ -160,12 +161,18 @@ private void buildProcess(String commandFile) throws IOException { * @return CommandExecuteResult * @throws Exception if error throws Exception */ - public CommandExecuteResult run(String execCommand) throws Exception{ + public CommandExecuteResult run(String execCommand) throws Exception { CommandExecuteResult result = new CommandExecuteResult(); - + int taskInstanceId = taskExecutionContext.getTaskInstanceId(); + // If the task has been killed, then the task in the cache is null + if (null == taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { + result.setExitStatusCode(EXIT_CODE_KILL); + return result; + } if (StringUtils.isEmpty(execCommand)) { + taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); return result; } @@ -187,7 +194,12 @@ public CommandExecuteResult run(String execCommand) throws Exception{ // cache processId taskExecutionContext.setProcessId(processId); - taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + boolean updateTaskExecutionContextStatus = taskExecutionContextCacheManager.updateTaskExecutionContext(taskExecutionContext); + if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) { + ProcessUtils.kill(taskExecutionContext); + result.setExitStatusCode(EXIT_CODE_KILL); + return result; + } // print process id logger.info("process start, process id is: {}", processId);