Skip to content

Commit

Permalink
feat: 执行引擎任务调度配额限制 TencentBlueKing#261
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Jun 27, 2024
1 parent 26cf457 commit 0d57697
Show file tree
Hide file tree
Showing 16 changed files with 139 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public class ResourceQuotaLimitProperties {
@Data
@NoArgsConstructor
public static class ResourceQuotaLimitProp {
/**
* 是否启用配额限制
*/
private boolean enabled;
/**
* 配额容量(比如任务数量、存储占用大小等)
*/
Expand All @@ -67,9 +71,11 @@ public static class ResourceQuotaLimitProp {
*/
private QuotaLimitProp appQuotaLimit;

public ResourceQuotaLimitProp(String capacity,
public ResourceQuotaLimitProp(boolean enabled,
String capacity,
QuotaLimitProp resourceScopeQuotaLimit,
QuotaLimitProp appQuotaLimit) {
this.enabled = enabled;
this.capacity = capacity;
this.resourceScopeQuotaLimit = resourceScopeQuotaLimit;
this.appQuotaLimit = appQuotaLimit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public ResourceQuotaLimit parse(ResourceQuotaLimitProperties.ResourceQuotaLimitP

ResourceQuotaLimit resourceQuota = new ResourceQuotaLimit();
try {
resourceQuota.setEnabled(resourceQuotaLimitProp.isEnabled());
resourceQuota.setCapacityExpr(resourceQuotaLimitProp.getCapacity());
Long capacity = parseCapacity(resourceQuotaLimitProp.getCapacity());
resourceQuota.setCapacity(capacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ void parseByPercentageLimitValue() {
ResourceQuotaLimitProperties.QuotaLimitProp appQuotaLimit =
new ResourceQuotaLimitProperties.QuotaLimitProp("2%", "bk-soap=20%,bk-nodeman=10%");
ResourceQuotaLimitProperties.ResourceQuotaLimitProp resourceQuotaLimitProp =
new ResourceQuotaLimitProperties.ResourceQuotaLimitProp("1000", resourceScopeQuotaLimit, appQuotaLimit);
new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(true, "1000", resourceScopeQuotaLimit,
appQuotaLimit);

ResourceQuotaLimit resourceQuota = parser.parse(resourceQuotaLimitProp);

Expand Down Expand Up @@ -75,7 +76,7 @@ void parseByExplicitLimitValue() {
ResourceQuotaLimitProperties.QuotaLimitProp appQuotaLimit =
new ResourceQuotaLimitProperties.QuotaLimitProp("20", "bk-soap=100,bk-nodeman=150");
ResourceQuotaLimitProperties.ResourceQuotaLimitProp resourceQuotaLimitProp =
new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(null, resourceScopeQuotaLimit, appQuotaLimit);
new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(true, null, resourceScopeQuotaLimit, appQuotaLimit);

ResourceQuotaLimit resourceQuota = parser.parse(resourceQuotaLimitProp);

Expand Down Expand Up @@ -103,34 +104,39 @@ void parseInvalidConfig() {

assertThatThrownBy(() -> parser.parse(
new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(
true,
"",
new ResourceQuotaLimitProperties.QuotaLimitProp("2%", null),
null)))
.isInstanceOf(ResourceQuotaConfigParseException.class);

assertThatThrownBy(() -> parser.parse(
new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(
true,
"",
null,
new ResourceQuotaLimitProperties.QuotaLimitProp("2%", null))))
.isInstanceOf(ResourceQuotaConfigParseException.class);

assertThatThrownBy(() -> parser.parse(
new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(
true,
"1000",
new ResourceQuotaLimitProperties.QuotaLimitProp("2%", "error_custom_settings"),
null)))
.isInstanceOf(ResourceQuotaConfigParseException.class);

assertThatThrownBy(() -> parser.parse(
new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(
true,
"1000",
null,
new ResourceQuotaLimitProperties.QuotaLimitProp("2%", "error_custom_settings"))))
.isInstanceOf(ResourceQuotaConfigParseException.class);

assertThatThrownBy(() -> parser.parse(
new ResourceQuotaLimitProperties.ResourceQuotaLimitProp(
true,
"1000G",
null,
new ResourceQuotaLimitProperties.QuotaLimitProp("2%", null))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.springframework.stereotype.Component;

/**
* 执行引擎依赖的服务的托管类,用于程序快速获取到依赖的 Service; 而不需要通过构造方法一一传入
* 执行引擎依赖的服务的托管类。用于简化一些较为复杂的类构造函数的参数列表
*/
@Component
@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ public void handleEvent(JobEvent jobEvent) {
log.info("Handle job event, event: {}, duration: {}ms", jobEvent, jobEvent.duration());
long jobInstanceId = jobEvent.getJobInstanceId();
JobActionEnum action = JobActionEnum.valueOf(jobEvent.getAction());
TaskInstanceDTO taskInstance = null;
TaskInstanceDTO taskInstance;
try {
taskInstance = taskInstanceService.getTaskInstance(jobInstanceId);
taskInstance = taskInstanceService.getTaskInstance(jobInstanceId);
switch (action) {
case START:
startJob(taskInstance);
Expand All @@ -115,7 +115,6 @@ public void handleEvent(JobEvent jobEvent) {
} catch (Throwable e) {
String errorMsg = "Handle job event error, jobInstanceId=" + jobInstanceId;
log.error(errorMsg, e);
finishJob(taskInstance, null, RunStatusEnum.ABNORMAL_STATE);
}
}

Expand Down Expand Up @@ -242,34 +241,36 @@ private void finishJob(TaskInstanceDTO taskInstance,
StepInstanceBaseDTO stepInstance,
RunStatusEnum jobStatus) {
long jobInstanceId = taskInstance.getId();
long stepInstanceId = stepInstance.getId();
Long endTime = DateUtils.currentTimeMillis();
long totalTime = TaskCostCalculator.calculate(taskInstance.getStartTime(), endTime,
taskInstance.getTotalTime());
taskInstance.setEndTime(endTime);
taskInstance.setTotalTime(totalTime);
taskInstance.setStatus(jobStatus);
taskInstanceService.updateTaskExecutionInfo(jobInstanceId, jobStatus, null, null, endTime, totalTime);
try {
Long endTime = DateUtils.currentTimeMillis();
long totalTime = TaskCostCalculator.calculate(taskInstance.getStartTime(), endTime,
taskInstance.getTotalTime());
taskInstance.setEndTime(endTime);
taskInstance.setTotalTime(totalTime);
taskInstance.setStatus(jobStatus);
taskInstanceService.updateTaskExecutionInfo(jobInstanceId, jobStatus, null, null, endTime, totalTime);

// 作业执行结果消息通知
if (RunStatusEnum.SUCCESS == jobStatus || RunStatusEnum.IGNORE_ERROR == jobStatus) {
notifyService.asyncSendMQSuccessTaskNotification(taskInstance, stepInstance);
} else {
notifyService.asyncSendMQFailTaskNotification(taskInstance, stepInstance);
}

// 从资源配额中删除该作业实例
runningJobResourceQuotaManager.removeJob(
taskInstance.getAppCode(),
GlobalAppScopeMappingService.get().getScopeByAppId(taskInstance.getAppId()),
jobInstanceId
);
// 触发作业结束统计分析
statisticsService.updateEndJobStatistics(taskInstance);

// 作业执行结果消息通知
if (RunStatusEnum.SUCCESS == jobStatus || RunStatusEnum.IGNORE_ERROR == jobStatus) {
notifyService.asyncSendMQSuccessTaskNotification(taskInstance, stepInstance);
} else {
notifyService.asyncSendMQFailTaskNotification(taskInstance, stepInstance);
// 作业执行完成回调
callback(taskInstance);
} finally {
// 从资源配额中删除该作业实例
runningJobResourceQuotaManager.removeJob(
taskInstance.getAppCode(),
GlobalAppScopeMappingService.get().getScopeByAppId(taskInstance.getAppId()),
jobInstanceId
);
}

// 触发作业结束统计分析
statisticsService.updateEndJobStatistics(taskInstance);

// 作业执行完成回调
callback(taskInstance, jobInstanceId, jobStatus.getValue(), stepInstanceId, stepInstance.getStatus());
}

/**
Expand Down Expand Up @@ -334,19 +335,22 @@ private void startStep(StepInstanceBaseDTO stepInstance) {
}
}

private void callback(TaskInstanceDTO taskInstance, long jobInstanceId, int taskStatus, long currentStepId,
RunStatusEnum stepStatus) {
private void callback(TaskInstanceDTO taskInstance) {
if (StringUtils.isNotBlank(taskInstance.getCallbackUrl())) {
JobCallbackDTO callback = new JobCallbackDTO();
callback.setId(jobInstanceId);
callback.setStatus(taskStatus);
callback.setId(taskInstance.getId());
callback.setStatus(taskInstance.getStatus().getValue());
callback.setCallbackUrl(taskInstance.getCallbackUrl());
Collection<JobCallbackDTO.StepInstanceStatus> stepInstanceList = Lists.newArrayList();
JobCallbackDTO.StepInstanceStatus stepInstance = new JobCallbackDTO.StepInstanceStatus();
stepInstance.setId(currentStepId);
stepInstance.setStatus(stepStatus.getValue());
stepInstanceList.add(stepInstance);
callback.setStepInstances(stepInstanceList);
Collection<JobCallbackDTO.StepInstanceStatus> stepInstanceStatusList = Lists.newArrayList();
List<StepInstanceBaseDTO> stepInstanceList =
stepInstanceService.listBaseStepInstanceByTaskInstanceId(taskInstance.getId());
stepInstanceList.forEach(stepInstance -> {
JobCallbackDTO.StepInstanceStatus stepInstanceStatus = new JobCallbackDTO.StepInstanceStatus();
stepInstanceStatus.setId(stepInstance.getId());
stepInstanceStatus.setStatus(stepInstance.getStatus().getValue());
stepInstanceStatusList.add(stepInstanceStatus);
});
callback.setStepInstances(stepInstanceStatusList);
taskExecuteMQEventDispatcher.dispatchCallbackMsg(callback);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.tencent.bk.job.execute.engine.result.ContinuousScheduledTask;
import com.tencent.bk.job.execute.engine.result.ScheduleStrategy;
import com.tencent.bk.job.execute.engine.result.StopTaskCounter;
import com.tencent.bk.job.execute.engine.result.TaskContext;
import com.tencent.bk.job.execute.model.StepInstanceDTO;
import com.tencent.bk.job.execute.model.TaskInstanceDTO;
import com.tencent.bk.job.execute.service.StepInstanceService;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class FilePrepareControlTask implements ContinuousScheduledTask {
private final FilePrepareTaskResultHandler filePrepareTaskResultHandler;
private final StepInstanceService stepInstanceService;
private final long startTimeMills;
private final TaskContext taskContext;

public FilePrepareControlTask(
FilePrepareService filePrepareService,
Expand All @@ -81,6 +83,7 @@ public FilePrepareControlTask(
this.filePrepareTaskResultHandler = filePrepareTaskResultHandler;
this.stepInstanceService = stepInstanceService;
this.startTimeMills = System.currentTimeMillis();
this.taskContext = new TaskContext(stepInstance.getTaskInstanceId());
}

@Override
Expand Down Expand Up @@ -175,6 +178,11 @@ public String getTaskId() {
return "FilePrepareControlTask-" + stepInstance.getId() + "_" + stepInstance.getExecuteCount();
}

@Override
public TaskContext getTaskContext() {
return taskContext;
}

@Override
public String toString() {
return getTaskId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.tencent.bk.job.execute.engine.result.ContinuousScheduledTask;
import com.tencent.bk.job.execute.engine.result.ScheduleStrategy;
import com.tencent.bk.job.execute.engine.result.StopTaskCounter;
import com.tencent.bk.job.execute.engine.result.TaskContext;
import com.tencent.bk.job.execute.model.AccountDTO;
import com.tencent.bk.job.execute.model.ExecuteTargetDTO;
import com.tencent.bk.job.execute.model.FileDetailDTO;
Expand Down Expand Up @@ -105,6 +106,8 @@ public class ThirdFilePrepareTask implements ContinuousScheduledTask, JobTaskCon
*/
private volatile boolean isStopped = false;

private final TaskContext taskContext;

public ThirdFilePrepareTask(
StepInstanceDTO stepInstance,
List<FileSourceDTO> fileSourceList,
Expand All @@ -116,7 +119,7 @@ public ThirdFilePrepareTask(
this.batchTaskId = batchTaskId;
this.isForRetry = isForRetry;
this.resultHandler = resultHandler;

this.taskContext = new TaskContext(stepInstance.getTaskInstanceId());
}

public void initDependentService(
Expand Down Expand Up @@ -518,6 +521,11 @@ public String getTaskId() {
return "file_source_batch_task:" + this.stepInstance.getId() + ":" + this.stepInstance.getExecuteCount();
}

@Override
public TaskContext getTaskContext() {
return taskContext;
}

@Override
public boolean isForRetry() {
return isForRetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public ExpiredJobDetector(RunningJobResourceQuotaManager runningJobResourceQuota
}

/**
* 兜底方案。为了防止系统异常导致 redis 中的作业没有被清理,需要定时清理。每天触发一次
* 兜底方案。为了防止系统异常导致 redis 中的作业记录没有被清理,需要定时清理。每天触发一次
*/
@Scheduled(cron = "0 0/5 * * * ?")
public void detectExpiredJob() {
Expand All @@ -72,13 +72,14 @@ public void detectExpiredJob() {
expiredJobInstanceIds.forEach(expiredJobInstanceId -> {
TaskInstanceDTO taskInstance = taskInstanceService.getTaskInstance(expiredJobInstanceId);
if (taskInstance != null) {
log.info("Remove expire job : {}", expiredJobInstanceId);
runningJobResourceQuotaManager.removeJob(
taskInstance.getAppCode(),
GlobalAppScopeMappingService.get().getScopeByAppId(taskInstance.getAppId()),
expiredJobInstanceId
);
} else {
log.error("Job instance not found, expiredJobInstanceId : {}", expiredJobInstanceId);
log.error("Job instance record not found, expiredJobInstanceId : {}", expiredJobInstanceId);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* 正在执行中的作业状态维持
* 正在执行中的作业存活状态维持
*/
@Slf4j
@Component
Expand Down Expand Up @@ -99,7 +99,7 @@ public void stopKeepaliveTask(long jobInstanceId) {
}
}

@Scheduled(cron = "0 * * * * ?")
@Scheduled(cron = "* 0/1 * * * ?")
public void refreshTaskKeepaliveInfo() {
log.info("Refresh running job keepalive task start...");
if (runningJobKeepaliveTasks.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ public abstract class AbstractResultHandleTask<T> implements ContinuousScheduled

protected final RunningJobKeepaliveManager runningJobKeepaliveManager;

private TaskContext taskContext;

protected AbstractResultHandleTask(EngineDependentServiceHolder engineDependentServiceHolder,
ExecuteObjectTaskService executeObjectTaskService,
JobExecuteConfig jobExecuteConfig,
Expand All @@ -241,7 +243,7 @@ protected AbstractResultHandleTask(EngineDependentServiceHolder engineDependentS

this.executeObjectTaskService = executeObjectTaskService;
this.jobExecuteConfig = jobExecuteConfig;

this.requestId = requestId;
this.taskInstance = taskInstance;
this.taskInstanceId = taskInstance.getId();
Expand All @@ -254,6 +256,7 @@ protected AbstractResultHandleTask(EngineDependentServiceHolder engineDependentS
this.gseTask = gseTask;
this.executeObjectTasks = executeObjectTasks;
this.gseTaskInfo = buildGseTaskInfo(stepInstance.getTaskInstanceId(), gseTask);
this.taskContext = new TaskContext(taskInstanceId);

targetExecuteObjectTasks.values().forEach(executeObjectTask ->
this.targetExecuteObjectGseKeys.add(executeObjectTask.getExecuteObject().toExecuteObjectGseKey()));
Expand Down Expand Up @@ -627,7 +630,7 @@ private void tryStopImmediately() {
if (!this.isRunning) {
log.info("ResultHandleTask-onStop start, task: {}", gseTaskInfo);
resultHandleTaskKeepaliveManager.stopKeepaliveInfoTask(getTaskId());
runningJobKeepaliveManager.stopKeepaliveTask(getJobInstanceId());
runningJobKeepaliveManager.stopKeepaliveTask(taskInstanceId);

taskExecuteMQEventDispatcher.dispatchResultHandleTaskResumeEvent(
ResultHandleTaskResumeEvent.resume(gseTask.getStepInstanceId(),
Expand Down Expand Up @@ -723,7 +726,8 @@ protected final GseTaskExecuteResult getExecuteResult() {
*/
abstract GseTaskExecuteResult analyseGseTaskResult(GseTaskResult<T> gseTaskResult);

public long getJobInstanceId() {
return this.taskInstanceId;
@Override
public TaskContext getTaskContext() {
return this.taskContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,9 @@ public String toString() {
public String getTaskId() {
return this.task.getTaskId();
}

@Override
public TaskContext getTaskContext() {
return task.getTaskContext();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ public void handleDeliveredTask(ContinuousScheduledTask task) {

if (task instanceof AbstractResultHandleTask) {
resultHandleTaskKeepaliveManager.addRunningTaskKeepaliveInfo(task.getTaskId());
runningJobKeepaliveManager.addKeepaliveTask(((AbstractResultHandleTask<?>) task).getJobInstanceId());
}
runningJobKeepaliveManager.addKeepaliveTask(task.getTaskContext().getJobInstanceId());
this.tasksQueue.add(scheduleTask);
if (task instanceof ScriptResultHandleTask) {
ScriptResultHandleTask scriptResultHandleTask = (ScriptResultHandleTask) task;
Expand Down
Loading

0 comments on commit 0d57697

Please sign in to comment.