Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: file-gateway调度逻辑优化 #2977 #2985

Merged
merged 3 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ public class MetricsConstants {

// tag
public static final String TAG_KEY_MODULE = "module";
public static final String TAG_KEY_REQUEST_SOURCE = "requestSource";
public static final String TAG_KEY_DISPATCH_RESULT = "dispatchResult";

// value
public static final String TAG_VALUE_MODULE_FILE_WORKER = "fileWorker";
public static final String TAG_VALUE_MODULE_FILE_GATEWAY = "fileGateway";
public static final String TAG_VALUE_DISPATCH_RESULT_TRUE = "true";
public static final String TAG_VALUE_DISPATCH_RESULT_FALSE = "false";

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public interface DispatchService {
* 根据文件源找到一个最适合的FileWorker
*
* @param fileSourceDTO 文件源对象
* @param requestSource 请求来源
* @return 选中的对接文件源的FileWorker对象
*/
FileWorkerDTO findBestFileWorker(FileSourceDTO fileSourceDTO);
FileWorkerDTO findBestFileWorker(FileSourceDTO fileSourceDTO, String requestSource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@
import com.tencent.bk.job.file_gateway.service.DispatchService;
import com.tencent.bk.job.file_gateway.service.FileWorkerService;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

@Slf4j
Expand All @@ -55,6 +56,8 @@ public class DispatchServiceImpl implements DispatchService {
private final FileWorkerDAO fileWorkerDAO;
private final FileWorkerService fileWorkerService;

private final AtomicLong roundRobinCount = new AtomicLong(0);

@Autowired
public DispatchServiceImpl(FileWorkerDAO fileWorkerDAO,
AbilityTagService abilityTagService,
Expand Down Expand Up @@ -142,24 +145,50 @@ private List<FileWorkerDTO> getFileWorkerByScopeAndAbilityTag(Long appId,
}

@Override
public FileWorkerDTO findBestFileWorker(FileSourceDTO fileSourceDTO) {
public FileWorkerDTO findBestFileWorker(FileSourceDTO fileSourceDTO, String requestSource) {
Timer.Sample sample = Timer.start(meterRegistry);
FileWorkerDTO fileWorkerDTO = findBestFileWorkerIndeed(fileSourceDTO);
long nanoSeconds = sample.stop(meterRegistry.timer(MetricsConstants.NAME_FILE_GATEWAY_DISPATCH_TIME,
MetricsConstants.TAG_KEY_MODULE, MetricsConstants.TAG_VALUE_MODULE_FILE_GATEWAY));
long millis = TimeUnit.NANOSECONDS.toMillis(nanoSeconds);
if (millis > 2000) {
log.warn("Dispatch time over 2000ms, fileSourceDTO={}", fileSourceDTO.getBasicDesc());
FileWorkerDTO fileWorkerDTO = null;
try {
fileWorkerDTO = findBestFileWorkerIndeed(fileSourceDTO);
} catch (Exception e) {
log.warn("Fail to findBestFileWorker", e);
} finally {
long nanoSeconds = sample.stop(
meterRegistry.timer(
MetricsConstants.NAME_FILE_GATEWAY_DISPATCH_TIME,
buildDispatchResult(fileWorkerDTO, requestSource)
)
);
long millis = TimeUnit.NANOSECONDS.toMillis(nanoSeconds);
if (millis > 2000) {
log.warn("SLOW: Dispatch time over 2000ms, fileSourceDTO={}", fileSourceDTO.getBasicDesc());
}
}
return fileWorkerDTO;
}

private Iterable<Tag> buildDispatchResult(FileWorkerDTO fileWorkerDTO, String requestSource) {
List<Tag> tagList = new ArrayList<>();
tagList.add(Tag.of(MetricsConstants.TAG_KEY_MODULE, MetricsConstants.TAG_VALUE_MODULE_FILE_GATEWAY));
tagList.add(Tag.of(MetricsConstants.TAG_KEY_REQUEST_SOURCE, requestSource));
if (fileWorkerDTO == null) {
tagList.add(
Tag.of(MetricsConstants.TAG_KEY_DISPATCH_RESULT, MetricsConstants.TAG_VALUE_DISPATCH_RESULT_FALSE)
);
} else {
tagList.add(
Tag.of(MetricsConstants.TAG_KEY_DISPATCH_RESULT, MetricsConstants.TAG_VALUE_DISPATCH_RESULT_TRUE)
);
}
return tagList;
}

private FileWorkerDTO findWorkerByAuto(FileSourceDTO fileSourceDTO) {
FileWorkerDTO fileWorkerDTO = null;
String workerSelectScope = fileSourceDTO.getWorkerSelectScope();
List<String> abilityTagList = abilityTagService.getAbilityTagList(fileSourceDTO);
List<FileWorkerDTO> fileWorkerDTOList;
if (abilityTagList == null || abilityTagList.size() == 0) {
if (abilityTagList == null || abilityTagList.isEmpty()) {
// 无能力标签要求,任选一个FileWorker
fileWorkerDTOList = getFileWorkerByScope(fileSourceDTO.getAppId(), workerSelectScope);
if (fileWorkerDTOList.isEmpty()) {
Expand Down Expand Up @@ -198,10 +227,9 @@ private FileWorkerDTO findWorkerByAuto(FileSourceDTO fileSourceDTO) {
return onlineStatus != null && onlineStatus.intValue() == 1;
}).collect(Collectors.toList());
if (!fileWorkerDTOList.isEmpty()) {
// 按策略调度:内存占用最小
fileWorkerDTOList.sort(Comparator.comparing(FileWorkerDTO::getMemRate));
log.debug("ordered fileWorkerDTOList:{}", fileWorkerDTOList);
fileWorkerDTO = fileWorkerDTOList.get(0);
// 按策略调度:RoundRobin
int index = (int) (roundRobinCount.getAndIncrement() % fileWorkerDTOList.size());
fileWorkerDTO = fileWorkerDTOList.get(index);
} else {
log.error("Cannot find available file worker, abilityTagList={}", abilityTagList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ public FileServiceImpl(FileSourceService fileSourceService,
this.jobHttpClient = jobHttpClient;
}

private FileWorkerDTO getFileWorker(FileSourceDTO fileSourceDTO) {
private FileWorkerDTO getFileWorker(FileSourceDTO fileSourceDTO, String requestSource) {
if (fileSourceDTO == null) {
throw new InternalException(ErrorCode.FILE_SOURCE_NOT_EXIST);
}
return dispatchService.findBestFileWorker(fileSourceDTO);
return dispatchService.findBestFileWorker(fileSourceDTO, requestSource);
}

@Override
public boolean isFileAvailable(String username, Long appId, Integer fileSourceId) {
FileSourceDTO fileSourceDTO = fileSourceService.getFileSourceById(appId, fileSourceId);
FileWorkerDTO fileWorkerDTO = getFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = getFileWorker(fileSourceDTO, "isFileAvailable");
if (fileWorkerDTO == null) {
throw new InternalException(ErrorCode.CAN_NOT_FIND_AVAILABLE_FILE_WORKER);
}
Expand All @@ -103,7 +103,7 @@ public FileNodesVO listFileNode(String username, Long appId, Integer fileSourceI
if (name == null) name = "";
final String finalName = name;
FileSourceDTO fileSourceDTO = fileSourceService.getFileSourceById(appId, fileSourceId);
FileWorkerDTO fileWorkerDTO = getFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = getFileWorker(fileSourceDTO, "listFileNode");
if (fileWorkerDTO == null) {
throw new InternalException(ErrorCode.CAN_NOT_FIND_AVAILABLE_FILE_WORKER);
}
Expand All @@ -129,7 +129,10 @@ public FileNodesVO listFileNode(String username, Long appId, Integer fileSourceI
@Override
public Boolean executeAction(String username, Long appId, Integer fileSourceId, ExecuteActionReq executeActionReq) {
FileSourceDTO fileSourceDTO = fileSourceService.getFileSourceById(appId, fileSourceId);
FileWorkerDTO fileWorkerDTO = getFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = getFileWorker(
fileSourceDTO,
"executeAction" + executeActionReq.getActionCode()
);
if (fileWorkerDTO == null) {
throw new InternalException(ErrorCode.CAN_NOT_FIND_AVAILABLE_FILE_WORKER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ public TaskInfoDTO startFileSourceDownloadTaskWithId(String username, Long appId
if (fileSourceDTO == null) {
throw new RuntimeException("FileSource not exist, fileSourceId=" + fileSourceId.toString());
}
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(
fileSourceDTO,
"DownloadTask(appId=" + appId + ")"
);
if (fileWorkerDTO == null) {
throw new RuntimeException(String.format("Cannot match fileWorker for FileSourceTask,appId=%d," +
"stepInstanceId=%d,fileSourceId=%d,filePathList=%s", appId, stepInstanceId, fileSourceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void run() {
// 2.删除现有FileSourceTask任务
fileSourceTaskService.deleteFileSourceTaskById(fileSourceTaskId);
log.debug("delete fileSourceTask {}", fileSourceTaskId);
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(fileSourceDTO, "ReDispatch");
log.debug("found bestWorker:{}", fileSourceDTO);
if (fileWorkerDTO != null) {
// 3.重新派发任务
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public void run() {
pageSize
);
for (FileSourceDTO fileSourceDTO : fileSourceDTOList) {
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(fileSourceDTO);
FileWorkerDTO fileWorkerDTO = dispatchService.findBestFileWorker(
fileSourceDTO, "FileSourceStatusUpdateTask"
);
int status;
if (fileWorkerDTO == null) {
log.info(
Expand Down
Loading