Skip to content

Commit

Permalink
Merge pull request TencentBlueKing#2983 from jsonwan/github_perf/thir…
Browse files Browse the repository at this point in the history
…d_file

perf: file-gateway调度逻辑优化 TencentBlueKing#2977
  • Loading branch information
wangyu096 authored May 14, 2024
2 parents db912c4 + bc2518f commit 4c72e45
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public interface FileTaskDAO {

List<FileTaskDTO> listFileTasks(String fileSourceTaskId, Integer start, Integer pageSize);

List<String> listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection<Byte> statusSet, Integer start,
List<String> listTimeoutFileSourceTaskIds(Long startTimeMills,
Long endTimeMills,
Collection<Byte> statusSet, Integer start,
Integer pageSize);

List<FileTaskDTO> listFileTasks(String fileSourceTaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,17 @@ public List<FileTaskDTO> listFileTasks(String fileSourceTaskId, Integer start, I
}

@Override
public List<String> listTimeoutFileSourceTaskIds(Long expireTimeMills,
Collection<Byte> statusSet, Integer start, Integer pageSize) {
public List<String> listTimeoutFileSourceTaskIds(Long startTimeMills,
Long endTimeMills,
Collection<Byte> statusSet,
Integer start,
Integer pageSize) {
List<Condition> conditions = new ArrayList<>();
if (expireTimeMills != null) {
conditions.add(defaultTable.LAST_MODIFY_TIME.le(System.currentTimeMillis() - expireTimeMills));
if (startTimeMills != null) {
conditions.add(defaultTable.LAST_MODIFY_TIME.greaterOrEqual(startTimeMills));
}
if (endTimeMills != null) {
conditions.add(defaultTable.LAST_MODIFY_TIME.lessOrEqual(endTimeMills));
}
if (statusSet != null && !statusSet.isEmpty()) {
conditions.add(defaultTable.STATUS.in(statusSet));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,33 @@

package com.tencent.bk.job.file_gateway.task.dispatch;

import com.tencent.bk.job.common.redis.util.HeartBeatRedisLock;
import com.tencent.bk.job.common.redis.util.HeartBeatRedisLockConfig;
import com.tencent.bk.job.common.redis.util.LockResult;
import com.tencent.bk.job.common.util.TimeUtil;
import com.tencent.bk.job.common.util.ip.IpUtils;
import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum;
import com.tencent.bk.job.file_gateway.dao.filesource.FileTaskDAO;
import com.tencent.bk.job.file_gateway.service.ReDispatchService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;

import java.util.List;

@Slf4j
@Service
public class ReDispatchTimeoutTask {

private static final String machineIp = IpUtils.getFirstMachineIP();
private static final String REDIS_KEY_REDISPATCH_TASK_RUNNING_MACHINE =
"file-gateway:reDispatch-task-running-machine";
private final FileTaskDAO fileTaskDAO;
private final ReDispatchService reDispatchService;
private final RedisTemplate<String, String> redisTemplate;

@Value("${job.file-gateway.reDispatch.timeoutTask.enabled:true}")
private boolean reDispatchTimeoutTaskEnabled = true;
Expand All @@ -49,9 +60,11 @@ public class ReDispatchTimeoutTask {

@Autowired
public ReDispatchTimeoutTask(FileTaskDAO fileTaskDAO,
ReDispatchService reDispatchService) {
ReDispatchService reDispatchService,
RedisTemplate<String, String> redisTemplate) {
this.fileTaskDAO = fileTaskDAO;
this.reDispatchService = reDispatchService;
this.redisTemplate = redisTemplate;
}

public void run() {
Expand All @@ -60,19 +73,58 @@ public void run() {
".file-gateway.reDispatch.timeoutTask.enabled=true");
return;
}
// 分布式唯一性保证
HeartBeatRedisLockConfig config = HeartBeatRedisLockConfig.getDefault();
config.setHeartBeatThreadName("reDispatchTaskRedisKeyHeartBeatThread");
HeartBeatRedisLock lock = new HeartBeatRedisLock(
redisTemplate,
REDIS_KEY_REDISPATCH_TASK_RUNNING_MACHINE,
machineIp,
config
);
LockResult lockResult = lock.lock();
if (!lockResult.isLockGotten()) {
log.info(
"lock {} gotten by another machine: {}, return",
REDIS_KEY_REDISPATCH_TASK_RUNNING_MACHINE,
lockResult.getLockValue()
);
return;
}
try {
reDispatchFileSourceTasks();
} finally {
lockResult.tryToRelease();
}
}

private void reDispatchFileSourceTasks() {
StopWatch watch = new StopWatch("reDispatchFileSourceTasks");
watch.start("listTimeoutFileSourceTaskIds");
// 找出未结束且长时间无响应的任务,无响应且未结束的任务就应当被重调度了
long fileSourceTaskStatusExpireTimeMills = reDispatchTaskTimeoutSeconds * 1000L;

long intervalStart = computeReDispatchIntervalStart();
long intervalEnd = computeReDispatchIntervalEnd();
List<String> timeoutFileSourceTaskIdList = fileTaskDAO.listTimeoutFileSourceTaskIds(
fileSourceTaskStatusExpireTimeMills,
intervalStart,
intervalEnd,
TaskStatusEnum.getRunningStatusSet(),
0,
-1
);
watch.stop();
if (timeoutFileSourceTaskIdList.isEmpty()) {
log.info("no fileSourceTask need to be reDispatch");
return;
}
log.info(
"find {} fileSourceTask to reDispatch: {}",
"find {} fileSourceTask between [{},{}] to reDispatch: {}",
timeoutFileSourceTaskIdList.size(),
TimeUtil.formatTime(intervalStart),
TimeUtil.formatTime(intervalEnd),
timeoutFileSourceTaskIdList
);
watch.start("reDispatch Tasks");
// 进行超时重调度
for (String fileSourceTaskId : timeoutFileSourceTaskIdList) {
boolean result = reDispatchService.reDispatchByGateway(fileSourceTaskId, 0L, 5000L);
Expand All @@ -83,5 +135,41 @@ public void run() {
result
);
}
watch.stop();
if (watch.getTotalTimeSeconds() > 10) {
log.warn(
"SLOW: reDispatched {} fileSourceTask, timeConsuming:{}",
timeoutFileSourceTaskIdList.size(),
watch.prettyPrint()
);
} else {
log.info(
"reDispatched {} fileSourceTask, timeConsuming: {}s",
timeoutFileSourceTaskIdList.size(),
watch.getTotalTimeSeconds()
);
}
}

/**
* 计算重调度区间开始时间
*
* @return 重调度区间开始时间(ms)
*/
private long computeReDispatchIntervalStart() {
// 只对最近半小时内的任务进行重调度
long reDispatchStartIntervalMills = 30 * 60 * 1000L;
return System.currentTimeMillis() - reDispatchStartIntervalMills;
}

/**
* 计算重调度区间结束时间
*
* @return 重调度区间结束时间(ms)
*/
private long computeReDispatchIntervalEnd() {
// 对已经超时未更新状态的任务进行重调度
long fileSourceTaskStatusExpireTimeMills = reDispatchTaskTimeoutSeconds * 1000L;
return System.currentTimeMillis() - fileSourceTaskStatusExpireTimeMills;
}
}

0 comments on commit 4c72e45

Please sign in to comment.