diff --git a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/quota/config/ResourceQuotaLimitProperties.java b/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/quota/config/ResourceQuotaLimitProperties.java index 5e635bd12a..1037caa541 100644 --- a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/quota/config/ResourceQuotaLimitProperties.java +++ b/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/quota/config/ResourceQuotaLimitProperties.java @@ -52,6 +52,10 @@ public class ResourceQuotaLimitProperties { @Data @NoArgsConstructor public static class ResourceQuotaLimitProp { + /** + * 是否启用配额限制 + */ + private boolean enabled; /** * 配额容量(比如任务数量、存储占用大小等) */ @@ -67,9 +71,11 @@ public static class ResourceQuotaLimitProp { */ private QuotaLimitProp appQuotaLimit; - public ResourceQuotaLimitProp(String capacity, + public ResourceQuotaLimitProp(boolean enabled, + String capacity, QuotaLimitProp resourceScopeQuotaLimit, QuotaLimitProp appQuotaLimit) { + this.enabled = enabled; this.capacity = capacity; this.resourceScopeQuotaLimit = resourceScopeQuotaLimit; this.appQuotaLimit = appQuotaLimit; diff --git a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/quota/config/parser/CounterResourceQuotaConfigParser.java b/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/quota/config/parser/CounterResourceQuotaConfigParser.java index b1e17e993b..966b0a4e3d 100644 --- a/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/quota/config/parser/CounterResourceQuotaConfigParser.java +++ b/src/backend/commons/common-service/src/main/java/com/tencent/bk/job/common/service/quota/config/parser/CounterResourceQuotaConfigParser.java @@ -42,6 +42,7 @@ public ResourceQuotaLimit parse(ResourceQuotaLimitProperties.ResourceQuotaLimitP ResourceQuotaLimit resourceQuota = new ResourceQuotaLimit(); try { + resourceQuota.setEnabled(resourceQuotaLimitProp.isEnabled()); resourceQuota.setCapacityExpr(resourceQuotaLimitProp.getCapacity()); Long capacity = parseCapacity(resourceQuotaLimitProp.getCapacity()); resourceQuota.setCapacity(capacity); diff --git a/src/backend/commons/common-service/src/test/java/com/tencent/bk/job/common/service/quota/CounterResourceQuotaConfigParserTest.java b/src/backend/commons/common-service/src/test/java/com/tencent/bk/job/common/service/quota/CounterResourceQuotaConfigParserTest.java index 0f96c69972..6711a683c0 100644 --- a/src/backend/commons/common-service/src/test/java/com/tencent/bk/job/common/service/quota/CounterResourceQuotaConfigParserTest.java +++ b/src/backend/commons/common-service/src/test/java/com/tencent/bk/job/common/service/quota/CounterResourceQuotaConfigParserTest.java @@ -43,7 +43,8 @@ void parseByPercentageLimitValue() { ResourceQuotaLimitProperties.QuotaLimitProp appQuotaLimit = new ResourceQuotaLimitProperties.QuotaLimitProp("2%", "bk-soap=20%,bk-nodeman=10%"); ResourceQuotaLimitProperties.ResourceQuotaLimitProp resourceQuotaLimitProp = - new ResourceQuotaLimitProperties.ResourceQuotaLimitProp("1000", resourceScopeQuotaLimit, appQuotaLimit); + new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(true, "1000", resourceScopeQuotaLimit, + appQuotaLimit); ResourceQuotaLimit resourceQuota = parser.parse(resourceQuotaLimitProp); @@ -75,7 +76,7 @@ void parseByExplicitLimitValue() { ResourceQuotaLimitProperties.QuotaLimitProp appQuotaLimit = new ResourceQuotaLimitProperties.QuotaLimitProp("20", "bk-soap=100,bk-nodeman=150"); ResourceQuotaLimitProperties.ResourceQuotaLimitProp resourceQuotaLimitProp = - new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(null, resourceScopeQuotaLimit, appQuotaLimit); + new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(true, null, resourceScopeQuotaLimit, appQuotaLimit); ResourceQuotaLimit resourceQuota = parser.parse(resourceQuotaLimitProp); @@ -103,6 +104,7 @@ void parseInvalidConfig() { assertThatThrownBy(() -> parser.parse( new ResourceQuotaLimitProperties.ResourceQuotaLimitProp( + true, "", new ResourceQuotaLimitProperties.QuotaLimitProp("2%", null), null))) @@ -110,6 +112,7 @@ void parseInvalidConfig() { assertThatThrownBy(() -> parser.parse( new ResourceQuotaLimitProperties.ResourceQuotaLimitProp( + true, "", null, new ResourceQuotaLimitProperties.QuotaLimitProp("2%", null)))) @@ -117,6 +120,7 @@ void parseInvalidConfig() { assertThatThrownBy(() -> parser.parse( new ResourceQuotaLimitProperties.ResourceQuotaLimitProp( + true, "1000", new ResourceQuotaLimitProperties.QuotaLimitProp("2%", "error_custom_settings"), null))) @@ -124,6 +128,7 @@ void parseInvalidConfig() { assertThatThrownBy(() -> parser.parse( new ResourceQuotaLimitProperties.ResourceQuotaLimitProp( + true, "1000", null, new ResourceQuotaLimitProperties.QuotaLimitProp("2%", "error_custom_settings")))) @@ -131,6 +136,7 @@ void parseInvalidConfig() { assertThatThrownBy(() -> parser.parse( new ResourceQuotaLimitProperties.ResourceQuotaLimitProp( + true, "1000G", null, new ResourceQuotaLimitProperties.QuotaLimitProp("2%", null)))) diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/EngineDependentServiceHolder.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/EngineDependentServiceHolder.java index 837bddc38e..47f63d8e7a 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/EngineDependentServiceHolder.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/EngineDependentServiceHolder.java @@ -46,7 +46,7 @@ import org.springframework.stereotype.Component; /** - * 执行引擎依赖的服务的托管类,用于程序快速获取到依赖的 Service; 而不需要通过构造方法一一传入 + * 执行引擎依赖的服务的托管类。用于简化一些较为复杂的类构造函数的参数列表 */ @Component @Getter diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/JobListener.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/JobListener.java index 421afb78a8..9a3213befb 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/JobListener.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/listener/JobListener.java @@ -96,9 +96,9 @@ public void handleEvent(JobEvent jobEvent) { log.info("Handle job event, event: {}, duration: {}ms", jobEvent, jobEvent.duration()); long jobInstanceId = jobEvent.getJobInstanceId(); JobActionEnum action = JobActionEnum.valueOf(jobEvent.getAction()); - TaskInstanceDTO taskInstance = null; + TaskInstanceDTO taskInstance; try { - taskInstance = taskInstanceService.getTaskInstance(jobInstanceId); + taskInstance = taskInstanceService.getTaskInstance(jobInstanceId); switch (action) { case START: startJob(taskInstance); @@ -115,7 +115,6 @@ public void handleEvent(JobEvent jobEvent) { } catch (Throwable e) { String errorMsg = "Handle job event error, jobInstanceId=" + jobInstanceId; log.error(errorMsg, e); - finishJob(taskInstance, null, RunStatusEnum.ABNORMAL_STATE); } } @@ -242,34 +241,36 @@ private void finishJob(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance, RunStatusEnum jobStatus) { long jobInstanceId = taskInstance.getId(); - long stepInstanceId = stepInstance.getId(); - Long endTime = DateUtils.currentTimeMillis(); - long totalTime = TaskCostCalculator.calculate(taskInstance.getStartTime(), endTime, - taskInstance.getTotalTime()); - taskInstance.setEndTime(endTime); - taskInstance.setTotalTime(totalTime); - taskInstance.setStatus(jobStatus); - taskInstanceService.updateTaskExecutionInfo(jobInstanceId, jobStatus, null, null, endTime, totalTime); + try { + Long endTime = DateUtils.currentTimeMillis(); + long totalTime = TaskCostCalculator.calculate(taskInstance.getStartTime(), endTime, + taskInstance.getTotalTime()); + taskInstance.setEndTime(endTime); + taskInstance.setTotalTime(totalTime); + taskInstance.setStatus(jobStatus); + taskInstanceService.updateTaskExecutionInfo(jobInstanceId, jobStatus, null, null, endTime, totalTime); + + // 作业执行结果消息通知 + if (RunStatusEnum.SUCCESS == jobStatus || RunStatusEnum.IGNORE_ERROR == jobStatus) { + notifyService.asyncSendMQSuccessTaskNotification(taskInstance, stepInstance); + } else { + notifyService.asyncSendMQFailTaskNotification(taskInstance, stepInstance); + } - // 从资源配额中删除该作业实例 - runningJobResourceQuotaManager.removeJob( - taskInstance.getAppCode(), - GlobalAppScopeMappingService.get().getScopeByAppId(taskInstance.getAppId()), - jobInstanceId - ); + // 触发作业结束统计分析 + statisticsService.updateEndJobStatistics(taskInstance); - // 作业执行结果消息通知 - if (RunStatusEnum.SUCCESS == jobStatus || RunStatusEnum.IGNORE_ERROR == jobStatus) { - notifyService.asyncSendMQSuccessTaskNotification(taskInstance, stepInstance); - } else { - notifyService.asyncSendMQFailTaskNotification(taskInstance, stepInstance); + // 作业执行完成回调 + callback(taskInstance); + } finally { + // 从资源配额中删除该作业实例 + runningJobResourceQuotaManager.removeJob( + taskInstance.getAppCode(), + GlobalAppScopeMappingService.get().getScopeByAppId(taskInstance.getAppId()), + jobInstanceId + ); } - // 触发作业结束统计分析 - statisticsService.updateEndJobStatistics(taskInstance); - - // 作业执行完成回调 - callback(taskInstance, jobInstanceId, jobStatus.getValue(), stepInstanceId, stepInstance.getStatus()); } /** @@ -334,19 +335,22 @@ private void startStep(StepInstanceBaseDTO stepInstance) { } } - private void callback(TaskInstanceDTO taskInstance, long jobInstanceId, int taskStatus, long currentStepId, - RunStatusEnum stepStatus) { + private void callback(TaskInstanceDTO taskInstance) { if (StringUtils.isNotBlank(taskInstance.getCallbackUrl())) { JobCallbackDTO callback = new JobCallbackDTO(); - callback.setId(jobInstanceId); - callback.setStatus(taskStatus); + callback.setId(taskInstance.getId()); + callback.setStatus(taskInstance.getStatus().getValue()); callback.setCallbackUrl(taskInstance.getCallbackUrl()); - Collection stepInstanceList = Lists.newArrayList(); - JobCallbackDTO.StepInstanceStatus stepInstance = new JobCallbackDTO.StepInstanceStatus(); - stepInstance.setId(currentStepId); - stepInstance.setStatus(stepStatus.getValue()); - stepInstanceList.add(stepInstance); - callback.setStepInstances(stepInstanceList); + Collection stepInstanceStatusList = Lists.newArrayList(); + List stepInstanceList = + stepInstanceService.listBaseStepInstanceByTaskInstanceId(taskInstance.getId()); + stepInstanceList.forEach(stepInstance -> { + JobCallbackDTO.StepInstanceStatus stepInstanceStatus = new JobCallbackDTO.StepInstanceStatus(); + stepInstanceStatus.setId(stepInstance.getId()); + stepInstanceStatus.setStatus(stepInstance.getStatus().getValue()); + stepInstanceStatusList.add(stepInstanceStatus); + }); + callback.setStepInstances(stepInstanceStatusList); taskExecuteMQEventDispatcher.dispatchCallbackMsg(callback); } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/FilePrepareControlTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/FilePrepareControlTask.java index e42fe8b867..8a39fc3588 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/FilePrepareControlTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/FilePrepareControlTask.java @@ -30,6 +30,7 @@ import com.tencent.bk.job.execute.engine.result.ContinuousScheduledTask; import com.tencent.bk.job.execute.engine.result.ScheduleStrategy; import com.tencent.bk.job.execute.engine.result.StopTaskCounter; +import com.tencent.bk.job.execute.engine.result.TaskContext; import com.tencent.bk.job.execute.model.StepInstanceDTO; import com.tencent.bk.job.execute.model.TaskInstanceDTO; import com.tencent.bk.job.execute.service.StepInstanceService; @@ -62,6 +63,7 @@ public class FilePrepareControlTask implements ContinuousScheduledTask { private final FilePrepareTaskResultHandler filePrepareTaskResultHandler; private final StepInstanceService stepInstanceService; private final long startTimeMills; + private final TaskContext taskContext; public FilePrepareControlTask( FilePrepareService filePrepareService, @@ -81,6 +83,7 @@ public FilePrepareControlTask( this.filePrepareTaskResultHandler = filePrepareTaskResultHandler; this.stepInstanceService = stepInstanceService; this.startTimeMills = System.currentTimeMillis(); + this.taskContext = new TaskContext(stepInstance.getTaskInstanceId()); } @Override @@ -175,6 +178,11 @@ public String getTaskId() { return "FilePrepareControlTask-" + stepInstance.getId() + "_" + stepInstance.getExecuteCount(); } + @Override + public TaskContext getTaskContext() { + return taskContext; + } + @Override public String toString() { return getTaskId(); diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareTask.java index fabce77dd4..df17894ce4 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/prepare/third/ThirdFilePrepareTask.java @@ -40,6 +40,7 @@ import com.tencent.bk.job.execute.engine.result.ContinuousScheduledTask; import com.tencent.bk.job.execute.engine.result.ScheduleStrategy; import com.tencent.bk.job.execute.engine.result.StopTaskCounter; +import com.tencent.bk.job.execute.engine.result.TaskContext; import com.tencent.bk.job.execute.model.AccountDTO; import com.tencent.bk.job.execute.model.ExecuteTargetDTO; import com.tencent.bk.job.execute.model.FileDetailDTO; @@ -105,6 +106,8 @@ public class ThirdFilePrepareTask implements ContinuousScheduledTask, JobTaskCon */ private volatile boolean isStopped = false; + private final TaskContext taskContext; + public ThirdFilePrepareTask( StepInstanceDTO stepInstance, List fileSourceList, @@ -116,7 +119,7 @@ public ThirdFilePrepareTask( this.batchTaskId = batchTaskId; this.isForRetry = isForRetry; this.resultHandler = resultHandler; - + this.taskContext = new TaskContext(stepInstance.getTaskInstanceId()); } public void initDependentService( @@ -518,6 +521,11 @@ public String getTaskId() { return "file_source_batch_task:" + this.stepInstance.getId() + ":" + this.stepInstance.getExecuteCount(); } + @Override + public TaskContext getTaskContext() { + return taskContext; + } + @Override public boolean isForRetry() { return isForRetry; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/quota/limit/ExpiredJobDetector.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/quota/limit/ExpiredJobDetector.java index bd6803a966..143c9c8652 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/quota/limit/ExpiredJobDetector.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/quota/limit/ExpiredJobDetector.java @@ -57,7 +57,7 @@ public ExpiredJobDetector(RunningJobResourceQuotaManager runningJobResourceQuota } /** - * 兜底方案。为了防止系统异常导致 redis 中的作业没有被清理,需要定时清理。每天触发一次 + * 兜底方案。为了防止系统异常导致 redis 中的作业记录没有被清理,需要定时清理。每天触发一次 */ @Scheduled(cron = "0 0/5 * * * ?") public void detectExpiredJob() { @@ -72,13 +72,14 @@ public void detectExpiredJob() { expiredJobInstanceIds.forEach(expiredJobInstanceId -> { TaskInstanceDTO taskInstance = taskInstanceService.getTaskInstance(expiredJobInstanceId); if (taskInstance != null) { + log.info("Remove expire job : {}", expiredJobInstanceId); runningJobResourceQuotaManager.removeJob( taskInstance.getAppCode(), GlobalAppScopeMappingService.get().getScopeByAppId(taskInstance.getAppId()), expiredJobInstanceId ); } else { - log.error("Job instance not found, expiredJobInstanceId : {}", expiredJobInstanceId); + log.error("Job instance record not found, expiredJobInstanceId : {}", expiredJobInstanceId); } }); } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/quota/limit/RunningJobKeepaliveManager.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/quota/limit/RunningJobKeepaliveManager.java index f2e92bd1f2..d9a0701c4c 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/quota/limit/RunningJobKeepaliveManager.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/quota/limit/RunningJobKeepaliveManager.java @@ -46,7 +46,7 @@ import java.util.concurrent.ConcurrentHashMap; /** - * 正在执行中的作业状态维持 + * 正在执行中的作业存活状态维持 */ @Slf4j @Component @@ -99,7 +99,7 @@ public void stopKeepaliveTask(long jobInstanceId) { } } - @Scheduled(cron = "0 * * * * ?") + @Scheduled(cron = "* 0/1 * * * ?") public void refreshTaskKeepaliveInfo() { log.info("Refresh running job keepalive task start..."); if (runningJobKeepaliveTasks.isEmpty()) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java index 96f5fc20da..7e2dc42f86 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java @@ -217,6 +217,8 @@ public abstract class AbstractResultHandleTask implements ContinuousScheduled protected final RunningJobKeepaliveManager runningJobKeepaliveManager; + private TaskContext taskContext; + protected AbstractResultHandleTask(EngineDependentServiceHolder engineDependentServiceHolder, ExecuteObjectTaskService executeObjectTaskService, JobExecuteConfig jobExecuteConfig, @@ -241,7 +243,7 @@ protected AbstractResultHandleTask(EngineDependentServiceHolder engineDependentS this.executeObjectTaskService = executeObjectTaskService; this.jobExecuteConfig = jobExecuteConfig; - + this.requestId = requestId; this.taskInstance = taskInstance; this.taskInstanceId = taskInstance.getId(); @@ -254,6 +256,7 @@ protected AbstractResultHandleTask(EngineDependentServiceHolder engineDependentS this.gseTask = gseTask; this.executeObjectTasks = executeObjectTasks; this.gseTaskInfo = buildGseTaskInfo(stepInstance.getTaskInstanceId(), gseTask); + this.taskContext = new TaskContext(taskInstanceId); targetExecuteObjectTasks.values().forEach(executeObjectTask -> this.targetExecuteObjectGseKeys.add(executeObjectTask.getExecuteObject().toExecuteObjectGseKey())); @@ -627,7 +630,7 @@ private void tryStopImmediately() { if (!this.isRunning) { log.info("ResultHandleTask-onStop start, task: {}", gseTaskInfo); resultHandleTaskKeepaliveManager.stopKeepaliveInfoTask(getTaskId()); - runningJobKeepaliveManager.stopKeepaliveTask(getJobInstanceId()); + runningJobKeepaliveManager.stopKeepaliveTask(taskInstanceId); taskExecuteMQEventDispatcher.dispatchResultHandleTaskResumeEvent( ResultHandleTaskResumeEvent.resume(gseTask.getStepInstanceId(), @@ -723,7 +726,8 @@ protected final GseTaskExecuteResult getExecuteResult() { */ abstract GseTaskExecuteResult analyseGseTaskResult(GseTaskResult gseTaskResult); - public long getJobInstanceId() { - return this.taskInstanceId; + @Override + public TaskContext getTaskContext() { + return this.taskContext; } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/DelayedTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/DelayedTask.java index 5f09e0bbe4..cb5b1c804a 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/DelayedTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/DelayedTask.java @@ -117,4 +117,9 @@ public String toString() { public String getTaskId() { return this.task.getTaskId(); } + + @Override + public TaskContext getTaskContext() { + return task.getTaskContext(); + } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java index 1909379d63..96b1517bfa 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ResultHandleManager.java @@ -174,8 +174,8 @@ public void handleDeliveredTask(ContinuousScheduledTask task) { if (task instanceof AbstractResultHandleTask) { resultHandleTaskKeepaliveManager.addRunningTaskKeepaliveInfo(task.getTaskId()); - runningJobKeepaliveManager.addKeepaliveTask(((AbstractResultHandleTask) task).getJobInstanceId()); } + runningJobKeepaliveManager.addKeepaliveTask(task.getTaskContext().getJobInstanceId()); this.tasksQueue.add(scheduleTask); if (task instanceof ScriptResultHandleTask) { ScriptResultHandleTask scriptResultHandleTask = (ScriptResultHandleTask) task; diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ScheduledContinuousResultHandleTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ScheduledContinuousResultHandleTask.java index b8e443df21..ce887b1042 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ScheduledContinuousResultHandleTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/ScheduledContinuousResultHandleTask.java @@ -159,14 +159,13 @@ public void doExecute() { if (task instanceof ScriptResultHandleTask) { ScriptResultHandleTask scriptTask = (ScriptResultHandleTask) task; resultHandleTaskKeepaliveManager.stopKeepaliveInfoTask(scriptTask.getTaskId()); - runningJobKeepaliveManager.stopKeepaliveTask(scriptTask.getJobInstanceId()); sampler.decrementScriptTask(scriptTask.getAppId()); } else if (task instanceof FileResultHandleTask) { FileResultHandleTask fileTask = (FileResultHandleTask) task; resultHandleTaskKeepaliveManager.stopKeepaliveInfoTask(fileTask.getTaskId()); - runningJobKeepaliveManager.stopKeepaliveTask(fileTask.getJobInstanceId()); sampler.decrementFileTask(fileTask.getAppId()); } + runningJobKeepaliveManager.stopKeepaliveTask(task.getTaskContext().getJobInstanceId()); resultHandleManager.getScheduledTasks().remove(task.getTaskId()); } if (isExecutable) { diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/Task.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/Task.java index 08ec592c1a..9797f292e2 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/Task.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/Task.java @@ -43,4 +43,6 @@ public interface Task { default String getTaskType() { return "default"; } + + TaskContext getTaskContext(); } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/TaskContext.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/TaskContext.java new file mode 100644 index 0000000000..ee3ad9b59d --- /dev/null +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/TaskContext.java @@ -0,0 +1,41 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.execute.engine.result; + +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 任务上下文 + */ +@Data +@NoArgsConstructor +public class TaskContext { + private Long jobInstanceId; + + public TaskContext(Long jobInstanceId) { + this.jobInstanceId = jobInstanceId; + } +} diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/NotifyServiceImpl.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/NotifyServiceImpl.java index 2e753af7f1..053a93ac2e 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/NotifyServiceImpl.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/service/impl/NotifyServiceImpl.java @@ -274,7 +274,7 @@ private Map getTemplateVariablesMap(TaskNotifyDTO taskNotifyDTO, successIpCount = scriptExecuteObjectTaskService.getSuccessTaskCount(stepInstanceDTO.getId(), stepInstanceDTO.getExecuteCount()); } else if (stepInstanceDTO.isFileStep()) { - successIpCount = scriptExecuteObjectTaskService.getSuccessTaskCount(stepInstanceDTO.getId(), + successIpCount = fileExecuteObjectTaskService.getSuccessTaskCount(stepInstanceDTO.getId(), stepInstanceDTO.getExecuteCount()); } variablesMap.put("task.step.failed_cnt", String.valueOf(totalTargetIpCount - successIpCount));