Skip to content

Commit

Permalink
fix: file-worker任务状态更新请求无序到达导致第三方源文件偶现分发失败 TencentBlueKing#2434
Browse files Browse the repository at this point in the history
1.修复Trace数据断链;
2.优化重调度逻辑,支持参数可配置;
3.重构部分代码。
  • Loading branch information
jsonwan committed Sep 26, 2023
1 parent 7627e94 commit 5e72128
Show file tree
Hide file tree
Showing 37 changed files with 544 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private void logRespStr(String respStr) {
}

private void logAndThrow(ResponseEntity<String> respEntity) {
log.error("Fail to request fileWorker, status={}, msg={}", respEntity.getStatusCode(), respEntity.getBody());
log.error("Fail to request, status={}, msg={}", respEntity.getStatusCode(), respEntity.getBody());
throw new ServiceException(
ErrorType.INTERNAL,
ErrorCode.FAIL_TO_REQUEST_FILE_WORKER_WITH_REASON,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface FileSourceTaskDAO {

FileSourceTaskDTO getFileSourceTaskById(String id);

FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id);

Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status);

List<FileSourceTaskDTO> listByBatchTaskId(String batchTaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface FileTaskDAO {

int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId);

FileTaskDTO getFileTaskByIdForUpdate(Long id);

FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath);

List<FileTaskDTO> listFileTasks(String fileSourceTaskId, Integer start, Integer pageSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,18 @@ val record = dslContext.selectFrom(defaultTable).where(
}
}

@Override
public FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id) {
val record = dslContext.selectFrom(defaultTable).where(
defaultTable.ID.eq(id)
).forUpdate().fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
}

@Override
public Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status) {
List<Condition> conditions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ public int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId) {
).execute();
}

@Override
public FileTaskDTO getFileTaskByIdForUpdate(Long id) {
List<Condition> conditions = new ArrayList<>();
conditions.add(defaultTable.ID.eq(id));
val record = dslContext.selectFrom(defaultTable).where(
conditions
).forUpdate().fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
}

