Skip to content

Commit

Permalink
feature: Job 支持容器执行 - 脚本任务 TencentBlueKing#2631
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Dec 30, 2023
1 parent 085eb7b commit 3d7690d
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,6 @@ private void initFileSourceExecuteObjectTasks() {
}

executeObjectTask.setExecuteObject(sourceExecuteObject);
if (isSupportExecuteObject) {
executeObjectTask.setExecuteObjectId(sourceExecuteObject.getId());
} else {
executeObjectTask.setHostId(sourceExecuteObject.getHost().getHostId());
executeObjectTask.setAgentId(sourceExecuteObject.getHost().getAgentId());
}

executeObjectTasks.add(executeObjectTask);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private void startStep(StepEvent stepEvent, StepInstanceDTO stepInstance) {
}

Long gseTaskId = saveInitialGseTask(stepInstance);
saveGseAgentTasksForStartStep(gseTaskId, stepInstance, rollingConfig);
saveExecuteObjectTasksForStartStep(gseTaskId, stepInstance, rollingConfig);

taskInstanceService.updateStepExecutionInfo(stepInstanceId, RunStatusEnum.RUNNING,
stepInstance.getStartTime() == null ? DateUtils.currentTimeMillis() : null, null, null);
Expand Down Expand Up @@ -253,35 +253,35 @@ private Long saveInitialGseTask(StepInstanceDTO stepInstance) {
* @param stepInstance 步骤实例
* @param rollingConfig 滚动配置
*/
private void saveGseAgentTasksForStartStep(Long gseTaskId,
StepInstanceDTO stepInstance,
RollingConfigDTO rollingConfig) {
private void saveExecuteObjectTasksForStartStep(Long gseTaskId,
StepInstanceDTO stepInstance,
RollingConfigDTO rollingConfig) {
long stepInstanceId = stepInstance.getId();
int executeCount = stepInstance.getExecuteCount();
int batch = stepInstance.getBatch();

if (stepInstance.isRollingStep()) {
// 滚动步骤
saveGseAgentTasksForStartRollingStep(gseTaskId, stepInstance, rollingConfig);
saveGseExecuteObjectTasksForStartRollingStep(gseTaskId, stepInstance, rollingConfig);
} else {
// 普通步骤,启动的时候需要初始化所有AgentTask
List<ExecuteObjectTask> agentTasks = new ArrayList<>(
// 普通步骤,启动的时候需要初始化所有ExecuteObjectTask
List<ExecuteObjectTask> executeObjectTasks = new ArrayList<>(
buildInitialExecuteObjectTasks(stepInstanceId, executeCount, executeCount, batch,
gseTaskId, stepInstance.getTargetExecuteObjects().getDecorateExecuteObjects()));
saveAgentTasks(stepInstance, agentTasks);
saveExecuteObjectTasks(stepInstance, executeObjectTasks);
}
}

/**
* 启动滚动执行步骤的时候保存 GSE Agent 任务
* 启动滚动执行步骤的时候保存执行对象任务
*
* @param gseTaskId GSE任务ID
* @param stepInstance 步骤实例
* @param rollingConfig 滚动配置
*/
private void saveGseAgentTasksForStartRollingStep(Long gseTaskId,
StepInstanceDTO stepInstance,
RollingConfigDTO rollingConfig) {
private void saveGseExecuteObjectTasksForStartRollingStep(Long gseTaskId,
StepInstanceDTO stepInstance,
RollingConfigDTO rollingConfig) {
long stepInstanceId = stepInstance.getId();
int executeCount = stepInstance.getExecuteCount();
int batch = stepInstance.getBatch();
Expand All @@ -303,15 +303,15 @@ private void saveGseAgentTasksForStartRollingStep(Long gseTaskId,
)
);
});
saveAgentTasks(stepInstance, executeObjectTasks);
saveExecuteObjectTasks(stepInstance, executeObjectTasks);
} else {
// 暂时不支持,滚动执行二期需求
log.warn("All rolling step is not supported!");
throw new NotImplementedException("All rolling step is not supported",
ErrorCode.NOT_SUPPORT_FEATURE);
}
} else {
// 滚动执行步骤除了第一批次,后续的批次仅更新 AgentTask 的 actualExecuteCount、gse_task_id
// 滚动执行步骤除了第一批次,后续的批次仅更新 ExecuteObjectTask 的 actualExecuteCount、gse_task_id
if (stepInstance.isScriptStep()) {
scriptExecuteObjectTaskService.updateTaskFields(stepInstance, executeCount, batch, executeCount,
gseTaskId);
Expand Down Expand Up @@ -505,7 +505,7 @@ private void retryStepFail(StepInstanceDTO stepInstance) {
}

Long gseTaskId = saveInitialGseTask(stepInstance);
saveAgentTasksForRetryFail(stepInstance, stepInstance.getExecuteCount(), stepInstance.getBatch(),
saveExecuteObjectTasksForRetryFail(stepInstance, stepInstance.getExecuteCount(), stepInstance.getBatch(),
gseTaskId);

startGseTask(stepInstance, gseTaskId);
Expand All @@ -520,61 +520,61 @@ private boolean isStepSupportRetry(RunStatusEnum stepStatus) {
|| RunStatusEnum.STOP_SUCCESS == stepStatus;
}

private void saveAgentTasksForRetryFail(StepInstanceBaseDTO stepInstance, int executeCount, Integer batch,
Long gseTaskId) {
List<ExecuteObjectTask> retryAgentTasks = listTargetAgentTasks(stepInstance, executeCount - 1);
private void saveExecuteObjectTasksForRetryFail(StepInstanceBaseDTO stepInstance, int executeCount, Integer batch,
Long gseTaskId) {
List<ExecuteObjectTask> retryExecuteObjectTasks = listTargetExecuteObjectTasks(stepInstance, executeCount - 1);

for (ExecuteObjectTask retryAgentTask : retryAgentTasks) {
retryAgentTask.setExecuteCount(executeCount);
if (batch != null && retryAgentTask.getBatch() != batch) {
for (ExecuteObjectTask retryExecuteObjectTask : retryExecuteObjectTasks) {
retryExecuteObjectTask.setExecuteCount(executeCount);
if (batch != null && retryExecuteObjectTask.getBatch() != batch) {
continue;
}
// 只有失败的目标主机才需要参与重试
if (!ExecuteObjectTaskStatusEnum.isSuccess(retryAgentTask.getStatus())) {
retryAgentTask.setActualExecuteCount(executeCount);
retryAgentTask.resetTaskInitialStatus();
retryAgentTask.setGseTaskId(gseTaskId);
if (!ExecuteObjectTaskStatusEnum.isSuccess(retryExecuteObjectTask.getStatus())) {
retryExecuteObjectTask.setActualExecuteCount(executeCount);
retryExecuteObjectTask.resetTaskInitialStatus();
retryExecuteObjectTask.setGseTaskId(gseTaskId);
}
}

saveAgentTasks(stepInstance, retryAgentTasks);
saveExecuteObjectTasks(stepInstance, retryExecuteObjectTasks);
}


private void saveAgentTasksForRetryAll(StepInstanceBaseDTO stepInstance, int executeCount, Integer batch,
Long gseTaskId) {
List<ExecuteObjectTask> retryAgentTasks = listTargetAgentTasks(stepInstance, executeCount - 1);
private void saveExecuteObjectTasksForRetryAll(StepInstanceBaseDTO stepInstance, int executeCount, Integer batch,
Long gseTaskId) {
List<ExecuteObjectTask> retryExecuteObjectTasks = listTargetExecuteObjectTasks(stepInstance, executeCount - 1);

for (ExecuteObjectTask retryAgentTask : retryAgentTasks) {
retryAgentTask.setExecuteCount(executeCount);
if (batch != null && retryAgentTask.getBatch() != batch) {
for (ExecuteObjectTask retryExecuteObjectTask : retryExecuteObjectTasks) {
retryExecuteObjectTask.setExecuteCount(executeCount);
if (batch != null && retryExecuteObjectTask.getBatch() != batch) {
continue;
}
retryAgentTask.setActualExecuteCount(executeCount);
retryAgentTask.resetTaskInitialStatus();
retryAgentTask.setGseTaskId(gseTaskId);
retryExecuteObjectTask.setActualExecuteCount(executeCount);
retryExecuteObjectTask.resetTaskInitialStatus();
retryExecuteObjectTask.setGseTaskId(gseTaskId);
}

saveAgentTasks(stepInstance, retryAgentTasks);
saveExecuteObjectTasks(stepInstance, retryExecuteObjectTasks);
}

private List<ExecuteObjectTask> listTargetAgentTasks(StepInstanceBaseDTO stepInstance, int executeCount) {
List<ExecuteObjectTask> agentTasks = Collections.emptyList();
private List<ExecuteObjectTask> listTargetExecuteObjectTasks(StepInstanceBaseDTO stepInstance, int executeCount) {
List<ExecuteObjectTask> executeObjectTasks = Collections.emptyList();
if (stepInstance.isScriptStep()) {
agentTasks = scriptExecuteObjectTaskService.listTasks(stepInstance, executeCount, null);
executeObjectTasks = scriptExecuteObjectTaskService.listTasks(stepInstance, executeCount, null);
} else if (stepInstance.isFileStep()) {
agentTasks = fileExecuteObjectTaskService.listTasks(stepInstance, executeCount, null,
executeObjectTasks = fileExecuteObjectTaskService.listTasks(stepInstance, executeCount, null,
FileTaskModeEnum.DOWNLOAD);
}
return agentTasks;
return executeObjectTasks;
}

private void saveAgentTasks(StepInstanceBaseDTO stepInstance, List<ExecuteObjectTask> agentTasks) {
if (CollectionUtils.isNotEmpty(agentTasks)) {
private void saveExecuteObjectTasks(StepInstanceBaseDTO stepInstance, List<ExecuteObjectTask> executeObjectTasks) {
if (CollectionUtils.isNotEmpty(executeObjectTasks)) {
if (stepInstance.isScriptStep()) {
scriptExecuteObjectTaskService.batchSaveTasks(agentTasks);
scriptExecuteObjectTaskService.batchSaveTasks(executeObjectTasks);
} else if (stepInstance.isFileStep()) {
fileExecuteObjectTaskService.batchSaveTasks(agentTasks);
fileExecuteObjectTaskService.batchSaveTasks(executeObjectTasks);
}
}
}
Expand Down Expand Up @@ -604,7 +604,7 @@ private void retryStepAll(StepInstanceDTO stepInstance) {
}

Long gseTaskId = saveInitialGseTask(stepInstance);
saveAgentTasksForRetryAll(stepInstance, stepInstance.getExecuteCount(), stepInstance.getBatch(),
saveExecuteObjectTasksForRetryAll(stepInstance, stepInstance.getExecuteCount(), stepInstance.getBatch(),
gseTaskId);

startGseTask(stepInstance, gseTaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,11 @@ public ExecuteObjectVO toExecuteObjectVO() {
}
return vo;
}

/**
* 判断是否支持执行对象特性
*/
public boolean isSupportExecuteObjectFeature() {
return id != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,12 @@
import com.tencent.bk.job.execute.engine.consts.ExecuteObjectTaskStatusEnum;
import com.tencent.bk.job.execute.engine.model.ExecuteObject;
import com.tencent.bk.job.logsvr.consts.FileTaskModeEnum;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
* GSE 执行对象任务
*/
@Getter
@Setter
@ToString
@NoArgsConstructor
public class ExecuteObjectTask {
Expand Down Expand Up @@ -277,4 +273,172 @@ public boolean isTarget() {
public boolean isSuccess() {
return ExecuteObjectTaskStatusEnum.isSuccess(status);
}

public long getTaskInstanceId() {
return taskInstanceId;
}

public void setTaskInstanceId(long taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}

public long getStepInstanceId() {
return stepInstanceId;
}

public void setStepInstanceId(long stepInstanceId) {
this.stepInstanceId = stepInstanceId;
}

public int getExecuteCount() {
return executeCount;
}

public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}

public Integer getActualExecuteCount() {
return actualExecuteCount;
}

public void setActualExecuteCount(Integer actualExecuteCount) {
this.actualExecuteCount = actualExecuteCount;
}

public int getBatch() {
return batch;
}

public void setBatch(int batch) {
this.batch = batch;
}

public Long getGseTaskId() {
return gseTaskId;
}

public void setGseTaskId(Long gseTaskId) {
this.gseTaskId = gseTaskId;
}

public String getExecuteObjectId() {
return executeObjectId;
}

public void setExecuteObjectId(String executeObjectId) {
this.executeObjectId = executeObjectId;
}

public ExecuteObjectTypeEnum getExecuteObjectType() {
return executeObjectType;
}

public void setExecuteObjectType(ExecuteObjectTypeEnum executeObjectType) {
this.executeObjectType = executeObjectType;
}

public ExecuteObject getExecuteObject() {
return executeObject;
}

@Deprecated
@CompatibleImplementation(name = "execute_object", deprecatedVersion = "3.9.x", type = CompatibleType.HISTORY_DATA,
explain = "兼容老数据,数据失效后可删除")
public Long getHostId() {
return hostId;
}

@Deprecated
@CompatibleImplementation(name = "execute_object", deprecatedVersion = "3.9.x", type = CompatibleType.HISTORY_DATA,
explain = "兼容老数据,数据失效后可删除")
public void setHostId(Long hostId) {
this.hostId = hostId;
}

@Deprecated
@CompatibleImplementation(name = "execute_object", deprecatedVersion = "3.9.x", type = CompatibleType.HISTORY_DATA,
explain = "兼容老数据,数据失效后可删除")
public String getAgentId() {
return agentId;
}

@Deprecated
@CompatibleImplementation(name = "execute_object", deprecatedVersion = "3.9.x", type = CompatibleType.HISTORY_DATA,
explain = "兼容老数据,数据失效后可删除")
public void setAgentId(String agentId) {
this.agentId = agentId;
}

public ExecuteObjectTaskStatusEnum getStatus() {
return status;
}

public Long getStartTime() {
return startTime;
}

public Long getEndTime() {
return endTime;
}

public Long getTotalTime() {
return totalTime;
}

public int getErrorCode() {
return errorCode;
}

public Integer getExitCode() {
return exitCode;
}

public String getTag() {
return tag;
}

public void setTag(String tag) {
this.tag = tag;
}

public int getScriptLogOffset() {
return scriptLogOffset;
}

public String getScriptLogContent() {
return scriptLogContent;
}

public void setScriptLogContent(String scriptLogContent) {
this.scriptLogContent = scriptLogContent;
}

public FileTaskModeEnum getFileTaskMode() {
return fileTaskMode;
}

public void setFileTaskMode(FileTaskModeEnum fileTaskMode) {
this.fileTaskMode = fileTaskMode;
}

public boolean isChanged() {
return changed;
}

public void setChanged(boolean changed) {
this.changed = changed;
}

public void setExecuteObject(ExecuteObject executeObject) {
if (executeObject != null) {
executeObjectType = executeObject.getType();
executeObjectId = executeObject.getId();
if (executeObject.isHostExecuteObject()) {
// 兼容老数据,发布完成后可删除
hostId = executeObject.getHost().getHostId();
agentId = executeObject.getHost().getAgentId();
}
}
}
}
Loading

0 comments on commit 3d7690d

Please sign in to comment.