Skip to content

Commit

Permalink
Merge pull request TencentBlueKing#3094 from wangyu096/issue_3093
Browse files Browse the repository at this point in the history
perf: 正在执行中的作业存活状态维持方案优化 TencentBlueKing#3093
  • Loading branch information
wangyu096 authored Jul 4, 2024
2 parents a75f0fc + bec9543 commit 3ab94ac
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ ResponseEntity<?> handleResourceExhaustedException(HttpServletRequest request, R
} else {
log.info(errorMsg);
}
return new ResponseEntity<>(EsbResp.buildCommonFailResp(ex), HttpStatus.TOO_MANY_REQUESTS);
return new ResponseEntity<>(EsbResp.buildCommonFailResp(ex), HttpStatus.OK);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.execute.constants;

/**
* redis key 定义
*/
public interface RedisKeys {
/**
* 正在执行中的作业-ZSET
*/
String RUNNING_JOB_ZSET_KEY = "job:execute:running:job";

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public NotAliveJobDetector(RunningJobResourceQuotaManager runningJobResourceQuot
}

/**
* 兜底方案。为了防止系统异常、程序 bug 等原因导致 redis 中的作业记录没有被清理,需要定时清理。每小时触发一次
* 兜底方案。为了防止系统异常、程序 bug 等原因导致 redis 中的作业记录没有被清理,需要定时清理。每10min触发一次
*/
@Scheduled(cron = "0 0 0/1 * * ?")
@Scheduled(cron = "0 0/10 * * * ?")
public void detectNotAliveJob() {
try {
if (LockUtils.tryGetDistributedLock("job:execute:not:alive:job:detect:lock", requestId, 60000L)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,27 @@


import com.tencent.bk.job.common.util.ThreadUtils;
import com.tencent.bk.job.execute.constants.RedisKeys;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisZSetCommands;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -52,7 +57,6 @@
@Component
@EnableScheduling
public class RunningJobKeepaliveManager {
private static final String RUNNING_JOB_ZSET_KEY = "job:execute:running:job";
private final Object lock = new Object();
private final RedisTemplate<String, String> redisTemplate;
private final Map<Long, KeepaliveTask> runningJobKeepaliveTasks = new ConcurrentHashMap<>();
Expand All @@ -75,7 +79,7 @@ private KeepaliveTask updateTaskKeepaliveInfo(long jobInstanceId) {
while (maxWaitingSeconds > 0) {
try {
long timestamp = System.currentTimeMillis();
redisTemplate.opsForZSet().add(RUNNING_JOB_ZSET_KEY, String.valueOf(jobInstanceId), timestamp);
updateJobLatestAliveTimestamp(jobInstanceId, timestamp);
return new KeepaliveTask(jobInstanceId, timestamp);
} catch (Throwable e) {
log.error("Update running job keepalive task error, jobInstanceId: " + jobInstanceId, e);
Expand All @@ -87,6 +91,22 @@ private KeepaliveTask updateTaskKeepaliveInfo(long jobInstanceId) {
return null;
}

private void updateJobLatestAliveTimestamp(Long jobInstanceId, long currentTimestamp) {
RedisSerializer<String> stringRedisSerializer = redisTemplate.getStringSerializer();
byte[] key = stringRedisSerializer.serialize(RedisKeys.RUNNING_JOB_ZSET_KEY);
byte[] member = stringRedisSerializer.serialize(String.valueOf(jobInstanceId));
redisTemplate.execute((RedisCallback<Object>) connection -> {
connection.zAdd(
Objects.requireNonNull(key),
currentTimestamp,
Objects.requireNonNull(member),
// 只有 member 存在的时候才会更新,避免写入一些异常的作业数据
RedisZSetCommands.ZAddArgs.ifExists()
);
return null;
});
}

public void stopKeepaliveTask(long jobInstanceId) {
log.info("Stop running job keepalive task : {}", jobInstanceId);
KeepaliveTask keepaliveTask = runningJobKeepaliveTasks.get(jobInstanceId);
Expand Down Expand Up @@ -121,8 +141,7 @@ public void refreshTaskKeepaliveInfo() {
try {
// 二次确认,防止在运行期间任务被移除
if (runningJobKeepaliveTasks.get(jobInstanceId) != null) {
redisTemplate.opsForZSet().add(RUNNING_JOB_ZSET_KEY, String.valueOf(jobInstanceId),
currentTimestamp);
updateJobLatestAliveTimestamp(jobInstanceId, currentTimestamp);
keepaliveTask.setTimestamp(currentTimestamp);
refreshJobInstanceIds.add(jobInstanceId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.tencent.bk.job.common.metrics.CommonMetricValues;
import com.tencent.bk.job.common.model.dto.ResourceScope;
import com.tencent.bk.job.common.service.quota.RunningJobResourceQuotaStore;
import com.tencent.bk.job.execute.constants.RedisKeys;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -67,7 +68,6 @@ public class RunningJobResourceQuotaManager {
private static final String RESOURCE_SCOPE_RUNNING_JOB_COUNT_HASH_KEY =
"job:execute:running:job:count:resource_scope";
private static final String APP_RUNNING_JOB_COUNT_HASH_KEY = "job:execute:running:job:count:app";
private static final String RUNNING_JOB_ZSET_KEY = "job:execute:running:job";

private static final List<String> LUA_SCRIPT_KEYS = new ArrayList<>();

Expand All @@ -82,7 +82,7 @@ public class RunningJobResourceQuotaManager {
private static final long JOB_EXPIRE_TIME = 3600 * 1000L;

static {
LUA_SCRIPT_KEYS.add(RUNNING_JOB_ZSET_KEY);
LUA_SCRIPT_KEYS.add(RedisKeys.RUNNING_JOB_ZSET_KEY);
LUA_SCRIPT_KEYS.add(RESOURCE_SCOPE_RUNNING_JOB_COUNT_HASH_KEY);
LUA_SCRIPT_KEYS.add(APP_RUNNING_JOB_COUNT_HASH_KEY);

Expand Down Expand Up @@ -214,7 +214,7 @@ private void recordExceedQuotaLimitRecord(String appCode, ResourceScope resource
}

public long getRunningJobTotal() {
Long count = redisTemplate.opsForZSet().size(RUNNING_JOB_ZSET_KEY);
Long count = redisTemplate.opsForZSet().size(RedisKeys.RUNNING_JOB_ZSET_KEY);
return count != null ? count : 0L;
}

Expand Down Expand Up @@ -252,8 +252,12 @@ public Set<Long> getNotAliveJobInstanceIds() {
try {
// 1 小时过期
long EXPIRE_AT = System.currentTimeMillis() - JOB_EXPIRE_TIME;
Set<String> notAliveJobInstanceIds = redisTemplate.opsForZSet().rangeByScore(RUNNING_JOB_ZSET_KEY, -1,
EXPIRE_AT);
Set<String> notAliveJobInstanceIds = redisTemplate.opsForZSet()
.rangeByScore(
RedisKeys.RUNNING_JOB_ZSET_KEY,
-1,
EXPIRE_AT
);
if (CollectionUtils.isEmpty(notAliveJobInstanceIds)) {
return Collections.emptySet();
}
Expand Down

0 comments on commit 3ab94ac

Please sign in to comment.