@Override
public FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath) {
List<Condition> conditions = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.file_gateway.service;

import com.tencent.bk.job.file_gateway.model.dto.FileTaskProgressDTO;

public interface FileSourceTaskUpdateService {

String updateFileSourceTask(String batchTaskId,
String fileSourceTaskId,
Long fileTaskId,
FileTaskProgressDTO fileTaskProgressDTO);

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,28 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.tencent.bk.job.common.constant.ErrorCode;
import com.tencent.bk.job.common.exception.InternalException;
import com.tencent.bk.job.common.exception.NotFoundException;
import com.tencent.bk.job.common.exception.ServiceException;
import com.tencent.bk.job.common.model.Response;
import com.tencent.bk.job.common.model.http.HttpReq;
import com.tencent.bk.job.common.util.ArrayUtil;
import com.tencent.bk.job.common.util.file.FileSizeUtil;
import com.tencent.bk.job.common.util.file.PathUtil;
import com.tencent.bk.job.common.util.http.JobHttpClient;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.execute.common.constants.FileDistStatusEnum;
import com.tencent.bk.job.file_gateway.consts.TaskCommandEnum;
import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum;
import com.tencent.bk.job.file_gateway.dao.filesource.FileSourceBatchTaskDAO;
import com.tencent.bk.job.file_gateway.dao.filesource.FileSourceDAO;
import com.tencent.bk.job.file_gateway.dao.filesource.FileSourceTaskDAO;
import com.tencent.bk.job.file_gateway.dao.filesource.FileTaskDAO;
import com.tencent.bk.job.file_gateway.dao.filesource.FileWorkerDAO;
import com.tencent.bk.job.file_gateway.model.dto.FileSourceBatchTaskDTO;
import com.tencent.bk.job.file_gateway.model.dto.FileSourceDTO;
import com.tencent.bk.job.file_gateway.model.dto.FileSourceTaskDTO;
import com.tencent.bk.job.file_gateway.model.dto.FileTaskDTO;
import com.tencent.bk.job.file_gateway.model.dto.FileTaskProgressDTO;
import com.tencent.bk.job.file_gateway.model.dto.FileWorkerDTO;
import com.tencent.bk.job.file_gateway.model.resp.inner.FileLogPieceDTO;
import com.tencent.bk.job.file_gateway.model.resp.inner.FileSourceTaskStatusDTO;
import com.tencent.bk.job.file_gateway.model.resp.inner.TaskInfoDTO;
import com.tencent.bk.job.file_gateway.model.resp.inner.ThirdFileSourceTaskLogDTO;
import com.tencent.bk.job.file_gateway.service.DispatchService;
import com.tencent.bk.job.file_gateway.service.FileSourceTaskService;
import com.tencent.bk.job.file_gateway.service.context.TaskContext;
import com.tencent.bk.job.file_gateway.service.context.impl.DefaultTaskContext;
import com.tencent.bk.job.file_gateway.service.listener.FileTaskStatusChangeListener;
import com.tencent.bk.job.file_gateway.service.FileSourceTaskUpdateService;
import com.tencent.bk.job.file_gateway.service.remote.FileSourceTaskReqGenService;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.helpers.MessageFormatter;
Expand All @@ -70,7 +60,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -82,7 +71,7 @@ public class FileSourceTaskServiceImpl implements FileSourceTaskService {

public static final String PREFIX_REDIS_TASK_LOG = "job:file-gateway:taskLog:";

private final FileSourceBatchTaskDAO fileSourceBatchTaskDAO;
private final FileSourceTaskUpdateService fileSourceTaskUpdateService;
private final FileSourceTaskDAO fileSourceTaskDAO;
private final FileTaskDAO fileTaskDAO;
private final FileWorkerDAO fileworkerDAO;
Expand All @@ -91,21 +80,18 @@ public class FileSourceTaskServiceImpl implements FileSourceTaskService {
private final FileSourceTaskReqGenService fileSourceTaskReqGenService;
private final RedisTemplate<String, Object> redisTemplate;
private final JobHttpClient jobHttpClient;
private final List<FileTaskStatusChangeListener> fileTaskStatusChangeListenerList = new ArrayList<>();

@Autowired
public FileSourceTaskServiceImpl(
FileSourceBatchTaskDAO fileSourceBatchTaskDAO,
FileSourceTaskDAO fileSourceTaskDAO,
FileTaskDAO fileTaskDAO,
FileWorkerDAO fileworkerDAO,
FileSourceDAO fileSourceDAO,
DispatchService dispatchService,
FileSourceTaskReqGenService fileSourceTaskReqGenService,
@Qualifier("jsonRedisTemplate") RedisTemplate<String, Object> redisTemplate,
FileTaskStatusChangeListener fileTaskStatusChangeListener,
JobHttpClient jobHttpClient) {
this.fileSourceBatchTaskDAO = fileSourceBatchTaskDAO;
public FileSourceTaskServiceImpl(FileSourceTaskUpdateService fileSourceTaskUpdateService,
FileSourceTaskDAO fileSourceTaskDAO,
FileTaskDAO fileTaskDAO,
FileWorkerDAO fileworkerDAO,
FileSourceDAO fileSourceDAO,
DispatchService dispatchService,
FileSourceTaskReqGenService fileSourceTaskReqGenService,
@Qualifier("jsonRedisTemplate") RedisTemplate<String, Object> redisTemplate,
JobHttpClient jobHttpClient) {
this.fileSourceTaskUpdateService = fileSourceTaskUpdateService;
this.fileSourceTaskDAO = fileSourceTaskDAO;
this.fileTaskDAO = fileTaskDAO;
this.fileworkerDAO = fileworkerDAO;
Expand All @@ -114,11 +100,6 @@ public FileSourceTaskServiceImpl(
this.fileSourceTaskReqGenService = fileSourceTaskReqGenService;
this.redisTemplate = redisTemplate;
this.jobHttpClient = jobHttpClient;
addFileTaskStatusChangeListener(fileTaskStatusChangeListener);
}

public void addFileTaskStatusChangeListener(FileTaskStatusChangeListener listener) {
this.fileTaskStatusChangeListenerList.add(listener);
}

@Override
Expand Down Expand Up @@ -197,65 +178,6 @@ public TaskInfoDTO startFileSourceDownloadTaskWithId(String username, Long appId
fileWorkerDTO.getCloudAreaId(), fileWorkerDTO.getInnerIp());
}

private void writeLog(FileSourceTaskDTO fileSourceTaskDTO,
FileWorkerDTO fileWorkerDTO,
FileTaskProgressDTO fileTaskProgressDTO) {
String taskId = fileSourceTaskDTO.getId();
String fileSizeStr = FileSizeUtil.getFileSizeStr(fileTaskProgressDTO.getFileSize());
ThirdFileSourceTaskLogDTO thirdFileSourceTaskLog = new ThirdFileSourceTaskLogDTO();
String sourceCloudIp = fileWorkerDTO.getCloudIp();
thirdFileSourceTaskLog.setIp(sourceCloudIp);
// 追加文件源名称
// 日志定位坐标:(文件源,文件路径),需要区分不同文件源下相同文件路径的日志
FileSourceDTO fileSourceDTO = fileSourceDAO.getFileSourceById(fileSourceTaskDTO.getFileSourceId());
if (fileSourceDTO == null) {
throw new NotFoundException(ErrorCode.FILE_SOURCE_NOT_EXIST,
ArrayUtil.toArray("fileSourceId:" + fileSourceTaskDTO.getFileSourceId()));
}
String filePathWithSourceAlias = PathUtil.joinFilePath(
fileSourceDTO.getAlias(),
fileTaskProgressDTO.getFilePath()
);
List<FileLogPieceDTO> fileLogPieceList = new ArrayList<>();
FileLogPieceDTO fileLogPiece = new FileLogPieceDTO();
fileLogPiece.setContent(buildFileLogContent(fileTaskProgressDTO, filePathWithSourceAlias, fileSizeStr));
fileLogPiece.setDisplaySrcFile(filePathWithSourceAlias);
fileLogPiece.setProcess(buildProcessStr(fileTaskProgressDTO));
fileLogPiece.setSize(fileSizeStr);
fileLogPiece.setSrcIp(sourceCloudIp);
fileLogPiece.setStatus(FileDistStatusEnum.PULLING.getValue());
fileLogPiece.setStatusDesc(FileDistStatusEnum.PULLING.getName());
fileLogPieceList.add(fileLogPiece);
thirdFileSourceTaskLog.setFileTaskLogs(fileLogPieceList);
// 写入Redis
redisTemplate.opsForList().rightPush(PREFIX_REDIS_TASK_LOG + taskId, thirdFileSourceTaskLog);
// 一小时后过期
redisTemplate.expireAt(PREFIX_REDIS_TASK_LOG + taskId, new Date(System.currentTimeMillis() + 3600 * 1000));
}

@SuppressWarnings("StringBufferReplaceableByString")
private String buildFileLogContent(FileTaskProgressDTO fileTaskProgressDTO,
String filePathWithSourceAlias,
String fileSizeStr) {
StringBuilder sb = new StringBuilder();
sb.append("FileName: ");
sb.append(filePathWithSourceAlias);
sb.append(" FileSize: ");
sb.append(fileSizeStr);
sb.append(" ");
sb.append("Speed: ");
sb.append(fileTaskProgressDTO.getSpeed());
sb.append(" Progress: ");
sb.append(fileTaskProgressDTO.getProgress());
sb.append("% Detail: ");
sb.append(fileTaskProgressDTO.getContent());
return sb.toString();
}

private String buildProcessStr(FileTaskProgressDTO fileTaskProgressDTO) {
return "" + fileTaskProgressDTO.getProgress() + "%";
}

@Override
public String updateFileSourceTask(FileTaskProgressDTO fileTaskProgressDTO) {
String fileSourceTaskId = fileTaskProgressDTO.getFileSourceTaskId();
Expand All @@ -272,97 +194,19 @@ public String updateFileSourceTask(FileTaskProgressDTO fileTaskProgressDTO) {
);
return null;
}
TaskStatusEnum previousStatus = TaskStatusEnum.valueOf(fileTaskDTO.getStatus());
fileTaskDTO.setDownloadPath(fileTaskProgressDTO.getDownloadPath());
FileSourceTaskDTO fileSourceTaskDTO = fileSourceTaskDAO.getFileSourceTaskById(fileSourceTaskId);
if (fileSourceTaskDTO == null) {
log.error("Cannot find fileSourceTaskDTO by taskId {} filePath {}", fileSourceTaskId, filePath);
return null;
}
return updateFileSourceTask(fileTaskDTO, fileSourceTaskDTO, fileTaskProgressDTO, previousStatus);
}

@Transactional(value = "jobFileGatewayTransactionManager", rollbackFor = {Throwable.class})
public String updateFileSourceTask(FileTaskDTO fileTaskDTO,
FileSourceTaskDTO fileSourceTaskDTO,
FileTaskProgressDTO fileTaskProgressDTO,
TaskStatusEnum previousStatus) {
// 开启事务后立即加排它锁,保证读取到其他事务已提交的数据
FileSourceBatchTaskDTO fileSourceBatchTaskDTO =
fileSourceBatchTaskDAO.getBatchTaskByIdForUpdate(fileSourceTaskDTO.getBatchTaskId());
String fileSourceTaskId = fileTaskProgressDTO.getFileSourceTaskId();
String filePath = fileTaskProgressDTO.getFilePath();
TaskStatusEnum status = fileTaskProgressDTO.getStatus();
Integer progress = fileTaskProgressDTO.getProgress();
Long fileSize = fileTaskProgressDTO.getFileSize();
if (fileSourceBatchTaskDTO == null) {
log.error("Cannot find fileSourceBatchTaskDTO by batchTaskId {} fileSourceTaskId {} filePath {}",
fileSourceTaskDTO.getBatchTaskId(),
fileSourceTaskId,
filePath
);
return null;
}
FileWorkerDTO fileWorkerDTO = fileworkerDAO.getFileWorkerById(fileSourceTaskDTO.getFileWorkerId());
int affectedRowNum = -1;
if (status == TaskStatusEnum.RUNNING) {
// 已处于结束态的任务不再接受状态更新
if (!fileTaskDTO.isDone()) {
fileTaskDTO.setProgress(progress);
fileTaskDTO.setFileSize(fileSize);
fileTaskDTO.setStatus(TaskStatusEnum.RUNNING.getStatus());
affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO);
logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status);
} else {
log.info("fileTask {} already done, do not update to running", fileSourceTaskId);
}
} else if (status == TaskStatusEnum.SUCCESS) {
fileTaskDTO.setProgress(100);
fileTaskDTO.setStatus(TaskStatusEnum.SUCCESS.getStatus());
affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO);
logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status);
} else if (status == TaskStatusEnum.FAILED) {
fileTaskDTO.setProgress(progress);
fileTaskDTO.setStatus(TaskStatusEnum.FAILED.getStatus());
affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO);
logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status);
} else if (status == TaskStatusEnum.STOPPED) {
fileTaskDTO.setProgress(progress);
fileTaskDTO.setStatus(TaskStatusEnum.STOPPED.getStatus());
affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO);
logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status);
} else {
log.warn("fileTask {} unknown status:{}", fileSourceTaskId, status);
}
if (affectedRowNum != -1) {
log.info("{} updated, affectedRowNum={}", fileTaskDTO, affectedRowNum);
}
// 通知关注者
if (status != previousStatus) {
notifyFileTaskStatusChangeListeners(fileTaskDTO, fileSourceTaskDTO, fileWorkerDTO, previousStatus, status);
}
// 进度上报
writeLog(fileSourceTaskDTO, fileWorkerDTO, fileTaskProgressDTO);
return fileSourceTaskId;
}

