diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyExecutor.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyExecutor.java index 81da35dbca..4c3f2efbfb 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyExecutor.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/evict/TaskEvictPolicyExecutor.java @@ -24,34 +24,23 @@ package com.tencent.bk.job.execute.engine.evict; -import com.tencent.bk.job.execute.common.constants.RunStatusEnum; -import com.tencent.bk.job.execute.common.util.TaskCostCalculator; -import com.tencent.bk.job.execute.model.StepInstanceBaseDTO; import com.tencent.bk.job.execute.model.TaskInstanceDTO; -import com.tencent.bk.job.execute.service.StepInstanceService; -import com.tencent.bk.job.execute.service.TaskInstanceService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** - * 任务驱逐策略执行器,用于判定某个任务在当前驱逐策略下是否应当被驱逐,提供任务被驱逐后更新任务相关状态的方法 + * 任务驱逐策略执行器,用于判定某个任务在当前驱逐策略下是否应当被驱逐 */ @Slf4j @Component public class TaskEvictPolicyExecutor { private final TaskEvictPolicyManager taskEvictPolicyManager; - private final TaskInstanceService taskInstanceService; - private final StepInstanceService stepInstanceService; @Autowired - public TaskEvictPolicyExecutor(TaskEvictPolicyManager taskEvictPolicyManager, - TaskInstanceService taskInstanceService, - StepInstanceService stepInstanceService) { + public TaskEvictPolicyExecutor(TaskEvictPolicyManager taskEvictPolicyManager) { this.taskEvictPolicyManager = taskEvictPolicyManager; - this.taskInstanceService = taskInstanceService; - this.stepInstanceService = stepInstanceService; } /** @@ -67,58 +56,4 @@ public boolean shouldEvictTask(TaskInstanceDTO taskInstance) { } return policy.needToEvict(taskInstance); } - - /** - * 更新被驱逐的任务相关状态为被丢弃状态 - * - * @param taskInstance 任务实例 - * @param stepInstance 步骤实例 - */ - public void updateEvictedTaskStatus(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) { - long endTime = System.currentTimeMillis(); - Long taskInstanceId = stepInstance.getTaskInstanceId(); - // 将进行中的被驱逐任务的步骤实例状态更新为“被丢弃”状态 - if (!RunStatusEnum.isFinishedStatus(stepInstance.getStatus())) { - long totalTime = TaskCostCalculator.calculate( - stepInstance.getStartTime(), - endTime, - stepInstance.getTotalTime() - ); - stepInstanceService.updateStepExecutionInfo( - stepInstance.getId(), - RunStatusEnum.ABANDONED, - null, - endTime, - totalTime - ); - } else { - log.info( - "stepInstance {} already enter a final state:{}", - stepInstance.getId(), - stepInstance.getStatus() - ); - } - // 将进行中的被驱逐任务外层状态更新为“被丢弃”状态 - if (!RunStatusEnum.isFinishedStatus(taskInstance.getStatus())) { - long totalTime = TaskCostCalculator.calculate( - taskInstance.getStartTime(), - endTime, - taskInstance.getTotalTime() - ); - taskInstanceService.updateTaskExecutionInfo( - taskInstanceId, - RunStatusEnum.ABANDONED, - null, - null, - endTime, - totalTime - ); - } else { - log.info( - "taskInstance {} already enter a final state:{}", - taskInstanceId, - taskInstance.getStatus() - ); - } - } } diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/GseTaskManager.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/GseTaskManager.java index 4f2e592bd5..21825de1da 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/GseTaskManager.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/GseTaskManager.java @@ -26,7 +26,6 @@ import com.tencent.bk.job.common.constant.ErrorCode; import com.tencent.bk.job.common.exception.InternalException; -import com.tencent.bk.job.common.gse.GseClient; import com.tencent.bk.job.common.redis.util.LockUtils; import com.tencent.bk.job.execute.common.constants.RunStatusEnum; import com.tencent.bk.job.execute.common.constants.StepExecuteTypeEnum; @@ -39,37 +38,25 @@ import com.tencent.bk.job.execute.engine.listener.event.EventSource; import com.tencent.bk.job.execute.engine.listener.event.StepEvent; import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteMQEventDispatcher; -import com.tencent.bk.job.execute.engine.result.ResultHandleManager; -import com.tencent.bk.job.execute.engine.result.ha.ResultHandleTaskKeepaliveManager; import com.tencent.bk.job.execute.engine.util.RunningTaskCounter; -import com.tencent.bk.job.execute.engine.variable.JobBuildInVariableResolver; import com.tencent.bk.job.execute.model.GseTaskDTO; import com.tencent.bk.job.execute.model.StepInstanceDTO; import com.tencent.bk.job.execute.model.TaskInstanceDTO; import com.tencent.bk.job.execute.monitor.ExecuteMetricNames; import com.tencent.bk.job.execute.monitor.metrics.ExecuteMonitor; -import com.tencent.bk.job.execute.monitor.metrics.GseTasksExceptionCounter; -import com.tencent.bk.job.execute.service.AccountService; -import com.tencent.bk.job.execute.service.AgentService; import com.tencent.bk.job.execute.service.FileExecuteObjectTaskService; import com.tencent.bk.job.execute.service.GseTaskService; -import com.tencent.bk.job.execute.service.LogService; import com.tencent.bk.job.execute.service.ScriptExecuteObjectTaskService; import com.tencent.bk.job.execute.service.StepInstanceService; -import com.tencent.bk.job.execute.service.StepInstanceVariableValueService; import com.tencent.bk.job.execute.service.TaskInstanceService; -import com.tencent.bk.job.execute.service.TaskInstanceVariableService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.sleuth.Tracer; import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; -import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -80,34 +67,19 @@ @Slf4j public class GseTaskManager implements SmartLifecycle { private final EngineDependentServiceHolder engineDependentServiceHolder; - private final ResultHandleManager resultHandleManager; private final TaskInstanceService taskInstanceService; private final GseTaskService gseTaskService; private final ScriptExecuteObjectTaskService scriptExecuteObjectTaskService; private final FileExecuteObjectTaskService fileExecuteObjectTaskService; private final TaskExecuteMQEventDispatcher taskExecuteMQEventDispatcher; - private final AccountService accountService; - private final LogService logService; - private final TaskInstanceVariableService taskInstanceVariableService; - private final StepInstanceVariableValueService stepInstanceVariableValueService; - private final AgentService agentService; - private final ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager; - private final JobBuildInVariableResolver jobBuildInVariableResolver; - private final Tracer tracer; private final ExecuteMonitor executeMonitor; private final FileDistributeConfig fileDistributeConfig; private final JobExecuteConfig jobExecuteConfig; private final TaskEvictPolicyExecutor taskEvictPolicyExecutor; - private final GseTasksExceptionCounter gseTasksExceptionCounter; private final StepInstanceService stepInstanceService; - private final GseClient gseClient; private final Object lifecycleMonitor = new Object(); private final RunningTaskCounter counter = new RunningTaskCounter<>("GseTask-Counter"); - /** - * 正在执行中的任务 - */ - private final Map startingGseTasks = new ConcurrentHashMap<>(); private volatile boolean running = false; private volatile boolean active = false; @@ -137,49 +109,27 @@ public class GseTaskManager implements SmartLifecycle { */ @Autowired public GseTaskManager(EngineDependentServiceHolder engineDependentServiceHolder, - ResultHandleManager resultHandleManager, TaskInstanceService taskInstanceService, GseTaskService gseTaskService, TaskExecuteMQEventDispatcher taskExecuteMQEventDispatcher, - AccountService accountService, - LogService logService, - TaskInstanceVariableService taskInstanceVariableService, - StepInstanceVariableValueService stepInstanceVariableValueService, - JobBuildInVariableResolver jobBuildInVariableResolver, FileDistributeConfig fileDistributeConfig, - AgentService agentService, - ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager, - GseTasksExceptionCounter gseTasksExceptionCounter, - Tracer tracer, ExecuteMonitor executeMonitor, JobExecuteConfig jobExecuteConfig, TaskEvictPolicyExecutor taskEvictPolicyExecutor, ScriptExecuteObjectTaskService scriptExecuteObjectTaskService, FileExecuteObjectTaskService fileExecuteObjectTaskService, - StepInstanceService stepInstanceService, - GseClient gseClient) { + StepInstanceService stepInstanceService) { this.engineDependentServiceHolder = engineDependentServiceHolder; - this.resultHandleManager = resultHandleManager; this.taskInstanceService = taskInstanceService; this.gseTaskService = gseTaskService; this.scriptExecuteObjectTaskService = scriptExecuteObjectTaskService; this.fileExecuteObjectTaskService = fileExecuteObjectTaskService; this.taskExecuteMQEventDispatcher = taskExecuteMQEventDispatcher; - this.accountService = accountService; - this.logService = logService; - this.taskInstanceVariableService = taskInstanceVariableService; - this.stepInstanceVariableValueService = stepInstanceVariableValueService; - this.jobBuildInVariableResolver = jobBuildInVariableResolver; this.fileDistributeConfig = fileDistributeConfig; - this.agentService = agentService; - this.resultHandleTaskKeepaliveManager = resultHandleTaskKeepaliveManager; - this.gseTasksExceptionCounter = gseTasksExceptionCounter; - this.tracer = tracer; this.executeMonitor = executeMonitor; this.jobExecuteConfig = jobExecuteConfig; this.taskEvictPolicyExecutor = taskEvictPolicyExecutor; this.stepInstanceService = stepInstanceService; - this.gseClient = gseClient; } /** @@ -219,7 +169,7 @@ public void startTask(GseTaskDTO gseTask, String requestId) { // 如果任务应当被驱逐,直接置为被丢弃状态 if (taskEvictPolicyExecutor.shouldEvictTask(taskInstance)) { log.warn("Evict job, taskInstanceId: {}, gseTask: {}", taskInstance.getId(), taskName); - taskEvictPolicyExecutor.updateEvictedTaskStatus(taskInstance, stepInstance); + finishGseTask(gseTask, RunStatusEnum.ABANDONED); watch.stop(); return; } @@ -228,11 +178,7 @@ public void startTask(GseTaskDTO gseTask, String requestId) { if (taskInstance.getStatus() == RunStatusEnum.STOPPING) { log.info("Task instance status is stopping, stop executing the step! taskInstanceId:{}, " + "stepInstanceId:{}", taskInstance.getId(), stepInstance.getId()); - gseTask.setStatus(RunStatusEnum.STOP_SUCCESS.getValue()); - gseTaskService.updateGseTask(gseTask); - taskExecuteMQEventDispatcher.dispatchStepEvent(StepEvent.refreshStep(stepInstanceId, - EventSource.buildGseTaskEventSource(stepInstanceId, stepInstance.getExecuteCount(), - stepInstance.getBatch(), gseTask.getId()))); + finishGseTask(gseTask, RunStatusEnum.STOP_SUCCESS); watch.stop(); return; } @@ -244,7 +190,7 @@ public void startTask(GseTaskDTO gseTask, String requestId) { watch.start("executeTask"); counter.add(taskName); - executeTask(startCommand, gseTask); + executeTask(startCommand); watch.stop(); success = true; } finally { @@ -263,6 +209,20 @@ public void startTask(GseTaskDTO gseTask, String requestId) { } } + private void finishGseTask(GseTaskDTO gseTask, RunStatusEnum status) { + gseTask.setStatus(status.getValue()); + long endTime = System.currentTimeMillis(); + gseTask.setEndTime(endTime); + if (gseTask.getStartTime() == null) { + gseTask.setStartTime(endTime); + } + gseTask.setTotalTime(endTime - gseTask.getStartTime()); + gseTaskService.updateGseTask(gseTask); + taskExecuteMQEventDispatcher.dispatchStepEvent(StepEvent.refreshStep(gseTask.getStepInstanceId(), + EventSource.buildGseTaskEventSource(gseTask.getStepInstanceId(), gseTask.getExecuteCount(), + gseTask.getBatch(), gseTask.getId()))); + } + private String buildGseTaskLockKey(GseTaskDTO gseTask) { return "job:running:gse:task:" + gseTask.getId(); } @@ -361,14 +321,11 @@ private AbstractGseTaskStartCommand initGseTaskStartCommand(String requestId, return gseTaskStartCommand; } - private void executeTask(AbstractGseTaskStartCommand startCommand, - GseTaskDTO gseTask) { + private void executeTask(AbstractGseTaskStartCommand startCommand) { try { - startingGseTasks.put(gseTask.getTaskUniqueName(), startCommand); incrementRunningTasksCount(startCommand); startCommand.execute(); } finally { - startingGseTasks.remove(gseTask.getTaskUniqueName()); decrementRunningTasksCount(startCommand); } } @@ -491,33 +448,6 @@ public int getPhase() { return DestroyOrder.GSE_TASK_HANDLER; } - /** - * 返回正在执行的任务数量 - * - * @return 任务数量 - */ - public int getRunningTaskCount() { - return this.runningTasks.get(); - } - - /** - * 返回正在执行的文件任务数量 - * - * @return 任务数量 - */ - public int getRunningFileTaskCount() { - return this.runningFileTasks.get(); - } - - /** - * 返回正在执行的脚本任务数量 - * - * @return 任务数量 - */ - public int getRunningScriptTaskCount() { - return this.runningScriptTasks.get(); - } - /** * 返回累计处理的文件任务数量 * diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/ScriptGseTaskStartCommand.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/ScriptGseTaskStartCommand.java index fd2daf4edb..911e9e0524 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/ScriptGseTaskStartCommand.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/executor/ScriptGseTaskStartCommand.java @@ -635,7 +635,7 @@ protected final void handleStartGseTaskError(GseTaskResponse gseTaskResponse) { now); scriptLogs.add(scriptLog); - // AgentTask 结果更新 + // ExecuteObjectTask 结果更新 executeObjectTask.setGseTaskId(gseTask.getId()); executeObjectTask.setStartTime(gseTask.getStartTime()); executeObjectTask.setEndTime(now); diff --git a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java index 7e2dc42f86..b50757cde8 100644 --- a/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java +++ b/src/backend/job-execute/service-job-execute/src/main/java/com/tencent/bk/job/execute/engine/result/AbstractResultHandleTask.java @@ -289,11 +289,10 @@ private boolean checkAndEvictTaskIfNeed(StopWatch watch) { watch.start("check-evict-task"); if (taskEvictPolicyExecutor.shouldEvictTask(taskInstance)) { log.info("taskInstance {} evicted", taskInstance.getId()); - // 更新任务与步骤状态 - taskEvictPolicyExecutor.updateEvictedTaskStatus(taskInstance, stepInstance); // 停止日志拉取调度 this.executeResult = GseTaskExecuteResult.DISCARDED; - finishGseTask(this.executeResult, false); + finishGseTask(this.executeResult); + watch.stop(); return true; } @@ -379,13 +378,13 @@ public void execute() { // 如果任务已结束 if (this.executeResult != GseTaskExecuteResult.RUNNING) { watch.start("finish-gse-task"); - finishGseTask(this.executeResult, true); + finishGseTask(this.executeResult); watch.stop(); } } catch (Throwable e) { log.error("[" + gseTaskInfo + "]: result handle error.", e); this.executeResult = GseTaskExecuteResult.EXCEPTION; - finishGseTask(this.executeResult, true); + finishGseTask(this.executeResult); } finally { this.isRunning = false; LockUtils.releaseDistributedLock(lockKey, requestId); @@ -471,7 +470,7 @@ private boolean checkTaskTimeout() { this.executeResult = GseTaskExecuteResult.FAILED; saveFailInfoForUnfinishedExecuteObjectTask(ExecuteObjectTaskStatusEnum.LOG_ERROR, "Task execution may be abnormal or timeout."); - finishGseTask(GseTaskExecuteResult.FAILED, true); + finishGseTask(GseTaskExecuteResult.FAILED); isTimeout = true; } return isTimeout; @@ -494,7 +493,7 @@ private boolean checkEmptyGseResult(GseTaskResult gseTaskResult) { saveFailInfoForUnfinishedExecuteObjectTask(ExecuteObjectTaskStatusEnum.LOG_ERROR, "Execution result " + "log " + "always empty."); - finishGseTask(GseTaskExecuteResult.FAILED, true); + finishGseTask(GseTaskExecuteResult.FAILED); isAbnormal = true; } } else { @@ -535,10 +534,9 @@ protected void dealTargetExecuteObjectFinish(ExecuteObjectGseKey executeObjectGs /** * 设置GSE TASK 完成状态并分发事件 * - * @param result 任务执行结果 - * @param dispatchRefreshEvent 是否分发Refresh事件 + * @param result 任务执行结果 */ - private void finishGseTask(GseTaskExecuteResult result, boolean dispatchRefreshEvent) { + private void finishGseTask(GseTaskExecuteResult result) { int gseTaskExecuteResult = result.getResultCode(); // 处理GSE任务执行结果 @@ -550,15 +548,12 @@ private void finishGseTask(GseTaskExecuteResult result, boolean dispatchRefreshE updateGseTaskExecutionInfo(result, endTime, gseTotalTime); - if (dispatchRefreshEvent) { - taskExecuteMQEventDispatcher.dispatchStepEvent(StepEvent.refreshStep(stepInstanceId, - EventSource.buildGseTaskEventSource(stepInstanceId, stepInstance.getExecuteCount(), - stepInstance.getBatch(), gseTask.getId()))); - } + taskExecuteMQEventDispatcher.dispatchStepEvent(StepEvent.refreshStep(stepInstanceId, + EventSource.buildGseTaskEventSource(stepInstanceId, stepInstance.getExecuteCount(), + stepInstance.getBatch(), gseTask.getId()))); } - private void updateGseTaskExecutionInfo(GseTaskExecuteResult result, long endTime, - long totalTime) { + private void updateGseTaskExecutionInfo(GseTaskExecuteResult result, long endTime, long totalTime) { gseTask.setStatus(analyseGseTaskStatus(result).getValue()); gseTask.setEndTime(endTime);