Skip to content

Commit

Permalink
feature: 原子操作能力支持滚动执行 TencentBlueKing#446
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Jan 13, 2022
1 parent 7866085 commit 8fa5a4c
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,63 @@
* 作业执行状态
*/
public enum RunStatusEnum {
BLANK(1, "等待执行"), RUNNING(2, "正在执行"), SUCCESS(3, "执行成功"),
FAIL(4, "执行失败"), SKIPPED(5, "跳过"), IGNORE_ERROR(6, "忽略错误"),
WAITING(7, "等待用户"), TERMINATED(8, "手动结束"), ABNORMAL_STATE(9, "状态异常"),
STOPPING(10, "强制终止中"), STOP_SUCCESS(11, "强制终止成功"), CONFIRM_TERMINATED(13, "确认终止");
/**
* 等待执行
*/
BLANK(1),
/**
* 正在执行
*/
RUNNING(2),
/**
* 执行成功
*/
SUCCESS(3),
/**
* 执行失败
*/
FAIL(4),
/**
* 跳过
*/
SKIPPED(5),
/**
* 忽略错误
*/
IGNORE_ERROR(6),
/**
* 等待用户
*/
WAITING(7),
/**
* 手动结束
*/
TERMINATED(8),
/**
* 状态异常
*/
ABNORMAL_STATE(9),
/**
* 强制终止中
*/
STOPPING(10),
/**
* 强制终止成功
*/
STOP_SUCCESS(11),
/**
* 确认终止
*/
CONFIRM_TERMINATED(13),
/**
* 滚动等待
*/
ROLLING_WAITING(14);

private final Integer value;
private final String name;

RunStatusEnum(Integer val, String name) {
RunStatusEnum(Integer val) {
this.value = val;
this.name = name;
}

public static RunStatusEnum valueOf(int status) {
Expand All @@ -55,8 +101,6 @@ public static RunStatusEnum valueOf(int status) {

/**
* 获取终止态的状态列表
*
* @return
*/
public static List<Integer> getFinishedStatusValueList() {
List<Integer> finishedStatusValueList = new ArrayList<>();
Expand All @@ -75,10 +119,10 @@ public Integer getValue() {
return value;
}

public String getName() {
return name;
}

/**
* 获取国际化Key
*
**/
public String getI18nKey() {
return "task.run.status." + this.name().toLowerCase();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,4 @@ public interface GseTaskLogDAO {

GseTaskLogDTO getGseTaskLog(long stepInstanceId, int executeCount);

void deleteGseTaskLog(long stepInstanceId, int executeCount);

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,29 @@
import com.tencent.bk.job.execute.model.GseTaskLogDTO;
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.TableField;
import org.jooq.generated.tables.GseTaskLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

@Repository
public class GseTaskLogDAOImpl implements GseTaskLogDAO {
private DSLContext create;
private final DSLContext dslContext;

private static final GseTaskLog TABLE = GseTaskLog.GSE_TASK_LOG;
private static final TableField<?, ?>[] ALL_FIELDS = {TABLE.ID, TABLE.STEP_INSTANCE_ID, TABLE.EXECUTE_COUNT,
TABLE.BATCH, TABLE.START_TIME, TABLE.END_TIME, TABLE.TOTAL_TIME, TABLE.STATUS, TABLE.GSE_TASK_ID};

@Autowired
public GseTaskLogDAOImpl(@Qualifier("job-execute-dsl-context") DSLContext create) {
this.create = create;
public GseTaskLogDAOImpl(@Qualifier("job-execute-dsl-context") DSLContext dslContext) {
this.dslContext = dslContext;
}

@Override
public GseTaskLogDTO getStepLastExecuteLog(long stepInstanceId) {
GseTaskLog t = GseTaskLog.GSE_TASK_LOG;
Record record = create.select(t.STEP_INSTANCE_ID, t.EXECUTE_COUNT, t.START_TIME, t.END_TIME, t.TOTAL_TIME,
t.STATUS, t.GSE_TASK_ID)
Record record = dslContext.select(ALL_FIELDS)
.from(t)
.where(t.STEP_INSTANCE_ID.eq(stepInstanceId))
.orderBy(t.EXECUTE_COUNT.desc())
Expand All @@ -61,56 +65,56 @@ private GseTaskLogDTO extractInfo(Record record) {
return null;
}
GseTaskLogDTO gseTaskLogDTO = new GseTaskLogDTO();
GseTaskLog t = GseTaskLog.GSE_TASK_LOG;

gseTaskLogDTO.setStepInstanceId(record.get(t.STEP_INSTANCE_ID));
gseTaskLogDTO.setExecuteCount(record.get(t.EXECUTE_COUNT));
gseTaskLogDTO.setStartTime(record.get(t.START_TIME));
gseTaskLogDTO.setEndTime(record.get(t.END_TIME));
gseTaskLogDTO.setTotalTime(record.get(t.TOTAL_TIME));
gseTaskLogDTO.setStatus(record.get(t.STATUS).intValue());
gseTaskLogDTO.setGseTaskId(record.get(t.GSE_TASK_ID));
gseTaskLogDTO.setStepInstanceId(record.get(TABLE.STEP_INSTANCE_ID));
gseTaskLogDTO.setExecuteCount(record.get(TABLE.EXECUTE_COUNT));
gseTaskLogDTO.setBatch(record.get(TABLE.BATCH));
gseTaskLogDTO.setStartTime(record.get(TABLE.START_TIME));
gseTaskLogDTO.setEndTime(record.get(TABLE.END_TIME));
gseTaskLogDTO.setTotalTime(record.get(TABLE.TOTAL_TIME));
gseTaskLogDTO.setStatus(record.get(TABLE.STATUS).intValue());
gseTaskLogDTO.setGseTaskId(record.get(TABLE.GSE_TASK_ID));
return gseTaskLogDTO;
}

@Override
public void saveGseTaskLog(GseTaskLogDTO gseTaskLog) {
GseTaskLog t = GseTaskLog.GSE_TASK_LOG;
create.insertInto(t, t.STEP_INSTANCE_ID, t.EXECUTE_COUNT, t.START_TIME, t.END_TIME, t.TOTAL_TIME, t.STATUS,
t.GSE_TASK_ID)
.values(gseTaskLog.getStepInstanceId(),
dslContext.insertInto(
TABLE,
TABLE.STEP_INSTANCE_ID,
TABLE.EXECUTE_COUNT,
TABLE.BATCH,
TABLE.START_TIME,
TABLE.END_TIME,
TABLE.TOTAL_TIME,
TABLE.STATUS,
TABLE.GSE_TASK_ID)
.values(
gseTaskLog.getStepInstanceId(),
gseTaskLog.getExecuteCount(),
(short) gseTaskLog.getBatch(),
gseTaskLog.getStartTime(),
gseTaskLog.getEndTime(),
gseTaskLog.getTotalTime(),
JooqDataTypeUtil.getByteFromInteger(gseTaskLog.getStatus()),
gseTaskLog.getGseTaskId())
.onDuplicateKeyUpdate()
.set(t.START_TIME, gseTaskLog.getStartTime())
.set(t.END_TIME, gseTaskLog.getEndTime())
.set(t.TOTAL_TIME, gseTaskLog.getTotalTime())
.set(t.STATUS, JooqDataTypeUtil.getByteFromInteger(gseTaskLog.getStatus())).set(t.GSE_TASK_ID,
gseTaskLog.getGseTaskId())
.set(TABLE.START_TIME, gseTaskLog.getStartTime())
.set(TABLE.END_TIME, gseTaskLog.getEndTime())
.set(TABLE.TOTAL_TIME, gseTaskLog.getTotalTime())
.set(TABLE.STATUS, JooqDataTypeUtil.getByteFromInteger(gseTaskLog.getStatus()))
.set(TABLE.GSE_TASK_ID, gseTaskLog.getGseTaskId())
.execute();
}

@Override
public GseTaskLogDTO getGseTaskLog(long stepInstanceId, int executeCount) {
GseTaskLog t = GseTaskLog.GSE_TASK_LOG;
Record record = create.select(t.STEP_INSTANCE_ID, t.EXECUTE_COUNT, t.START_TIME, t.END_TIME, t.TOTAL_TIME,
t.STATUS, t.GSE_TASK_ID).from(t)
.where(t.STEP_INSTANCE_ID.eq(stepInstanceId))
.and(t.EXECUTE_COUNT.eq(executeCount))
Record record = dslContext.select(ALL_FIELDS).from(TABLE)
.where(TABLE.STEP_INSTANCE_ID.eq(stepInstanceId))
.and(TABLE.EXECUTE_COUNT.eq(executeCount))
.and(TABLE.BATCH.eq((short) 0))
.fetchOne();
return extractInfo(record);
}

@Override
public void deleteGseTaskLog(long stepInstanceId, int executeCount) {
GseTaskLog t = GseTaskLog.GSE_TASK_LOG;
create.deleteFrom(t).where(t.STEP_INSTANCE_ID.eq(stepInstanceId))
.and(t.EXECUTE_COUNT.eq(executeCount))
.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package com.tencent.bk.job.execute.engine;

import brave.Tracing;
import com.tencent.bk.job.common.model.dto.IpDTO;
import com.tencent.bk.job.common.redis.util.LockUtils;
import com.tencent.bk.job.common.util.date.DateUtils;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
Expand Down Expand Up @@ -56,6 +57,7 @@
import com.tencent.bk.job.execute.service.AgentService;
import com.tencent.bk.job.execute.service.GseTaskLogService;
import com.tencent.bk.job.execute.service.LogService;
import com.tencent.bk.job.execute.service.RollingConfigService;
import com.tencent.bk.job.execute.service.StepInstanceVariableValueService;
import com.tencent.bk.job.execute.service.TaskInstanceService;
import com.tencent.bk.job.execute.service.TaskInstanceVariableService;
Expand Down Expand Up @@ -101,6 +103,7 @@ public class GseTaskManager implements SmartLifecycle {
private final ExecuteMonitor executeMonitor;
private final StorageSystemConfig storageSystemConfig;
private final JobExecuteConfig jobExecuteConfig;
private final RollingConfigService rollingConfigService;
private final Object lifecycleMonitor = new Object();
private final RunningTaskCounter<String> counter = new RunningTaskCounter<>("GseTask-Counter");
/**
Expand Down Expand Up @@ -156,7 +159,8 @@ public GseTaskManager(ResultHandleManager resultHandleManager,
GseTasksExceptionCounter gseTasksExceptionCounter,
Tracing tracing,
ExecuteMonitor executeMonitor,
JobExecuteConfig jobExecuteConfig) {
JobExecuteConfig jobExecuteConfig,
RollingConfigService rollingConfigService) {
this.resultHandleManager = resultHandleManager;
this.taskInstanceService = taskInstanceService;
this.gseTaskLogService = gseTaskLogService;
Expand All @@ -174,6 +178,7 @@ public GseTaskManager(ResultHandleManager resultHandleManager,
this.tracing = tracing;
this.executeMonitor = executeMonitor;
this.jobExecuteConfig = jobExecuteConfig;
this.rollingConfigService = rollingConfigService;
}

/**
Expand Down Expand Up @@ -226,9 +231,14 @@ public void startStep(long stepInstanceId, String requestId) {
watch.start("init-task-executor");
int executeCount = stepInstance.getExecuteCount();
Set<String> executeIps = new HashSet<>();
stepInstance.getTargetServers().getIpList().forEach(ipDTO -> {
executeIps.add(ipDTO.getCloudAreaId() + ":" + ipDTO.getIp());
});
if (stepInstance.isRollingStep()) {
List<IpDTO> rollingServers = rollingConfigService.getRollingServers(stepInstance.getRollingConfigId()
, stepInstanceId, stepInstance.getBatch());
rollingServers.forEach(ipDTO -> executeIps.add(ipDTO.getCloudAreaId() + ":" + ipDTO.getIp()));
} else {
stepInstance.getTargetServers().getIpList()
.forEach(ipDTO -> executeIps.add(ipDTO.getCloudAreaId() + ":" + ipDTO.getIp()));
}
watch.stop();

watch.start("init-gse-task-executor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.tencent.bk.job.execute.model.StepInstanceDTO;
import com.tencent.bk.job.execute.model.TaskInstanceDTO;
import com.tencent.bk.job.execute.model.TaskNotifyDTO;
import com.tencent.bk.job.execute.service.StepInstanceService;
import com.tencent.bk.job.execute.service.TaskInstanceService;
import com.tencent.bk.job.manage.common.consts.notify.ExecuteStatusEnum;
import com.tencent.bk.job.manage.common.consts.notify.ResourceTypeEnum;
Expand Down Expand Up @@ -71,14 +72,17 @@
@Slf4j
public class StepListener {
private final TaskInstanceService taskInstanceService;
private final StepInstanceService stepInstanceService;
private final TaskExecuteMQEventDispatcher taskControlMsgSender;
private final FilePrepareService filePrepareService;

@Autowired
public StepListener(TaskInstanceService taskInstanceService,
StepInstanceService stepInstanceService,
TaskExecuteMQEventDispatcher taskControlMsgSender,
FilePrepareService filePrepareService) {
this.taskInstanceService = taskInstanceService;
this.stepInstanceService = stepInstanceService;
this.taskControlMsgSender = taskControlMsgSender;
this.filePrepareService = filePrepareService;
}
Expand Down Expand Up @@ -234,17 +238,29 @@ private void startStep(StepInstanceBaseDTO stepInstance) {
long stepInstanceId = stepInstance.getId();
long taskInstanceId = stepInstance.getTaskInstanceId();

// 只有当步骤状态为'等待用户'和'未执行'时可以启动步骤
if (RunStatusEnum.BLANK.getValue() == stepStatus || RunStatusEnum.WAITING.getValue() == stepStatus) {
// 只有当步骤状态为“等待用户”、“未执行”、“滚动等待”时可以启动步骤
if (RunStatusEnum.BLANK.getValue() == stepStatus
|| RunStatusEnum.WAITING.getValue() == stepStatus
|| RunStatusEnum.ROLLING_WAITING.getValue() == stepStatus) {

taskInstanceService.updateStepExecutionInfo(stepInstanceId, RunStatusEnum.RUNNING,
DateUtils.currentTimeMillis(), null, null);
taskInstanceService.updateTaskStatus(taskInstanceId, RunStatusEnum.RUNNING.getValue());

// 如果是滚动步骤,需要更新滚动进度
if (stepInstance.isRollingStep()) {
int currentRollingBatch = stepInstance.getBatch() + 1;
stepInstance.setBatch(currentRollingBatch);
stepInstanceService.updateStepCurrentBatch(stepInstanceId, currentRollingBatch);
}

int stepType = stepInstance.getExecuteType();
if (EXECUTE_SCRIPT.getValue() == stepType || StepExecuteTypeEnum.EXECUTE_SQL.getValue() == stepType) {
taskControlMsgSender.startGseStep(stepInstanceId);
} else if (TaskStepTypeEnum.FILE.getValue() == stepType) {
filePrepareService.prepareFileForGseTask(stepInstanceId);
// 如果不是滚动步骤或者是第一批次滚动执行,那么需要为后续的分发阶段准备本地/第三方源文件
if (!stepInstance.isRollingStep() || stepInstance.isFirstRollingBatch()) {
filePrepareService.prepareFileForGseTask(stepInstanceId);
}
} else if (TaskStepTypeEnum.APPROVAL.getValue() == stepType) {
executeConfirmStep(stepInstance);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ public void confirmStepRestart(long stepInstanceId) {

@Override
public void startStep(long stepInstanceId) {
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.START.getValue());
msg.setTime(LocalDateTime.now());
stepOutput.send(MessageBuilder.withPayload(msg).build());
log.info("Send start step control message successfully, stepInstanceId={}", stepInstanceId);
StepEvent event = new StepEvent();
event.setStepInstanceId(stepInstanceId);
event.setAction(StepActionEnum.START.getValue());
event.setTime(LocalDateTime.now());
stepOutput.send(MessageBuilder.withPayload(event).build());
log.info("Send start step event successfully, event={}", event);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class GseTaskLogDTO {
/**
* 滚动执行批次
*/
private Integer batch;
private int batch;
/**
* 任务开始时间
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,11 @@ public int getTargetServerTotalCount() {
public boolean isRollingStep() {
return this.rollingConfigId != null && this.rollingConfigId > 0;
}

/**
* 是否滚动执行第一批次
*/
public boolean isFirstRollingBatch() {
return this.batch == 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public interface GseTaskLogService {

GseTaskLogDTO getGseTaskLog(long stepInstanceId, int executeCount);

void deleteGseTaskLog(long stepInstanceId, int executeCount);

void clearAllIpLog(long stepInstanceId, int executeCount);

void batchSaveIpLog(List<GseTaskIpLogDTO> ipLogList);
Expand Down
Loading

0 comments on commit 8fa5a4c

Please sign in to comment.