Skip to content

Commit

Permalink
feature: 原子操作能力支持滚动执行 TencentBlueKing#446
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Jan 11, 2022
1 parent 5d9e1b2 commit c08ccf5
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@
import com.tencent.bk.job.execute.engine.consts.GseStepActionEnum;
import com.tencent.bk.job.execute.engine.consts.JobActionEnum;
import com.tencent.bk.job.execute.engine.consts.StepActionEnum;
import com.tencent.bk.job.execute.engine.message.*;
import com.tencent.bk.job.execute.engine.listener.event.JobEvent;
import com.tencent.bk.job.execute.engine.listener.event.StepEvent;
import com.tencent.bk.job.execute.engine.message.CallbackProcessor;
import com.tencent.bk.job.execute.engine.message.GseTaskProcessor;
import com.tencent.bk.job.execute.engine.message.NotifyMsgProcessor;
import com.tencent.bk.job.execute.engine.message.StepProcessor;
import com.tencent.bk.job.execute.engine.message.TaskProcessor;
import com.tencent.bk.job.execute.engine.message.TaskResultHandleResumeProcessor;
import com.tencent.bk.job.execute.engine.model.JobCallbackDTO;
import com.tencent.bk.job.execute.engine.model.StepControlMessage;
import com.tencent.bk.job.execute.engine.model.TaskControlMessage;
import com.tencent.bk.job.execute.model.TaskNotifyDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -91,7 +96,7 @@ public TaskExecuteControlMsgSenderImpl(@Qualifier(TaskProcessor.OUTPUT) MessageC
@Override
public void startTask(long taskInstanceId) {
log.info("Begin to send start task control message, taskInstanceId={}", taskInstanceId);
TaskControlMessage msg = new TaskControlMessage();
JobEvent msg = new JobEvent();
msg.setTaskInstanceId(taskInstanceId);
msg.setAction(JobActionEnum.START.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -102,7 +107,7 @@ public void startTask(long taskInstanceId) {
@Override
public void stopTask(long taskInstanceId) {
log.info("Begin to send stop task control message, taskInstanceId={}", taskInstanceId);
TaskControlMessage msg = new TaskControlMessage();
JobEvent msg = new JobEvent();
msg.setTaskInstanceId(taskInstanceId);
msg.setAction(JobActionEnum.STOP.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -113,7 +118,7 @@ public void stopTask(long taskInstanceId) {
@Override
public void restartTask(long taskInstanceId) {
log.info("Begin to send restart task control message, taskInstanceId={}", taskInstanceId);
TaskControlMessage msg = new TaskControlMessage();
JobEvent msg = new JobEvent();
msg.setTaskInstanceId(taskInstanceId);
msg.setAction(JobActionEnum.RESTART.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -124,7 +129,7 @@ public void restartTask(long taskInstanceId) {
@Override
public void refreshTask(long taskInstanceId) {
log.info("Begin to send refresh task control message, taskInstanceId={}", taskInstanceId);
TaskControlMessage msg = new TaskControlMessage();
JobEvent msg = new JobEvent();
msg.setTaskInstanceId(taskInstanceId);
msg.setAction(JobActionEnum.REFRESH.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -135,7 +140,7 @@ public void refreshTask(long taskInstanceId) {
@Override
public void ignoreStepError(long stepInstanceId) {
log.info("Begin to send ignore-error step control message, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.IGNORE_ERROR.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -146,7 +151,7 @@ public void ignoreStepError(long stepInstanceId) {
@Override
public void nextStep(long stepInstanceId) {
log.info("Begin to send next-step step control message, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.NEXT_STEP.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -157,7 +162,7 @@ public void nextStep(long stepInstanceId) {
@Override
public void confirmStepContinue(long stepInstanceId) {
log.info("Begin to send confirm-continue step control message, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.CONFIRM_CONTINUE.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -168,7 +173,7 @@ public void confirmStepContinue(long stepInstanceId) {
@Override
public void confirmStepTerminate(long stepInstanceId) {
log.info("Begin to send confirm-terminate step control message, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.CONFIRM_TERMINATE.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -179,7 +184,7 @@ public void confirmStepTerminate(long stepInstanceId) {
@Override
public void confirmStepRestart(long stepInstanceId) {
log.info("Begin to send confirm-restart step control message, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.CONFIRM_RESTART.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -189,7 +194,7 @@ public void confirmStepRestart(long stepInstanceId) {

@Override
public void startStep(long stepInstanceId) {
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.START.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -200,7 +205,7 @@ public void startStep(long stepInstanceId) {
@Override
public void skipStep(long stepInstanceId) {
log.info("Begin to send skip step control message successfully, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.SKIP.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -211,7 +216,7 @@ public void skipStep(long stepInstanceId) {
@Override
public void stopStep(long stepInstanceId) {
log.info("Begin to send stop step control message successfully, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.STOP.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -222,7 +227,7 @@ public void stopStep(long stepInstanceId) {
@Override
public void retryStepFail(long stepInstanceId) {
log.info("Begin to send retry-step-fail step control message successfully, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.RETRY_FAIL.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -233,7 +238,7 @@ public void retryStepFail(long stepInstanceId) {
@Override
public void retryStepAll(long stepInstanceId) {
log.info("Begin to send retry-step-all step control message successfully, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.RETRY_ALL.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -245,7 +250,7 @@ public void retryStepAll(long stepInstanceId) {
public void continueGseFileStep(long stepInstanceId) {
log.info("Begin to send continue-gse-file-step step control message successfully, stepInstanceId={}",
stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.CONTINUE_FILE_PUSH.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -256,7 +261,7 @@ public void continueGseFileStep(long stepInstanceId) {
@Override
public void clearStep(long stepInstanceId) {
log.info("Begin to send clear-step step control message successfully, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(StepActionEnum.CLEAR.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -267,7 +272,7 @@ public void clearStep(long stepInstanceId) {
@Override
public void startGseStep(long stepInstanceId) {
log.info("Begin to send start gse step control message successfully, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(GseStepActionEnum.START.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -281,7 +286,7 @@ public void resumeGseStep(long stepInstanceId, int executeCount, String requestI
log.info("Begin to send resume gse step control message successfully, stepInstanceId={}, executeCount={}, " +
"requestId={}",
stepInstanceId, executeCount, requestId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setExecuteCount(executeCount);
msg.setAction(StepActionEnum.RESUME.getValue());
Expand All @@ -295,7 +300,7 @@ public void resumeGseStep(long stepInstanceId, int executeCount, String requestI
@Override
public void retryGseStepFail(long stepInstanceId) {
log.info("Begin to send retry gse step fail control message successfully, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(GseStepActionEnum.RETRY_FAIL.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -307,7 +312,7 @@ public void retryGseStepFail(long stepInstanceId) {
@Override
public void retryGseStepAll(long stepInstanceId) {
log.info("Begin to send retry gse step all control message successfully, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(GseStepActionEnum.RETRY_ALL.getValue());
msg.setTime(LocalDateTime.now());
Expand All @@ -319,7 +324,7 @@ public void retryGseStepAll(long stepInstanceId) {
@Override
public void stopGseStep(long stepInstanceId) {
log.info("Begin to send stop gse step control message successfully, stepInstanceId={}", stepInstanceId);
StepControlMessage msg = new StepControlMessage();
StepEvent msg = new StepEvent();
msg.setStepInstanceId(stepInstanceId);
msg.setAction(GseStepActionEnum.STOP.getValue());
msg.setTime(LocalDateTime.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import com.tencent.bk.job.execute.engine.GseTaskManager;
import com.tencent.bk.job.execute.engine.consts.GseStepActionEnum;
import com.tencent.bk.job.execute.engine.exception.ExceptionStatusManager;
import com.tencent.bk.job.execute.engine.listener.event.StepEvent;
import com.tencent.bk.job.execute.engine.message.GseTaskProcessor;
import com.tencent.bk.job.execute.engine.model.StepControlMessage;
import com.tencent.bk.job.execute.monitor.metrics.GseTasksExceptionCounter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -54,27 +54,25 @@ public class GseStepListener {
private final GseTasksExceptionCounter gseTasksExceptionCounter;

@Autowired
public GseStepListener(
GseTaskManager gseTaskManager,
ExceptionStatusManager exceptionStatusManager,
GseTasksExceptionCounter gseTasksExceptionCounter
) {
public GseStepListener(GseTaskManager gseTaskManager,
ExceptionStatusManager exceptionStatusManager,
GseTasksExceptionCounter gseTasksExceptionCounter) {
this.gseTaskManager = gseTaskManager;
this.exceptionStatusManager = exceptionStatusManager;
this.gseTasksExceptionCounter = gseTasksExceptionCounter;
}

@StreamListener(GseTaskProcessor.INPUT)
public void handleMessage(@Payload StepControlMessage gseStepControlMessage) {
public void handleEvent(@Payload StepEvent gseStepEvent) {
log.info("Receive gse step control message, stepInstanceId={}, action={}, requestId={}, msgSendTime={}",
gseStepControlMessage.getStepInstanceId(),
gseStepControlMessage.getAction(), gseStepControlMessage.getRequestId(), gseStepControlMessage.getTime());
long stepInstanceId = gseStepControlMessage.getStepInstanceId();
String requestId = gseStepControlMessage.getRequestId();
gseStepEvent.getStepInstanceId(),
gseStepEvent.getAction(), gseStepEvent.getRequestId(), gseStepEvent.getTime());
long stepInstanceId = gseStepEvent.getStepInstanceId();
String requestId = gseStepEvent.getRequestId();
try {
int action = gseStepControlMessage.getAction();
int action = gseStepEvent.getAction();
if (GseStepActionEnum.START.getValue() == action) {
gseTaskManager.startStep(stepInstanceId, gseStepControlMessage.getRequestId());
gseTaskManager.startStep(stepInstanceId, gseStepEvent.getRequestId());
} else if (GseStepActionEnum.STOP.getValue() == action) {
gseTaskManager.stopStep(stepInstanceId, requestId);
} else if (GseStepActionEnum.RETRY_FAIL.getValue() == action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import com.tencent.bk.job.execute.common.util.TaskCostCalculator;
import com.tencent.bk.job.execute.engine.TaskExecuteControlMsgSender;
import com.tencent.bk.job.execute.engine.consts.JobActionEnum;
import com.tencent.bk.job.execute.engine.listener.event.JobEvent;
import com.tencent.bk.job.execute.engine.message.TaskProcessor;
import com.tencent.bk.job.execute.engine.model.JobCallbackDTO;
import com.tencent.bk.job.execute.engine.model.TaskControlMessage;
import com.tencent.bk.job.execute.model.StepInstanceBaseDTO;
import com.tencent.bk.job.execute.model.TaskInstanceDTO;
import com.tencent.bk.job.execute.model.TaskNotifyDTO;
Expand Down Expand Up @@ -77,31 +77,30 @@ public JobListener(TaskExecuteControlMsgSender taskExecuteControlMsgSender,
* 处理和作业相关的控制消息:启动作业、停止作业、重启作业、忽略错误和作业状态刷新
*/
@StreamListener(TaskProcessor.INPUT)
public void handleMessage(TaskControlMessage taskControlMessage) {
log.info("Receive task control message, taskInstanceId={}, action={}, msgSendTime={}",
taskControlMessage.getTaskInstanceId(),
taskControlMessage.getAction(), taskControlMessage.getTime());
long taskInstanceId = taskControlMessage.getTaskInstanceId();
int action = taskControlMessage.getAction();
public void handleEvent(JobEvent jobEvent) {
log.info("Handle job event, taskInstanceId={}, action={}, eventTime={}",
jobEvent.getTaskInstanceId(), jobEvent.getAction(), jobEvent.getTime());
long taskInstanceId = jobEvent.getTaskInstanceId();
int action = jobEvent.getAction();
try {
TaskInstanceDTO taskInstance = taskInstanceService.getTaskInstance(taskInstanceId);
if (JobActionEnum.START.getValue() == action) {
log.info("Start task, taskInstanceId={}", taskInstanceId);
log.info("Start job, taskInstanceId={}", taskInstanceId);
startJob(taskInstance);
} else if (JobActionEnum.STOP.getValue() == action) {
log.info("Stop task, taskInstanceId={}", taskInstanceId);
log.info("Stop job, taskInstanceId={}", taskInstanceId);
stopJob(taskInstance);
} else if (JobActionEnum.RESTART.getValue() == action) {
log.info("Restart task, taskInstanceId={}", taskInstanceId);
log.info("Restart job, taskInstanceId={}", taskInstanceId);
restartJob(taskInstance);
} else if (JobActionEnum.REFRESH.getValue() == action) {
log.info("Refresh task, taskInstanceId={}", taskInstanceId);
log.info("Refresh job, taskInstanceId={}", taskInstanceId);
refreshJob(taskInstance);
} else {
log.warn("Error task control action:{}", action);
}
} catch (Exception e) {
String errorMsg = "Handling task control message error,taskInstanceId=" + taskInstanceId;
String errorMsg = "Handle job event error, taskInstanceId=" + taskInstanceId;
log.error(errorMsg, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import com.tencent.bk.job.execute.engine.consts.FileDirTypeConf;
import com.tencent.bk.job.execute.engine.consts.IpStatus;
import com.tencent.bk.job.execute.engine.exception.ExceptionStatusManager;
import com.tencent.bk.job.execute.engine.listener.event.StepEvent;
import com.tencent.bk.job.execute.engine.message.TaskResultHandleResumeProcessor;
import com.tencent.bk.job.execute.engine.model.FileDest;
import com.tencent.bk.job.execute.engine.model.JobFile;
import com.tencent.bk.job.execute.engine.model.StepControlMessage;
import com.tencent.bk.job.execute.engine.model.TaskVariableDTO;
import com.tencent.bk.job.execute.engine.model.TaskVariablesAnalyzeResult;
import com.tencent.bk.job.execute.engine.result.FileResultHandleTask;
Expand Down Expand Up @@ -130,14 +130,14 @@ public ResultHandleResumeListener(
* 恢复被中断的作业结果处理任务
*/
@StreamListener(TaskResultHandleResumeProcessor.INPUT)
public void handleMessage(StepControlMessage stepControlMessage) {
public void handleEvent(StepEvent stepEvent) {
log.info("Receive result handle task resume control message, action: {}, stepInstanceId: {}, executeCount: {}, requestId: {}, msgSendTime={}",
stepControlMessage.getAction(), stepControlMessage.getStepInstanceId(),
stepControlMessage.getExecuteCount(),
stepControlMessage.getRequestId(), stepControlMessage.getTime());
long stepInstanceId = stepControlMessage.getStepInstanceId();
int executeCount = stepControlMessage.getExecuteCount();
String requestId = StringUtils.isNotEmpty(stepControlMessage.getRequestId()) ? stepControlMessage.getRequestId()
stepEvent.getAction(), stepEvent.getStepInstanceId(),
stepEvent.getExecuteCount(),
stepEvent.getRequestId(), stepEvent.getTime());
long stepInstanceId = stepEvent.getStepInstanceId();
int executeCount = stepEvent.getExecuteCount();
String requestId = StringUtils.isNotEmpty(stepEvent.getRequestId()) ? stepEvent.getRequestId()
: UUID.randomUUID().toString();
try {
StepInstanceDTO stepInstance = taskInstanceService.getStepInstanceDetail(stepInstanceId);
Expand Down
Loading

0 comments on commit c08ccf5

Please sign in to comment.