private void notifyFileTaskStatusChangeListeners(FileTaskDTO fileTaskDTO,
FileSourceTaskDTO fileSourceTaskDTO,
FileWorkerDTO fileWorkerDTO,
TaskStatusEnum previousStatus,
TaskStatusEnum currentStatus) {
TaskContext context = new DefaultTaskContext(fileTaskDTO, fileSourceTaskDTO, fileWorkerDTO);
if (!fileTaskStatusChangeListenerList.isEmpty()) {
boolean stop;
for (FileTaskStatusChangeListener listener : fileTaskStatusChangeListenerList) {
stop = listener.onStatusChange(context, previousStatus, currentStatus);
if (stop) break;
}
}
}

private void logUpdatedTaskStatus(String taskId, String filePath, Integer progress, TaskStatusEnum status) {
log.info("updated fileTask:{},{},{},{}", taskId, filePath, progress, status.name());
String batchTaskId = fileSourceTaskDTO.getBatchTaskId();
Long fileTaskId = fileTaskDTO.getId();
return fileSourceTaskUpdateService.updateFileSourceTask(
batchTaskId,
fileSourceTaskId,
fileTaskId,
fileTaskProgressDTO
);
}

@Override
Expand Down
Loading

0 comments on commit 5e72128

Please sign in to comment.