Skip to content

Commit

Permalink
feature: Job 支持容器执行 - 脚本任务 TencentBlueKing#2631
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Jan 1, 2024
1 parent 84a7d06 commit df8c95c
Show file tree
Hide file tree
Showing 37 changed files with 606 additions and 460 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
import java.util.List;
import java.util.Map;

/**
* @since 6/11/2019 10:26
*/

@Data
public class JobContext {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,21 @@ void queryRollingConfig() {
assertThat(stepRollingConfigs.get(101L).isBatch()).isEqualTo(false);
assertThat(stepRollingConfigs.get(102L).isBatch()).isEqualTo(true);
assertThat(stepRollingConfigs.get(103L).isBatch()).isEqualTo(true);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList()).hasSize(3);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(0).getBatch()).isEqualTo(1);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(0).getMergedExecuteObjects()).hasSize(1);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(0).getMergedExecuteObjects().get(0).getHost().getBkCloudId()).isEqualTo(0L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(0).getMergedExecuteObjects().get(0).getHost().getIp()).isEqualTo("127.0.0.1");
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(1).getBatch()).isEqualTo(2);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(1).getMergedExecuteObjects()).hasSize(1);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(1).getMergedExecuteObjects().get(0).getHost().getBkCloudId()).isEqualTo(0L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(1).getMergedExecuteObjects().get(0).getHost().getIp()).isEqualTo("127.0.0.2");
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(2).getBatch()).isEqualTo(3);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(2).getMergedExecuteObjects()).hasSize(2);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(2).getMergedExecuteObjects().get(0).getHost().getBkCloudId()).isEqualTo(0L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(2).getMergedExecuteObjects().get(0).getHost().getIp()).isEqualTo("127.0.0.3");
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(2).getMergedExecuteObjects().get(1).getHost().getBkCloudId()).isEqualTo(0L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(2).getMergedExecuteObjects().get(1).getHost().getIp()).isEqualTo("127.0.0.4");
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly()).hasSize(3);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(0).getBatch()).isEqualTo(1);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(0).getExecuteObjectsCompatibly()).hasSize(1);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(0).getExecuteObjectsCompatibly().get(0).getHost().getBkCloudId()).isEqualTo(0L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(0).getExecuteObjectsCompatibly().get(0).getHost().getIp()).isEqualTo("127.0.0.1");
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(1).getBatch()).isEqualTo(2);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(1).getExecuteObjectsCompatibly()).hasSize(1);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(1).getExecuteObjectsCompatibly().get(0).getHost().getBkCloudId()).isEqualTo(0L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(1).getExecuteObjectsCompatibly().get(0).getHost().getIp()).isEqualTo("127.0.0.2");
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(2).getBatch()).isEqualTo(3);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(2).getExecuteObjectsCompatibly()).hasSize(2);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(2).getExecuteObjectsCompatibly().get(0).getHost().getBkCloudId()).isEqualTo(0L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(2).getExecuteObjectsCompatibly().get(0).getHost().getIp()).isEqualTo("127.0.0.3");
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(2).getExecuteObjectsCompatibly().get(1).getHost().getBkCloudId()).isEqualTo(0L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(2).getExecuteObjectsCompatibly().get(1).getHost().getIp()).isEqualTo("127.0.0.4");
}

