Skip to content

Commit

Permalink
feature: 原子操作能力支持滚动执行 TencentBlueKing#446
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Jan 11, 2022
1 parent c08ccf5 commit 9bb2635
Show file tree
Hide file tree
Showing 19 changed files with 225 additions and 494 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,16 @@ CREATE TABLE IF NOT EXISTS `step_instance_variable`
DEFAULT CHARSET = utf8mb4;


CREATE TABLE IF NOT EXISTS `rolling_config`
CREATE TABLE IF NOT EXISTS `task_instance_rolling_config`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`task_instance_id` bigint(20) NOT NULL DEFAULT '0',
`rolling_name` varchar(128) NOT NULL,
`config_name` varchar(128) NOT NULL,
`config` longtext NOT NULL,
`row_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY(`id`),
KEY (`task_instance_id`)
UNIQUE KEY (`task_instance_id`,`config_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

Expand All @@ -293,7 +293,8 @@ CREATE TABLE IF NOT EXISTS `step_instance_task`
`status` tinyint(4) NOT NULL DEFAULT '1',
`row_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`step_instance_id`, `execute_count`, `batch`)
PRIMARY KEY(`id`),
UNIQUE KEY (`step_instance_id`, `execute_count`, `batch`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.tencent.bk.job.execute.engine.executor.FileTaskExecutor;
import com.tencent.bk.job.execute.engine.executor.SQLScriptTaskExecutor;
import com.tencent.bk.job.execute.engine.executor.ScriptTaskExecutor;
import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher;
import com.tencent.bk.job.execute.engine.model.GseTaskExecuteResult;
import com.tencent.bk.job.execute.engine.result.ResultHandleManager;
import com.tencent.bk.job.execute.engine.result.ha.ResultHandleTaskKeepaliveManager;
Expand Down Expand Up @@ -87,7 +88,7 @@ public class GseTaskManager implements SmartLifecycle {
private final ResultHandleManager resultHandleManager;
private final TaskInstanceService taskInstanceService;
private final GseTaskLogService gseTaskLogService;
private final TaskExecuteControlMsgSender taskManager;
private final TaskExecuteEventDispatcher taskManager;
private final AccountService accountService;
private final LogService logService;
private final TaskInstanceVariableService taskInstanceVariableService;
Expand Down Expand Up @@ -142,7 +143,7 @@ public class GseTaskManager implements SmartLifecycle {
public GseTaskManager(ResultHandleManager resultHandleManager,
TaskInstanceService taskInstanceService,
GseTaskLogService gseTaskLogService,
TaskExecuteControlMsgSender taskManager,
TaskExecuteEventDispatcher taskManager,
AccountService accountService,
LogService logService,
TaskInstanceVariableService taskInstanceVariableService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import com.tencent.bk.job.common.util.date.DateUtils;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.config.JobExecuteConfig;
import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender;
import com.tencent.bk.job.execute.engine.consts.IpStatus;
import com.tencent.bk.job.execute.engine.exception.ExceptionStatusManager;
import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher;
import com.tencent.bk.job.execute.engine.model.GseTaskExecuteResult;
import com.tencent.bk.job.execute.engine.model.GseTaskResponse;
import com.tencent.bk.job.execute.engine.model.TaskVariableDTO;
Expand Down Expand Up @@ -84,7 +84,7 @@ public abstract class AbstractGseTaskExecutor implements ResumableTask {
protected StepInstanceVariableValueService stepInstanceVariableValueService;
protected AgentService agentService;
protected LogService logService;
protected TaskExecuteControlMsgSender taskManager;
protected TaskExecuteEventDispatcher taskManager;
protected ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager;
protected ExecuteMonitor executeMonitor;
protected ExceptionStatusManager exceptionStatusManager;
Expand Down Expand Up @@ -188,7 +188,7 @@ public void initDependentService(ResultHandleManager resultHandleManager,
StepInstanceVariableValueService stepInstanceVariableValueService,
AgentService agentService,
LogService logService,
TaskExecuteControlMsgSender taskManager,
TaskExecuteEventDispatcher taskManager,
ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager,
ExecuteMonitor executeMonitor,
JobExecuteConfig jobExecuteConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public GseStepListener(GseTaskManager gseTaskManager,

@StreamListener(GseTaskProcessor.INPUT)
public void handleEvent(@Payload StepEvent gseStepEvent) {
log.info("Receive gse step control message, stepInstanceId={}, action={}, requestId={}, msgSendTime={}",
log.info("Handel gse step event, stepInstanceId={}, action={}, requestId={}, msgSendTime={}",
gseStepEvent.getStepInstanceId(),
gseStepEvent.getAction(), gseStepEvent.getRequestId(), gseStepEvent.getTime());
long stepInstanceId = gseStepEvent.getStepInstanceId();
Expand All @@ -80,10 +80,10 @@ public void handleEvent(@Payload StepEvent gseStepEvent) {
} else if (GseStepActionEnum.RETRY_ALL.getValue() == action) {
gseTaskManager.retryAll(stepInstanceId, requestId);
} else {
log.error("Error gse step control action:{}", action);
log.error("Error gse step action:{}", action);
}
} catch (Throwable e) {
String errorMsg = "Handling gse step control message error,stepInstanceId:" + stepInstanceId;
String errorMsg = "Handling gse step event error,stepInstanceId:" + stepInstanceId;
log.error(errorMsg, e);
handleException(stepInstanceId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.common.constants.StepExecuteTypeEnum;
import com.tencent.bk.job.execute.common.util.TaskCostCalculator;
import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender;
import com.tencent.bk.job.execute.engine.consts.JobActionEnum;
import com.tencent.bk.job.execute.engine.listener.event.JobEvent;
import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher;
import com.tencent.bk.job.execute.engine.message.TaskProcessor;
import com.tencent.bk.job.execute.engine.model.JobCallbackDTO;
import com.tencent.bk.job.execute.model.StepInstanceBaseDTO;
Expand Down Expand Up @@ -59,15 +59,15 @@
@Slf4j
public class JobListener {

private final TaskExecuteControlMsgSender taskExecuteControlMsgSender;
private final TaskExecuteEventDispatcher taskExecuteEventDispatcher;
private final StatisticsService statisticsService;
private final TaskInstanceService taskInstanceService;

@Autowired
public JobListener(TaskExecuteControlMsgSender taskExecuteControlMsgSender,
public JobListener(TaskExecuteEventDispatcher taskExecuteEventDispatcher,
StatisticsService statisticsService,
TaskInstanceService taskInstanceService) {
this.taskExecuteControlMsgSender = taskExecuteControlMsgSender;
this.taskExecuteEventDispatcher = taskExecuteEventDispatcher;
this.statisticsService = statisticsService;
this.taskInstanceService = taskInstanceService;
}
Expand Down Expand Up @@ -115,7 +115,7 @@ private void startJob(TaskInstanceDTO taskInstance) {
long firstStepId = taskInstanceService.getTaskStepIdList(taskInstanceId).get(0);
taskInstanceService.updateTaskExecutionInfo(taskInstanceId, RunStatusEnum.RUNNING, firstStepId,
DateUtils.currentTimeMillis(), null, null);
taskExecuteControlMsgSender.startStep(firstStepId);
taskExecuteEventDispatcher.startStep(firstStepId);
// 触发任务开始统计分析
statisticsService.updateStartJobStatistics(taskInstance);
} else {
Expand Down Expand Up @@ -159,7 +159,7 @@ private void restartJob(TaskInstanceDTO taskInstance) {
taskInstanceService.resetStepStatus(stepInstanceId);
}

taskExecuteControlMsgSender.startTask(taskInstanceId);
taskExecuteEventDispatcher.startTask(taskInstanceId);
} else {
log.warn("Unsupported task instance run status for restart task, taskInstanceId={}, status={}",
taskInstanceId, taskInstance.getStatus());
Expand Down Expand Up @@ -233,10 +233,10 @@ private void refreshJob(TaskInstanceDTO taskInstance) {
statisticsService.updateEndJobStatistics(taskInstance);
} else { // 进入下一步
taskInstanceService.updateTaskCurrentStepId(taskInstanceId, nextStepId);
taskExecuteControlMsgSender.startStep(nextStepId);
taskExecuteEventDispatcher.startStep(nextStepId);
}
// 步骤执行成功后清理产生的临时文件
taskExecuteControlMsgSender.clearStep(currentStepId);
taskExecuteEventDispatcher.clearStep(currentStepId);
} else if (RunStatusEnum.FAIL.getValue() == stepStatus) {
if (currentStep.isIgnoreError()) {
taskInstanceService.updateStepStatus(currentStepId, RunStatusEnum.IGNORE_ERROR.getValue());
Expand Down Expand Up @@ -285,7 +285,7 @@ private void goToNextStep(TaskInstanceDTO taskInstance, StepInstanceBaseDTO curr
} else { // 进入下一步
taskInstanceService.updateTaskCurrentStepId(taskInstanceId, nextStepId);

taskExecuteControlMsgSender.startStep(nextStepId);
taskExecuteEventDispatcher.startStep(nextStepId);
}
}

Expand All @@ -302,7 +302,7 @@ private void callback(TaskInstanceDTO taskInstance, long taskInstanceId, int tas
e.setStatus(stepStatus);
instances.add(e);
dto.setStepInstances(instances);
taskExecuteControlMsgSender.sendCallback(dto);
taskExecuteEventDispatcher.sendCallback(dto);
}
}

Expand Down Expand Up @@ -335,15 +335,15 @@ private void asyncNotifyFail(TaskInstanceDTO taskInstance, StepInstanceBaseDTO s
taskNotifyDTO.setResourceExecuteStatus(ExecuteStatusEnum.FAIL.getStatus());
taskNotifyDTO.setStepName(stepInstance.getName());
setResourceInfo(taskInstance, stepInstance, taskNotifyDTO);
taskExecuteControlMsgSender.asyncSendNotifyMsg(taskNotifyDTO);
taskExecuteEventDispatcher.asyncSendNotifyMsg(taskNotifyDTO);
}

private void asyncNotifySuccess(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) {
TaskNotifyDTO taskNotifyDTO = buildCommonTaskNotification(taskInstance, stepInstance);
taskNotifyDTO.setResourceExecuteStatus(ExecuteStatusEnum.SUCCESS.getStatus());
taskNotifyDTO.setCost(taskInstance.getTotalTime());
setResourceInfo(taskInstance, stepInstance, taskNotifyDTO);
taskExecuteControlMsgSender.asyncSendNotifyMsg(taskNotifyDTO);
taskExecuteEventDispatcher.asyncSendNotifyMsg(taskNotifyDTO);
}

private TaskNotifyDTO buildCommonTaskNotification(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@

import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.config.StorageSystemConfig;
import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender;
import com.tencent.bk.job.execute.engine.consts.FileDirTypeConf;
import com.tencent.bk.job.execute.engine.consts.IpStatus;
import com.tencent.bk.job.execute.engine.exception.ExceptionStatusManager;
import com.tencent.bk.job.execute.engine.listener.event.StepEvent;
import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteEventDispatcher;
import com.tencent.bk.job.execute.engine.message.TaskResultHandleResumeProcessor;
import com.tencent.bk.job.execute.engine.model.FileDest;
import com.tencent.bk.job.execute.engine.model.JobFile;
Expand Down Expand Up @@ -91,7 +91,7 @@ public class ResultHandleResumeListener {

private final StepInstanceVariableValueService stepInstanceVariableValueService;

private final TaskExecuteControlMsgSender taskExecuteControlMsgSender;
private final TaskExecuteEventDispatcher taskExecuteEventDispatcher;

private final ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager;

Expand All @@ -107,7 +107,7 @@ public ResultHandleResumeListener(
AgentService agentService,
LogService logService,
StepInstanceVariableValueService stepInstanceVariableValueService,
TaskExecuteControlMsgSender taskExecuteControlMsgSender,
TaskExecuteEventDispatcher taskExecuteEventDispatcher,
ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager,
ExceptionStatusManager exceptionStatusManager
) {
Expand All @@ -120,7 +120,7 @@ public ResultHandleResumeListener(
this.logService = logService;

this.stepInstanceVariableValueService = stepInstanceVariableValueService;
this.taskExecuteControlMsgSender = taskExecuteControlMsgSender;
this.taskExecuteEventDispatcher = taskExecuteEventDispatcher;
this.resultHandleTaskKeepaliveManager = resultHandleTaskKeepaliveManager;
this.exceptionStatusManager = exceptionStatusManager;
}
Expand Down Expand Up @@ -168,7 +168,7 @@ public void handleEvent(StepEvent stepEvent) {
taskVariablesAnalyzeResult, ipLogMap, gseTaskLog, ipLogMap.keySet(),
requestId);
scriptResultHandleTask.initDependentService(taskInstanceService, gseTaskLogService, logService,
taskInstanceVariableService, stepInstanceVariableValueService, taskExecuteControlMsgSender,
taskInstanceVariableService, stepInstanceVariableValueService, taskExecuteEventDispatcher,
resultHandleTaskKeepaliveManager, exceptionStatusManager);
resultHandleManager.handleDeliveredTask(scriptResultHandleTask);
} else if (stepInstance.isFileStep()) {
Expand All @@ -190,7 +190,7 @@ public void handleEvent(StepEvent stepEvent) {
storageSystemConfig.getJobStorageRootPath(), sourceDestPathMap, sourceFileDisplayMap,
requestId);
fileResultHandleTask.initDependentService(taskInstanceService, gseTaskLogService, logService,
taskInstanceVariableService, stepInstanceVariableValueService, taskExecuteControlMsgSender,
taskInstanceVariableService, stepInstanceVariableValueService, taskExecuteEventDispatcher,
resultHandleTaskKeepaliveManager, exceptionStatusManager);
resultHandleManager.handleDeliveredTask(fileResultHandleTask);
} else {
Expand Down
Loading

0 comments on commit 9bb2635

Please sign in to comment.