Skip to content

Commit

Permalink
feat: 执行引擎任务调度配额限制 TencentBlueKing#261
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Jun 14, 2024
1 parent e7a7438 commit 8fd13f4
Show file tree
Hide file tree
Showing 18 changed files with 271 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
1244029=步骤 [{0}] 的源文件执行对象为空
1244030=作业引用的执行对象不存在。不存在的执行对象个数:{0},执行对象列表[{1}]
1244031=Label Selector 不合法
1244032=当前执行的作业总量超过配额限制

## 业务错误-定时任务(job-crontab)
1245008=删除定时任务失败
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
1244029=Step [{0}] source execute object is empty
1244030=Execute object referenced by the job does not exist. Number of non-existent execution objects: {0}, execution object list: [{1}]
1244031=Invalid Label Selector
1244032=The total number of jobs currently executed exceeds the quota limit

## Business error - job-crontab
1245008=Failed to delete cron
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
1244029=Step [{0}] source execute object is empty
1244030=Execute object referenced by the job does not exist. Number of non-existent execution objects: {0}, execution object list: [{1}]
1244031=Invalid Label Selector
1244032=The total number of jobs currently executed exceeds the quota limit

## Business error - job-crontab
1245008=Failed to delete cron
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
1244029=步骤 [{0}] 的源文件执行对象为空
1244030=作业引用的执行对象不存在。不存在的执行对象个数:{0},执行对象列表[{1}]
1244031=Label Selector 不合法
1244032=当前执行的作业总量超过配额限制

## 业务错误-定时任务(job-crontab)
1245008=删除定时任务失败
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@
1244029=步骤 [{0}] 的源文件执行对象为空
1244030=作业引用的执行对象不存在。不存在的执行对象个数:{0},执行对象列表[{1}]
1244031=Label Selector 不合法
1244032=当前执行的作业总量超过配额限制

## 业务错误-定时任务(job-crontab)
1245008=删除定时任务失败
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.tencent.bk.job.common.service;

