Skip to content

Commit

Permalink
Merge pull request #1114 from wangyu096/feature/rolling
Browse files Browse the repository at this point in the history
feature: 原子操作能力支持滚动执行 #446
  • Loading branch information
wangyu096 authored Jul 12, 2022
2 parents b8c6bcf + 4e02122 commit 7d52a47
Show file tree
Hide file tree
Showing 32 changed files with 509 additions and 368 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,10 @@ Response<List<StepExecutionRecordVO>> listStepExecutionHistory(
String scopeId,
@ApiParam(value = "步骤实例ID", name = "stepInstanceId", required = true)
@PathVariable("stepInstanceId")
Long stepInstanceId
Long stepInstanceId,
@ApiParam(value = "滚动批次,非滚动步骤不需要传入", name = "batch")
@RequestParam(value = "batch", required = false)
Integer batch
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class TaskOperationLogVO {
private Long stepInstanceId;
@ApiModelProperty("步骤执行次数")
private Integer retry;
@ApiModelProperty("滚动执行批次")
private Integer batch;
@ApiModelProperty("步骤名称")
private String stepName;
@ApiModelProperty("操作时间")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public void testBatchSaveAgentTasks() {
List<AgentTaskDTO> agentTaskList = new ArrayList<>();
AgentTaskDTO agentTask1 = new AgentTaskDTO();
agentTask1.setStepInstanceId(100L);
agentTask1.setExecuteCount(0);
agentTask1.setExecuteCount(1);
agentTask1.setActualExecuteCount(1);
agentTask1.setBatch(1);
agentTask1.setFileTaskMode(FileTaskModeEnum.UPLOAD);
agentTask1.setHostId(101L);
Expand All @@ -99,7 +100,8 @@ public void testBatchSaveAgentTasks() {

AgentTaskDTO agentTask2 = new AgentTaskDTO();
agentTask2.setStepInstanceId(100L);
agentTask2.setExecuteCount(0);
agentTask2.setExecuteCount(1);
agentTask2.setActualExecuteCount(1);
agentTask2.setBatch(1);
agentTask2.setFileTaskMode(FileTaskModeEnum.DOWNLOAD);
agentTask2.setHostId(102L);
Expand All @@ -115,10 +117,11 @@ public void testBatchSaveAgentTasks() {

fileAgentTaskDAO.batchSaveAgentTasks(agentTaskList);

AgentTaskDTO agentTask1Return = fileAgentTaskDAO.getAgentTaskByHostId(100L, 0, 1,
AgentTaskDTO agentTask1Return = fileAgentTaskDAO.getAgentTaskByHostId(100L, 1, 1,
FileTaskModeEnum.UPLOAD, 101L);
assertThat(agentTask1Return.getStepInstanceId()).isEqualTo(100L);
assertThat(agentTask1Return.getExecuteCount()).isEqualTo(0L);
assertThat(agentTask1Return.getExecuteCount()).isEqualTo(1L);
assertThat(agentTask1Return.getActualExecuteCount()).isEqualTo(1L);
assertThat(agentTask1Return.getBatch()).isEqualTo(1);
assertThat(agentTask1Return.getFileTaskMode()).isEqualTo(FileTaskModeEnum.UPLOAD);
assertThat(agentTask1Return.getHostId()).isEqualTo(101L);
Expand All @@ -131,10 +134,11 @@ public void testBatchSaveAgentTasks() {
assertThat(agentTask1Return.getStatus()).isEqualTo(1);


AgentTaskDTO agentTask2Return = fileAgentTaskDAO.getAgentTaskByHostId(100L, 0, 1,
AgentTaskDTO agentTask2Return = fileAgentTaskDAO.getAgentTaskByHostId(100L, 1, 1,
FileTaskModeEnum.DOWNLOAD, 102L);
assertThat(agentTask2Return.getStepInstanceId()).isEqualTo(100L);
assertThat(agentTask2Return.getExecuteCount()).isEqualTo(0L);
assertThat(agentTask2Return.getExecuteCount()).isEqualTo(1L);
assertThat(agentTask2Return.getActualExecuteCount()).isEqualTo(1L);
assertThat(agentTask2Return.getBatch()).isEqualTo(1);
assertThat(agentTask2Return.getFileTaskMode()).isEqualTo(FileTaskModeEnum.DOWNLOAD);
assertThat(agentTask2Return.getHostId()).isEqualTo(102L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void testBatchSaveAgentTasks() {
List<AgentTaskDTO> agentTaskList = new ArrayList<>();
AgentTaskDTO agentTask1 = new AgentTaskDTO();
agentTask1.setStepInstanceId(100L);
agentTask1.setExecuteCount(0);
agentTask1.setExecuteCount(1);
agentTask1.setActualExecuteCount(1);
agentTask1.setBatch(1);
agentTask1.setHostId(101L);
agentTask1.setAgentId("0:127.0.0.1");
Expand All @@ -102,7 +103,8 @@ public void testBatchSaveAgentTasks() {

AgentTaskDTO agentTask2 = new AgentTaskDTO();
agentTask2.setStepInstanceId(100L);
agentTask2.setExecuteCount(0);
agentTask2.setExecuteCount(1);
agentTask2.setActualExecuteCount(1);
agentTask2.setBatch(1);
agentTask2.setHostId(102L);
agentTask2.setAgentId("0:127.0.0.2");
Expand All @@ -120,9 +122,10 @@ public void testBatchSaveAgentTasks() {

scriptAgentTaskDAO.batchSaveAgentTasks(agentTaskList);

AgentTaskDTO agentTask1Return = scriptAgentTaskDAO.getAgentTaskByHostId(100L, 0, 1, 101L);
AgentTaskDTO agentTask1Return = scriptAgentTaskDAO.getAgentTaskByHostId(100L, 1, 1, 101L);
assertThat(agentTask1Return.getStepInstanceId()).isEqualTo(100L);
assertThat(agentTask1Return.getExecuteCount()).isEqualTo(0L);
assertThat(agentTask1Return.getExecuteCount()).isEqualTo(1L);
assertThat(agentTask1Return.getActualExecuteCount()).isEqualTo(1L);
assertThat(agentTask1Return.getBatch()).isEqualTo(1);
assertThat(agentTask1Return.getHostId()).isEqualTo(101L);
assertThat(agentTask1Return.getAgentId()).isEqualTo("0:127.0.0.1");
Expand All @@ -136,9 +139,10 @@ public void testBatchSaveAgentTasks() {
assertThat(agentTask1Return.getExitCode()).isEqualTo(1);


AgentTaskDTO agentTask2Return = scriptAgentTaskDAO.getAgentTaskByHostId(100L, 0, 1, 102L);
AgentTaskDTO agentTask2Return = scriptAgentTaskDAO.getAgentTaskByHostId(100L, 1, 1, 102L);
assertThat(agentTask2Return.getStepInstanceId()).isEqualTo(100L);
assertThat(agentTask2Return.getExecuteCount()).isEqualTo(0L);
assertThat(agentTask2Return.getExecuteCount()).isEqualTo(1L);
assertThat(agentTask2Return.getActualExecuteCount()).isEqualTo(1L);
assertThat(agentTask2Return.getBatch()).isEqualTo(1);
assertThat(agentTask2Return.getHostId()).isEqualTo(102L);
assertThat(agentTask2Return.getGseTaskId()).isEqualTo(1001L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void queryRollingTask() {
@Test
@DisplayName("根据步骤实例ID查询步骤滚动任务")
void listRollingTasks() {
List<StepInstanceRollingTaskDTO> rollingTasks = stepInstanceRollingTaskDAO.listRollingTasks(1L);
List<StepInstanceRollingTaskDTO> rollingTasks = stepInstanceRollingTaskDAO.listRollingTasks(1L, null, null);
assertThat(rollingTasks).hasSize(2);

StepInstanceRollingTaskDTO rollingTask1 = rollingTasks.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ CREATE TABLE IF NOT EXISTS `step_instance_confirm`

CREATE TABLE IF NOT EXISTS `gse_task_log`
(
`step_instance_id` bigint(20) NOT NULL DEFAULT '0',
`execute_count` int(11) NOT NULL DEFAULT '0',
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`total_time` bigint(11) DEFAULT NULL,
`status` tinyint(4) DEFAULT '1',
`gse_task_id` varchar(64) DEFAULT NULL,
`row_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`step_instance_id` bigint(20) NOT NULL DEFAULT '0',
`execute_count` int(11) NOT NULL DEFAULT '0',
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`total_time` bigint(11) DEFAULT NULL,
`status` tinyint(4) DEFAULT '1',
`gse_task_id` varchar(64) DEFAULT NULL,
`row_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`step_instance_id`, `execute_count`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
Expand Down Expand Up @@ -188,66 +188,71 @@ CREATE TABLE IF NOT EXISTS `gse_task_ip_log`
DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `gse_task`
(
(
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`step_instance_id` bigint(20) NOT NULL DEFAULT '0',
`execute_count` int(11) NOT NULL DEFAULT '0',
`execute_count` smallint(6) NOT NULL DEFAULT '0',
`batch` smallint(6) NOT NULL DEFAULT '0',
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`total_time` bigint(11) DEFAULT NULL,
`status` tinyint(4) DEFAULT '1',
`gse_task_id` varchar(64) DEFAULT NULL,
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`total_time` bigint(11) DEFAULT NULL,
`status` tinyint(4) DEFAULT '1',
`gse_task_id` varchar(64) DEFAULT NULL,
`row_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY (`step_instance_id`,`execute_count`,`batch`),
UNIQUE KEY (`step_instance_id`, `execute_count`, `batch`),
KEY (`gse_task_id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `gse_script_agent_task`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`step_instance_id` bigint(20) NOT NULL,
`execute_count` int(11) NOT NULL DEFAULT '0',
`batch` smallint(6) NOT NULL DEFAULT '0',
`host_id` bigint(20) NOT NULL DEFAULT '0',
`agent_id` varchar(64) NOT NULL DEFAULT '',
`gse_task_id` bigint(20) NOT NULL DEFAULT '0',
`status` int(11) DEFAULT '1',
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`total_time` bigint(20) DEFAULT NULL,
`error_code` int(11) DEFAULT '0',
`exit_code` int(11) DEFAULT NULL,
`tag` varchar(256) DEFAULT '',
`log_offset` int(11) NOT NULL DEFAULT '0',
`row_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY (`step_instance_id`,`execute_count`,`batch`,`host_id`),
KEY (`step_instance_id`,`host_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
(
`id` bigint(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`step_instance_id` bigint(20) NOT NULL,
`execute_count` smallint(6) NOT NULL DEFAULT '0',
`actual_execute_count` smallint(6) DEFAULT NULL,
`batch` smallint(6) NOT NULL DEFAULT '0',
`host_id` bigint(20) NOT NULL DEFAULT '0',
`agent_id` varchar(64) NOT NULL DEFAULT '',
`gse_task_id` bigint(20) NOT NULL DEFAULT '0',
`status` int(11) DEFAULT '1',
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`total_time` bigint(20) DEFAULT NULL,
`error_code` int(11) DEFAULT '0',
`exit_code` int(11) DEFAULT NULL,
`tag` varchar(256) DEFAULT '',
`log_offset` int(11) NOT NULL DEFAULT '0',
`row_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY (`step_instance_id`, `execute_count`, `batch`, `host_id`),
KEY (`step_instance_id`, `host_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `gse_file_agent_task`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`step_instance_id` bigint(20) NOT NULL,
`execute_count` int(11) NOT NULL DEFAULT '0',
`batch` smallint(6) NOT NULL DEFAULT '0',
`host_id` bigint(20) NOT NULL DEFAULT '0',
`agent_id` varchar(64) NOT NULL DEFAULT '',
`mode` tinyint(1) NOT NULL,
`gse_task_id` bigint(20) NOT NULL DEFAULT '0',
`status` int(11) DEFAULT '1',
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`total_time` bigint(20) DEFAULT NULL,
`error_code` int(11) DEFAULT '0',
`row_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY (`step_instance_id`,`execute_count`,`batch`,`mode`,`host_id`),
KEY (`step_instance_id`,`host_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
(
`id` bigint(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`step_instance_id` bigint(20) NOT NULL,
`execute_count` smallint(6) NOT NULL DEFAULT '0',
`actual_execute_count` smallint(6) DEFAULT NULL,
`batch` smallint(6) NOT NULL DEFAULT '0',
`host_id` bigint(20) NOT NULL DEFAULT '0',
`agent_id` varchar(64) NOT NULL DEFAULT '',
`mode` tinyint(1) NOT NULL,
`gse_task_id` bigint(20) NOT NULL DEFAULT '0',
`status` int(11) DEFAULT '1',
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`total_time` bigint(20) DEFAULT NULL,
`error_code` int(11) DEFAULT '0',
`row_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY (`step_instance_id`, `execute_count`, `batch`, `mode`, `host_id`),
KEY (`step_instance_id`, `host_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `operation_log`
(
Expand Down Expand Up @@ -339,16 +344,16 @@ CREATE TABLE IF NOT EXISTS `rolling_config`

CREATE TABLE IF NOT EXISTS `step_instance_rolling_task`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`step_instance_id` bigint(20) NOT NULL DEFAULT '0',
`batch` smallint(6) NOT NULL DEFAULT '0',
`execute_count` tinyint(4) NOT NULL DEFAULT '0',
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`total_time` bigint(11) DEFAULT NULL,
`status` tinyint(4) NOT NULL DEFAULT '1',
`row_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`step_instance_id` bigint(20) NOT NULL DEFAULT '0',
`batch` smallint(6) NOT NULL DEFAULT '0',
`execute_count` smallint(6) NOT NULL DEFAULT '0',
`start_time` bigint(20) DEFAULT NULL,
`end_time` bigint(20) DEFAULT NULL,
`total_time` bigint(11) DEFAULT NULL,
`status` tinyint(4) NOT NULL DEFAULT '1',
`row_create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`row_update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY (`step_instance_id`, `batch`, `execute_count`)
) ENGINE = InnoDB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,10 +909,11 @@ public Response<List<StepExecutionRecordVO>> listStepExecutionHistory(String use
AppResourceScope appResourceScope,
String scopeType,
String scopeId,
Long stepInstanceId) {
Long stepInstanceId,
Integer batch) {

List<StepExecutionRecordDTO> stepExecutionRecords = taskResultService.listStepExecutionHistory(username,
appResourceScope.getAppId(), stepInstanceId);
appResourceScope.getAppId(), stepInstanceId, batch);

return Response.buildSuccessResp(stepExecutionRecords.stream().map(stepExecutionRecord -> {
StepExecutionRecordVO vo = new StepExecutionRecordVO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ public Response<List<TaskOperationLogVO>> getTaskInstanceOperationLog(String use
vo.setStepInstanceId(detail.getStepInstanceId());
vo.setStepName(detail.getStepName());
vo.setRetry(detail.getExecuteCount());
vo.setBatch(detail.getBatch());
vo.setDetail(buildDetail(operationLog.getOperationEnum(), operationLog.getDetail()));
vos.add(vo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,19 @@ AgentTaskDTO getAgentTaskByHostId(Long stepInstanceId, Integer executeCount, Int
FileTaskModeEnum mode, long hostId);

/**
* 获取Agent任务实际执行成功的executeCount值(重试场景)
* 判断步骤实例的Agent Task 记录是否存在
*
* @param stepInstanceId 步骤实例ID
* @param batch 滚动执行批次;如果传入null或者0,忽略该参数
* @param mode 文件分发任务模式
* @param hostId 主机ID
* @return Agent任务实际执行成功的executeCount值
*/
int getActualSuccessExecuteCount(long stepInstanceId, Integer batch, FileTaskModeEnum mode, long hostId);
boolean isStepInstanceRecordExist(long stepInstanceId);

/**
* 判断步骤实例的Agent Task 记录是否存在
* 更新Agent任务实际执行的步骤重试次数
*
* @param stepInstanceId 步骤实例ID
* @param stepInstanceId 步骤实例ID
* @param batch 滚动执行批次;传入null将忽略该条件
* @param actualExecuteCount Agent任务实际执行的步骤重试次数
*/
boolean isStepInstanceRecordExist(long stepInstanceId);
void updateActualExecuteCount(long stepInstanceId, Integer batch, int actualExecuteCount);

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,21 +138,22 @@ List<AgentTaskDTO> listAgentTasks(Long stepInstanceId,
AgentTaskDTO getAgentTaskByHostId(Long stepInstanceId, Integer executeCount, Integer batch, long hostId);

/**
* 获取Agent任务实际执行成功的executeCount值(重试场景)
* 判断步骤实例的Agent Task 记录是否存在
*
* @param stepInstanceId 步骤实例ID
* @param batch 滚动执行批次;如果传入null或者0,忽略该参数
* @param hostId 主机ID
* @return Agent任务实际执行成功的executeCount值
*/
int getActualSuccessExecuteCount(long stepInstanceId, Integer batch, long hostId);
boolean isStepInstanceRecordExist(long stepInstanceId);

/**
* 判断步骤实例的Agent Task 记录是否存在
* 更新Agent任务实际执行的步骤重试次数
*
* @param stepInstanceId 步骤实例ID
* @param stepInstanceId 步骤实例ID
* @param batch 滚动执行批次;传入null将忽略该条件
* @param actualExecuteCount Agent任务实际执行的步骤重试次数
*/
boolean isStepInstanceRecordExist(long stepInstanceId);
void updateActualExecuteCount(long stepInstanceId, Integer batch, int actualExecuteCount);




}
Loading

0 comments on commit 7d52a47

Please sign in to comment.