@Test
Expand All @@ -122,7 +122,7 @@ void saveRollingConfig() {
rollingConfig.setStepRollingConfigs(stepRollingConfigs);
List<RollingExecuteObjectsBatchDO> executeObjectsBatchList = new ArrayList<>();
List<ExecuteObject> executeObjects = new ArrayList<>();
executeObjects.add(new ExecuteObject(new HostDTO(1L, 0L, "127.0.0.1")));
executeObjects.add(ExecuteObject.buildCompatibleExecuteObject((new HostDTO(1L, 0L, "127.0.0.1"))));
RollingExecuteObjectsBatchDO hostBatch1 = new RollingExecuteObjectsBatchDO(1, executeObjects);
executeObjectsBatchList.add(hostBatch1);
rollingConfig.setExecuteObjectsBatchList(executeObjectsBatchList);
Expand All @@ -148,11 +148,11 @@ void saveRollingConfig() {
assertThat(savedStepRollingConfigs.get(1001L).isBatch()).isEqualTo(false);
assertThat(savedStepRollingConfigs.get(1002L).isBatch()).isEqualTo(true);
assertThat(savedStepRollingConfigs.get(1003L).isBatch()).isEqualTo(true);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList()).hasSize(1);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(0).getBatch()).isEqualTo(1);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(0).getMergedExecuteObjects().get(0).getHost().getHostId()).isEqualTo(1L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(0).getMergedExecuteObjects().get(0).getHost().getBkCloudId()).isEqualTo(0L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList().get(0).getMergedExecuteObjects().get(0).getHost().getIp()).isEqualTo("127.0.0.1");
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly()).hasSize(1);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(0).getBatch()).isEqualTo(1);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(0).getExecuteObjectsCompatibly().get(0).getHost().getHostId()).isEqualTo(1L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(0).getExecuteObjectsCompatibly().get(0).getHost().getBkCloudId()).isEqualTo(0L);
assertThat(savedTaskInstanceRollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly().get(0).getExecuteObjectsCompatibly().get(0).getHost().getIp()).isEqualTo("127.0.0.1");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.tencent.bk.job.common.model.dto.HostDTO;
import com.tencent.bk.job.execute.api.esb.v2.EsbGetStepInstanceStatusResource;
import com.tencent.bk.job.execute.engine.consts.ExecuteObjectTaskStatusEnum;
import com.tencent.bk.job.execute.engine.model.ExecuteObject;
import com.tencent.bk.job.execute.model.ExecuteObjectTask;
import com.tencent.bk.job.execute.model.ResultGroupDTO;
import com.tencent.bk.job.execute.model.StepExecutionDetailDTO;
Expand Down Expand Up @@ -116,7 +117,13 @@ private EsbStepInstanceStatusDTO.StepInstance convertStepInstance(StepInstanceBa
stepInst.setId(stepInstance.getId());
stepInst.setEndTime(stepInstance.getEndTime());
stepInst.setStartTime(stepInstance.getStartTime());
stepInst.setIpList(convertToIpListStr(stepInstance.getTargetExecuteObjects().getIpList()));
stepInst.setIpList(
convertToIpListStr(
stepInstance.getTargetExecuteObjects()
.getExecuteObjectsCompatibly()
.stream()
.map(ExecuteObject::getHost)
.collect(Collectors.toList())));
stepInst.setName(stepInstance.getName());
stepInst.setOperator(stepInstance.getOperator());
stepInst.setExecuteCount(stepInstance.getExecuteCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private List<FileSourceDTO> convertConfigFileSource(String userName,
files.add(new FileDetailDTO(configFileLocalPath));
fileSourceDTO.setFiles(files);
// 设置配置文件所在机器IP信息
fileSourceDTO.setServers(agentService.getLocalServersDTO());
fileSourceDTO.setServers(agentService.getLocalHostExecuteObjectDTO());
fileSourceDTOS.add(fileSourceDTO);
});
return fileSourceDTOS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private List<FileSourceDTO> convertConfigFileSource(
files.add(new FileDetailDTO(configFileLocalPath));
fileSourceDTO.setFiles(files);
// 设置配置文件所在机器IP信息
fileSourceDTO.setServers(agentService.getLocalServersDTO());
fileSourceDTO.setServers(agentService.getLocalHostExecuteObjectDTO());
fileSourceDTOS.add(fileSourceDTO);
});
return fileSourceDTOS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private ExecuteVariableVO convertToVariableVO(TaskVariableDTO variable) {
vo.setRequired(variable.isRequired() ? 1 : 0);
if (variable.getType() == TaskVariableTypeEnum.HOST_LIST.getType()) {
ExecuteObjectsDTO servers = variable.getTargetServers();
if (servers != null && servers.getIpList() != null) {
if (servers != null && servers.getExecuteObjectsCompatibly() != null) {
TaskTargetVO taskTargetVO = servers.convertToTaskTargetVO();
vo.setTargetValue(taskTargetVO);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private void initFileSourceExecuteObjectTasks() {
}
}
List<ExecuteObjectTask> executeObjectTasks = new ArrayList<>();
boolean isSupportExecuteObject = stepInstance.isSupportExecuteObject();
boolean isSupportExecuteObject = stepInstance.isSupportExecuteObjectFeature();
for (ExecuteObject sourceExecuteObject : sourceExecuteObjects) {
ExecuteObjectTask executeObjectTask = new ExecuteObjectTask();
executeObjectTask.setStepInstanceId(stepInstanceId);
Expand Down Expand Up @@ -448,7 +448,7 @@ private ServiceExecuteObjectLogDTO initExecuteObjectLogIfAbsent(
executeObjectLogDTO.setStepInstanceId(stepInstanceId);
executeObjectLogDTO.setExecuteCount(executeCount);
}
if (stepInstance.isSupportExecuteObject()) {
if (stepInstance.isSupportExecuteObjectFeature()) {
executeObjectLogDTO.setExecuteObjectId(executeObject.getId());
} else {
executeObjectLogDTO.setHostId(executeObject.getHost().getHostId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,9 @@ private void appendImportVariablesDeclareScript(StringBuffer sb, List<TaskVariab
TaskVariableDTO hostVariableValue = hostVariableMap.get(importVariable);
String formattedHosts = "";
if (hostVariableValue.getTargetServers() != null
&& hostVariableValue.getTargetServers().getIpList() != null) {
formattedHosts = VariableResolveUtils.formatHosts(hostVariableValue.getTargetServers().getIpList());
&& hostVariableValue.getTargetServers().getExecuteObjectsCompatibly() != null) {
formattedHosts = VariableResolveUtils.formatHosts(
hostVariableValue.getTargetServers().getHostsCompatibly());
}
variableValues.put(importVariable, formattedHosts);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private void saveExecuteObjectTasksForStartStep(Long gseTaskId,
// 普通步骤,启动的时候需要初始化所有ExecuteObjectTask
List<ExecuteObjectTask> executeObjectTasks = new ArrayList<>(
buildInitialExecuteObjectTasks(stepInstanceId, executeCount, executeCount, batch,
gseTaskId, stepInstance.getTargetExecuteObjects().getMergedExecuteObjects()));
gseTaskId, stepInstance.getTargetExecuteObjects().getExecuteObjectsCompatibly()));
saveExecuteObjectTasks(stepInstance, executeObjectTasks);
}
}
Expand All @@ -289,7 +289,7 @@ private void saveGseExecuteObjectTasksForStartRollingStep(Long gseTaskId,
List<ExecuteObjectTask> executeObjectTasks = new ArrayList<>();
if (rollingConfig.isBatchRollingStep(stepInstanceId)) {
List<RollingExecuteObjectsBatchDO> executeObjectsBatchList =
rollingConfig.getConfigDetail().getMergedExecuteObjectsBatchList();
rollingConfig.getConfigDetail().getExecuteObjectsBatchListCompatibly();
executeObjectsBatchList.forEach(executeObjectsBatch -> {
executeObjectTasks.addAll(
buildInitialExecuteObjectTasks(
Expand All @@ -298,7 +298,7 @@ private void saveGseExecuteObjectTasksForStartRollingStep(Long gseTaskId,
executeObjectsBatch.getBatch() == 1 ? executeCount : null,
executeObjectsBatch.getBatch(),
executeObjectsBatch.getBatch() == 1 ? gseTaskId : 0,
executeObjectsBatch.getMergedExecuteObjects()
executeObjectsBatch.getExecuteObjectsCompatibly()
)
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.tencent.bk.job.common.annotation.CompatibleImplementation;
import com.tencent.bk.job.common.annotation.PersistenceObject;
import com.tencent.bk.job.common.constant.CompatibleType;
import com.tencent.bk.job.common.constant.ExecuteObjectTypeEnum;
import com.tencent.bk.job.common.gse.v2.model.Agent;
import com.tencent.bk.job.common.gse.v2.model.ExecuteObjectGseKey;
Expand Down Expand Up @@ -88,16 +90,26 @@ public class ExecuteObject implements Cloneable {
@JsonIgnore
private ExecuteObjectGseKey executeObjectGseKey;

public ExecuteObject(Container container) {
this.type = ExecuteObjectTypeEnum.CONTAINER;
this.container = container;
this.resourceId = container.getId();
}

public ExecuteObject(HostDTO host) {
this.type = ExecuteObjectTypeEnum.HOST;
this.host = host;
this.resourceId = host.getHostId();
// public ExecuteObject(Container container) {
// this.type = ExecuteObjectTypeEnum.CONTAINER;
// this.container = container;
// this.resourceId = container.getId();
// }
//
// public ExecuteObject(HostDTO host) {
// this.type = ExecuteObjectTypeEnum.HOST;
// this.host = host;
// this.resourceId = host.getHostId();
// }

@CompatibleImplementation(name = "execute_object", deprecatedVersion = "3.9.x", type = CompatibleType.HISTORY_DATA,
explain = "数据失效后该构造方法可以删除")
public static ExecuteObject buildCompatibleExecuteObject(HostDTO host) {
ExecuteObject executeObject = new ExecuteObject();
executeObject.setType(ExecuteObjectTypeEnum.HOST);
executeObject.setResourceId(host.getHostId());
executeObject.setHost(host);
return executeObject;
}

@JsonCreator(mode = JsonCreator.Mode.DELEGATING)
Expand Down Expand Up @@ -137,7 +149,8 @@ public ExecuteObjectGseKey toExecuteObjectGseKey() {
if (isHostExecuteObject()) {
executeObjectGseKey = ExecuteObjectGseKey.ofHost(host.getAgentId());
} else {
executeObjectGseKey = ExecuteObjectGseKey.ofContainer(container.getNodeAgentId(), container.getContainerId());
executeObjectGseKey = ExecuteObjectGseKey.ofContainer(container.getNodeAgentId(),
container.getContainerId());
}
return executeObjectGseKey;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private void fillLocalFileSourceHost(List<FileSourceDTO> fileSourceList, StepIns
boolean isGseV2Task = stepInstance.isTargetGseV2Agent();
fileSourceList.forEach(fileSourceDTO -> {
if (fileSourceDTO.getFileType() == TaskFileTypeEnum.LOCAL.getType() || fileSourceDTO.isLocalUpload()) {
ExecuteObjectsDTO localHost = agentService.getLocalServersDTO();
ExecuteObjectsDTO localHost = agentService.getLocalHostExecuteObjectDTO();
if (!isGseV2Task) {
// 如果目标Agent是GSE V1, 那么源Agent也必须要GSE1.0 Agent,设置agentId={云区域:ip}
localHost.getIpList().forEach(host -> host.setAgentId(host.toCloudIp()));
Expand Down
Loading

0 comments on commit df8c95c

Please sign in to comment.