diff --git a/src/backend/job-execute/boot-job-execute/src/test/resources/init_schema.sql b/src/backend/job-execute/boot-job-execute/src/test/resources/init_schema.sql index f7984fe3f3..077a3ae653 100644 --- a/src/backend/job-execute/boot-job-execute/src/test/resources/init_schema.sql +++ b/src/backend/job-execute/boot-job-execute/src/test/resources/init_schema.sql @@ -268,16 +268,16 @@ CREATE TABLE IF NOT EXISTS `step_instance_variable` DEFAULT CHARSET = utf8mb4; -CREATE TABLE IF NOT EXISTS `rolling_config` +CREATE TABLE IF NOT EXISTS `task_instance_rolling_config` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `task_instance_id` bigint(20) NOT NULL DEFAULT '0', - `rolling_name` varchar(128) NOT NULL, + `config_name` varchar(128) NOT NULL, `config` longtext NOT NULL, `row_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, `row_update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY(`id`), - KEY (`task_instance_id`) + UNIQUE KEY (`task_instance_id`,`config_name`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; @@ -293,7 +293,8 @@ CREATE TABLE IF NOT EXISTS `step_instance_task` `status` tinyint(4) NOT NULL DEFAULT '1', `row_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, `row_update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - PRIMARY KEY (`step_instance_id`, `execute_count`, `batch`) + PRIMARY KEY(`id`), + UNIQUE KEY (`step_instance_id`, `execute_count`, `batch`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/GseTaskManager.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/GseTaskManager.java index 2c2f9565f1..706b0f798f 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/GseTaskManager.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/GseTaskManager.java @@ -39,6 +39,7 @@ import com.tencent.bk.job.execute.engine.executor.FileTaskExecutor; import com.tencent.bk.job.execute.engine.executor.SQLScriptTaskExecutor; import com.tencent.bk.job.execute.engine.executor.ScriptTaskExecutor; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.engine.model.GseTaskExecuteResult; import com.tencent.bk.job.execute.engine.result.ResultHandleManager; import com.tencent.bk.job.execute.engine.result.ha.ResultHandleTaskKeepaliveManager; @@ -87,7 +88,7 @@ public class GseTaskManager implements SmartLifecycle { private final ResultHandleManager resultHandleManager; private final TaskInstanceService taskInstanceService; private final GseTaskLogService gseTaskLogService; - private final TaskExecuteControlMsgSender taskManager; + private final TaskExecuteEventDispatcher taskManager; private final AccountService accountService; private final LogService logService; private final TaskInstanceVariableService taskInstanceVariableService; @@ -142,7 +143,7 @@ public class GseTaskManager implements SmartLifecycle { public GseTaskManager(ResultHandleManager resultHandleManager, TaskInstanceService taskInstanceService, GseTaskLogService gseTaskLogService, - TaskExecuteControlMsgSender taskManager, + TaskExecuteEventDispatcher taskManager, AccountService accountService, LogService logService, TaskInstanceVariableService taskInstanceVariableService, diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/AbstractGseTaskExecutor.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/AbstractGseTaskExecutor.java index d9fd24752e..ac1afe0ca9 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/AbstractGseTaskExecutor.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/AbstractGseTaskExecutor.java @@ -30,9 +30,9 @@ import com.tencent.bk.job.common.util.date.DateUtils; import com.tencent.bk.job.execute.common.constants.RunStatusEnum; import com.tencent.bk.job.execute.config.JobExecuteConfig; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; import com.tencent.bk.job.execute.engine.consts.IpStatus; import com.tencent.bk.job.execute.engine.exception.ExceptionStatusManager; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.engine.model.GseTaskExecuteResult; import com.tencent.bk.job.execute.engine.model.GseTaskResponse; import com.tencent.bk.job.execute.engine.model.TaskVariableDTO; @@ -84,7 +84,7 @@ public abstract class AbstractGseTaskExecutor implements ResumableTask { protected StepInstanceVariableValueService stepInstanceVariableValueService; protected AgentService agentService; protected LogService logService; - protected TaskExecuteControlMsgSender taskManager; + protected TaskExecuteEventDispatcher taskManager; protected ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager; protected ExecuteMonitor executeMonitor; protected ExceptionStatusManager exceptionStatusManager; @@ -188,7 +188,7 @@ public void initDependentService(ResultHandleManager resultHandleManager, StepInstanceVariableValueService stepInstanceVariableValueService, AgentService agentService, LogService logService, - TaskExecuteControlMsgSender taskManager, + TaskExecuteEventDispatcher taskManager, ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager, ExecuteMonitor executeMonitor, JobExecuteConfig jobExecuteConfig) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/GseStepListener.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/GseStepListener.java index bbf94c76c5..08a9937a32 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/GseStepListener.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/GseStepListener.java @@ -64,7 +64,7 @@ public GseStepListener(GseTaskManager gseTaskManager, @StreamListener(GseTaskProcessor.INPUT) public void handleEvent(@Payload StepEvent gseStepEvent) { - log.info("Receive gse step control message, stepInstanceId={}, action={}, requestId={}, msgSendTime={}", + log.info("Handel gse step event, stepInstanceId={}, action={}, requestId={}, msgSendTime={}", gseStepEvent.getStepInstanceId(), gseStepEvent.getAction(), gseStepEvent.getRequestId(), gseStepEvent.getTime()); long stepInstanceId = gseStepEvent.getStepInstanceId(); @@ -80,10 +80,10 @@ public void handleEvent(@Payload StepEvent gseStepEvent) { } else if (GseStepActionEnum.RETRY_ALL.getValue() == action) { gseTaskManager.retryAll(stepInstanceId, requestId); } else { - log.error("Error gse step control action:{}", action); + log.error("Error gse step action:{}", action); } } catch (Throwable e) { - String errorMsg = "Handling gse step control message error,stepInstanceId:" + stepInstanceId; + String errorMsg = "Handling gse step event error,stepInstanceId:" + stepInstanceId; log.error(errorMsg, e); handleException(stepInstanceId, e); } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/JobListener.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/JobListener.java index d5ec6917c4..97668fbe7d 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/JobListener.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/JobListener.java @@ -29,9 +29,9 @@ import com.tencent.bk.job.execute.common.constants.RunStatusEnum; import com.tencent.bk.job.execute.common.constants.StepExecuteTypeEnum; import com.tencent.bk.job.execute.common.util.TaskCostCalculator; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; import com.tencent.bk.job.execute.engine.consts.JobActionEnum; import com.tencent.bk.job.execute.engine.listener.event.JobEvent; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.engine.message.TaskProcessor; import com.tencent.bk.job.execute.engine.model.JobCallbackDTO; import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; @@ -59,15 +59,15 @@ @Slf4j public class JobListener { - private final TaskExecuteControlMsgSender taskExecuteControlMsgSender; + private final TaskExecuteEventDispatcher taskExecuteEventDispatcher; private final StatisticsService statisticsService; private final TaskInstanceService taskInstanceService; @Autowired - public JobListener(TaskExecuteControlMsgSender taskExecuteControlMsgSender, + public JobListener(TaskExecuteEventDispatcher taskExecuteEventDispatcher, StatisticsService statisticsService, TaskInstanceService taskInstanceService) { - this.taskExecuteControlMsgSender = taskExecuteControlMsgSender; + this.taskExecuteEventDispatcher = taskExecuteEventDispatcher; this.statisticsService = statisticsService; this.taskInstanceService = taskInstanceService; } @@ -115,7 +115,7 @@ private void startJob(TaskInstanceDTO taskInstance) { long firstStepId = taskInstanceService.getTaskStepIdList(taskInstanceId).get(0); taskInstanceService.updateTaskExecutionInfo(taskInstanceId, RunStatusEnum.RUNNING, firstStepId, DateUtils.currentTimeMillis(), null, null); - taskExecuteControlMsgSender.startStep(firstStepId); + taskExecuteEventDispatcher.startStep(firstStepId); // 触发任务开始统计分析 statisticsService.updateStartJobStatistics(taskInstance); } else { @@ -159,7 +159,7 @@ private void restartJob(TaskInstanceDTO taskInstance) { taskInstanceService.resetStepStatus(stepInstanceId); } - taskExecuteControlMsgSender.startTask(taskInstanceId); + taskExecuteEventDispatcher.startTask(taskInstanceId); } else { log.warn("Unsupported task instance run status for restart task, taskInstanceId={}, status={}", taskInstanceId, taskInstance.getStatus()); @@ -233,10 +233,10 @@ private void refreshJob(TaskInstanceDTO taskInstance) { statisticsService.updateEndJobStatistics(taskInstance); } else { // 进入下一步 taskInstanceService.updateTaskCurrentStepId(taskInstanceId, nextStepId); - taskExecuteControlMsgSender.startStep(nextStepId); + taskExecuteEventDispatcher.startStep(nextStepId); } // 步骤执行成功后清理产生的临时文件 - taskExecuteControlMsgSender.clearStep(currentStepId); + taskExecuteEventDispatcher.clearStep(currentStepId); } else if (RunStatusEnum.FAIL.getValue() == stepStatus) { if (currentStep.isIgnoreError()) { taskInstanceService.updateStepStatus(currentStepId, RunStatusEnum.IGNORE_ERROR.getValue()); @@ -285,7 +285,7 @@ private void goToNextStep(TaskInstanceDTO taskInstance, StepInstanceBaseDTO curr } else { // 进入下一步 taskInstanceService.updateTaskCurrentStepId(taskInstanceId, nextStepId); - taskExecuteControlMsgSender.startStep(nextStepId); + taskExecuteEventDispatcher.startStep(nextStepId); } } @@ -302,7 +302,7 @@ private void callback(TaskInstanceDTO taskInstance, long taskInstanceId, int tas e.setStatus(stepStatus); instances.add(e); dto.setStepInstances(instances); - taskExecuteControlMsgSender.sendCallback(dto); + taskExecuteEventDispatcher.sendCallback(dto); } } @@ -335,7 +335,7 @@ private void asyncNotifyFail(TaskInstanceDTO taskInstance, StepInstanceBaseDTO s taskNotifyDTO.setResourceExecuteStatus(ExecuteStatusEnum.FAIL.getStatus()); taskNotifyDTO.setStepName(stepInstance.getName()); setResourceInfo(taskInstance, stepInstance, taskNotifyDTO); - taskExecuteControlMsgSender.asyncSendNotifyMsg(taskNotifyDTO); + taskExecuteEventDispatcher.asyncSendNotifyMsg(taskNotifyDTO); } private void asyncNotifySuccess(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) { @@ -343,7 +343,7 @@ private void asyncNotifySuccess(TaskInstanceDTO taskInstance, StepInstanceBaseDT taskNotifyDTO.setResourceExecuteStatus(ExecuteStatusEnum.SUCCESS.getStatus()); taskNotifyDTO.setCost(taskInstance.getTotalTime()); setResourceInfo(taskInstance, stepInstance, taskNotifyDTO); - taskExecuteControlMsgSender.asyncSendNotifyMsg(taskNotifyDTO); + taskExecuteEventDispatcher.asyncSendNotifyMsg(taskNotifyDTO); } private TaskNotifyDTO buildCommonTaskNotification(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/ResultHandleResumeListener.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/ResultHandleResumeListener.java index ef323c33ef..e4720d1ac2 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/ResultHandleResumeListener.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/ResultHandleResumeListener.java @@ -26,11 +26,11 @@ import com.tencent.bk.job.execute.common.constants.RunStatusEnum; import com.tencent.bk.job.execute.config.StorageSystemConfig; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; import com.tencent.bk.job.execute.engine.consts.FileDirTypeConf; import com.tencent.bk.job.execute.engine.consts.IpStatus; import com.tencent.bk.job.execute.engine.exception.ExceptionStatusManager; import com.tencent.bk.job.execute.engine.listener.event.StepEvent; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.engine.message.TaskResultHandleResumeProcessor; import com.tencent.bk.job.execute.engine.model.FileDest; import com.tencent.bk.job.execute.engine.model.JobFile; @@ -91,7 +91,7 @@ public class ResultHandleResumeListener { private final StepInstanceVariableValueService stepInstanceVariableValueService; - private final TaskExecuteControlMsgSender taskExecuteControlMsgSender; + private final TaskExecuteEventDispatcher taskExecuteEventDispatcher; private final ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager; @@ -107,7 +107,7 @@ public ResultHandleResumeListener( AgentService agentService, LogService logService, StepInstanceVariableValueService stepInstanceVariableValueService, - TaskExecuteControlMsgSender taskExecuteControlMsgSender, + TaskExecuteEventDispatcher taskExecuteEventDispatcher, ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager, ExceptionStatusManager exceptionStatusManager ) { @@ -120,7 +120,7 @@ public ResultHandleResumeListener( this.logService = logService; this.stepInstanceVariableValueService = stepInstanceVariableValueService; - this.taskExecuteControlMsgSender = taskExecuteControlMsgSender; + this.taskExecuteEventDispatcher = taskExecuteEventDispatcher; this.resultHandleTaskKeepaliveManager = resultHandleTaskKeepaliveManager; this.exceptionStatusManager = exceptionStatusManager; } @@ -168,7 +168,7 @@ public void handleEvent(StepEvent stepEvent) { taskVariablesAnalyzeResult, ipLogMap, gseTaskLog, ipLogMap.keySet(), requestId); scriptResultHandleTask.initDependentService(taskInstanceService, gseTaskLogService, logService, - taskInstanceVariableService, stepInstanceVariableValueService, taskExecuteControlMsgSender, + taskInstanceVariableService, stepInstanceVariableValueService, taskExecuteEventDispatcher, resultHandleTaskKeepaliveManager, exceptionStatusManager); resultHandleManager.handleDeliveredTask(scriptResultHandleTask); } else if (stepInstance.isFileStep()) { @@ -190,7 +190,7 @@ public void handleEvent(StepEvent stepEvent) { storageSystemConfig.getJobStorageRootPath(), sourceDestPathMap, sourceFileDisplayMap, requestId); fileResultHandleTask.initDependentService(taskInstanceService, gseTaskLogService, logService, - taskInstanceVariableService, stepInstanceVariableValueService, taskExecuteControlMsgSender, + taskInstanceVariableService, stepInstanceVariableValueService, taskExecuteEventDispatcher, resultHandleTaskKeepaliveManager, exceptionStatusManager); resultHandleManager.handleDeliveredTask(fileResultHandleTask); } else { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/RollingStepListener.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/RollingStepListener.java deleted file mode 100644 index 400941e232..0000000000 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/RollingStepListener.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. - * - * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. - * - * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. - * - * License for BK-JOB蓝鲸智云作业平台: - * -------------------------------------------------------------------- - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation - * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and - * to permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of - * the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO - * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS - * IN THE SOFTWARE. - */ - -package com.tencent.bk.job.execute.engine.listener; - -import com.tencent.bk.job.common.util.date.DateUtils; -import com.tencent.bk.job.execute.common.constants.RunStatusEnum; -import com.tencent.bk.job.execute.common.constants.StepExecuteTypeEnum; -import com.tencent.bk.job.execute.common.util.TaskCostCalculator; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; -import com.tencent.bk.job.execute.engine.listener.event.StepEvent; -import com.tencent.bk.job.execute.engine.message.StepProcessor; -import com.tencent.bk.job.execute.engine.prepare.FilePrepareService; -import com.tencent.bk.job.execute.model.NotifyDTO; -import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; -import com.tencent.bk.job.execute.model.StepInstanceDTO; -import com.tencent.bk.job.execute.model.TaskInstanceDTO; -import com.tencent.bk.job.execute.model.TaskNotifyDTO; -import com.tencent.bk.job.execute.service.TaskInstanceService; -import com.tencent.bk.job.manage.common.consts.notify.ExecuteStatusEnum; -import com.tencent.bk.job.manage.common.consts.notify.ResourceTypeEnum; -import com.tencent.bk.job.manage.common.consts.task.TaskStepTypeEnum; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.stereotype.Component; - -import static com.tencent.bk.job.execute.common.constants.StepExecuteTypeEnum.EXECUTE_SCRIPT; -import static com.tencent.bk.job.execute.common.constants.StepExecuteTypeEnum.EXECUTE_SQL; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.CLEAR; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.CONFIRM_CONTINUE; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.CONFIRM_RESTART; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.CONFIRM_TERMINATE; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.CONTINUE_FILE_PUSH; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.IGNORE_ERROR; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.NEXT_STEP; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.RETRY_ALL; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.RETRY_FAIL; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.SKIP; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.START; -import static com.tencent.bk.job.execute.engine.consts.StepActionEnum.STOP; - -/** - * 执行引擎流程处理-滚动步骤 - */ -@Component -@EnableBinding({StepProcessor.class}) -@Slf4j -public class RollingStepListener { - private final TaskInstanceService taskInstanceService; - private final TaskExecuteControlMsgSender taskControlMsgSender; - private final FilePrepareService filePrepareService; - - @Autowired - public RollingStepListener(TaskInstanceService taskInstanceService, - TaskExecuteControlMsgSender taskControlMsgSender, - FilePrepareService filePrepareService) { - this.taskInstanceService = taskInstanceService; - this.taskControlMsgSender = taskControlMsgSender; - this.filePrepareService = filePrepareService; - } - - /** - * 处理步骤控制相关消息,包含:预启动步骤、启动步骤、重新执行步骤和跳过步骤 - */ - @StreamListener(StepProcessor.INPUT) - public void handleEvent(StepEvent stepEvent) { - log.info("Receive step control message, stepInstanceId={}, action={}, msgSendTime={}", - stepEvent.getStepInstanceId(), - stepEvent.getAction(), stepEvent.getTime()); - long stepInstanceId = stepEvent.getStepInstanceId(); - try { - int action = stepEvent.getAction(); - StepInstanceBaseDTO stepInstance = taskInstanceService.getBaseStepInstance(stepInstanceId); - if (START.getValue() == action) { - log.info("Start step, stepInstanceId={}", stepInstanceId); - startStep(stepInstance); - } else if (SKIP.getValue() == action) { - TaskInstanceDTO taskInstance = taskInstanceService.getTaskInstance(stepInstance.getTaskInstanceId()); - if (taskInstance.getCurrentStepId() == stepInstanceId) { - log.info("Skip step, stepInstanceId={}", stepInstanceId); - skipStep(stepInstance); - } else { - log.warn("Only current running step is support for skipping, stepInstanceId={}", stepInstanceId); - } - } else if (RETRY_FAIL.getValue() == action) { - log.info("Retry step fail, stepInstanceId={}", stepInstanceId); - retryStepFail(stepInstance); - } else if (RETRY_ALL.getValue() == action) { - log.info("Retry step all, stepInstanceId={}", stepInstanceId); - retryStepAll(stepInstance); - } else if (STOP.getValue() == action) { - log.info("Force stop step, stepInstanceId={}", stepInstanceId); - stopStep(stepInstance); - } else if (IGNORE_ERROR.getValue() == action) { - log.info("Ignore step error, stepInstanceId={}", stepInstanceId); - ignoreError(stepInstance); - } else if (NEXT_STEP.getValue() == action) { - log.info("Next step, stepInstanceId={}", stepInstanceId); - nextStep(stepInstance); - } else if (CONFIRM_TERMINATE.getValue() == action) { - log.info("Confirm step terminate, stepInstanceId={}", stepInstanceId); - confirmStepTerminate(stepInstance); - } else if (CONFIRM_RESTART.getValue() == action) { - log.info("Confirm step restart, stepInstanceId={}", stepInstanceId); - confirmStepRestart(stepInstance); - } else if (CONFIRM_CONTINUE.getValue() == action) { - log.info("Confirm step continue, stepInstanceId={}", stepInstanceId); - confirmStepContinue(stepInstance); - } else if (CONTINUE_FILE_PUSH.getValue() == action) { - log.info("continue file push step, stepInstanceId={}", stepInstanceId); - continueGseFileStep(stepInstance); - } else if (CLEAR.getValue() == action) { - log.info("clear step, stepInstanceId={}", stepInstanceId); - clearStep(stepInstance); - } else { - log.warn("Error step control action:{}", action); - } - } catch (Exception e) { - String errorMsg = "Handling step control message error,stepInstanceId:" + stepInstanceId; - log.warn(errorMsg, e); - } - - } - - private void confirmStepTerminate(StepInstanceBaseDTO stepInstance) { - long stepInstanceId = stepInstance.getId(); - TaskInstanceDTO taskInstance = taskInstanceService.getTaskInstance(stepInstance.getTaskInstanceId()); - if (RunStatusEnum.WAITING.getValue().equals(stepInstance.getStatus())) { - Long endTime = DateUtils.currentTimeMillis(); - long taskTotalTime = TaskCostCalculator.calculate(taskInstance.getStartTime(), endTime, - taskInstance.getTotalTime()); - taskInstanceService.updateTaskExecutionInfo(taskInstance.getId(), RunStatusEnum.CONFIRM_TERMINATED, null, - null, endTime, taskTotalTime); - long stepTotalTime = TaskCostCalculator.calculate(stepInstance.getStartTime(), endTime, - stepInstance.getTotalTime()); - taskInstanceService.updateStepExecutionInfo(stepInstanceId, RunStatusEnum.CONFIRM_TERMINATED, null, - endTime, stepTotalTime); - } else { - log.warn("Unsupported step instance status for confirm step terminate action, stepInstanceId:{}, " + - "status:{}", stepInstanceId, stepInstance.getStatus()); - } - } - - private void confirmStepRestart(StepInstanceBaseDTO stepInstance) { - long stepInstanceId = stepInstance.getId(); - if (RunStatusEnum.CONFIRM_TERMINATED.getValue().equals(stepInstance.getStatus())) { - executeConfirmStep(stepInstance); - } else { - log.warn("Unsupported step instance status for confirm-step-restart action, stepInstanceId:{}, status:{}" - , stepInstanceId, stepInstance.getStatus()); - } - } - - private void nextStep(StepInstanceBaseDTO stepInstance) { - long taskInstanceId = stepInstance.getTaskInstanceId(); - long stepInstanceId = stepInstance.getId(); - int stepStatus = stepInstance.getStatus(); - - if (RunStatusEnum.STOP_SUCCESS.getValue() == stepStatus) { - taskInstanceService.updateTaskStatus(taskInstanceId, RunStatusEnum.RUNNING.getValue()); - long endTime = DateUtils.currentTimeMillis(); - long totalTime = TaskCostCalculator.calculate(stepInstance.getStartTime(), endTime, - stepInstance.getTotalTime()); - // 终止成功,进入下一步,该步骤设置为“跳过” - taskInstanceService.updateStepExecutionInfo(stepInstanceId, RunStatusEnum.SKIPPED, null, endTime, - totalTime); - taskControlMsgSender.refreshTask(taskInstanceId); - } else { - log.warn("Unsupported step instance status for next step action, stepInstanceId:{}, status:{}", - stepInstanceId, stepInstance.getStatus()); - } - } - - private void confirmStepContinue(StepInstanceBaseDTO stepInstance) { - long taskInstanceId = stepInstance.getTaskInstanceId(); - long stepInstanceId = stepInstance.getId(); - int stepStatus = stepInstance.getStatus(); - - if (RunStatusEnum.WAITING.getValue() == stepStatus) { - taskInstanceService.updateTaskStatus(taskInstanceId, RunStatusEnum.RUNNING.getValue()); - long endTime = DateUtils.currentTimeMillis(); - long totalTime = TaskCostCalculator.calculate(stepInstance.getStartTime(), endTime, - stepInstance.getTotalTime()); - // 人工确认通过,该步骤状态标识为成功;终止成功的步骤保持状态不变 - taskInstanceService.updateStepExecutionInfo(stepInstanceId, RunStatusEnum.SUCCESS, null, endTime, - totalTime); - taskControlMsgSender.refreshTask(taskInstanceId); - } else { - log.warn("Unsupported step instance status for confirm-step-continue step action, stepInstanceId:{}, " + - "status:{}", stepInstanceId, stepInstance.getStatus()); - } - } - - private void ignoreError(StepInstanceBaseDTO stepInstance) { - if (!stepInstance.getStatus().equals(RunStatusEnum.FAIL.getValue())) { - log.warn("Current step status does not support ignore error operation! stepInstanceId:{}, status:{}", - stepInstance.getId(), stepInstance.getStatus()); - return; - } - - taskInstanceService.updateStepStatus(stepInstance.getId(), RunStatusEnum.IGNORE_ERROR.getValue()); - taskInstanceService.resetTaskExecuteInfoForResume(stepInstance.getTaskInstanceId()); - taskControlMsgSender.refreshTask(stepInstance.getTaskInstanceId()); - } - - private void startStep(StepInstanceBaseDTO stepInstance) { - int stepStatus = stepInstance.getStatus(); - long stepInstanceId = stepInstance.getId(); - long taskInstanceId = stepInstance.getTaskInstanceId(); - - // 只有当步骤状态为'等待用户'和'未执行'时可以启动步骤 - if (RunStatusEnum.BLANK.getValue() == stepStatus || RunStatusEnum.WAITING.getValue() == stepStatus) { - taskInstanceService.updateStepExecutionInfo(stepInstanceId, RunStatusEnum.RUNNING, - DateUtils.currentTimeMillis(), null, null); - taskInstanceService.updateTaskStatus(taskInstanceId, RunStatusEnum.RUNNING.getValue()); - - int stepType = stepInstance.getExecuteType(); - if (EXECUTE_SCRIPT.getValue() == stepType || StepExecuteTypeEnum.EXECUTE_SQL.getValue() == stepType) { - taskControlMsgSender.startGseStep(stepInstanceId); - } else if (TaskStepTypeEnum.FILE.getValue() == stepType) { - filePrepareService.prepareFileForGseTask(stepInstanceId); - } else if (TaskStepTypeEnum.APPROVAL.getValue() == stepType) { - executeConfirmStep(stepInstance); - } else { - log.warn("Unsupported step type, skip it! stepInstanceId={}, stepType={}", stepInstanceId, stepType); - taskInstanceService.updateTaskStatus(taskInstanceId, RunStatusEnum.SKIPPED.getValue()); - taskControlMsgSender.refreshTask(taskInstanceId); - } - } else { - log.warn("Unsupported step instance run status for starting step, stepInstanceId={}, status={}", - stepInstanceId, stepStatus); - } - } - - private void skipStep(StepInstanceBaseDTO stepInstance) { - int stepStatus = stepInstance.getStatus(); - long stepInstanceId = stepInstance.getId(); - long taskInstanceId = stepInstance.getTaskInstanceId(); - - // 只有当步骤状态为'终止中'时可以跳过步骤 - if (RunStatusEnum.STOPPING.getValue() == stepStatus) { - long now = DateUtils.currentTimeMillis(); - taskInstanceService.updateStepStartTimeIfNull(stepInstanceId, now); - taskInstanceService.updateStepStatus(stepInstanceId, RunStatusEnum.SKIPPED.getValue()); - taskInstanceService.updateStepEndTime(stepInstanceId, now); - - taskInstanceService.updateTaskStatus(taskInstanceId, RunStatusEnum.RUNNING.getValue()); - taskControlMsgSender.refreshTask(taskInstanceId); - } else { - log.warn("Unsupported step instance run status for skipping step, stepInstanceId={}, status={}", - stepInstanceId, stepStatus); - } - } - - private void stopStep(StepInstanceBaseDTO stepInstance) { - long stepInstanceId = stepInstance.getId(); - long taskInstanceId = stepInstance.getTaskInstanceId(); - - int executeType = stepInstance.getExecuteType(); - if (TaskStepTypeEnum.SCRIPT.getValue() == executeType || TaskStepTypeEnum.FILE.getValue() == executeType - || EXECUTE_SQL.getValue().equals(executeType)) { - taskInstanceService.updateTaskStatus(taskInstanceId, RunStatusEnum.STOPPING.getValue()); - } else { - log.warn("Not gse step type, can not stop! stepInstanceId={}, stepType={}", stepInstanceId, executeType); - } - } - - /** - * 第三方文件源文件拉取完成后继续GSE文件分发 - * - * @param stepInstance 步骤实例 - */ - private void continueGseFileStep(StepInstanceBaseDTO stepInstance) { - taskControlMsgSender.startGseStep(stepInstance.getId()); - } - - /** - * 重新执行步骤失败的任务 - */ - private void retryStepFail(StepInstanceBaseDTO stepInstance) { - resetStatusForRetry(stepInstance); - filePrepareService.retryPrepareFile(stepInstance.getId()); - taskControlMsgSender.retryGseStepFail(stepInstance.getId()); - } - - /** - * 从头执行步骤 - */ - private void retryStepAll(StepInstanceBaseDTO stepInstance) { - resetStatusForRetry(stepInstance); - filePrepareService.retryPrepareFile(stepInstance.getId()); - taskControlMsgSender.retryGseStepAll(stepInstance.getId()); - } - - /** - * 清理执行完的步骤 - */ - private void clearStep(StepInstanceBaseDTO stepInstance) { - int executeType = stepInstance.getExecuteType(); - // 当前仅有文件分发类步骤需要清理中间文件 - if (TaskStepTypeEnum.FILE.getValue() == executeType) { - filePrepareService.clearPreparedTmpFile(stepInstance.getId()); - } - } - - private void resetStatusForRetry(StepInstanceBaseDTO stepInstance) { - long stepInstanceId = stepInstance.getId(); - long taskInstanceId = stepInstance.getTaskInstanceId(); - - taskInstanceService.resetStepExecuteInfoForRetry(stepInstanceId); - taskInstanceService.resetTaskExecuteInfoForResume(taskInstanceId); - } - - /** - * 人工确认步骤 - */ - private void executeConfirmStep(StepInstanceBaseDTO stepInstance) { - long stepInstanceId = stepInstance.getId(); - long taskInstanceId = stepInstance.getTaskInstanceId(); - - // 只有“未执行”和“确认终止”状态的,才可以重新执行人工确认步骤 - if (RunStatusEnum.BLANK.getValue().equals(stepInstance.getStatus()) - || RunStatusEnum.CONFIRM_TERMINATED.getValue().equals(stepInstance.getStatus())) { - // 发送页面确认信息 - TaskInstanceDTO taskInstance = taskInstanceService.getTaskInstance(taskInstanceId); - String stepOperator = stepInstance.getOperator(); - - if (StringUtils.isBlank(stepOperator)) { - log.info("The operator is empty, continue run step! stepInstanceId={}", stepInstanceId); - stepOperator = taskInstance.getOperator(); - stepInstance.setOperator(stepOperator); - } - taskInstanceService.updateStepStatus(stepInstanceId, RunStatusEnum.WAITING.getValue()); - taskInstanceService.updateTaskStatus(taskInstanceId, RunStatusEnum.WAITING.getValue()); - asyncNotifyConfirm(taskInstance, stepInstance); - } else { - log.warn("Unsupported step instance run status for executing confirm step, stepInstanceId={}, status={}", - stepInstanceId, stepInstance.getStatus()); - } - } - - private void asyncNotifyConfirm(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) { - StepInstanceDTO stepInstanceDetail = taskInstanceService.getStepInstanceDetail(stepInstance.getId()); - if (stepInstanceDetail == null) { - log.warn("StepInstance is not exist, stepInstanceId: {}", stepInstance.getId()); - return; - } - TaskNotifyDTO taskNotifyDTO = buildCommonTaskNotification(taskInstance, stepInstance); - taskNotifyDTO.setResourceExecuteStatus(ExecuteStatusEnum.READY.getStatus()); - taskNotifyDTO.setStepName(stepInstance.getName()); - taskNotifyDTO.setConfirmMessage(stepInstanceDetail.getConfirmMessage()); - NotifyDTO notifyDTO = new NotifyDTO(); - notifyDTO.setReceiverUsers(stepInstanceDetail.getConfirmUsers()); - notifyDTO.setReceiverRoles(stepInstanceDetail.getConfirmRoles()); - notifyDTO.setChannels(stepInstanceDetail.getNotifyChannels()); - notifyDTO.setTriggerUser(stepInstance.getOperator()); - taskNotifyDTO.setNotifyDTO(notifyDTO); - taskNotifyDTO.setResourceId(String.valueOf(taskInstance.getTaskId())); - taskNotifyDTO.setResourceType(ResourceTypeEnum.JOB.getType()); - taskNotifyDTO.setOperator(stepInstance.getOperator()); - taskControlMsgSender.asyncSendNotifyMsg(taskNotifyDTO); - } - - private TaskNotifyDTO buildCommonTaskNotification(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) { - TaskNotifyDTO taskNotifyDTO = new TaskNotifyDTO(); - taskNotifyDTO.setAppId(taskInstance.getAppId()); - String operator = getOperator(taskInstance.getOperator(), stepInstance.getOperator()); - taskNotifyDTO.setStartupMode(taskInstance.getStartupMode()); - taskNotifyDTO.setOperator(operator); - taskNotifyDTO.setTaskInstanceId(taskInstance.getId()); - taskNotifyDTO.setTaskInstanceName(taskInstance.getName()); - taskNotifyDTO.setTaskId(taskInstance.getTaskId()); - return taskNotifyDTO; - } - - private String getOperator(String taskOperator, String stepOperator) { - return StringUtils.isNotEmpty(stepOperator) ? stepOperator : taskOperator; - } -} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/StepListener.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/StepListener.java index b96c78352c..999d773e44 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/StepListener.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/StepListener.java @@ -28,8 +28,8 @@ import com.tencent.bk.job.execute.common.constants.RunStatusEnum; import com.tencent.bk.job.execute.common.constants.StepExecuteTypeEnum; import com.tencent.bk.job.execute.common.util.TaskCostCalculator; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; import com.tencent.bk.job.execute.engine.listener.event.StepEvent; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.engine.message.StepProcessor; import com.tencent.bk.job.execute.engine.prepare.FilePrepareService; import com.tencent.bk.job.execute.model.NotifyDTO; @@ -71,12 +71,12 @@ @Slf4j public class StepListener { private final TaskInstanceService taskInstanceService; - private final TaskExecuteControlMsgSender taskControlMsgSender; + private final TaskExecuteEventDispatcher taskControlMsgSender; private final FilePrepareService filePrepareService; @Autowired public StepListener(TaskInstanceService taskInstanceService, - TaskExecuteControlMsgSender taskControlMsgSender, + TaskExecuteEventDispatcher taskControlMsgSender, FilePrepareService filePrepareService) { this.taskInstanceService = taskInstanceService; this.taskControlMsgSender = taskControlMsgSender; @@ -88,7 +88,7 @@ public StepListener(TaskInstanceService taskInstanceService, */ @StreamListener(StepProcessor.INPUT) public void handleEvent(StepEvent stepEvent) { - log.info("Receive step control message, stepInstanceId={}, action={}, msgSendTime={}", + log.info("Handle step event, stepInstanceId={}, action={}, msgSendTime={}", stepEvent.getStepInstanceId(), stepEvent.getAction(), stepEvent.getTime()); long stepInstanceId = stepEvent.getStepInstanceId(); @@ -140,7 +140,7 @@ public void handleEvent(StepEvent stepEvent) { log.warn("Error step control action:{}", action); } } catch (Exception e) { - String errorMsg = "Handling step control message error,stepInstanceId:" + stepInstanceId; + String errorMsg = "Handling step event error,stepInstanceId:" + stepInstanceId; log.warn(errorMsg, e); } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/event/JobEvent.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/event/JobEvent.java index baf300c381..68751728a1 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/event/JobEvent.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/event/JobEvent.java @@ -24,6 +24,7 @@ package com.tencent.bk.job.execute.engine.listener.event; +import com.fasterxml.jackson.annotation.JsonInclude; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -36,6 +37,7 @@ @Data @NoArgsConstructor @AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) public class JobEvent { /** * 作业操作 diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/TaskExecuteControlMsgSender.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/event/TaskExecuteEventDispatcher.java similarity index 78% rename from src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/TaskExecuteControlMsgSender.java rename to src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/event/TaskExecuteEventDispatcher.java index e52c6675d3..4e0490a797 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/TaskExecuteControlMsgSender.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/event/TaskExecuteEventDispatcher.java @@ -22,137 +22,137 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.execute.engine; +package com.tencent.bk.job.execute.engine.listener.event; import com.tencent.bk.job.execute.engine.model.JobCallbackDTO; import com.tencent.bk.job.execute.model.TaskNotifyDTO; /** - * 作业执行控制消息发送 + * 作业执行事件分发 */ -public interface TaskExecuteControlMsgSender { +public interface TaskExecuteEventDispatcher { /** - * 发送启动作业的作业控制消息 + * 发送启动作业事件 * * @param taskInstanceId 作业实例ID */ void startTask(long taskInstanceId); /** - * 发送停止作业的作业控制消息 + * 发送停止作业事件 * * @param taskInstanceId 作业实例ID */ void stopTask(long taskInstanceId); /** - * 发送重头执行作业的作业控制消息 + * 发送重头执行作业事件 * * @param taskInstanceId 作业实例ID */ void restartTask(long taskInstanceId); /** - * 触发作业继续后续步骤的作业控制消息 + * 触发作业继续后续步骤事件 * * @param taskInstanceId 作业实例ID */ void refreshTask(long taskInstanceId); /** - * 触发忽略错误的作业控制消息 + * 触发忽略错误事件 * * @param stepInstanceId 步骤实例ID */ void ignoreStepError(long stepInstanceId); /** - * 发送进入下一步骤的作业控制消息 + * 发送进入下一步骤事件 * * @param stepInstanceId 步骤实例ID */ void nextStep(long stepInstanceId); /** - * 发送进入下一步骤的作业控制消息 + * 发送进入下一步骤事件 * * @param stepInstanceId 步骤实例ID */ void confirmStepContinue(long stepInstanceId); /** - * 人工确认-终止流程 + * 人工确认-终止流程事件 * * @param stepInstanceId 步骤实例ID */ void confirmStepTerminate(long stepInstanceId); /** - * 人工确认-重新发起确认 + * 人工确认-重新发起确认事件 * * @param stepInstanceId 步骤实例ID */ void confirmStepRestart(long stepInstanceId); /** - * 发送启动步骤的步骤控制消息 + * 发送启动步骤事件 * * @param stepInstanceId 步骤实例ID */ void startStep(long stepInstanceId); /** - * 发送跳过步骤的步骤控制消息 + * 发送跳过步骤事件 * * @param stepInstanceId 步骤实例ID */ void skipStep(long stepInstanceId); /** - * 发送强制终止步骤的步骤控制消息 + * 发送强制终止步骤事件 * * @param stepInstanceId 步骤实例ID */ void stopStep(long stepInstanceId); /** - * 重新执行步骤中失败的ip + * 重新执行步骤中失败的ip事件 * * @param stepInstanceId 步骤实例ID */ void retryStepFail(long stepInstanceId); /** - * 重新执行步骤 + * 重新执行步骤事件 * * @param stepInstanceId 步骤实例ID */ void retryStepAll(long stepInstanceId); /** - * 发送继续GSE文件分发步骤的步骤控制消息 + * 发送继续GSE文件分发步骤事件 * * @param stepInstanceId 步骤实例ID */ void continueGseFileStep(long stepInstanceId); /** - * 发送清理步骤的步骤控制消息:清理步骤中产生的临时文件等 + * 发送清理步骤事件:清理步骤中产生的临时文件等 * * @param stepInstanceId 步骤实例ID */ void clearStep(long stepInstanceId); /** - * 发送执行gse步骤的消息 + * 发送执行gse步骤事件 * * @param stepInstanceId 步骤实例ID */ void startGseStep(long stepInstanceId); /** - * 恢复GSE任务执行 + * 恢复GSE任务执行事件 * * @param stepInstanceId 步骤实例ID * @param executeCount 执行次数 @@ -161,37 +161,37 @@ public interface TaskExecuteControlMsgSender { void resumeGseStep(long stepInstanceId, int executeCount, String requestId); /** - * 重新执行步骤中失败的ip + * 重新执行步骤中失败的ip事件 * * @param stepInstanceId 步骤实例ID */ void retryGseStepFail(long stepInstanceId); /** - * 重新执行步骤 + * 重新执行步骤事件 * * @param stepInstanceId 步骤实例ID */ void retryGseStepAll(long stepInstanceId); /** - * 发送强制终止TSC步骤的消息 + * 发送强制终止GSE步骤事件 * * @param stepInstanceId 步骤实例ID */ void stopGseStep(long stepInstanceId); /** - * 异步发送消息通知 + * 异步发送消息通知事件 * * @param notification 消息内容 */ void asyncSendNotifyMsg(TaskNotifyDTO notification); /** - * 发送回调信息 + * 发送回调信息事件 * - * @param jobCallbackDto + * @param jobCallbackDto 回调内容 */ void sendCallback(JobCallbackDTO jobCallbackDto); } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/TaskExecuteControlMsgSenderImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/event/TaskExecuteEventDispatcherImpl.java similarity index 94% rename from src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/TaskExecuteControlMsgSenderImpl.java rename to src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/event/TaskExecuteEventDispatcherImpl.java index a31d043a68..687749f314 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/TaskExecuteControlMsgSenderImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/event/TaskExecuteEventDispatcherImpl.java @@ -22,14 +22,12 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.execute.engine; +package com.tencent.bk.job.execute.engine.listener.event; import com.tencent.bk.job.common.util.json.JsonUtils; import com.tencent.bk.job.execute.engine.consts.GseStepActionEnum; import com.tencent.bk.job.execute.engine.consts.JobActionEnum; import com.tencent.bk.job.execute.engine.consts.StepActionEnum; -import com.tencent.bk.job.execute.engine.listener.event.JobEvent; -import com.tencent.bk.job.execute.engine.listener.event.StepEvent; import com.tencent.bk.job.execute.engine.message.CallbackProcessor; import com.tencent.bk.job.execute.engine.message.GseTaskProcessor; import com.tencent.bk.job.execute.engine.message.NotifyMsgProcessor; @@ -50,7 +48,7 @@ @Service @Slf4j -public class TaskExecuteControlMsgSenderImpl implements TaskExecuteControlMsgSender { +public class TaskExecuteEventDispatcherImpl implements TaskExecuteEventDispatcher { /** * 消息通道-作业 */ @@ -79,12 +77,12 @@ public class TaskExecuteControlMsgSenderImpl implements TaskExecuteControlMsgSen private final MessageChannel resultHandleTaskResumeOutput; @Autowired - public TaskExecuteControlMsgSenderImpl(@Qualifier(TaskProcessor.OUTPUT) MessageChannel taskOutput, - @Qualifier(StepProcessor.OUTPUT) MessageChannel stepOutput, - @Qualifier(GseTaskProcessor.OUTPUT) MessageChannel gseTaskOutput, - @Qualifier(NotifyMsgProcessor.OUTPUT) MessageChannel notifyMsgOutput, - @Qualifier(CallbackProcessor.OUTPUT) MessageChannel callbackOutput, - @Qualifier(TaskResultHandleResumeProcessor.OUTPUT) MessageChannel resultHandleTaskResumeOutput) { + public TaskExecuteEventDispatcherImpl(@Qualifier(TaskProcessor.OUTPUT) MessageChannel taskOutput, + @Qualifier(StepProcessor.OUTPUT) MessageChannel stepOutput, + @Qualifier(GseTaskProcessor.OUTPUT) MessageChannel gseTaskOutput, + @Qualifier(NotifyMsgProcessor.OUTPUT) MessageChannel notifyMsgOutput, + @Qualifier(CallbackProcessor.OUTPUT) MessageChannel callbackOutput, + @Qualifier(TaskResultHandleResumeProcessor.OUTPUT) MessageChannel resultHandleTaskResumeOutput) { this.taskOutput = taskOutput; this.stepOutput = stepOutput; this.gseTaskOutput = gseTaskOutput; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/FilePrepareServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/FilePrepareServiceImpl.java index d84214ed88..dca31145e0 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/FilePrepareServiceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/FilePrepareServiceImpl.java @@ -25,7 +25,7 @@ package com.tencent.bk.job.execute.engine.prepare; import com.tencent.bk.job.execute.common.constants.RunStatusEnum; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.engine.prepare.local.LocalFilePrepareService; import com.tencent.bk.job.execute.engine.prepare.local.LocalFilePrepareTaskResultHandler; import com.tencent.bk.job.execute.engine.prepare.third.ThirdFilePrepareService; @@ -56,7 +56,7 @@ public class FilePrepareServiceImpl implements FilePrepareService { private final LocalFilePrepareService localFilePrepareService; private final ThirdFilePrepareService thirdFilePrepareService; private final TaskInstanceService taskInstanceService; - private final TaskExecuteControlMsgSender taskControlMsgSender; + private final TaskExecuteEventDispatcher taskControlMsgSender; private final ResultHandleManager resultHandleManager; @Autowired @@ -64,7 +64,7 @@ public FilePrepareServiceImpl( LocalFilePrepareService localFilePrepareService, ThirdFilePrepareService thirdFilePrepareService, TaskInstanceService taskInstanceService, - TaskExecuteControlMsgSender taskControlMsgSender, + TaskExecuteEventDispatcher taskControlMsgSender, ResultHandleManager resultHandleManager) { this.localFilePrepareService = localFilePrepareService; this.thirdFilePrepareService = thirdFilePrepareService; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareService.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareService.java index 6e3e701300..d233a96046 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareService.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareService.java @@ -31,7 +31,7 @@ import com.tencent.bk.job.common.util.file.PathUtil; import com.tencent.bk.job.execute.client.FileSourceTaskResourceClient; import com.tencent.bk.job.execute.dao.FileSourceTaskLogDAO; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.engine.prepare.JobTaskContext; import com.tencent.bk.job.execute.engine.result.ResultHandleManager; import com.tencent.bk.job.execute.model.FileDetailDTO; @@ -72,7 +72,7 @@ public class ThirdFilePrepareService { private final FileSourceTaskLogDAO fileSourceTaskLogDAO; private final AccountService accountService; private final LogService logService; - private final TaskExecuteControlMsgSender taskControlMsgSender; + private final TaskExecuteEventDispatcher taskControlMsgSender; private final Map taskMap = new ConcurrentHashMap<>(); @Autowired @@ -80,7 +80,7 @@ public ThirdFilePrepareService(ResultHandleManager resultHandleManager, FileSourceTaskResourceClient fileSourceTaskResource, TaskInstanceService taskInstanceService, FileSourceTaskLogDAO fileSourceTaskLogDAO, AccountService accountService, - LogService logService, TaskExecuteControlMsgSender taskControlMsgSender) { + LogService logService, TaskExecuteEventDispatcher taskControlMsgSender) { this.resultHandleManager = resultHandleManager; this.fileSourceTaskResource = fileSourceTaskResource; this.taskInstanceService = taskInstanceService; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareTask.java index 0fc95d3a1d..dc63c361ed 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareTask.java @@ -30,7 +30,7 @@ import com.tencent.bk.job.execute.client.FileSourceTaskResourceClient; import com.tencent.bk.job.execute.common.constants.RunStatusEnum; import com.tencent.bk.job.execute.dao.FileSourceTaskLogDAO; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.engine.prepare.JobTaskContext; import com.tencent.bk.job.execute.engine.result.ContinuousScheduledTask; import com.tencent.bk.job.execute.engine.result.ScheduleStrategy; @@ -82,7 +82,7 @@ public class ThirdFilePrepareTask implements ContinuousScheduledTask, JobTaskCon private TaskInstanceService taskInstanceService; private AccountService accountService; private LogService logService; - private TaskExecuteControlMsgSender taskControlMsgSender; + private TaskExecuteEventDispatcher taskControlMsgSender; private FileSourceTaskLogDAO fileSourceTaskLogDAO; private ThirdFilePrepareTaskResultHandler resultHandler; private int pullTimes = 0; @@ -112,7 +112,7 @@ public void initDependentService( TaskInstanceService taskInstanceService, AccountService accountService, LogService logService, - TaskExecuteControlMsgSender taskControlMsgSender, + TaskExecuteEventDispatcher taskControlMsgSender, FileSourceTaskLogDAO fileSourceTaskLogDAO ) { this.fileSourceTaskResource = fileSourceTaskResource; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java index ee00083918..ba91c6305d 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java @@ -30,9 +30,9 @@ import com.tencent.bk.job.common.util.date.DateUtils; import com.tencent.bk.job.common.util.json.JsonUtils; import com.tencent.bk.job.execute.common.constants.RunStatusEnum; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; import com.tencent.bk.job.execute.engine.consts.IpStatus; import com.tencent.bk.job.execute.engine.exception.ExceptionStatusManager; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.engine.model.GseLog; import com.tencent.bk.job.execute.engine.model.GseLogBatchPullResult; import com.tencent.bk.job.execute.engine.model.GseTaskExecuteResult; @@ -91,7 +91,7 @@ public abstract class AbstractResultHandleTask implements ContinuousScheduled protected GseTaskLogService gseTaskLogService; protected TaskInstanceVariableService taskInstanceVariableService; protected StepInstanceVariableValueService stepInstanceVariableValueService; - protected TaskExecuteControlMsgSender taskManager; + protected TaskExecuteEventDispatcher taskManager; protected ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager; protected ExceptionStatusManager exceptionStatusManager; /** @@ -245,7 +245,7 @@ public void initDependentService(TaskInstanceService taskInstanceService, LogService logService, TaskInstanceVariableService taskInstanceVariableService, StepInstanceVariableValueService stepInstanceVariableValueService, - TaskExecuteControlMsgSender taskManager, + TaskExecuteEventDispatcher taskManager, ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager, ExceptionStatusManager exceptionStatusManager ) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ha/NotAliveResultHandleTaskDetector.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ha/NotAliveResultHandleTaskDetector.java index 71a090fc9c..9bdc536436 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ha/NotAliveResultHandleTaskDetector.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ha/NotAliveResultHandleTaskDetector.java @@ -25,7 +25,7 @@ package com.tencent.bk.job.execute.engine.result.ha; import com.tencent.bk.job.common.redis.util.LockUtils; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.monitor.metrics.ExecuteMonitor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; @@ -44,15 +44,15 @@ @EnableScheduling public class NotAliveResultHandleTaskDetector { private final ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager; - private final TaskExecuteControlMsgSender taskExecuteControlMsgSender; + private final TaskExecuteEventDispatcher taskExecuteEventDispatcher; private final ExecuteMonitor executeMonitor; private final String requestId = UUID.randomUUID().toString(); public NotAliveResultHandleTaskDetector(ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager, - TaskExecuteControlMsgSender taskExecuteControlMsgSender, + TaskExecuteEventDispatcher taskExecuteEventDispatcher, ExecuteMonitor executeMonitor) { this.resultHandleTaskKeepaliveManager = resultHandleTaskKeepaliveManager; - this.taskExecuteControlMsgSender = taskExecuteControlMsgSender; + this.taskExecuteEventDispatcher = taskExecuteEventDispatcher; this.executeMonitor = executeMonitor; } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceRollingConfigDTO.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceRollingConfigDTO.java new file mode 100644 index 0000000000..cbe96dcaa5 --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/TaskInstanceRollingConfigDTO.java @@ -0,0 +1,55 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.execute.model; + +import com.tencent.bk.job.execute.model.db.RollingConfigDO; +import lombok.Data; + +/** + * 执行作业实例滚动区间配置 + */ +@Data +public class TaskInstanceRollingConfigDTO { + /** + * id + */ + private Long id; + + /** + * 执行作业实例id + */ + private Long taskInstanceId; + + /** + * 滚动区间名称 + */ + private String configName; + + /** + * 滚动配置 + */ + private RollingConfigDO config; +} + diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/db/RollingConfigDO.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/db/RollingConfigDO.java new file mode 100644 index 0000000000..b7fb97dbe2 --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/model/db/RollingConfigDO.java @@ -0,0 +1,77 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.execute.model.db; + +import com.tencent.bk.job.common.model.dto.IpDTO; +import lombok.Data; + +import java.util.List; + +/** + * 执行作业实例滚动区间配置DO + */ +@Data +public class RollingConfigDO { + /** + * 滚动区间名称 + */ + private String name; + /** + * 滚动区间包含的步骤实例ID + */ + private List includeStepInstanceIdList; + /** + * 参与滚动的步骤实例ID + */ + private List rollingStepInstanceIdList; + /** + * 不参与滚动的步骤实例ID(全量执行) + */ + private List excludeStepInstanceIdList; + /** + * 滚动策略 + */ + private Integer rollingMode; + /** + * 滚动表达式 + */ + private String rollingExpr; + /** + * 目标服务器滚动分批 + */ + private List rollingTargetServerBatch; + + @Data + private static class ServerBatch { + /** + * 滚动执行批次 + */ + private Integer batch; + /** + * 该批次的目标服务器 + */ + private List servers; + } +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java index 66f4e533c2..bdbe4bc94f 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/TaskExecuteServiceImpl.java @@ -57,7 +57,7 @@ import com.tencent.bk.job.execute.constants.StepOperationEnum; import com.tencent.bk.job.execute.constants.TaskOperationEnum; import com.tencent.bk.job.execute.constants.UserOperationEnum; -import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender; +import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher; import com.tencent.bk.job.execute.engine.model.TaskVariableDTO; import com.tencent.bk.job.execute.engine.util.TimeoutUtils; import com.tencent.bk.job.execute.model.AccountDTO; @@ -148,7 +148,7 @@ public class TaskExecuteServiceImpl implements TaskExecuteService { private final ApplicationService applicationService; private final AccountService accountService; private final ScriptService scriptService; - private final TaskExecuteControlMsgSender controlMsgSender; + private final TaskExecuteEventDispatcher controlMsgSender; private final TaskPlanService taskPlanService; private final TaskInstanceVariableService taskInstanceVariableService; private final ServerService serverService; @@ -168,7 +168,7 @@ public class TaskExecuteServiceImpl implements TaskExecuteService { public TaskExecuteServiceImpl(ApplicationService applicationService, AccountService accountService, TaskInstanceService taskInstanceService, - TaskExecuteControlMsgSender controlMsgSender, + TaskExecuteEventDispatcher controlMsgSender, TaskPlanService taskPlanService, TaskInstanceVariableService taskInstanceVariableService, ServerService serverService,