Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 作业驱逐功能纳入配额管理 #3090 #3091

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,23 @@

package com.tencent.bk.job.execute.engine.evict;

import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.common.util.TaskCostCalculator;
import com.tencent.bk.job.execute.model.StepInstanceBaseDTO;
import com.tencent.bk.job.execute.model.TaskInstanceDTO;
import com.tencent.bk.job.execute.service.StepInstanceService;
import com.tencent.bk.job.execute.service.TaskInstanceService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 任务驱逐策略执行器,用于判定某个任务在当前驱逐策略下是否应当被驱逐,提供任务被驱逐后更新任务相关状态的方法
* 任务驱逐策略执行器,用于判定某个任务在当前驱逐策略下是否应当被驱逐
*/
@Slf4j
@Component
public class TaskEvictPolicyExecutor {

private final TaskEvictPolicyManager taskEvictPolicyManager;
private final TaskInstanceService taskInstanceService;
private final StepInstanceService stepInstanceService;

@Autowired
public TaskEvictPolicyExecutor(TaskEvictPolicyManager taskEvictPolicyManager,
TaskInstanceService taskInstanceService,
StepInstanceService stepInstanceService) {
public TaskEvictPolicyExecutor(TaskEvictPolicyManager taskEvictPolicyManager) {
this.taskEvictPolicyManager = taskEvictPolicyManager;
this.taskInstanceService = taskInstanceService;
this.stepInstanceService = stepInstanceService;
}

/**
Expand All @@ -67,58 +56,4 @@ public boolean shouldEvictTask(TaskInstanceDTO taskInstance) {
}
return policy.needToEvict(taskInstance);
}

/**
* 更新被驱逐的任务相关状态为被丢弃状态
*
* @param taskInstance 任务实例
* @param stepInstance 步骤实例
*/
public void updateEvictedTaskStatus(TaskInstanceDTO taskInstance, StepInstanceBaseDTO stepInstance) {
long endTime = System.currentTimeMillis();
Long taskInstanceId = stepInstance.getTaskInstanceId();
// 将进行中的被驱逐任务的步骤实例状态更新为“被丢弃”状态
if (!RunStatusEnum.isFinishedStatus(stepInstance.getStatus())) {
long totalTime = TaskCostCalculator.calculate(
stepInstance.getStartTime(),
endTime,
stepInstance.getTotalTime()
);
stepInstanceService.updateStepExecutionInfo(
stepInstance.getId(),
RunStatusEnum.ABANDONED,
null,
endTime,
totalTime
);
} else {
log.info(
"stepInstance {} already enter a final state:{}",
stepInstance.getId(),
stepInstance.getStatus()
);
}
// 将进行中的被驱逐任务外层状态更新为“被丢弃”状态
if (!RunStatusEnum.isFinishedStatus(taskInstance.getStatus())) {
long totalTime = TaskCostCalculator.calculate(
taskInstance.getStartTime(),
endTime,
taskInstance.getTotalTime()
);
taskInstanceService.updateTaskExecutionInfo(
taskInstanceId,
RunStatusEnum.ABANDONED,
null,
null,
endTime,
totalTime
);
} else {
log.info(
"taskInstance {} already enter a final state:{}",
taskInstanceId,
taskInstance.getStatus()
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.tencent.bk.job.common.constant.ErrorCode;
import com.tencent.bk.job.common.exception.InternalException;
import com.tencent.bk.job.common.gse.GseClient;
import com.tencent.bk.job.common.redis.util.LockUtils;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.common.constants.StepExecuteTypeEnum;
Expand All @@ -39,37 +38,25 @@
import com.tencent.bk.job.execute.engine.listener.event.EventSource;
import com.tencent.bk.job.execute.engine.listener.event.StepEvent;
import com.tencent.bk.job.execute.engine.listener.event.TaskExecuteMQEventDispatcher;
import com.tencent.bk.job.execute.engine.result.ResultHandleManager;
import com.tencent.bk.job.execute.engine.result.ha.ResultHandleTaskKeepaliveManager;
import com.tencent.bk.job.execute.engine.util.RunningTaskCounter;
import com.tencent.bk.job.execute.engine.variable.JobBuildInVariableResolver;
import com.tencent.bk.job.execute.model.GseTaskDTO;
import com.tencent.bk.job.execute.model.StepInstanceDTO;
import com.tencent.bk.job.execute.model.TaskInstanceDTO;
import com.tencent.bk.job.execute.monitor.ExecuteMetricNames;
import com.tencent.bk.job.execute.monitor.metrics.ExecuteMonitor;
import com.tencent.bk.job.execute.monitor.metrics.GseTasksExceptionCounter;
import com.tencent.bk.job.execute.service.AccountService;
import com.tencent.bk.job.execute.service.AgentService;
import com.tencent.bk.job.execute.service.FileExecuteObjectTaskService;
import com.tencent.bk.job.execute.service.GseTaskService;
import com.tencent.bk.job.execute.service.LogService;
import com.tencent.bk.job.execute.service.ScriptExecuteObjectTaskService;
import com.tencent.bk.job.execute.service.StepInstanceService;
import com.tencent.bk.job.execute.service.StepInstanceVariableValueService;
import com.tencent.bk.job.execute.service.TaskInstanceService;
import com.tencent.bk.job.execute.service.TaskInstanceVariableService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import org.springframework.util.StopWatch;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -80,34 +67,19 @@
@Slf4j
public class GseTaskManager implements SmartLifecycle {
private final EngineDependentServiceHolder engineDependentServiceHolder;
private final ResultHandleManager resultHandleManager;
private final TaskInstanceService taskInstanceService;
private final GseTaskService gseTaskService;
private final ScriptExecuteObjectTaskService scriptExecuteObjectTaskService;
private final FileExecuteObjectTaskService fileExecuteObjectTaskService;
private final TaskExecuteMQEventDispatcher taskExecuteMQEventDispatcher;
private final AccountService accountService;
private final LogService logService;
private final TaskInstanceVariableService taskInstanceVariableService;
private final StepInstanceVariableValueService stepInstanceVariableValueService;
private final AgentService agentService;
private final ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager;
private final JobBuildInVariableResolver jobBuildInVariableResolver;
private final Tracer tracer;
private final ExecuteMonitor executeMonitor;
private final FileDistributeConfig fileDistributeConfig;
private final JobExecuteConfig jobExecuteConfig;
private final TaskEvictPolicyExecutor taskEvictPolicyExecutor;
private final GseTasksExceptionCounter gseTasksExceptionCounter;
private final StepInstanceService stepInstanceService;
private final GseClient gseClient;

private final Object lifecycleMonitor = new Object();
private final RunningTaskCounter<String> counter = new RunningTaskCounter<>("GseTask-Counter");
/**
* 正在执行中的任务
*/
private final Map<String, AbstractGseTaskStartCommand> startingGseTasks = new ConcurrentHashMap<>();
private volatile boolean running = false;
private volatile boolean active = false;

Expand Down Expand Up @@ -137,49 +109,27 @@ public class GseTaskManager implements SmartLifecycle {
*/
@Autowired
public GseTaskManager(EngineDependentServiceHolder engineDependentServiceHolder,
ResultHandleManager resultHandleManager,
TaskInstanceService taskInstanceService,
GseTaskService gseTaskService,
TaskExecuteMQEventDispatcher taskExecuteMQEventDispatcher,
AccountService accountService,
LogService logService,
TaskInstanceVariableService taskInstanceVariableService,
StepInstanceVariableValueService stepInstanceVariableValueService,
JobBuildInVariableResolver jobBuildInVariableResolver,
FileDistributeConfig fileDistributeConfig,
AgentService agentService,
ResultHandleTaskKeepaliveManager resultHandleTaskKeepaliveManager,
GseTasksExceptionCounter gseTasksExceptionCounter,
Tracer tracer,
ExecuteMonitor executeMonitor,
JobExecuteConfig jobExecuteConfig,
TaskEvictPolicyExecutor taskEvictPolicyExecutor,
ScriptExecuteObjectTaskService scriptExecuteObjectTaskService,
FileExecuteObjectTaskService fileExecuteObjectTaskService,
StepInstanceService stepInstanceService,
GseClient gseClient) {
StepInstanceService stepInstanceService) {
this.engineDependentServiceHolder = engineDependentServiceHolder;
this.resultHandleManager = resultHandleManager;
this.taskInstanceService = taskInstanceService;
this.gseTaskService = gseTaskService;
this.scriptExecuteObjectTaskService = scriptExecuteObjectTaskService;
this.fileExecuteObjectTaskService = fileExecuteObjectTaskService;
this.taskExecuteMQEventDispatcher = taskExecuteMQEventDispatcher;
this.accountService = accountService;
this.logService = logService;
this.taskInstanceVariableService = taskInstanceVariableService;
this.stepInstanceVariableValueService = stepInstanceVariableValueService;
this.jobBuildInVariableResolver = jobBuildInVariableResolver;
this.fileDistributeConfig = fileDistributeConfig;
this.agentService = agentService;
this.resultHandleTaskKeepaliveManager = resultHandleTaskKeepaliveManager;
this.gseTasksExceptionCounter = gseTasksExceptionCounter;
this.tracer = tracer;
this.executeMonitor = executeMonitor;
this.jobExecuteConfig = jobExecuteConfig;
this.taskEvictPolicyExecutor = taskEvictPolicyExecutor;
this.stepInstanceService = stepInstanceService;
this.gseClient = gseClient;
}

/**
Expand Down Expand Up @@ -219,7 +169,7 @@ public void startTask(GseTaskDTO gseTask, String requestId) {
// 如果任务应当被驱逐,直接置为被丢弃状态
if (taskEvictPolicyExecutor.shouldEvictTask(taskInstance)) {
log.warn("Evict job, taskInstanceId: {}, gseTask: {}", taskInstance.getId(), taskName);
taskEvictPolicyExecutor.updateEvictedTaskStatus(taskInstance, stepInstance);
finishGseTask(gseTask, RunStatusEnum.ABANDONED);
watch.stop();
return;
}
Expand All @@ -228,11 +178,7 @@ public void startTask(GseTaskDTO gseTask, String requestId) {
if (taskInstance.getStatus() == RunStatusEnum.STOPPING) {
log.info("Task instance status is stopping, stop executing the step! taskInstanceId:{}, "
+ "stepInstanceId:{}", taskInstance.getId(), stepInstance.getId());
gseTask.setStatus(RunStatusEnum.STOP_SUCCESS.getValue());
gseTaskService.updateGseTask(gseTask);
taskExecuteMQEventDispatcher.dispatchStepEvent(StepEvent.refreshStep(stepInstanceId,
EventSource.buildGseTaskEventSource(stepInstanceId, stepInstance.getExecuteCount(),
stepInstance.getBatch(), gseTask.getId())));
finishGseTask(gseTask, RunStatusEnum.STOP_SUCCESS);
watch.stop();
return;
}
Expand All @@ -244,7 +190,7 @@ public void startTask(GseTaskDTO gseTask, String requestId) {

watch.start("executeTask");
counter.add(taskName);
executeTask(startCommand, gseTask);
executeTask(startCommand);
watch.stop();
success = true;
} finally {
Expand All @@ -263,6 +209,20 @@ public void startTask(GseTaskDTO gseTask, String requestId) {
}
}

private void finishGseTask(GseTaskDTO gseTask, RunStatusEnum status) {
gseTask.setStatus(status.getValue());
long endTime = System.currentTimeMillis();
gseTask.setEndTime(endTime);
if (gseTask.getStartTime() == null) {
gseTask.setStartTime(endTime);
}
gseTask.setTotalTime(endTime - gseTask.getStartTime());
gseTaskService.updateGseTask(gseTask);
taskExecuteMQEventDispatcher.dispatchStepEvent(StepEvent.refreshStep(gseTask.getStepInstanceId(),
EventSource.buildGseTaskEventSource(gseTask.getStepInstanceId(), gseTask.getExecuteCount(),
gseTask.getBatch(), gseTask.getId())));
}

private String buildGseTaskLockKey(GseTaskDTO gseTask) {
return "job:running:gse:task:" + gseTask.getId();
}
Expand Down Expand Up @@ -361,14 +321,11 @@ private AbstractGseTaskStartCommand initGseTaskStartCommand(String requestId,
return gseTaskStartCommand;
}

private void executeTask(AbstractGseTaskStartCommand startCommand,
GseTaskDTO gseTask) {
private void executeTask(AbstractGseTaskStartCommand startCommand) {
try {
startingGseTasks.put(gseTask.getTaskUniqueName(), startCommand);
incrementRunningTasksCount(startCommand);
startCommand.execute();
} finally {
startingGseTasks.remove(gseTask.getTaskUniqueName());
decrementRunningTasksCount(startCommand);
}
}
Expand Down Expand Up @@ -491,33 +448,6 @@ public int getPhase() {
return DestroyOrder.GSE_TASK_HANDLER;
}

/**
* 返回正在执行的任务数量
*
* @return 任务数量
*/
public int getRunningTaskCount() {
return this.runningTasks.get();
}

/**
* 返回正在执行的文件任务数量
*
* @return 任务数量
*/
public int getRunningFileTaskCount() {
return this.runningFileTasks.get();
}

/**
* 返回正在执行的脚本任务数量
*
* @return 任务数量
*/
public int getRunningScriptTaskCount() {
return this.runningScriptTasks.get();
}

/**
* 返回累计处理的文件任务数量
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ protected final void handleStartGseTaskError(GseTaskResponse gseTaskResponse) {
now);
scriptLogs.add(scriptLog);

// AgentTask 结果更新
// ExecuteObjectTask 结果更新
executeObjectTask.setGseTaskId(gseTask.getId());
executeObjectTask.setStartTime(gseTask.getStartTime());
executeObjectTask.setEndTime(now);
Expand Down
Loading
Loading