Skip to content

Commit

Permalink
perf: file-gateway调度逻辑优化 TencentBlueKing#2977
Browse files Browse the repository at this point in the history
改用RoundRobin策略调度file-worker,避免内存数据不准确导致的负载不均衡
  • Loading branch information
jsonwan committed May 15, 2024
1 parent 4c72e45 commit 3a15f07
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public class MetricsConstants {

// tag
public static final String TAG_KEY_MODULE = "module";
public static final String TAG_KEY_DISPATCH_RESULT = "dispatchResult";

// value
public static final String TAG_VALUE_MODULE_FILE_WORKER = "fileWorker";
public static final String TAG_VALUE_MODULE_FILE_GATEWAY = "fileGateway";
public static final String TAG_VALUE_DISPATCH_RESULT_TRUE = "true";
public static final String TAG_VALUE_DISPATCH_RESULT_FALSE = "false";

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@
import com.tencent.bk.job.file_gateway.service.DispatchService;
import com.tencent.bk.job.file_gateway.service.FileWorkerService;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

@Slf4j
Expand All @@ -55,6 +56,8 @@ public class DispatchServiceImpl implements DispatchService {
private final FileWorkerDAO fileWorkerDAO;
private final FileWorkerService fileWorkerService;

private final AtomicLong dispatchCount = new AtomicLong(0);

@Autowired
public DispatchServiceImpl(FileWorkerDAO fileWorkerDAO,
AbilityTagService abilityTagService,
Expand Down Expand Up @@ -144,16 +147,42 @@ private List<FileWorkerDTO> getFileWorkerByScopeAndAbilityTag(Long appId,
@Override
public FileWorkerDTO findBestFileWorker(FileSourceDTO fileSourceDTO) {
Timer.Sample sample = Timer.start(meterRegistry);
FileWorkerDTO fileWorkerDTO = findBestFileWorkerIndeed(fileSourceDTO);
long nanoSeconds = sample.stop(meterRegistry.timer(MetricsConstants.NAME_FILE_GATEWAY_DISPATCH_TIME,
MetricsConstants.TAG_KEY_MODULE, MetricsConstants.TAG_VALUE_MODULE_FILE_GATEWAY));
long millis = TimeUnit.NANOSECONDS.toMillis(nanoSeconds);
if (millis > 2000) {
log.warn("Dispatch time over 2000ms, fileSourceDTO={}", fileSourceDTO.getBasicDesc());
FileWorkerDTO fileWorkerDTO = null;
try {
fileWorkerDTO = findBestFileWorkerIndeed(fileSourceDTO);
} catch (Exception e) {
log.warn("Fail to findBestFileWorker", e);
} finally {
dispatchCount.incrementAndGet();
long nanoSeconds = sample.stop(
meterRegistry.timer(
MetricsConstants.NAME_FILE_GATEWAY_DISPATCH_TIME,
buildDispatchResult(fileWorkerDTO)
)
);
long millis = TimeUnit.NANOSECONDS.toMillis(nanoSeconds);
if (millis > 2000) {
log.warn("SLOW: Dispatch time over 2000ms, fileSourceDTO={}", fileSourceDTO.getBasicDesc());
}
}
return fileWorkerDTO;
}

private Iterable<Tag> buildDispatchResult(FileWorkerDTO fileWorkerDTO) {
List<Tag> tagList = new ArrayList<>();
tagList.add(Tag.of(MetricsConstants.TAG_KEY_MODULE, MetricsConstants.TAG_VALUE_MODULE_FILE_GATEWAY));
if (fileWorkerDTO == null) {
tagList.add(
Tag.of(MetricsConstants.TAG_KEY_DISPATCH_RESULT, MetricsConstants.TAG_VALUE_DISPATCH_RESULT_FALSE)
);
} else {
tagList.add(
Tag.of(MetricsConstants.TAG_KEY_DISPATCH_RESULT, MetricsConstants.TAG_VALUE_DISPATCH_RESULT_TRUE)
);
}
return tagList;
}

private FileWorkerDTO findWorkerByAuto(FileSourceDTO fileSourceDTO) {
FileWorkerDTO fileWorkerDTO = null;
String workerSelectScope = fileSourceDTO.getWorkerSelectScope();
Expand Down Expand Up @@ -198,10 +227,10 @@ private FileWorkerDTO findWorkerByAuto(FileSourceDTO fileSourceDTO) {
return onlineStatus != null && onlineStatus.intValue() == 1;
}).collect(Collectors.toList());
if (!fileWorkerDTOList.isEmpty()) {
// 按策略调度:内存占用最小
fileWorkerDTOList.sort(Comparator.comparing(FileWorkerDTO::getMemRate));
log.debug("ordered fileWorkerDTOList:{}", fileWorkerDTOList);
fileWorkerDTO = fileWorkerDTOList.get(0);
// 按策略调度:RoundRobin
int index = (int) (dispatchCount.get() % fileWorkerDTOList.size());
log.debug("choose fileWorker index: {}", index);
fileWorkerDTO = fileWorkerDTOList.get(index);
} else {
log.error("Cannot find available file worker, abilityTagList={}", abilityTagList);
}
Expand Down

0 comments on commit 3a15f07

Please sign in to comment.