Skip to content

Commit

Permalink
feature: 原子操作能力支持滚动执行 TencentBlueKing#446
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Jan 17, 2022
1 parent 84a94b0 commit 3ce1583
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ public EsbResp<List<EsbStepInstanceResultAndLog>> getJobInstanceLogUsingPost(Esb
taskInstanceService.listStepInstanceByTaskInstanceId(taskInstanceId);
List<EsbStepInstanceResultAndLog> stepInstResultAndLogList = Lists.newArrayList();
for (StepInstanceBaseDTO stepInstance : stepInstanceList) {
// TODO Rolling
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstance.getId(),
stepInstance.getExecuteCount());
stepInstance.getExecuteCount(), 0);
if (null == gseTask) {
EsbStepInstanceResultAndLog stepInstResultAndLog = new EsbStepInstanceResultAndLog();
stepInstResultAndLog.setFinished(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public EsbResp<EsbStepInstanceStatusDTO> getJobStepInstanceStatus(EsbGetStepInst
}

EsbStepInstanceStatusDTO resultData = new EsbStepInstanceStatusDTO();
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstanceId, stepInstance.getExecuteCount());
// TODO Rolling
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstanceId, stepInstance.getExecuteCount(), 0);
if (null == gseTask) {
resultData.setIsFinished(false);
return EsbResp.buildSuccessResp(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,26 @@

import com.tencent.bk.job.execute.model.GseTaskDTO;

/**
* GseTaskDAO
*/
public interface GseTaskDAO {

/**
* 保存/更新 GSE 任务
*
* @param gseTask GSE 任务
*/
void saveGseTask(GseTaskDTO gseTask);

GseTaskDTO getGseTask(long stepInstanceId, int executeCount);
/**
* 获取 GSE 任务
*
* @param stepInstanceId 步骤实例ID
* @param executeCount 步骤执行次数
* @param batch 滚动执行批次
* @return GSE 任务
*/
GseTaskDTO getGseTask(long stepInstanceId, int executeCount, int batch);

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ public void saveGseTask(GseTaskDTO gseTask) {
}

@Override
public GseTaskDTO getGseTask(long stepInstanceId, int executeCount) {
public GseTaskDTO getGseTask(long stepInstanceId, int executeCount, int batch) {
Record record = dslContext.select(ALL_FIELDS).from(TABLE)
.where(TABLE.STEP_INSTANCE_ID.eq(stepInstanceId))
.and(TABLE.EXECUTE_COUNT.eq(executeCount))
.and(TABLE.BATCH.eq((short) 0))
.and(TABLE.BATCH.eq((short) batch))
.fetchOne();
return extractInfo(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public void stopStep(long stepInstanceId, String requestId) {

TaskInstanceDTO taskInstance = taskInstanceService.getTaskInstance(stepInstance.getTaskInstanceId());
int executeCount = stepInstance.getExecuteCount();
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstanceId, executeCount);
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstanceId, executeCount, stepInstance.getBatch());
if (null == gseTask) {
log.info("Get gseTask return null, stepInstanceId: {}, executeCount:{}", stepInstanceId, executeCount);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public void execute() {
watch.stop();

watch.start("get-gse-task-log-from-db");
gseTask = gseTaskService.getGseTask(stepInstanceId, executeCount);
gseTask = gseTaskService.getGseTask(stepInstanceId, executeCount, rollingBatch);
watch.stop();

boolean shouldSendTaskToGseServer = (gseTask == null || StringUtils.isEmpty(gseTask.getGseTaskId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public GseTaskExecuteResult stopGseTask() {
agentList.add(src);
}

GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstanceId, executeCount);
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstanceId, executeCount, rollingBatch);
if (gseTask == null || StringUtils.isEmpty(gseTask.getGseTaskId())) {
log.warn("Gse Task not send to gse server, not support stop");
return new GseTaskExecuteResult(GseTaskExecuteResult.RESULT_CODE_STOP_FAILED, "Termination failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ public GseTaskExecuteResult stopGseTask() {
List<api_agent> agentList = GseRequestUtils.buildAgentList(jobIpSet, accountInfo.getAccount(),
accountInfo.getPassword());
api_stop_task_request stopTaskRequest = new api_stop_task_request();
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstanceId, executeCount);
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstanceId, executeCount, rollingBatch);
if (gseTask == null || StringUtils.isEmpty(gseTask.getGseTaskId())) {
log.warn("Gse Task not send to gse server, not support stop");
return new GseTaskExecuteResult(GseTaskExecuteResult.RESULT_CODE_STOP_FAILED, "Termination failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void handleEvent(StepEvent stepEvent) {
try {
StepInstanceDTO stepInstance = taskInstanceService.getStepInstanceDetail(stepInstanceId);
TaskInstanceDTO taskInstance = taskInstanceService.getTaskInstance(stepInstance.getTaskInstanceId());
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstanceId, executeCount);
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstanceId, executeCount, stepInstance.getBatch());

if (!checkIsTaskResumeable(stepInstance, gseTask)) {
log.warn("Task can not resume, stepStatus: {}, gseTaskStatus: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ public interface GseTaskService {
*
* @param stepInstanceId 步骤实例ID
* @param executeCount 步骤执行次数
* @param batch 滚动执行批次
* @return GSE 任务
*/
GseTaskDTO getGseTask(long stepInstanceId, int executeCount);
GseTaskDTO getGseTask(long stepInstanceId, int executeCount, int batch);

/**
* 批量保存 GSE Agent 任务
Expand Down Expand Up @@ -116,7 +117,8 @@ void batchUpdateGseAgentTasks(long stepInstanceId,
List<GseAgentTaskDTO> listGseAgentTasksByResultType(Long stepInstanceId, Integer executeCount, Integer resultType,
String tag);

List<GseAgentTaskDTO> getGseAgentTaskContentByResultType(Long stepInstanceId, Integer executeCount, Integer resultType,
List<GseAgentTaskDTO> getGseAgentTaskContentByResultType(Long stepInstanceId, Integer executeCount,
Integer resultType,
String tag);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public void saveGseTask(GseTaskDTO gseTask) {
}

@Override
public GseTaskDTO getGseTask(long stepInstanceId, int executeCount) {
return gseTaskDao.getGseTask(stepInstanceId, executeCount);
public GseTaskDTO getGseTask(long stepInstanceId, int executeCount, int batch) {
return gseTaskDao.getGseTask(stepInstanceId, executeCount, batch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ public StepExecutionDetailDTO getStepExecutionResult(String username, Long appId
query.setExecuteCount(finalExecuteCount);
watch.stop();

GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstance.getId(), finalExecuteCount);
// TODO Rolling
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstance.getId(), finalExecuteCount, 0);
if (gseTask == null) {
return buildNotStartStepExecutionResult(appId, stepInstance, finalExecuteCount,
query.getMaxAgentTasksForResultGroup(), query.getSearchIp());
Expand Down Expand Up @@ -885,8 +886,9 @@ public List<IpDTO> getHostsByResultType(String username, Long appId, Long stepIn
}
}

// TODO Rolling
GseTaskDTO gseTask = gseTaskService.getGseTask(stepInstance.getId(),
stepInstance.getExecuteCount());
stepInstance.getExecuteCount(), 0);
if (gseTask == null) {
if (stepInstance.getTargetServers().getIpList() != null) {
return stepInstance.getTargetServers().getIpList();
Expand Down

0 comments on commit 3ce1583

Please sign in to comment.