diff --git a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/ExecutionObjectTypeEnum.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/CompatibleType.java similarity index 84% rename from src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/ExecutionObjectTypeEnum.java rename to src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/CompatibleType.java index 9502cefd3b..4008483600 100644 --- a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/ExecutionObjectTypeEnum.java +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/CompatibleType.java @@ -25,26 +25,22 @@ package com.tencent.bk.job.common.constant; /** - * 执行对象类型 + * 兼容类型 */ -public enum ExecutionObjectTypeEnum { +public enum CompatibleType { /** - * 主机 + * 历史数据兼容。历史数据失效之后可删除 */ - HOST(1), + HISTORY_DATA, /** - * 容器 + * API 兼容。API 废弃之后可删除 */ - CONTAINER(2); - - ExecutionObjectTypeEnum(int type) { - this.type = type; - } - - private final int type; + API, + /** + * 发布兼容。发布完成后可删除 + */ + DEPLOY; - public int getValue() { - return this.type; + CompatibleType() { } - } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/consts/ExecuteObjectTypeEnum.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/ExecuteObjectTypeEnum.java similarity index 78% rename from src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/consts/ExecuteObjectTypeEnum.java rename to src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/ExecuteObjectTypeEnum.java index f24daeebf7..b4ef99d4c7 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/consts/ExecuteObjectTypeEnum.java +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/constant/ExecuteObjectTypeEnum.java @@ -22,7 +22,9 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.execute.engine.consts; +package com.tencent.bk.job.common.constant; + +import com.fasterxml.jackson.annotation.JsonValue; /** * 执行对象类型 @@ -37,27 +39,23 @@ public enum ExecuteObjectTypeEnum { */ CONTAINER(2); + ExecuteObjectTypeEnum(int type) { + this.type = type; + } - private final int value; + private final int type; - ExecuteObjectTypeEnum(int value) { - this.value = value; + @JsonValue + public int getValue() { + return this.type; } - public static ExecuteObjectTypeEnum valueOf(Integer value) { - if (value == null) { - return null; - } - for (ExecuteObjectTypeEnum type : values()) { - if (type.value == value) { - return type; + public static ExecuteObjectTypeEnum valOf(int type) { + for (ExecuteObjectTypeEnum typeEnum : values()) { + if (typeEnum.getValue() == type) { + return typeEnum; } } - throw new IllegalArgumentException("No ExecuteObjectTypeEnum constant: " + value); + throw new IllegalArgumentException("No ExecuteObjectTypeEnum constant: " + type); } - - public final int getValue() { - return value; - } - } diff --git a/src/backend/commons/esb-sdk/src/main/java/com/tencent/bk/job/common/esb/model/EsbAppScopeReq.java b/src/backend/commons/esb-sdk/src/main/java/com/tencent/bk/job/common/esb/model/EsbAppScopeReq.java index 353c2b70b7..80f5b9d6aa 100644 --- a/src/backend/commons/esb-sdk/src/main/java/com/tencent/bk/job/common/esb/model/EsbAppScopeReq.java +++ b/src/backend/commons/esb-sdk/src/main/java/com/tencent/bk/job/common/esb/model/EsbAppScopeReq.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.tencent.bk.job.common.annotation.CompatibleImplementation; +import com.tencent.bk.job.common.constant.CompatibleType; import com.tencent.bk.job.common.constant.ResourceScopeTypeEnum; import com.tencent.bk.job.common.esb.validate.EsbAppScopeReqGroupSequenceProvider; import com.tencent.bk.job.common.model.dto.AppResourceScope; @@ -50,7 +51,8 @@ public class EsbAppScopeReq extends EsbJobReq { /** * 兼容字段,表示cmdb 业务/业务集ID */ - @CompatibleImplementation(explain = "兼容字段,表示业务ID或者业务集ID", deprecatedVersion = "3.6.x") + @CompatibleImplementation(name = "bizId", type = CompatibleType.API, + explain = "兼容字段,表示业务ID或者业务集ID", deprecatedVersion = "3.6.x") @JsonProperty("bk_biz_id") @Min(value = 1L, message = "{validation.constraints.InvalidBkBizId.message}", groups = UseBkBizIdParam.class) private Long bizId; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/AgentTaskService.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/AgentTaskService.java deleted file mode 100644 index ce9ae6b949..0000000000 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/AgentTaskService.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. - * - * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. - * - * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. - * - * License for BK-JOB蓝鲸智云作业平台: - * -------------------------------------------------------------------- - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation - * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and - * to permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of - * the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO - * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS - * IN THE SOFTWARE. - */ - -package com.tencent.bk.job.execute.service; - -import com.tencent.bk.job.common.annotation.CompatibleImplementation; -import com.tencent.bk.job.common.constant.Order; -import com.tencent.bk.job.execute.model.AgentTaskResultGroupBaseDTO; -import com.tencent.bk.job.execute.model.AgentTaskResultGroupDTO; -import com.tencent.bk.job.execute.model.ExecuteObjectTask; -import com.tencent.bk.job.execute.model.ExecuteObjectTaskDetail; -import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; - -import java.util.Collection; -import java.util.List; - -/** - * GSE Agent 任务 Service - */ -public interface AgentTaskService { - - /** - * 批量保存(insert/update) GSE Agent 任务 - * - * @param agentTasks GSE Agent 任务列表 - */ - void batchSaveAgentTasks(Collection agentTasks); - - /** - * 批量更新Agent任务 - * - * @param agentTasks Agent任务 - */ - void batchUpdateAgentTasks(Collection agentTasks); - - /** - * 获取执行成功的Agent任务数量 - * - * @param stepInstanceId 步骤实例ID - * @param executeCount 步骤执行次数 - * @return 执行成功的Agent数量 - */ - int getSuccessAgentTaskCount(long stepInstanceId, int executeCount); - - - /** - * 根据GSE任务ID获取Agent任务 - * - * @param gseTaskId GSE任务ID - * @return Agent任务 - */ - List listAgentTasksByGseTaskId(Long gseTaskId); - - /** - * 获取Agent任务 - * - * @param stepInstanceId 步骤实例ID - * @param executeCount 执行次数 - * @param batch 滚动执行批次;传入null或者0将忽略该参数 - * @return Agent任务 - */ - List listAgentTasks(Long stepInstanceId, - Integer executeCount, - Integer batch); - - /** - * 获取Agent任务详情并分组 - * - * @param stepInstance 步骤实例 - * @param executeCount 执行次数 - * @param batch 滚动执行批次;如果传入null或者0,将忽略该参数 - */ - List listAndGroupAgentTasks(StepInstanceBaseDTO stepInstance, - int executeCount, - Integer batch); - - /** - * 获取任务结果分组 - * - * @param stepInstanceId 步骤实例ID - * @param executeCount 执行次数 - * @param batch 滚动执行批次;传入null或者0将忽略该参数 - * @return Agent任务 - */ - List listResultGroups(long stepInstanceId, - int executeCount, - Integer batch); - - /** - * 根据执行结果查询Agent任务详情(排序、限制返回数量) - 包含主机详情 - * - * @param stepInstance 步骤实例 - * @param executeCount 执行次数 - * @param batch 滚动执行批次;如果传入null或者0,忽略该参数 - * @param status 任务状态 - * @param tag 用户自定义分组标签 - * @param limit 最大返回数量 - * @param orderField 排序字段 - * @param order 排序方式 - * @return Agent任务 - */ - List listAgentTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, - Integer executeCount, - Integer batch, - Integer status, - String tag, - Integer limit, - String orderField, - Order order); - - /** - * 根据结果分组获取Agent任务详情 - 包含主机详情 - * - * @param stepInstance 步骤实例 - * @param executeCount 执行次数 - * @param batch 滚动执行批次;如果传入null或者0,忽略该参数 - * @param status 任务状态 - * @param tag 用户自定义分组标签 - * @return Agent任务 - */ - List listAgentTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, - Integer executeCount, - Integer batch, - Integer status, - String tag); - - /** - * 获取Agent任务详情 - 包含主机详情 - * - * @param stepInstance 步骤实例 - * @param executeCount 执行次数 - * @param batch 滚动执行批次;传入null或者0将忽略该参数 - * @return Agent任务 - */ - List listAgentTaskDetail(StepInstanceBaseDTO stepInstance, - Integer executeCount, - Integer batch); - - /** - * 获取Agent任务实际执行成功的executeCount值(重试场景,兼容历史数据) - * - * @param stepInstanceId 步骤实例ID - * @param cloudIp 云区域+ip - * @return Agent任务实际执行成功的executeCount值 - */ - @CompatibleImplementation(name = "rolling_execution", explain = "兼容历史数据", deprecatedVersion = "3.7.x") - int getActualSuccessExecuteCount(long stepInstanceId, String cloudIp); - - /** - * 批量更新AgentTask的字段 - * - * @param stepInstanceId 条件 - 步骤实例ID - * @param executeCount 条件 - 重试次数 - * @param batch 条件 - 滚动执行批次;传入null将忽略该条件 - * @param actualExecuteCount 值 - Agent任务实际执行的步骤重试次数;如果传入null,则不更新 - * @param gseTaskId 值 - Agent任务对应的GSE_TASK_ID;如果传入null,则不更新 - */ - void updateAgentTaskFields(long stepInstanceId, - int executeCount, - Integer batch, - Integer actualExecuteCount, - Long gseTaskId); - - -} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/ExecuteObjectTaskService.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/ExecuteObjectTaskService.java new file mode 100644 index 0000000000..c35dede883 --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/ExecuteObjectTaskService.java @@ -0,0 +1,177 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.execute.service; + +import com.tencent.bk.job.common.constant.Order; +import com.tencent.bk.job.execute.model.ExecuteObjectTask; +import com.tencent.bk.job.execute.model.ExecuteObjectTaskDetail; +import com.tencent.bk.job.execute.model.ResultGroupBaseDTO; +import com.tencent.bk.job.execute.model.ResultGroupDTO; +import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; + +import java.util.Collection; +import java.util.List; + +/** + * 执行对象任务 Service + */ +public interface ExecuteObjectTaskService { + + /** + * 批量保存(insert/update) 任务 + * + * @param tasks 任务列表 + */ + void batchSaveTasks(Collection tasks); + + /** + * 批量更新执行对象任务 + * + * @param tasks 执行对象任务 + */ + void batchUpdateTasks(Collection tasks); + + /** + * 获取执行成功的执行对象任务数量 + * + * @param stepInstanceId 步骤实例ID + * @param executeCount 步骤执行次数 + * @return 执行成功的任务数量 + */ + int getSuccessTaskCount(long stepInstanceId, int executeCount); + + + /** + * 根据GSE任务ID获取执行对象任务 + * + * @param stepInstance 步骤实例 + * @param gseTaskId GSE任务ID + * @return 执行对象任务 + */ + List listTasksByGseTaskId(StepInstanceBaseDTO stepInstance, Long gseTaskId); + + /** + * 获取执行对象任务 + * + * @param stepInstance 步骤实例 + * @param executeCount 执行次数 + * @param batch 滚动执行批次;传入null或者0将忽略该参数 + * @return 执行对象任务 + */ + List listTasks(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch); + + /** + * 获取执行对象任务详情并分组 + * + * @param stepInstance 步骤实例 + * @param executeCount 执行次数 + * @param batch 滚动执行批次;如果传入null或者0,将忽略该参数 + */ + List listAndGroupTasks(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch); + + /** + * 获取任务结果分组 + * + * @param stepInstance 步骤实例 + * @param executeCount 执行次数 + * @param batch 滚动执行批次;传入null或者0将忽略该参数 + * @return 执行对象任务 + */ + List listResultGroups(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch); + + /** + * 根据执行结果查询执行对象任务详情(排序、限制返回数量) - 包含主机详情 + * + * @param stepInstance 步骤实例 + * @param executeCount 执行次数 + * @param batch 滚动执行批次;如果传入null或者0,忽略该参数 + * @param status 任务状态 + * @param tag 用户自定义分组标签 + * @param limit 最大返回数量 + * @param orderField 排序字段 + * @param order 排序方式 + * @return 执行对象任务 + */ + List listTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch, + Integer status, + String tag, + Integer limit, + String orderField, + Order order); + + /** + * 根据结果分组获取执行对象任务详情 - 包含主机详情 + * + * @param stepInstance 步骤实例 + * @param executeCount 执行次数 + * @param batch 滚动执行批次;如果传入null或者0,忽略该参数 + * @param status 任务状态 + * @param tag 用户自定义分组标签 + * @return 执行对象任务 + */ + List listTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch, + Integer status, + String tag); + + /** + * 获取执行对象任务详情 - 包含主机详情 + * + * @param stepInstance 步骤实例 + * @param executeCount 执行次数 + * @param batch 滚动执行批次;传入null或者0将忽略该参数 + * @return 执行对象任务 + */ + List listTaskDetail(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch); + + + /** + * 批量更新任务的字段 + * + * @param stepInstance 条件 - 步骤实例 + * @param executeCount 条件 - 重试次数 + * @param batch 条件 - 滚动执行批次;传入null将忽略该条件 + * @param actualExecuteCount 值 - 执行对象任务实际执行的步骤重试次数;如果传入null,则不更新 + * @param gseTaskId 值 - 执行对象任务对应的GSE_TASK_ID;如果传入null,则不更新 + */ + void updateTaskFields(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch, + Integer actualExecuteCount, + Long gseTaskId); + + +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/AbstractAgentTaskServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/AbstractAgentTaskServiceImpl.java deleted file mode 100644 index 67bb282a59..0000000000 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/AbstractAgentTaskServiceImpl.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.tencent.bk.job.execute.service.impl; - -import com.tencent.bk.job.common.cc.sdk.BkNetClient; -import com.tencent.bk.job.common.model.dto.HostDTO; -import com.tencent.bk.job.execute.model.AgentTaskResultGroupDTO; -import com.tencent.bk.job.execute.model.ExecuteObjectTask; -import com.tencent.bk.job.execute.model.ExecuteObjectTaskDetail; -import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; -import com.tencent.bk.job.execute.service.AgentTaskService; -import com.tencent.bk.job.execute.service.StepInstanceService; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * Agent 任务Service的公共实现 - */ -public abstract class AbstractAgentTaskServiceImpl implements AgentTaskService { - private final StepInstanceService stepInstanceService; - - public AbstractAgentTaskServiceImpl(StepInstanceService stepInstanceService) { - this.stepInstanceService = stepInstanceService; - } - - protected final List fillHostDetail(StepInstanceBaseDTO stepInstance, - List agentTasks) { - if (CollectionUtils.isEmpty(agentTasks)) { - return Collections.emptyList(); - } - - List agentTaskDetailList; - boolean hasIpInfo = agentTasks.stream().anyMatch(agentTask -> StringUtils.isNotEmpty(agentTask.getCloudIp())); - if (!hasIpInfo) { - // 从当前版本开始AgentTask不会包含主机详细信息(ip\云区域等),需要从StepInstance反查 - Map hosts = stepInstanceService.computeStepHosts(stepInstance, HostDTO::getHostId); - agentTaskDetailList = agentTasks - .stream() - .map(agentTask -> { - ExecuteObjectTaskDetail agentTaskDetail = new ExecuteObjectTaskDetail(agentTask); - HostDTO host = hosts.get(agentTask.getHostId()); - agentTaskDetail.setCloudIp(host.toCloudIp()); - agentTaskDetail.setBkCloudId(host.getBkCloudId()); - agentTaskDetail.setIp(host.getIp()); - agentTaskDetail.setIpv6(host.getIpv6()); - agentTaskDetail.setBkCloudName(BkNetClient.getCloudAreaNameFromCache(host.getBkCloudId())); - return agentTaskDetail; - }).collect(Collectors.toList()); - } else { - // 历史版本AgentTask会包含ipv4信息 - agentTaskDetailList = agentTasks - .stream() - .map(ExecuteObjectTaskDetail::new) - .collect(Collectors.toList()); - agentTaskDetailList.forEach( - agentTaskDetail -> agentTaskDetail.setBkCloudName( - BkNetClient.getCloudAreaNameFromCache(agentTaskDetail.getBkCloudId()))); - } - return agentTaskDetailList; - } - - protected final List groupAgentTasks(List agentTasks) { - List resultGroups = new ArrayList<>(); - agentTasks.stream() - .collect(Collectors.groupingBy( - agentTask -> new AgentTaskResultGroupDTO(agentTask.getStatus().getValue(), agentTask.getTag()))) - .forEach((resultGroup, groupedAgentTasks) -> { - resultGroup.setTotalAgentTasks(groupedAgentTasks.size()); - resultGroup.setAgentTasks(groupedAgentTasks); - resultGroups.add(resultGroup); - }); - return resultGroups; - } - - -} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/AbstractExecuteObjectTaskServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/AbstractExecuteObjectTaskServiceImpl.java new file mode 100644 index 0000000000..7c0524a67b --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/AbstractExecuteObjectTaskServiceImpl.java @@ -0,0 +1,97 @@ +package com.tencent.bk.job.execute.service.impl; + +import com.tencent.bk.job.common.model.dto.HostDTO; +import com.tencent.bk.job.execute.engine.model.ExecuteObject; +import com.tencent.bk.job.execute.model.ExecuteObjectTask; +import com.tencent.bk.job.execute.model.ExecuteObjectTaskDetail; +import com.tencent.bk.job.execute.model.ResultGroupDTO; +import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; +import com.tencent.bk.job.execute.service.ExecuteObjectTaskService; +import com.tencent.bk.job.execute.service.StepInstanceService; +import com.tencent.bk.job.execute.service.TaskInstanceExecuteObjectService; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * 执行对象任务公共实现 Service + */ +public abstract class AbstractExecuteObjectTaskServiceImpl implements ExecuteObjectTaskService { + protected final StepInstanceService stepInstanceService; + protected final TaskInstanceExecuteObjectService taskInstanceExecuteObjectService; + + public AbstractExecuteObjectTaskServiceImpl(StepInstanceService stepInstanceService, + TaskInstanceExecuteObjectService taskInstanceExecuteObjectService) { + this.stepInstanceService = stepInstanceService; + this.taskInstanceExecuteObjectService = taskInstanceExecuteObjectService; + } + + protected final List fillExecuteObjectDetail(StepInstanceBaseDTO stepInstance, + List tasks) { + if (CollectionUtils.isEmpty(tasks)) { + return Collections.emptyList(); + } + + List taskList; + boolean hasExecuteObjectId = tasks.stream().anyMatch(task -> task.getExecuteObjId() != null); + if (hasExecuteObjectId) { + taskList = tasks + .stream() + .map(ExecuteObjectTaskDetail::new) + .collect(Collectors.toList()); + + long taskInstanceId = stepInstance.getTaskInstanceId(); + Map executeObjectMap = new HashMap<>(); + List executeObjects; + if (tasks.size() > 1000) { + executeObjects = taskInstanceExecuteObjectService.listExecuteObjectsByJobInstanceId(taskInstanceId); + } else { + // in 查询控制个数在 1000 以内 + Set executeObjectIds = + tasks.stream().map(ExecuteObjectTask::getExecuteObjId).collect(Collectors.toSet()); + executeObjects = taskInstanceExecuteObjectService.listExecuteObjectsByIds(executeObjectIds); + } + if (CollectionUtils.isEmpty(executeObjects)) { + return taskList; + } + executeObjects.forEach(executeObject -> executeObjectMap.put(executeObject.getId(), executeObject)); + + taskList.forEach( + task -> task.setExecuteObject(executeObjectMap.get(task.getExecuteObjId()))); + } else { + // 兼容老版本不支持执行对象的数据 + Map hosts = stepInstanceService.computeStepHosts(stepInstance, HostDTO::getHostId); + taskList = tasks + .stream() + .map(task -> { + ExecuteObjectTaskDetail taskDetail = new ExecuteObjectTaskDetail(task); + HostDTO host = hosts.get(task.getHostId()); + ExecuteObject executeObject = new ExecuteObject(null, host); + taskDetail.setExecuteObject(executeObject); + return taskDetail; + }).collect(Collectors.toList()); + } + return taskList; + } + + protected final List groupTasks(List tasks) { + List resultGroups = new ArrayList<>(); + tasks.stream() + .collect(Collectors.groupingBy( + task -> new ResultGroupDTO(task.getStatus().getValue(), task.getTag()))) + .forEach((resultGroup, groupedExecuteObjectTasks) -> { + resultGroup.setTotal(groupedExecuteObjectTasks.size()); + resultGroup.setExecuteObjectTasks(groupedExecuteObjectTasks); + resultGroups.add(resultGroup); + }); + return resultGroups; + } + + +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/FileAgentTaskServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/FileAgentTaskServiceImpl.java deleted file mode 100644 index 4c5f84ea89..0000000000 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/FileAgentTaskServiceImpl.java +++ /dev/null @@ -1,243 +0,0 @@ -package com.tencent.bk.job.execute.service.impl; - -import com.tencent.bk.job.common.constant.Order; -import com.tencent.bk.job.common.model.dto.HostDTO; -import com.tencent.bk.job.execute.dao.FileAgentTaskDAO; -import com.tencent.bk.job.execute.dao.GseTaskIpLogDAO; -import com.tencent.bk.job.execute.model.AgentTaskResultGroupBaseDTO; -import com.tencent.bk.job.execute.model.AgentTaskResultGroupDTO; -import com.tencent.bk.job.execute.model.ExecuteObjectTask; -import com.tencent.bk.job.execute.model.ExecuteObjectTaskDetail; -import com.tencent.bk.job.execute.model.FileSourceDTO; -import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; -import com.tencent.bk.job.execute.model.StepInstanceDTO; -import com.tencent.bk.job.execute.service.FileAgentTaskService; -import com.tencent.bk.job.execute.service.StepInstanceService; -import com.tencent.bk.job.logsvr.consts.FileTaskModeEnum; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -@Service -@Slf4j -public class FileAgentTaskServiceImpl - extends AbstractAgentTaskServiceImpl - implements FileAgentTaskService { - - private final FileAgentTaskDAO fileAgentTaskDAO; - private final GseTaskIpLogDAO gseTaskIpLogDAO; - - @Autowired - public FileAgentTaskServiceImpl(FileAgentTaskDAO fileAgentTaskDAO, - StepInstanceService stepInstanceService, - GseTaskIpLogDAO gseTaskIpLogDAO) { - super(stepInstanceService); - this.fileAgentTaskDAO = fileAgentTaskDAO; - this.gseTaskIpLogDAO = gseTaskIpLogDAO; - } - - @Override - public void batchSaveAgentTasks(Collection agentTasks) { - if (CollectionUtils.isEmpty(agentTasks)) { - return; - } - fileAgentTaskDAO.batchSaveAgentTasks(agentTasks); - } - - @Override - public void batchUpdateAgentTasks(Collection agentTasks) { - if (CollectionUtils.isEmpty(agentTasks)) { - return; - } - fileAgentTaskDAO.batchUpdateAgentTasks(agentTasks); - } - - @Override - public int getSuccessAgentTaskCount(long stepInstanceId, int executeCount) { - if (isStepInstanceRecordExist(stepInstanceId)) { - return fileAgentTaskDAO.getSuccessAgentTaskCount(stepInstanceId, executeCount); - } else { - return gseTaskIpLogDAO.getSuccessAgentTaskCount(stepInstanceId, executeCount); - } - } - - @Override - public List listAndGroupAgentTasks(StepInstanceBaseDTO stepInstance, - int executeCount, - Integer batch) { - List resultGroups = new ArrayList<>(); - - List agentTasks = listAgentTasks(stepInstance.getId(), executeCount, batch, - FileTaskModeEnum.DOWNLOAD); - if (CollectionUtils.isEmpty(agentTasks)) { - return resultGroups; - } - - List agentTaskDetailList = fillHostDetail(stepInstance, agentTasks); - - resultGroups = groupAgentTasks(agentTaskDetailList); - - return resultGroups.stream().sorted().collect(Collectors.toList()); - } - - @Override - public List listResultGroups(long stepInstanceId, - int executeCount, - Integer batch) { - List resultGroups; - resultGroups = fileAgentTaskDAO.listResultGroups(stepInstanceId, executeCount, batch); - if (CollectionUtils.isEmpty(resultGroups)) { - // 兼容历史数据 - resultGroups = gseTaskIpLogDAO.listResultGroups(stepInstanceId, executeCount); - } - return resultGroups; - } - - @Override - public List listAgentTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, - Integer executeCount, - Integer batch, - Integer status, - String tag) { - List agentTasks = fileAgentTaskDAO.listAgentTaskByResultGroup(stepInstance.getId(), - executeCount, batch, status); - if (CollectionUtils.isEmpty(agentTasks)) { - // 兼容历史数据 - agentTasks = gseTaskIpLogDAO.listAgentTaskByResultGroup(stepInstance.getId(), executeCount, status, tag); - } - return fillHostDetail(stepInstance, agentTasks); - } - - @Override - public List listAgentTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, - Integer executeCount, - Integer batch, - Integer status, - String tag, - Integer limit, - String orderField, - Order order) { - List agentTasks = fileAgentTaskDAO.listAgentTaskByResultGroup(stepInstance.getId(), - executeCount, batch, status, limit, orderField, order); - if (CollectionUtils.isEmpty(agentTasks)) { - // 兼容历史数据 - agentTasks = gseTaskIpLogDAO.listAgentTaskByResultGroup(stepInstance.getId(), executeCount, status, tag, - limit, orderField, order); - } - return fillHostDetail(stepInstance, agentTasks); - } - - @Override - public List listAgentTasks(Long stepInstanceId, Integer executeCount, Integer batch, - FileTaskModeEnum fileTaskMode) { - List agentTasks = fileAgentTaskDAO.listAgentTasks(stepInstanceId, executeCount, batch, - fileTaskMode); - // 兼容历史数据 - if (CollectionUtils.isEmpty(agentTasks)) { - agentTasks = gseTaskIpLogDAO.listAgentTasks(stepInstanceId, executeCount); - if (CollectionUtils.isNotEmpty(agentTasks) && fileTaskMode != null) { - agentTasks = agentTasks.stream().filter(agentTask -> agentTask.getFileTaskMode() == fileTaskMode) - .collect(Collectors.toList()); - } - } - - return agentTasks; - } - - @Override - public List listAgentTasksByGseTaskId(Long gseTaskId) { - return fileAgentTaskDAO.listAgentTasksByGseTaskId(gseTaskId); - } - - @Override - public ExecuteObjectTask getAgentTaskByHost(StepInstanceDTO stepInstance, Integer executeCount, Integer batch, - FileTaskModeEnum fileTaskMode, HostDTO host) { - ExecuteObjectTask agentTask = null; - Long hostId = host.getHostId(); - if (hostId != null) { - // 根据hostId查询 - agentTask = fileAgentTaskDAO.getAgentTaskByHostId(stepInstance.getId(), executeCount, batch, - fileTaskMode, hostId); - } else if (StringUtils.isNotEmpty(host.toCloudIp())) { - // 根据ip查询的模式,有两种情况,数据可能在gse_file_agent_task/gse_task_ip_log表中,优先查询gse_file_agent_task - HostDTO queryHost = getStepHostByIp(stepInstance, host.toCloudIp()); - if (queryHost != null) { - agentTask = fileAgentTaskDAO.getAgentTaskByHostId(stepInstance.getId(), executeCount, batch, - fileTaskMode, queryHost.getHostId()); - } else { - // 根据ip查询gse_task_ip_log表中的数据 - agentTask = gseTaskIpLogDAO.getAgentTaskByIp(stepInstance.getId(), executeCount, host.toCloudIp()); - } - } - return agentTask; - } - - private HostDTO getStepHostByIp(StepInstanceDTO stepInstance, String cloudIp) { - HostDTO queryHost = stepInstance.getTargetServers().getIpList().stream() - .filter(targetHost -> cloudIp.equals(targetHost.toCloudIp())) - .findFirst() - .orElse(null); - if (queryHost == null) { - if (CollectionUtils.isNotEmpty(stepInstance.getFileSourceList())) { - for (FileSourceDTO fileSource : stepInstance.getFileSourceList()) { - if (fileSource.getServers() != null - && CollectionUtils.isNotEmpty(fileSource.getServers().getIpList())) { - queryHost = fileSource.getServers().getIpList().stream() - .filter(sourceHost -> cloudIp.equals(sourceHost.toCloudIp())) - .findFirst() - .orElse(null); - if (queryHost != null) { - break; - } - } - } - } - } - - return queryHost; - } - - @Override - public List listAgentTasks(Long stepInstanceId, Integer executeCount, Integer batch) { - List agentTasks = fileAgentTaskDAO.listAgentTasks(stepInstanceId, executeCount, batch, null); - if (CollectionUtils.isEmpty(agentTasks)) { - // 兼容历史数据 - agentTasks = gseTaskIpLogDAO.listAgentTasks(stepInstanceId, executeCount); - } - return agentTasks; - } - - @Override - public int getActualSuccessExecuteCount(long stepInstanceId, String cloudIp) { - // 兼容历史数据 - return gseTaskIpLogDAO.getActualSuccessExecuteCount(stepInstanceId, cloudIp); - } - - @Override - public List listAgentTaskDetail(StepInstanceBaseDTO stepInstance, - Integer executeCount, - Integer batch) { - List agentTasks = listAgentTasks(stepInstance.getId(), executeCount, batch); - return fillHostDetail(stepInstance, agentTasks); - } - - private boolean isStepInstanceRecordExist(long stepInstanceId) { - return fileAgentTaskDAO.isStepInstanceRecordExist(stepInstanceId); - } - - @Override - public void updateAgentTaskFields(long stepInstanceId, - int executeCount, - Integer batch, - Integer actualExecuteCount, - Long gseTaskId) { - fileAgentTaskDAO.updateAgentTaskFields(stepInstanceId, executeCount, batch, actualExecuteCount, gseTaskId); - } -} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/FileExecuteObjectTaskServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/FileExecuteObjectTaskServiceImpl.java new file mode 100644 index 0000000000..206136363e --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/FileExecuteObjectTaskServiceImpl.java @@ -0,0 +1,325 @@ +package com.tencent.bk.job.execute.service.impl; + +import com.tencent.bk.job.common.constant.Order; +import com.tencent.bk.job.common.model.dto.HostDTO; +import com.tencent.bk.job.execute.dao.FileAgentTaskDAO; +import com.tencent.bk.job.execute.dao.FileExecuteObjectTaskDAO; +import com.tencent.bk.job.execute.engine.model.ExecuteObject; +import com.tencent.bk.job.execute.model.ExecuteObjectCompositeKey; +import com.tencent.bk.job.execute.model.ExecuteObjectTask; +import com.tencent.bk.job.execute.model.ExecuteObjectTaskDetail; +import com.tencent.bk.job.execute.model.FileSourceDTO; +import com.tencent.bk.job.execute.model.ResultGroupBaseDTO; +import com.tencent.bk.job.execute.model.ResultGroupDTO; +import com.tencent.bk.job.execute.model.ServersDTO; +import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; +import com.tencent.bk.job.execute.model.StepInstanceDTO; +import com.tencent.bk.job.execute.service.FileExecuteObjectTaskService; +import com.tencent.bk.job.execute.service.StepInstanceService; +import com.tencent.bk.job.execute.service.TaskInstanceExecuteObjectService; +import com.tencent.bk.job.logsvr.consts.FileTaskModeEnum; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class FileExecuteObjectTaskServiceImpl + extends AbstractExecuteObjectTaskServiceImpl + implements FileExecuteObjectTaskService { + + private final FileExecuteObjectTaskDAO fileExecuteObjectTaskDAO; + private final FileAgentTaskDAO fileAgentTaskDAO; + + @Autowired + public FileExecuteObjectTaskServiceImpl(FileExecuteObjectTaskDAO fileExecuteObjectTaskDAO, + StepInstanceService stepInstanceService, + FileAgentTaskDAO fileAgentTaskDAO, + TaskInstanceExecuteObjectService taskInstanceExecuteObjectService) { + super(stepInstanceService, taskInstanceExecuteObjectService); + this.fileAgentTaskDAO = fileAgentTaskDAO; + this.fileExecuteObjectTaskDAO = fileExecuteObjectTaskDAO; + } + + @Override + public void batchSaveTasks(Collection tasks) { + if (CollectionUtils.isEmpty(tasks)) { + return; + } + ExecuteObjectTask anyTask = tasks.stream().findAny().orElse(null); + + if (Objects.requireNonNull(anyTask).getExecuteObjId() != null) { + fileExecuteObjectTaskDAO.batchSaveTasks(tasks); + } else { + fileAgentTaskDAO.batchSaveAgentTasks(tasks); + } + } + + @Override + public void batchUpdateTasks(Collection tasks) { + if (CollectionUtils.isEmpty(tasks)) { + return; + } + ExecuteObjectTask anyTask = tasks.stream().findAny().orElse(null); + if (Objects.requireNonNull(anyTask).getExecuteObjId() != null) { + fileExecuteObjectTaskDAO.batchUpdateTasks(tasks); + } else { + fileAgentTaskDAO.batchUpdateAgentTasks(tasks); + } + } + + @Override + public int getSuccessTaskCount(long stepInstanceId, int executeCount) { + if (isStepInstanceRecordExist(stepInstanceId)) { + return fileExecuteObjectTaskDAO.getSuccessTaskCount(stepInstanceId, executeCount); + } else { + return fileAgentTaskDAO.getSuccessAgentTaskCount(stepInstanceId, executeCount); + } + } + + @Override + public List listTasks(StepInstanceBaseDTO stepInstance, Integer executeCount, Integer batch) { + return listTasks(stepInstance, executeCount, batch, null); + } + + /** + * 判断是否使用执行对象的方式存储 + * + * @param stepInstance 步骤实例 + */ + private boolean isUsingExecuteObject(StepInstanceBaseDTO stepInstance) { + ServersDTO servers = stepInstance.getTargetServers(); + if (CollectionUtils.isNotEmpty(servers.getExecuteObjects())) { + ExecuteObject executeObject = servers.getExecuteObjects().stream().findAny().orElse(null); + return Objects.requireNonNull(executeObject).getId() != null; + } else { + return false; + } + } + + @Override + public List listTasksByGseTaskId(StepInstanceBaseDTO stepInstance, Long gseTaskId) { + List executeObjectTasks; + + if (isUsingExecuteObject(stepInstance)) { + executeObjectTasks = fileExecuteObjectTaskDAO.listTasksByGseTaskId(gseTaskId); + } else { + // 兼容老版本数据 + executeObjectTasks = fileAgentTaskDAO.listAgentTasksByGseTaskId(gseTaskId); + } + return executeObjectTasks; + } + + @Override + public ExecuteObjectTask getTaskByExecuteObjectCompositeKey(StepInstanceDTO stepInstance, + Integer executeCount, + Integer batch, + FileTaskModeEnum fileTaskMode, + ExecuteObjectCompositeKey executeObjectCompositeKey) { + ExecuteObjectTask executeObjectTask = null; + long stepInstanceId = stepInstance.getId(); + + if (isUsingExecuteObject(stepInstance)) { + String executeObjectResourceId = executeObjectCompositeKey.getResourceId(); + if (executeObjectResourceId != null) { + ExecuteObject executeObject = taskInstanceExecuteObjectService.getExecuteObject( + executeObjectCompositeKey.getExecuteObjectType(), executeObjectCompositeKey.getResourceId()); + if (executeObject == null) { + return null; + } + executeObjectTask = fileExecuteObjectTaskDAO.getTaskByExecuteObjectId(stepInstanceId, executeCount, + batch, fileTaskMode, executeObject.getId()); + } else { + // 兼容使用<云区域+ip> 的方式查询主机执行任务 + ExecuteObject executeObject = getStepHostExecuteObjectByCloudIp( + stepInstance, executeObjectCompositeKey.getCloudIp()); + executeObjectTask = fileExecuteObjectTaskDAO.getTaskByExecuteObjectId(stepInstance.getId(), + executeCount, batch, fileTaskMode, executeObject.getId()); + } + } else { + // 兼容老版本不使用执行对象的场景(仅支持主机) + Long hostId = executeObjectCompositeKey.getHostId(); + if (hostId != null) { + executeObjectTask = fileAgentTaskDAO.getAgentTaskByHostId(stepInstanceId, executeCount, + batch, fileTaskMode, hostId); + } else { + // 兼容使用<云区域+ip> 的方式查询主机执行任务 + HostDTO host = getStepHostByCloudIp(stepInstance, executeObjectCompositeKey.getCloudIp()); + if (host != null) { + executeObjectTask = fileAgentTaskDAO.getAgentTaskByHostId(stepInstanceId, executeCount, + batch, fileTaskMode, host.getHostId()); + } + } + + } + return executeObjectTask; + } + + private ExecuteObject getStepHostExecuteObjectByCloudIp(StepInstanceBaseDTO stepInstance, String cloudIp) { + return stepInstance.getTargetServers().getExecuteObjects() + .stream() + .filter(executeObject -> { + HostDTO host = executeObject.getHost(); + if (host == null) { + return false; + } + return cloudIp.equals(host.toCloudIp()); + }) + .findFirst() + .orElse(null); + } + + private HostDTO getStepHostByCloudIp(StepInstanceDTO stepInstance, + String cloudIp) { + HostDTO matchHost = stepInstance.getTargetServers().getIpList() + .stream() + .filter(host -> cloudIp.equals(host.toCloudIp())) + .findFirst() + .orElse(null); + if (matchHost == null) { + if (CollectionUtils.isNotEmpty(stepInstance.getFileSourceList())) { + for (FileSourceDTO fileSource : stepInstance.getFileSourceList()) { + if (fileSource.getServers() != null + && CollectionUtils.isNotEmpty(fileSource.getServers().getIpList())) { + matchHost = fileSource.getServers().getIpList().stream() + .filter(sourceHost -> cloudIp.equals(sourceHost.toCloudIp())) + .findFirst() + .orElse(null); + if (matchHost != null) { + break; + } + } + } + } + } + + return matchHost; + } + + @Override + public List listAndGroupTasks(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch) { + List resultGroups = new ArrayList<>(); + List executeObjectTasks = listTasks(stepInstance, executeCount, batch); + if (CollectionUtils.isEmpty(executeObjectTasks)) { + return resultGroups; + } + + List agentTaskDetailList = fillExecuteObjectDetail(stepInstance, executeObjectTasks); + resultGroups = groupTasks(agentTaskDetailList); + + return resultGroups.stream().sorted().collect(Collectors.toList()); + } + + @Override + public List listResultGroups(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch) { + List resultGroups; + long stepInstanceId = stepInstance.getId(); + + if (isUsingExecuteObject(stepInstance)) { + resultGroups = fileExecuteObjectTaskDAO.listResultGroups(stepInstanceId, executeCount, batch); + } else { + // 兼容历史数据 + resultGroups = fileAgentTaskDAO.listResultGroups(stepInstanceId, executeCount, batch); + } + return resultGroups; + } + + @Override + public List listTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch, + Integer status, + String tag) { + List executeObjectTasks; + + if (isUsingExecuteObject(stepInstance)) { + executeObjectTasks = fileExecuteObjectTaskDAO.listTaskByResultGroup(stepInstance.getId(), + executeCount, batch, status); + } else { + // 兼容历史数据 + executeObjectTasks = fileAgentTaskDAO.listAgentTaskByResultGroup( + stepInstance.getId(), executeCount, batch, status); + } + + return fillExecuteObjectDetail(stepInstance, executeObjectTasks); + } + + + @Override + public List listTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch, + Integer status, + String tag, + Integer limit, + String orderField, + Order order) { + List executeObjectTasks; + + if (isUsingExecuteObject(stepInstance)) { + executeObjectTasks = fileExecuteObjectTaskDAO.listTaskByResultGroup(stepInstance.getId(), + executeCount, batch, status, limit, orderField, order); + } else { + // 兼容历史数据 + executeObjectTasks = fileAgentTaskDAO.listAgentTaskByResultGroup(stepInstance.getId(), executeCount, + batch, status, limit, orderField, order); + } + + return fillExecuteObjectDetail(stepInstance, executeObjectTasks); + } + + @Override + public List listTaskDetail(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch) { + List executeObjectTasks = listTasks(stepInstance, executeCount, batch); + return fillExecuteObjectDetail(stepInstance, executeObjectTasks); + } + + private boolean isStepInstanceRecordExist(long stepInstanceId) { + return fileExecuteObjectTaskDAO.isStepInstanceRecordExist(stepInstanceId); + } + + @Override + public void updateTaskFields(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch, + Integer actualExecuteCount, + Long gseTaskId) { + if (isUsingExecuteObject(stepInstance)) { + fileExecuteObjectTaskDAO.updateTaskFields(stepInstance.getId(), executeCount, batch, + actualExecuteCount, gseTaskId); + } else { + // 兼容老版本方式 + fileAgentTaskDAO.updateAgentTaskFields(stepInstance.getId(), executeCount, batch, + actualExecuteCount, gseTaskId); + } + } + + @Override + public List listTasks(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch, + FileTaskModeEnum fileTaskMode) { + List executeObjectTasks; + long stepInstanceId = stepInstance.getId(); + if (isUsingExecuteObject(stepInstance)) { + executeObjectTasks = fileExecuteObjectTaskDAO.listTasks(stepInstanceId, executeCount, batch, fileTaskMode); + } else { + // 兼容老版本数据 + executeObjectTasks = fileAgentTaskDAO.listAgentTasks(stepInstanceId, executeCount, batch, fileTaskMode); + } + return executeObjectTasks; + } +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ScriptAgentTaskServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ScriptAgentTaskServiceImpl.java deleted file mode 100644 index ab86780eef..0000000000 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ScriptAgentTaskServiceImpl.java +++ /dev/null @@ -1,204 +0,0 @@ -package com.tencent.bk.job.execute.service.impl; - -import com.tencent.bk.job.common.constant.Order; -import com.tencent.bk.job.common.model.dto.HostDTO; -import com.tencent.bk.job.execute.dao.GseTaskIpLogDAO; -import com.tencent.bk.job.execute.dao.ScriptAgentTaskDAO; -import com.tencent.bk.job.execute.model.AgentTaskResultGroupBaseDTO; -import com.tencent.bk.job.execute.model.AgentTaskResultGroupDTO; -import com.tencent.bk.job.execute.model.ExecuteObjectTask; -import com.tencent.bk.job.execute.model.ExecuteObjectTaskDetail; -import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; -import com.tencent.bk.job.execute.service.ScriptAgentTaskService; -import com.tencent.bk.job.execute.service.StepInstanceService; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -@Service -@Slf4j -public class ScriptAgentTaskServiceImpl - extends AbstractAgentTaskServiceImpl - implements ScriptAgentTaskService { - - private final ScriptAgentTaskDAO scriptAgentTaskDAO; - private final GseTaskIpLogDAO gseTaskIpLogDAO; - - @Autowired - public ScriptAgentTaskServiceImpl(ScriptAgentTaskDAO scriptAgentTaskDAO, - StepInstanceService stepInstanceService, - GseTaskIpLogDAO gseTaskIpLogDAO) { - super(stepInstanceService); - this.scriptAgentTaskDAO = scriptAgentTaskDAO; - this.gseTaskIpLogDAO = gseTaskIpLogDAO; - } - - @Override - public void batchSaveAgentTasks(Collection agentTasks) { - if (CollectionUtils.isEmpty(agentTasks)) { - return; - } - scriptAgentTaskDAO.batchSaveAgentTasks(agentTasks); - } - - @Override - public void batchUpdateAgentTasks(Collection agentTasks) { - if (CollectionUtils.isEmpty(agentTasks)) { - return; - } - scriptAgentTaskDAO.batchUpdateAgentTasks(agentTasks); - } - - @Override - public int getSuccessAgentTaskCount(long stepInstanceId, int executeCount) { - if (isStepInstanceRecordExist(stepInstanceId)) { - return scriptAgentTaskDAO.getSuccessAgentTaskCount(stepInstanceId, executeCount); - } else { - return gseTaskIpLogDAO.getSuccessAgentTaskCount(stepInstanceId, executeCount); - } - } - - @Override - public List listAgentTasks(Long stepInstanceId, Integer executeCount, Integer batch) { - List agentTasks = scriptAgentTaskDAO.listAgentTasks(stepInstanceId, executeCount, batch); - if (CollectionUtils.isEmpty(agentTasks)) { - // 兼容历史数据 - agentTasks = gseTaskIpLogDAO.listAgentTasks(stepInstanceId, executeCount); - } - return agentTasks; - } - - @Override - public List listAgentTasksByGseTaskId(Long gseTaskId) { - return scriptAgentTaskDAO.listAgentTasksByGseTaskId(gseTaskId); - } - - @Override - public ExecuteObjectTask getAgentTaskByHost(StepInstanceBaseDTO stepInstance, - Integer executeCount, - Integer batch, - HostDTO host) { - ExecuteObjectTask agentTask = null; - Long hostId = host.getHostId(); - if (hostId != null) { - // 根据hostId查询 - agentTask = scriptAgentTaskDAO.getAgentTaskByHostId(stepInstance.getId(), executeCount, batch, hostId); - } else if (StringUtils.isNotEmpty(host.toCloudIp())) { - // 根据ip查询的模式,有两种情况,数据可能在gse_script_agent_task/gse_task_ip_log表中,优先查询gse_script_agent_task - HostDTO queryHost = getStepHostByIp(stepInstance, host.toCloudIp()); - if (queryHost != null && queryHost.getHostId() != null) { - agentTask = scriptAgentTaskDAO.getAgentTaskByHostId(stepInstance.getId(), executeCount, batch, - queryHost.getHostId()); - } else { - // 根据ip查询gse_task_ip_log表中的数据 - agentTask = gseTaskIpLogDAO.getAgentTaskByIp(stepInstance.getId(), executeCount, host.toCloudIp()); - } - } - return agentTask; - } - - private HostDTO getStepHostByIp(StepInstanceBaseDTO stepInstance, String cloudIp) { - return stepInstance.getTargetServers().getIpList().stream() - .filter(targetHost -> cloudIp.equals(targetHost.toCloudIp())) - .findFirst() - .orElse(null); - } - - @Override - public int getActualSuccessExecuteCount(long stepInstanceId, String cloudIp) { - // 兼容历史数据 - return gseTaskIpLogDAO.getActualSuccessExecuteCount(stepInstanceId, cloudIp); - } - - @Override - public List listAndGroupAgentTasks(StepInstanceBaseDTO stepInstance, - int executeCount, - Integer batch) { - List resultGroups = new ArrayList<>(); - List agentTasks = listAgentTasks(stepInstance.getId(), executeCount, batch); - if (CollectionUtils.isEmpty(agentTasks)) { - return resultGroups; - } - - List agentTaskDetailList = fillHostDetail(stepInstance, agentTasks); - resultGroups = groupAgentTasks(agentTaskDetailList); - - return resultGroups.stream().sorted().collect(Collectors.toList()); - } - - @Override - public List listResultGroups(long stepInstanceId, - int executeCount, - Integer batch) { - List resultGroups; - resultGroups = scriptAgentTaskDAO.listResultGroups(stepInstanceId, executeCount, batch); - if (CollectionUtils.isEmpty(resultGroups)) { - // 兼容历史数据 - resultGroups = gseTaskIpLogDAO.listResultGroups(stepInstanceId, executeCount); - } - return resultGroups; - } - - @Override - public List listAgentTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, - Integer executeCount, - Integer batch, - Integer status, - String tag) { - List agentTasks = scriptAgentTaskDAO.listAgentTaskByResultGroup(stepInstance.getId(), - executeCount, batch, status, tag); - if (CollectionUtils.isEmpty(agentTasks)) { - // 兼容历史数据 - agentTasks = gseTaskIpLogDAO.listAgentTaskByResultGroup(stepInstance.getId(), executeCount, status, tag); - } - return fillHostDetail(stepInstance, agentTasks); - } - - - @Override - public List listAgentTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, - Integer executeCount, - Integer batch, - Integer status, - String tag, - Integer limit, - String orderField, - Order order) { - List agentTasks = scriptAgentTaskDAO.listAgentTaskByResultGroup(stepInstance.getId(), - executeCount, batch, status, tag, limit, orderField, order); - if (CollectionUtils.isEmpty(agentTasks)) { - // 兼容历史数据 - agentTasks = gseTaskIpLogDAO.listAgentTaskByResultGroup(stepInstance.getId(), executeCount, status, tag, - limit, orderField, order); - } - return fillHostDetail(stepInstance, agentTasks); - } - - @Override - public List listAgentTaskDetail(StepInstanceBaseDTO stepInstance, - Integer executeCount, - Integer batch) { - List agentTasks = listAgentTasks(stepInstance.getId(), executeCount, batch); - return fillHostDetail(stepInstance, agentTasks); - } - - private boolean isStepInstanceRecordExist(long stepInstanceId) { - return scriptAgentTaskDAO.isStepInstanceRecordExist(stepInstanceId); - } - - @Override - public void updateAgentTaskFields(long stepInstanceId, - int executeCount, - Integer batch, - Integer actualExecuteCount, - Long gseTaskId) { - scriptAgentTaskDAO.updateAgentTaskFields(stepInstanceId, executeCount, batch, actualExecuteCount, gseTaskId); - } -} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ScriptExecuteObjectTaskServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ScriptExecuteObjectTaskServiceImpl.java new file mode 100644 index 0000000000..d07237e736 --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/ScriptExecuteObjectTaskServiceImpl.java @@ -0,0 +1,295 @@ +package com.tencent.bk.job.execute.service.impl; + +import com.tencent.bk.job.common.constant.Order; +import com.tencent.bk.job.common.model.dto.HostDTO; +import com.tencent.bk.job.execute.dao.ScriptAgentTaskDAO; +import com.tencent.bk.job.execute.dao.ScriptExecuteObjectTaskDAO; +import com.tencent.bk.job.execute.engine.model.ExecuteObject; +import com.tencent.bk.job.execute.model.ExecuteObjectCompositeKey; +import com.tencent.bk.job.execute.model.ExecuteObjectTask; +import com.tencent.bk.job.execute.model.ExecuteObjectTaskDetail; +import com.tencent.bk.job.execute.model.ResultGroupBaseDTO; +import com.tencent.bk.job.execute.model.ResultGroupDTO; +import com.tencent.bk.job.execute.model.ServersDTO; +import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; +import com.tencent.bk.job.execute.service.ScriptExecuteObjectTaskService; +import com.tencent.bk.job.execute.service.StepInstanceService; +import com.tencent.bk.job.execute.service.TaskInstanceExecuteObjectService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class ScriptExecuteObjectTaskServiceImpl + extends AbstractExecuteObjectTaskServiceImpl + implements ScriptExecuteObjectTaskService { + + private final ScriptExecuteObjectTaskDAO scriptExecuteObjectTaskDAO; + private final ScriptAgentTaskDAO scriptAgentTaskDAO; + + @Autowired + public ScriptExecuteObjectTaskServiceImpl(ScriptExecuteObjectTaskDAO scriptExecuteObjectTaskDAO, + StepInstanceService stepInstanceService, + ScriptAgentTaskDAO scriptAgentTaskDAO, + TaskInstanceExecuteObjectService taskInstanceExecuteObjectService) { + super(stepInstanceService, taskInstanceExecuteObjectService); + this.scriptExecuteObjectTaskDAO = scriptExecuteObjectTaskDAO; + this.scriptAgentTaskDAO = scriptAgentTaskDAO; + } + + @Override + public void batchSaveTasks(Collection tasks) { + if (CollectionUtils.isEmpty(tasks)) { + return; + } + ExecuteObjectTask anyTask = tasks.stream().findAny().orElse(null); + + if (Objects.requireNonNull(anyTask).getExecuteObjId() != null) { + scriptExecuteObjectTaskDAO.batchSaveTasks(tasks); + } else { + scriptAgentTaskDAO.batchSaveAgentTasks(tasks); + } + } + + @Override + public void batchUpdateTasks(Collection tasks) { + if (CollectionUtils.isEmpty(tasks)) { + return; + } + ExecuteObjectTask anyTask = tasks.stream().findAny().orElse(null); + if (Objects.requireNonNull(anyTask).getExecuteObjId() != null) { + scriptExecuteObjectTaskDAO.batchUpdateTasks(tasks); + } else { + scriptAgentTaskDAO.batchUpdateAgentTasks(tasks); + } + } + + @Override + public int getSuccessTaskCount(long stepInstanceId, int executeCount) { + if (isStepInstanceRecordExist(stepInstanceId)) { + return scriptExecuteObjectTaskDAO.getSuccessTaskCount(stepInstanceId, executeCount); + } else { + return scriptAgentTaskDAO.getSuccessAgentTaskCount(stepInstanceId, executeCount); + } + } + + @Override + public List listTasks(StepInstanceBaseDTO stepInstance, Integer executeCount, Integer batch) { + List executeObjectTasks; + long stepInstanceId = stepInstance.getId(); + if (isUsingExecuteObject(stepInstance)) { + executeObjectTasks = scriptExecuteObjectTaskDAO.listTasks(stepInstanceId, executeCount, batch); + } else { + // 兼容老版本数据 + executeObjectTasks = scriptAgentTaskDAO.listAgentTasks(stepInstanceId, executeCount, batch); + } + return executeObjectTasks; + } + + /** + * 判断是否使用执行对象的方式存储 + * + * @param stepInstance 步骤实例 + */ + private boolean isUsingExecuteObject(StepInstanceBaseDTO stepInstance) { + ServersDTO servers = stepInstance.getTargetServers(); + if (CollectionUtils.isNotEmpty(servers.getExecuteObjects())) { + ExecuteObject executeObject = servers.getExecuteObjects().stream().findAny().orElse(null); + return Objects.requireNonNull(executeObject).getId() != null; + } else { + return false; + } + } + + @Override + public List listTasksByGseTaskId(StepInstanceBaseDTO stepInstance, Long gseTaskId) { + List executeObjectTasks; + + if (isUsingExecuteObject(stepInstance)) { + executeObjectTasks = scriptExecuteObjectTaskDAO.listTasksByGseTaskId(gseTaskId); + } else { + // 兼容老版本数据 + executeObjectTasks = scriptAgentTaskDAO.listAgentTasksByGseTaskId(gseTaskId); + } + return executeObjectTasks; + } + + @Override + public ExecuteObjectTask getTaskByExecuteObjectCompositeKey(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch, + ExecuteObjectCompositeKey executeObjectCompositeKey) { + ExecuteObjectTask executeObjectTask = null; + long stepInstanceId = stepInstance.getId(); + + if (isUsingExecuteObject(stepInstance)) { + String executeObjectResourceId = executeObjectCompositeKey.getResourceId(); + if (executeObjectResourceId != null) { + ExecuteObject executeObject = taskInstanceExecuteObjectService.getExecuteObject( + executeObjectCompositeKey.getExecuteObjectType(), executeObjectCompositeKey.getResourceId()); + if (executeObject == null) { + return null; + } + executeObjectTask = scriptExecuteObjectTaskDAO.getTaskByExecuteObjectId(stepInstanceId, executeCount, + batch, executeObject.getId()); + } else { + // 兼容使用<云区域+ip> 的方式查询主机执行任务 + ExecuteObject executeObject = getStepHostExecuteObjectByCloudIp( + stepInstance, executeObjectCompositeKey.getCloudIp()); + executeObjectTask = scriptExecuteObjectTaskDAO.getTaskByExecuteObjectId(stepInstance.getId(), + executeCount, batch, executeObject.getId()); + } + } else { + // 兼容老版本不使用执行对象的场景(仅支持主机) + Long hostId = executeObjectCompositeKey.getHostId(); + if (hostId != null) { + executeObjectTask = scriptAgentTaskDAO.getAgentTaskByHostId(stepInstanceId, executeCount, + batch, hostId); + } else { + // 兼容使用<云区域+ip> 的方式查询主机执行任务 + HostDTO host = getStepHostByCloudIp(stepInstance, executeObjectCompositeKey.getCloudIp()); + if (host != null) { + executeObjectTask = scriptAgentTaskDAO.getAgentTaskByHostId(stepInstanceId, executeCount, + batch, host.getHostId()); + } + } + + } + return executeObjectTask; + } + + private ExecuteObject getStepHostExecuteObjectByCloudIp(StepInstanceBaseDTO stepInstance, String cloudIp) { + return stepInstance.getTargetServers().getExecuteObjects() + .stream() + .filter(executeObject -> { + HostDTO host = executeObject.getHost(); + if (host == null) { + return false; + } + return cloudIp.equals(host.toCloudIp()); + }) + .findFirst() + .orElse(null); + } + + private HostDTO getStepHostByCloudIp(StepInstanceBaseDTO stepInstance, + String cloudIp) { + return stepInstance.getTargetServers().getIpList() + .stream() + .filter(targetHost -> cloudIp.equals(targetHost.toCloudIp())) + .findFirst() + .orElse(null); + } + + @Override + public List listAndGroupTasks(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch) { + List resultGroups = new ArrayList<>(); + List executeObjectTasks = listTasks(stepInstance, executeCount, batch); + if (CollectionUtils.isEmpty(executeObjectTasks)) { + return resultGroups; + } + + List agentTaskDetailList = fillExecuteObjectDetail(stepInstance, executeObjectTasks); + resultGroups = groupTasks(agentTaskDetailList); + + return resultGroups.stream().sorted().collect(Collectors.toList()); + } + + @Override + public List listResultGroups(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch) { + List resultGroups; + long stepInstanceId = stepInstance.getId(); + + if (isUsingExecuteObject(stepInstance)) { + resultGroups = scriptExecuteObjectTaskDAO.listResultGroups(stepInstanceId, executeCount, batch); + } else { + // 兼容历史数据 + resultGroups = scriptAgentTaskDAO.listResultGroups(stepInstanceId, executeCount, batch); + } + return resultGroups; + } + + @Override + public List listTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch, + Integer status, + String tag) { + List executeObjectTasks; + + if (isUsingExecuteObject(stepInstance)) { + executeObjectTasks = scriptExecuteObjectTaskDAO.listTasksByResultGroup(stepInstance.getId(), + executeCount, batch, status, tag); + } else { + // 兼容历史数据 + executeObjectTasks = scriptAgentTaskDAO.listAgentTaskByResultGroup( + stepInstance.getId(), executeCount, batch, status, tag); + } + + return fillExecuteObjectDetail(stepInstance, executeObjectTasks); + } + + + @Override + public List listTaskDetailByResultGroup(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch, + Integer status, + String tag, + Integer limit, + String orderField, + Order order) { + List executeObjectTasks; + + if (isUsingExecuteObject(stepInstance)) { + executeObjectTasks = scriptExecuteObjectTaskDAO.listTasksByResultGroup(stepInstance.getId(), + executeCount, batch, status, tag, limit, orderField, order); + } else { + // 兼容历史数据 + executeObjectTasks = scriptAgentTaskDAO.listAgentTaskByResultGroup(stepInstance.getId(), executeCount, + batch, status, tag, limit, orderField, order); + } + + return fillExecuteObjectDetail(stepInstance, executeObjectTasks); + } + + @Override + public List listTaskDetail(StepInstanceBaseDTO stepInstance, + Integer executeCount, + Integer batch) { + List executeObjectTasks = listTasks(stepInstance, executeCount, batch); + return fillExecuteObjectDetail(stepInstance, executeObjectTasks); + } + + private boolean isStepInstanceRecordExist(long stepInstanceId) { + return scriptExecuteObjectTaskDAO.isStepInstanceRecordExist(stepInstanceId); + } + + @Override + public void updateTaskFields(StepInstanceBaseDTO stepInstance, + int executeCount, + Integer batch, + Integer actualExecuteCount, + Long gseTaskId) { + if (isUsingExecuteObject(stepInstance)) { + scriptExecuteObjectTaskDAO.updateTaskFields(stepInstance.getId(), executeCount, batch, + actualExecuteCount, gseTaskId); + } else { + // 兼容老版本方式 + scriptAgentTaskDAO.updateAgentTaskFields(stepInstance.getId(), executeCount, batch, + actualExecuteCount, gseTaskId); + } + } +} diff --git a/src/backend/job-logsvr/service-job-logsvr/src/main/java/com/tencent/bk/job/logsvr/service/impl/LogServiceImpl.java b/src/backend/job-logsvr/service-job-logsvr/src/main/java/com/tencent/bk/job/logsvr/service/impl/LogServiceImpl.java index d7e468c92c..4e47fc8363 100644 --- a/src/backend/job-logsvr/service-job-logsvr/src/main/java/com/tencent/bk/job/logsvr/service/impl/LogServiceImpl.java +++ b/src/backend/job-logsvr/service-job-logsvr/src/main/java/com/tencent/bk/job/logsvr/service/impl/LogServiceImpl.java @@ -130,7 +130,6 @@ private List> buildUpdateOpsFileTask(List> updateOps = new ArrayList<>(); taskExecuteObjectLogs.forEach(taskExecuteObjectLog -> { long stepInstanceId = taskExecuteObjectLog.getStepInstanceId(); - String ip = taskExecuteObjectLog.getIp(); int executeCount = taskExecuteObjectLog.getExecuteCount(); Integer batch = taskExecuteObjectLog.getBatch(); List fileTaskLogs = taskExecuteObjectLog.getFileTaskLogs();