import com.tencent.bk.job.common.service.quota.ResourceQuotaStore;
import com.tencent.bk.job.common.util.feature.FeatureStore;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -32,8 +33,9 @@
public class CommonServiceAutoConfiguration {

@Bean
public ConfigRefreshEventListener configRefreshEventListener(FeatureStore featureStore) {
return new ConfigRefreshEventListener(featureStore);
public ConfigRefreshEventListener configRefreshEventListener(FeatureStore featureStore,
ResourceQuotaStore resourceQuotaStore) {
return new ConfigRefreshEventListener(featureStore, resourceQuotaStore);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.tencent.bk.job.common.service;

import com.tencent.bk.job.common.service.quota.ResourceQuotaStore;
import com.tencent.bk.job.common.util.feature.FeatureStore;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -41,8 +42,11 @@ public class ConfigRefreshEventListener {

private final FeatureStore featureStore;

public ConfigRefreshEventListener(FeatureStore featureStore) {
private final ResourceQuotaStore resourceQuotaStore;

public ConfigRefreshEventListener(FeatureStore featureStore, ResourceQuotaStore resourceQuotaStore) {
this.featureStore = featureStore;
this.resourceQuotaStore = resourceQuotaStore;
log.info("Init ConfigRefreshEventListener");
}

Expand All @@ -56,7 +60,7 @@ public void onEvent(EnvironmentChangeEvent event) {
if (log.isInfoEnabled()) {
log.info("Handle EnvironmentChangeEvent, event: {}", printEnvironmentChangeEvent(event));
}
reloadFeatureToggleIfChanged(event.getKeys());
reload(event.getKeys());
}

private String printEnvironmentChangeEvent(EnvironmentChangeEvent event) {
Expand All @@ -67,18 +71,17 @@ private String printEnvironmentChangeEvent(EnvironmentChangeEvent event) {
.toString();
}


/**
* 重载特性开关配置
* 重载配置
*/
private void reloadFeatureToggleIfChanged(Set<String> changedKeys) {
private void reload(Set<String> changedKeys) {
if (CollectionUtils.isEmpty(changedKeys)) {
return;
}
boolean isFeatureToggleConfigChanged =
changedKeys.stream().anyMatch(changedKey -> changedKey.startsWith("job.features."));
if (isFeatureToggleConfigChanged) {
if (changedKeys.stream().anyMatch(changedKey -> changedKey.startsWith("job.features."))) {
featureStore.load(true);
} else if (changedKeys.stream().anyMatch(changedKey -> changedKey.startsWith("job.resourceScopeQuotaLimit."))) {
resourceQuotaStore.load(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public class CounterResourceQuota extends ResourceQuota {
/**
* 解析之后的自定义业务配额限制
*/
private Map<String, Long> resourceScopeLimits = new HashMap<>();
private Map<String, Long> customResourceScopeLimits = new HashMap<>();

public CounterResourceQuota(String capacityExpr, String globalLimitExpr, String customLimitExpr) {
super(capacityExpr, globalLimitExpr, customLimitExpr);
}

public long getLimit(ResourceScope resourceScope) {
String resourceScopeUniqueId = resourceScope.getResourceScopeUniqueId();
Long limit = resourceScopeLimits.get(resourceScopeUniqueId);
Long limit = customResourceScopeLimits.get(resourceScopeUniqueId);
if (limit == null) {
limit = globalLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
package com.tencent.bk.job.common.service.quota;

import com.tencent.bk.job.common.service.quota.config.ResourceQuotaConfig;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;

/**
* 计数类型的配额配置解析
Expand All @@ -34,20 +38,50 @@ public class CounterResourceQuotaConfigParser implements ResourceQuotaConfigPars
@Override
public ResourceQuota parse(ResourceQuotaConfig resourceQuotaConfig) throws ResourceQuotaConfigParseException {
CounterResourceQuota resourceQuota = new CounterResourceQuota(
resourceQuotaConfig.getCapacity(),
resourceQuotaConfig.getGlobalLimit(),
resourceQuotaConfig.getCustomLimit()
resourceQuotaConfig.getCapacity(),
resourceQuotaConfig.getGlobalLimit(),
resourceQuotaConfig.getCustomLimit()
);
try {
Long capacity = Long.parseLong(resourceQuotaConfig.getCapacity());
resourceQuota.setCapacity(capacity);
Long capacity = null;
if (StringUtils.isNotBlank(resourceQuotaConfig.getCapacity())) {
capacity = Long.parseLong(resourceQuotaConfig.getCapacity());
resourceQuota.setCapacity(capacity);
}

long globalLimit = computeLimitValue(capacity, resourceQuotaConfig.getGlobalLimit());
resourceQuota.setGlobalLimit(globalLimit);

String globalLimitExpr = resourceQuotaConfig.getGlobalLimit().trim();
if (globalLimitExpr.endsWith("%"))
String customLimitExpr = resourceQuotaConfig.getCustomLimit();
if (StringUtils.isNotBlank(customLimitExpr)) {
Map<String, Long> resourceScopeLimits = new HashMap<>();

return null;
customLimitExpr = customLimitExpr.trim();
String[] limitExprForResourceScopes = customLimitExpr.split(",");
for (String resourceScopeLimit : limitExprForResourceScopes) {
String[] resourceScopeLimitParts = resourceScopeLimit.split("=");
String resourceScope = resourceScopeLimitParts[0];
String limitExpr = resourceScopeLimitParts[1];
Long limit = computeLimitValue(capacity, limitExpr);
resourceScopeLimits.put(resourceScope, limit);
}
resourceQuota.setCustomResourceScopeLimits(resourceScopeLimits);
}
return resourceQuota;
} catch (Throwable e) {
throw new ResourceQuotaConfigParseException(e);
}
}

private long computeLimitValue(Long capacity, String limitExpr) {
long limit;
if (limitExpr.endsWith("%")) {
String percentageValueStr = limitExpr.substring(0, limitExpr.length() - 1);
int percentageValue = Integer.parseInt(percentageValueStr);
limit = capacity * percentageValue / 100;
} else {
limit = Long.parseLong(limitExpr);
}
return limit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public ResourceScopeResourceQuotaManager resourceScopeResourceQuotaManager(Resou
}

@Bean
public ResourceQuotaLoadApplicationRunner featureLoadApplicationRunner(ResourceQuotaStore resourceQuotaStore) {
public ResourceQuotaLoadApplicationRunner resourceQuotaLoadApplicationRunner(
ResourceQuotaStore resourceQuotaStore) {
return new ResourceQuotaLoadApplicationRunner(resourceQuotaStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
package com.tencent.bk.job.common.service.quota.config;

import lombok.Data;
import lombok.NoArgsConstructor;

/**
* 资源配额限制
*/
@Data
@NoArgsConstructor
public class ResourceQuotaConfig {
/**
* 配额容量表达式,表示整个作业平台能使用的最大资源
Expand All @@ -45,4 +47,10 @@ public class ResourceQuotaConfig {
* 自定义业务配额表达式。会覆盖全局业务配额的配置
*/
private String customLimit;

public ResourceQuotaConfig(String capacity, String globalLimit, String customLimit) {
this.capacity = capacity;
this.globalLimit = globalLimit;
this.customLimit = customLimit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.common.service.quota;

import com.tencent.bk.job.common.service.quota.config.ResourceQuotaConfig;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

class CounterResourceQuotaConfigParserTest {

@Test
void parse() {
CounterResourceQuotaConfigParser parser = new CounterResourceQuotaConfigParser();

ResourceQuotaConfig resourceQuotaConfig = new ResourceQuotaConfig("1000", "1%", "biz:2=100,biz:3=20%");
CounterResourceQuota resourceQuota = (CounterResourceQuota) parser.parse(resourceQuotaConfig);
assertThat(resourceQuota.getCapacityExpr()).isEqualTo("1000");
assertThat(resourceQuota.getGlobalLimitExpr()).isEqualTo("1%");
assertThat(resourceQuota.getCustomLimitExpr()).isEqualTo("biz:2=100,biz:3=20%");
assertThat(resourceQuota.getCapacity()).isEqualTo(1000L);
assertThat(resourceQuota.getGlobalLimit()).isEqualTo(10L);
assertThat(resourceQuota.getCustomResourceScopeLimits()).isNotEmpty();
assertThat(resourceQuota.getCustomResourceScopeLimits().get("biz:2")).isEqualTo(100L);
assertThat(resourceQuota.getCustomResourceScopeLimits().get("biz:3")).isEqualTo(200L);

resourceQuotaConfig = new ResourceQuotaConfig("1000", "10", "biz:2=100,biz:3=20%");
resourceQuota = (CounterResourceQuota) parser.parse(resourceQuotaConfig);
assertThat(resourceQuota.getCapacity()).isEqualTo(1000L);
assertThat(resourceQuota.getGlobalLimit()).isEqualTo(10L);
assertThat(resourceQuota.getCustomResourceScopeLimits()).isNotEmpty();
assertThat(resourceQuota.getCustomResourceScopeLimits().get("biz:2")).isEqualTo(100L);
assertThat(resourceQuota.getCustomResourceScopeLimits().get("biz:3")).isEqualTo(200L);

resourceQuotaConfig = new ResourceQuotaConfig(null, "10", "biz:2=100");
resourceQuota = (CounterResourceQuota) parser.parse(resourceQuotaConfig);
assertThat(resourceQuota.getCapacity()).isNull();
assertThat(resourceQuota.getGlobalLimit()).isEqualTo(10L);
assertThat(resourceQuota.getCustomResourceScopeLimits()).isNotEmpty();
assertThat(resourceQuota.getCustomResourceScopeLimits().get("biz:2")).isEqualTo(100L);

resourceQuotaConfig = new ResourceQuotaConfig(null, "10", null);
resourceQuota = (CounterResourceQuota) parser.parse(resourceQuotaConfig);
assertThat(resourceQuota.getCapacity()).isNull();
assertThat(resourceQuota.getGlobalLimit()).isEqualTo(10L);
assertThat(resourceQuota.getCustomResourceScopeLimits()).isEmpty();
}

@Test
void parseInvalidConfig() {
CounterResourceQuotaConfigParser parser = new CounterResourceQuotaConfigParser();

assertThatThrownBy(() -> parser.parse(new ResourceQuotaConfig("", "", "")))
.isInstanceOf(ResourceQuotaConfigParseException.class);

assertThatThrownBy(() -> parser.parse(new ResourceQuotaConfig("", "2%", "")))
.isInstanceOf(ResourceQuotaConfigParseException.class);

assertThatThrownBy(() -> parser.parse(new ResourceQuotaConfig("", "2.1%", "")))
.isInstanceOf(ResourceQuotaConfigParseException.class);

assertThatThrownBy(() -> parser.parse(new ResourceQuotaConfig("1000", "2%", "bizss")))
.isInstanceOf(ResourceQuotaConfigParseException.class);

assertThatThrownBy(() -> parser.parse(new ResourceQuotaConfig("1000GB", "2%", null)))
.isInstanceOf(ResourceQuotaConfigParseException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ public class ErrorCode {
// 执行对象不存在。无效的{0}个执行对象:[{1}]
public static final int EXECUTE_OBJECT_NOT_EXIST = 1244030;
public static final int INVALID_LABEL_SELECTOR = 1244031;
// 当前执行的作业总量超过配额限制
public static final int RESOURCE_QUOTA_LIMIT_JOB_INSTANCE = 1244032;
// 作业执行 end

// 定时作业 start
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.execute.common.exception;

import com.tencent.bk.job.common.constant.ErrorCode;
import com.tencent.bk.job.common.exception.ResourceExhaustedException;

/**
* 业务正在执行作业数量超过配额异常
*/
public class RunningJobInstanceQuotaExceedException extends ResourceExhaustedException {

public RunningJobInstanceQuotaExceedException() {
super(ErrorCode.RESOURCE_QUOTA_LIMIT_JOB_INSTANCE);
}
}
Loading

0 comments on commit 8fd13f4

Please sign in to comment.