From 5e72128dc5d0e0113425dd802ca92f91832057b5 Mon Sep 17 00:00:00 2001 From: jsonwan Date: Mon, 25 Sep 2023 22:36:13 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20file-worker=E4=BB=BB=E5=8A=A1=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E6=9B=B4=E6=96=B0=E8=AF=B7=E6=B1=82=E6=97=A0=E5=BA=8F?= =?UTF-8?q?=E5=88=B0=E8=BE=BE=E5=AF=BC=E8=87=B4=E7=AC=AC=E4=B8=89=E6=96=B9?= =?UTF-8?q?=E6=BA=90=E6=96=87=E4=BB=B6=E5=81=B6=E7=8E=B0=E5=88=86=E5=8F=91?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=20#2434?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1.修复Trace数据断链; 2.优化重调度逻辑,支持参数可配置; 3.重构部分代码。 --- .../common/util/http/JobHttpClientImpl.java | 2 +- .../dao/filesource/FileSourceTaskDAO.java | 2 + .../dao/filesource/FileTaskDAO.java | 2 + .../impl/FileSourceTaskDAOImpl.java | 12 + .../dao/filesource/impl/FileTaskDAOImpl.java | 14 + .../service/FileSourceTaskUpdateService.java | 36 +++ .../impl/FileSourceTaskServiceImpl.java | 196 ++------------ .../impl/FileSourceTaskUpdateServiceImpl.java | 248 ++++++++++++++++++ .../task/dispatch/ReDispatchTimeoutTask.java | 26 +- .../file/worker/api/FileTaskResourceImpl.java | 9 +- .../bk/job/file/worker/api/IFileResource.java | 2 +- .../job/file/worker/api/OpResourceImpl.java | 2 +- .../file/worker/api/RemoteClientAccess.java | 2 +- .../file/worker/config/GracefulShutdown.java | 2 +- .../{cos => }/service/DownloadFileTask.java | 23 +- .../{cos => }/service/EnvironmentService.java | 2 +- .../service/FileProgressWatchingTask.java | 33 ++- .../{cos => }/service/FileTaskService.java | 42 ++- .../{cos => }/service/GatewayInfoService.java | 2 +- .../{cos => }/service/MetaDataService.java | 2 +- .../worker/{cos => }/service/OpService.java | 2 +- .../{cos => }/service/RemoteClient.java | 2 +- .../{cos => }/service/TaskReporter.java | 30 ++- .../{cos => }/service/TaskReporterImpl.java | 50 ++-- .../{cos => }/service/ThreadCommandBus.java | 3 +- .../worker/task/heartbeat/HeartBeatTask.java | 6 +- .../file/worker/api/FileResourceProxy.java | 7 +- .../worker/api/FileTaskResourceProxy.java | 4 +- .../impl/ArtifactoryFileResourceImpl.java | 4 +- .../service/ArtifactoryRemoteClient.java | 2 +- .../worker/cos/impl/COSFileResourceImpl.java | 8 +- .../{cos => }/service/COSBaseService.java | 2 +- .../{cos => }/service/COSRemoteClient.java | 2 +- .../kubernetes/charts/bk-job/VALUES_LOG.md | 15 ++ .../templates/job-file-gateway/configmap.yaml | 2 + .../kubernetes/charts/bk-job/values.yaml | 10 +- ...#job#job-file-gateway#job-file-gateway.yml | 10 + 37 files changed, 544 insertions(+), 274 deletions(-) create mode 100644 src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/FileSourceTaskUpdateService.java create mode 100644 src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskUpdateServiceImpl.java rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/DownloadFileTask.java (92%) rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/EnvironmentService.java (98%) rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/FileProgressWatchingTask.java (77%) rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/FileTaskService.java (80%) rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/GatewayInfoService.java (97%) rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/MetaDataService.java (99%) rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/OpService.java (99%) rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/RemoteClient.java (97%) rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/TaskReporter.java (60%) rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/TaskReporterImpl.java (77%) rename src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/ThreadCommandBus.java (98%) rename src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/COSBaseService.java (97%) rename src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/{cos => }/service/COSRemoteClient.java (98%) diff --git a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/util/http/JobHttpClientImpl.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/util/http/JobHttpClientImpl.java index 9d74d2316c..25e93d99d1 100644 --- a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/util/http/JobHttpClientImpl.java +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/util/http/JobHttpClientImpl.java @@ -85,7 +85,7 @@ private void logRespStr(String respStr) { } private void logAndThrow(ResponseEntity respEntity) { - log.error("Fail to request fileWorker, status={}, msg={}", respEntity.getStatusCode(), respEntity.getBody()); + log.error("Fail to request, status={}, msg={}", respEntity.getStatusCode(), respEntity.getBody()); throw new ServiceException( ErrorType.INTERNAL, ErrorCode.FAIL_TO_REQUEST_FILE_WORKER_WITH_REASON, diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceTaskDAO.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceTaskDAO.java index a21d3b0dde..7f2738a892 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceTaskDAO.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileSourceTaskDAO.java @@ -39,6 +39,8 @@ public interface FileSourceTaskDAO { FileSourceTaskDTO getFileSourceTaskById(String id); + FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id); + Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status); List listByBatchTaskId(String batchTaskId); diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java index 34854276eb..f7485d9445 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/FileTaskDAO.java @@ -38,6 +38,8 @@ public interface FileTaskDAO { int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId); + FileTaskDTO getFileTaskByIdForUpdate(Long id); + FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath); List listFileTasks(String fileSourceTaskId, Integer start, Integer pageSize); diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceTaskDAOImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceTaskDAOImpl.java index 0592d3f6e4..33d82ac79b 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceTaskDAOImpl.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileSourceTaskDAOImpl.java @@ -177,6 +177,18 @@ val record = dslContext.selectFrom(defaultTable).where( } } + @Override + public FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id) { + val record = dslContext.selectFrom(defaultTable).where( + defaultTable.ID.eq(id) + ).forUpdate().fetchOne(); + if (record == null) { + return null; + } else { + return convertRecordToDto(record); + } + } + @Override public Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status) { List conditions = new ArrayList<>(); diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java index 9bc765e4a2..2416b9d090 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/dao/filesource/impl/FileTaskDAOImpl.java @@ -134,6 +134,20 @@ public int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId) { ).execute(); } + @Override + public FileTaskDTO getFileTaskByIdForUpdate(Long id) { + List conditions = new ArrayList<>(); + conditions.add(defaultTable.ID.eq(id)); + val record = dslContext.selectFrom(defaultTable).where( + conditions + ).forUpdate().fetchOne(); + if (record == null) { + return null; + } else { + return convertRecordToDto(record); + } + } + @Override public FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath) { List conditions = new ArrayList<>(); diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/FileSourceTaskUpdateService.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/FileSourceTaskUpdateService.java new file mode 100644 index 0000000000..3538e1ae26 --- /dev/null +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/FileSourceTaskUpdateService.java @@ -0,0 +1,36 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.file_gateway.service; + +import com.tencent.bk.job.file_gateway.model.dto.FileTaskProgressDTO; + +public interface FileSourceTaskUpdateService { + + String updateFileSourceTask(String batchTaskId, + String fileSourceTaskId, + Long fileTaskId, + FileTaskProgressDTO fileTaskProgressDTO); + +} diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskServiceImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskServiceImpl.java index 523568e85d..54b20f20b0 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskServiceImpl.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskServiceImpl.java @@ -27,38 +27,28 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.tencent.bk.job.common.constant.ErrorCode; import com.tencent.bk.job.common.exception.InternalException; -import com.tencent.bk.job.common.exception.NotFoundException; import com.tencent.bk.job.common.exception.ServiceException; import com.tencent.bk.job.common.model.Response; import com.tencent.bk.job.common.model.http.HttpReq; -import com.tencent.bk.job.common.util.ArrayUtil; -import com.tencent.bk.job.common.util.file.FileSizeUtil; -import com.tencent.bk.job.common.util.file.PathUtil; import com.tencent.bk.job.common.util.http.JobHttpClient; import com.tencent.bk.job.common.util.json.JsonUtils; -import com.tencent.bk.job.execute.common.constants.FileDistStatusEnum; import com.tencent.bk.job.file_gateway.consts.TaskCommandEnum; import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum; -import com.tencent.bk.job.file_gateway.dao.filesource.FileSourceBatchTaskDAO; import com.tencent.bk.job.file_gateway.dao.filesource.FileSourceDAO; import com.tencent.bk.job.file_gateway.dao.filesource.FileSourceTaskDAO; import com.tencent.bk.job.file_gateway.dao.filesource.FileTaskDAO; import com.tencent.bk.job.file_gateway.dao.filesource.FileWorkerDAO; -import com.tencent.bk.job.file_gateway.model.dto.FileSourceBatchTaskDTO; import com.tencent.bk.job.file_gateway.model.dto.FileSourceDTO; import com.tencent.bk.job.file_gateway.model.dto.FileSourceTaskDTO; import com.tencent.bk.job.file_gateway.model.dto.FileTaskDTO; import com.tencent.bk.job.file_gateway.model.dto.FileTaskProgressDTO; import com.tencent.bk.job.file_gateway.model.dto.FileWorkerDTO; -import com.tencent.bk.job.file_gateway.model.resp.inner.FileLogPieceDTO; import com.tencent.bk.job.file_gateway.model.resp.inner.FileSourceTaskStatusDTO; import com.tencent.bk.job.file_gateway.model.resp.inner.TaskInfoDTO; import com.tencent.bk.job.file_gateway.model.resp.inner.ThirdFileSourceTaskLogDTO; import com.tencent.bk.job.file_gateway.service.DispatchService; import com.tencent.bk.job.file_gateway.service.FileSourceTaskService; -import com.tencent.bk.job.file_gateway.service.context.TaskContext; -import com.tencent.bk.job.file_gateway.service.context.impl.DefaultTaskContext; -import com.tencent.bk.job.file_gateway.service.listener.FileTaskStatusChangeListener; +import com.tencent.bk.job.file_gateway.service.FileSourceTaskUpdateService; import com.tencent.bk.job.file_gateway.service.remote.FileSourceTaskReqGenService; import lombok.extern.slf4j.Slf4j; import org.slf4j.helpers.MessageFormatter; @@ -70,7 +60,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,7 +71,7 @@ public class FileSourceTaskServiceImpl implements FileSourceTaskService { public static final String PREFIX_REDIS_TASK_LOG = "job:file-gateway:taskLog:"; - private final FileSourceBatchTaskDAO fileSourceBatchTaskDAO; + private final FileSourceTaskUpdateService fileSourceTaskUpdateService; private final FileSourceTaskDAO fileSourceTaskDAO; private final FileTaskDAO fileTaskDAO; private final FileWorkerDAO fileworkerDAO; @@ -91,21 +80,18 @@ public class FileSourceTaskServiceImpl implements FileSourceTaskService { private final FileSourceTaskReqGenService fileSourceTaskReqGenService; private final RedisTemplate redisTemplate; private final JobHttpClient jobHttpClient; - private final List fileTaskStatusChangeListenerList = new ArrayList<>(); @Autowired - public FileSourceTaskServiceImpl( - FileSourceBatchTaskDAO fileSourceBatchTaskDAO, - FileSourceTaskDAO fileSourceTaskDAO, - FileTaskDAO fileTaskDAO, - FileWorkerDAO fileworkerDAO, - FileSourceDAO fileSourceDAO, - DispatchService dispatchService, - FileSourceTaskReqGenService fileSourceTaskReqGenService, - @Qualifier("jsonRedisTemplate") RedisTemplate redisTemplate, - FileTaskStatusChangeListener fileTaskStatusChangeListener, - JobHttpClient jobHttpClient) { - this.fileSourceBatchTaskDAO = fileSourceBatchTaskDAO; + public FileSourceTaskServiceImpl(FileSourceTaskUpdateService fileSourceTaskUpdateService, + FileSourceTaskDAO fileSourceTaskDAO, + FileTaskDAO fileTaskDAO, + FileWorkerDAO fileworkerDAO, + FileSourceDAO fileSourceDAO, + DispatchService dispatchService, + FileSourceTaskReqGenService fileSourceTaskReqGenService, + @Qualifier("jsonRedisTemplate") RedisTemplate redisTemplate, + JobHttpClient jobHttpClient) { + this.fileSourceTaskUpdateService = fileSourceTaskUpdateService; this.fileSourceTaskDAO = fileSourceTaskDAO; this.fileTaskDAO = fileTaskDAO; this.fileworkerDAO = fileworkerDAO; @@ -114,11 +100,6 @@ public FileSourceTaskServiceImpl( this.fileSourceTaskReqGenService = fileSourceTaskReqGenService; this.redisTemplate = redisTemplate; this.jobHttpClient = jobHttpClient; - addFileTaskStatusChangeListener(fileTaskStatusChangeListener); - } - - public void addFileTaskStatusChangeListener(FileTaskStatusChangeListener listener) { - this.fileTaskStatusChangeListenerList.add(listener); } @Override @@ -197,65 +178,6 @@ public TaskInfoDTO startFileSourceDownloadTaskWithId(String username, Long appId fileWorkerDTO.getCloudAreaId(), fileWorkerDTO.getInnerIp()); } - private void writeLog(FileSourceTaskDTO fileSourceTaskDTO, - FileWorkerDTO fileWorkerDTO, - FileTaskProgressDTO fileTaskProgressDTO) { - String taskId = fileSourceTaskDTO.getId(); - String fileSizeStr = FileSizeUtil.getFileSizeStr(fileTaskProgressDTO.getFileSize()); - ThirdFileSourceTaskLogDTO thirdFileSourceTaskLog = new ThirdFileSourceTaskLogDTO(); - String sourceCloudIp = fileWorkerDTO.getCloudIp(); - thirdFileSourceTaskLog.setIp(sourceCloudIp); - // 追加文件源名称 - // 日志定位坐标:(文件源,文件路径),需要区分不同文件源下相同文件路径的日志 - FileSourceDTO fileSourceDTO = fileSourceDAO.getFileSourceById(fileSourceTaskDTO.getFileSourceId()); - if (fileSourceDTO == null) { - throw new NotFoundException(ErrorCode.FILE_SOURCE_NOT_EXIST, - ArrayUtil.toArray("fileSourceId:" + fileSourceTaskDTO.getFileSourceId())); - } - String filePathWithSourceAlias = PathUtil.joinFilePath( - fileSourceDTO.getAlias(), - fileTaskProgressDTO.getFilePath() - ); - List fileLogPieceList = new ArrayList<>(); - FileLogPieceDTO fileLogPiece = new FileLogPieceDTO(); - fileLogPiece.setContent(buildFileLogContent(fileTaskProgressDTO, filePathWithSourceAlias, fileSizeStr)); - fileLogPiece.setDisplaySrcFile(filePathWithSourceAlias); - fileLogPiece.setProcess(buildProcessStr(fileTaskProgressDTO)); - fileLogPiece.setSize(fileSizeStr); - fileLogPiece.setSrcIp(sourceCloudIp); - fileLogPiece.setStatus(FileDistStatusEnum.PULLING.getValue()); - fileLogPiece.setStatusDesc(FileDistStatusEnum.PULLING.getName()); - fileLogPieceList.add(fileLogPiece); - thirdFileSourceTaskLog.setFileTaskLogs(fileLogPieceList); - // 写入Redis - redisTemplate.opsForList().rightPush(PREFIX_REDIS_TASK_LOG + taskId, thirdFileSourceTaskLog); - // 一小时后过期 - redisTemplate.expireAt(PREFIX_REDIS_TASK_LOG + taskId, new Date(System.currentTimeMillis() + 3600 * 1000)); - } - - @SuppressWarnings("StringBufferReplaceableByString") - private String buildFileLogContent(FileTaskProgressDTO fileTaskProgressDTO, - String filePathWithSourceAlias, - String fileSizeStr) { - StringBuilder sb = new StringBuilder(); - sb.append("FileName: "); - sb.append(filePathWithSourceAlias); - sb.append(" FileSize: "); - sb.append(fileSizeStr); - sb.append(" "); - sb.append("Speed: "); - sb.append(fileTaskProgressDTO.getSpeed()); - sb.append(" Progress: "); - sb.append(fileTaskProgressDTO.getProgress()); - sb.append("% Detail: "); - sb.append(fileTaskProgressDTO.getContent()); - return sb.toString(); - } - - private String buildProcessStr(FileTaskProgressDTO fileTaskProgressDTO) { - return "" + fileTaskProgressDTO.getProgress() + "%"; - } - @Override public String updateFileSourceTask(FileTaskProgressDTO fileTaskProgressDTO) { String fileSourceTaskId = fileTaskProgressDTO.getFileSourceTaskId(); @@ -272,97 +194,19 @@ public String updateFileSourceTask(FileTaskProgressDTO fileTaskProgressDTO) { ); return null; } - TaskStatusEnum previousStatus = TaskStatusEnum.valueOf(fileTaskDTO.getStatus()); - fileTaskDTO.setDownloadPath(fileTaskProgressDTO.getDownloadPath()); FileSourceTaskDTO fileSourceTaskDTO = fileSourceTaskDAO.getFileSourceTaskById(fileSourceTaskId); if (fileSourceTaskDTO == null) { log.error("Cannot find fileSourceTaskDTO by taskId {} filePath {}", fileSourceTaskId, filePath); return null; } - return updateFileSourceTask(fileTaskDTO, fileSourceTaskDTO, fileTaskProgressDTO, previousStatus); - } - - @Transactional(value = "jobFileGatewayTransactionManager", rollbackFor = {Throwable.class}) - public String updateFileSourceTask(FileTaskDTO fileTaskDTO, - FileSourceTaskDTO fileSourceTaskDTO, - FileTaskProgressDTO fileTaskProgressDTO, - TaskStatusEnum previousStatus) { - // 开启事务后立即加排它锁,保证读取到其他事务已提交的数据 - FileSourceBatchTaskDTO fileSourceBatchTaskDTO = - fileSourceBatchTaskDAO.getBatchTaskByIdForUpdate(fileSourceTaskDTO.getBatchTaskId()); - String fileSourceTaskId = fileTaskProgressDTO.getFileSourceTaskId(); - String filePath = fileTaskProgressDTO.getFilePath(); - TaskStatusEnum status = fileTaskProgressDTO.getStatus(); - Integer progress = fileTaskProgressDTO.getProgress(); - Long fileSize = fileTaskProgressDTO.getFileSize(); - if (fileSourceBatchTaskDTO == null) { - log.error("Cannot find fileSourceBatchTaskDTO by batchTaskId {} fileSourceTaskId {} filePath {}", - fileSourceTaskDTO.getBatchTaskId(), - fileSourceTaskId, - filePath - ); - return null; - } - FileWorkerDTO fileWorkerDTO = fileworkerDAO.getFileWorkerById(fileSourceTaskDTO.getFileWorkerId()); - int affectedRowNum = -1; - if (status == TaskStatusEnum.RUNNING) { - // 已处于结束态的任务不再接受状态更新 - if (!fileTaskDTO.isDone()) { - fileTaskDTO.setProgress(progress); - fileTaskDTO.setFileSize(fileSize); - fileTaskDTO.setStatus(TaskStatusEnum.RUNNING.getStatus()); - affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); - logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status); - } else { - log.info("fileTask {} already done, do not update to running", fileSourceTaskId); - } - } else if (status == TaskStatusEnum.SUCCESS) { - fileTaskDTO.setProgress(100); - fileTaskDTO.setStatus(TaskStatusEnum.SUCCESS.getStatus()); - affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); - logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status); - } else if (status == TaskStatusEnum.FAILED) { - fileTaskDTO.setProgress(progress); - fileTaskDTO.setStatus(TaskStatusEnum.FAILED.getStatus()); - affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); - logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status); - } else if (status == TaskStatusEnum.STOPPED) { - fileTaskDTO.setProgress(progress); - fileTaskDTO.setStatus(TaskStatusEnum.STOPPED.getStatus()); - affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); - logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status); - } else { - log.warn("fileTask {} unknown status:{}", fileSourceTaskId, status); - } - if (affectedRowNum != -1) { - log.info("{} updated, affectedRowNum={}", fileTaskDTO, affectedRowNum); - } - // 通知关注者 - if (status != previousStatus) { - notifyFileTaskStatusChangeListeners(fileTaskDTO, fileSourceTaskDTO, fileWorkerDTO, previousStatus, status); - } - // 进度上报 - writeLog(fileSourceTaskDTO, fileWorkerDTO, fileTaskProgressDTO); - return fileSourceTaskId; - } - - private void notifyFileTaskStatusChangeListeners(FileTaskDTO fileTaskDTO, - FileSourceTaskDTO fileSourceTaskDTO, - FileWorkerDTO fileWorkerDTO, - TaskStatusEnum previousStatus, - TaskStatusEnum currentStatus) { - TaskContext context = new DefaultTaskContext(fileTaskDTO, fileSourceTaskDTO, fileWorkerDTO); - if (!fileTaskStatusChangeListenerList.isEmpty()) { - boolean stop; - for (FileTaskStatusChangeListener listener : fileTaskStatusChangeListenerList) { - stop = listener.onStatusChange(context, previousStatus, currentStatus); - if (stop) break; - } - } - } - - private void logUpdatedTaskStatus(String taskId, String filePath, Integer progress, TaskStatusEnum status) { - log.info("updated fileTask:{},{},{},{}", taskId, filePath, progress, status.name()); + String batchTaskId = fileSourceTaskDTO.getBatchTaskId(); + Long fileTaskId = fileTaskDTO.getId(); + return fileSourceTaskUpdateService.updateFileSourceTask( + batchTaskId, + fileSourceTaskId, + fileTaskId, + fileTaskProgressDTO + ); } @Override diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskUpdateServiceImpl.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskUpdateServiceImpl.java new file mode 100644 index 0000000000..0b0e315443 --- /dev/null +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/service/impl/FileSourceTaskUpdateServiceImpl.java @@ -0,0 +1,248 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.file_gateway.service.impl; + +import com.tencent.bk.job.common.constant.ErrorCode; +import com.tencent.bk.job.common.exception.NotFoundException; +import com.tencent.bk.job.common.util.ArrayUtil; +import com.tencent.bk.job.common.util.file.FileSizeUtil; +import com.tencent.bk.job.common.util.file.PathUtil; +import com.tencent.bk.job.execute.common.constants.FileDistStatusEnum; +import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum; +import com.tencent.bk.job.file_gateway.dao.filesource.FileSourceBatchTaskDAO; +import com.tencent.bk.job.file_gateway.dao.filesource.FileSourceDAO; +import com.tencent.bk.job.file_gateway.dao.filesource.FileSourceTaskDAO; +import com.tencent.bk.job.file_gateway.dao.filesource.FileTaskDAO; +import com.tencent.bk.job.file_gateway.dao.filesource.FileWorkerDAO; +import com.tencent.bk.job.file_gateway.model.dto.FileSourceBatchTaskDTO; +import com.tencent.bk.job.file_gateway.model.dto.FileSourceDTO; +import com.tencent.bk.job.file_gateway.model.dto.FileSourceTaskDTO; +import com.tencent.bk.job.file_gateway.model.dto.FileTaskDTO; +import com.tencent.bk.job.file_gateway.model.dto.FileTaskProgressDTO; +import com.tencent.bk.job.file_gateway.model.dto.FileWorkerDTO; +import com.tencent.bk.job.file_gateway.model.resp.inner.FileLogPieceDTO; +import com.tencent.bk.job.file_gateway.model.resp.inner.ThirdFileSourceTaskLogDTO; +import com.tencent.bk.job.file_gateway.service.FileSourceTaskUpdateService; +import com.tencent.bk.job.file_gateway.service.context.TaskContext; +import com.tencent.bk.job.file_gateway.service.context.impl.DefaultTaskContext; +import com.tencent.bk.job.file_gateway.service.listener.FileTaskStatusChangeListener; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +@Slf4j +@Service +public class FileSourceTaskUpdateServiceImpl implements FileSourceTaskUpdateService { + + public static final String PREFIX_REDIS_TASK_LOG = "job:file-gateway:taskLog:"; + + private final FileSourceBatchTaskDAO fileSourceBatchTaskDAO; + private final FileSourceTaskDAO fileSourceTaskDAO; + private final FileTaskDAO fileTaskDAO; + private final FileWorkerDAO fileworkerDAO; + private final FileSourceDAO fileSourceDAO; + private final RedisTemplate redisTemplate; + private final List fileTaskStatusChangeListenerList = new ArrayList<>(); + + @Autowired + public FileSourceTaskUpdateServiceImpl(FileSourceBatchTaskDAO fileSourceBatchTaskDAO, + FileSourceTaskDAO fileSourceTaskDAO, + FileTaskDAO fileTaskDAO, + FileWorkerDAO fileworkerDAO, + FileSourceDAO fileSourceDAO, + @Qualifier("jsonRedisTemplate") RedisTemplate redisTemplate, + FileTaskStatusChangeListener fileTaskStatusChangeListener) { + this.fileSourceBatchTaskDAO = fileSourceBatchTaskDAO; + this.fileSourceTaskDAO = fileSourceTaskDAO; + this.fileTaskDAO = fileTaskDAO; + this.fileworkerDAO = fileworkerDAO; + this.fileSourceDAO = fileSourceDAO; + this.redisTemplate = redisTemplate; + addFileTaskStatusChangeListener(fileTaskStatusChangeListener); + } + + public void addFileTaskStatusChangeListener(FileTaskStatusChangeListener listener) { + this.fileTaskStatusChangeListenerList.add(listener); + } + + @Override + @Transactional(value = "jobFileGatewayTransactionManager", rollbackFor = {Throwable.class}) + public String updateFileSourceTask(String batchTaskId, + String fileSourceTaskId, + Long fileTaskId, + FileTaskProgressDTO fileTaskProgressDTO) { + // 开启事务后立即加排它锁,保证读取到其他事务已提交的数据 + FileSourceBatchTaskDTO fileSourceBatchTaskDTO = + fileSourceBatchTaskDAO.getBatchTaskByIdForUpdate(batchTaskId); + // 查出加锁后的最新数据 + FileSourceTaskDTO fileSourceTaskDTO = fileSourceTaskDAO.getFileSourceTaskByIdForUpdate(fileSourceTaskId); + FileTaskDTO fileTaskDTO = fileTaskDAO.getFileTaskByIdForUpdate(fileTaskId); + if (log.isDebugEnabled()) { + log.debug("fileTaskDTO={}", fileTaskDTO); + } + fileTaskDTO.setDownloadPath(fileTaskProgressDTO.getDownloadPath()); + + String filePath = fileTaskProgressDTO.getFilePath(); + TaskStatusEnum previousStatus = TaskStatusEnum.valueOf(fileTaskDTO.getStatus()); + TaskStatusEnum status = fileTaskProgressDTO.getStatus(); + Integer progress = fileTaskProgressDTO.getProgress(); + Long fileSize = fileTaskProgressDTO.getFileSize(); + if (fileSourceBatchTaskDTO == null) { + log.error("Cannot find fileSourceBatchTaskDTO by batchTaskId {} fileSourceTaskId {} filePath {}", + fileSourceTaskDTO.getBatchTaskId(), + fileSourceTaskId, + filePath + ); + return null; + } + FileWorkerDTO fileWorkerDTO = fileworkerDAO.getFileWorkerById(fileSourceTaskDTO.getFileWorkerId()); + int affectedRowNum = -1; + if (status == TaskStatusEnum.RUNNING) { + // 已处于结束态的任务不再接受状态更新 + if (!fileTaskDTO.isDone()) { + fileTaskDTO.setProgress(progress); + fileTaskDTO.setFileSize(fileSize); + fileTaskDTO.setStatus(TaskStatusEnum.RUNNING.getStatus()); + affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); + logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status); + } else { + log.info("fileTask {} already done, do not update to running", fileSourceTaskId); + } + } else if (status == TaskStatusEnum.SUCCESS) { + fileTaskDTO.setProgress(100); + fileTaskDTO.setStatus(TaskStatusEnum.SUCCESS.getStatus()); + affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); + logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status); + } else if (status == TaskStatusEnum.FAILED) { + fileTaskDTO.setProgress(progress); + fileTaskDTO.setStatus(TaskStatusEnum.FAILED.getStatus()); + affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); + logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status); + } else if (status == TaskStatusEnum.STOPPED) { + fileTaskDTO.setProgress(progress); + fileTaskDTO.setStatus(TaskStatusEnum.STOPPED.getStatus()); + affectedRowNum = fileTaskDAO.updateFileTask(fileTaskDTO); + logUpdatedTaskStatus(fileSourceTaskId, filePath, progress, status); + } else { + log.warn("fileTask {} unknown status:{}", fileSourceTaskId, status); + } + if (affectedRowNum != -1) { + log.info("{} updated, affectedRowNum={}", fileTaskDTO, affectedRowNum); + } + // 通知关注者 + if (status != previousStatus) { + notifyFileTaskStatusChangeListeners(fileTaskDTO, fileSourceTaskDTO, fileWorkerDTO, previousStatus, status); + } + // 进度上报 + writeLog(fileSourceTaskDTO, fileWorkerDTO, fileTaskProgressDTO); + return fileSourceTaskId; + } + + private void notifyFileTaskStatusChangeListeners(FileTaskDTO fileTaskDTO, + FileSourceTaskDTO fileSourceTaskDTO, + FileWorkerDTO fileWorkerDTO, + TaskStatusEnum previousStatus, + TaskStatusEnum currentStatus) { + TaskContext context = new DefaultTaskContext(fileTaskDTO, fileSourceTaskDTO, fileWorkerDTO); + if (!fileTaskStatusChangeListenerList.isEmpty()) { + boolean stop; + for (FileTaskStatusChangeListener listener : fileTaskStatusChangeListenerList) { + stop = listener.onStatusChange(context, previousStatus, currentStatus); + if (stop) break; + } + } + } + + private void logUpdatedTaskStatus(String taskId, String filePath, Integer progress, TaskStatusEnum status) { + log.info("updated fileTask:{},{},{},{}", taskId, filePath, progress, status.name()); + } + + private void writeLog(FileSourceTaskDTO fileSourceTaskDTO, + FileWorkerDTO fileWorkerDTO, + FileTaskProgressDTO fileTaskProgressDTO) { + String taskId = fileSourceTaskDTO.getId(); + String fileSizeStr = FileSizeUtil.getFileSizeStr(fileTaskProgressDTO.getFileSize()); + ThirdFileSourceTaskLogDTO thirdFileSourceTaskLog = new ThirdFileSourceTaskLogDTO(); + String sourceCloudIp = fileWorkerDTO.getCloudIp(); + thirdFileSourceTaskLog.setIp(sourceCloudIp); + // 追加文件源名称 + // 日志定位坐标:(文件源,文件路径),需要区分不同文件源下相同文件路径的日志 + FileSourceDTO fileSourceDTO = fileSourceDAO.getFileSourceById(fileSourceTaskDTO.getFileSourceId()); + if (fileSourceDTO == null) { + throw new NotFoundException(ErrorCode.FILE_SOURCE_NOT_EXIST, + ArrayUtil.toArray("fileSourceId:" + fileSourceTaskDTO.getFileSourceId())); + } + String filePathWithSourceAlias = PathUtil.joinFilePath( + fileSourceDTO.getAlias(), + fileTaskProgressDTO.getFilePath() + ); + List fileLogPieceList = new ArrayList<>(); + FileLogPieceDTO fileLogPiece = new FileLogPieceDTO(); + fileLogPiece.setContent(buildFileLogContent(fileTaskProgressDTO, filePathWithSourceAlias, fileSizeStr)); + fileLogPiece.setDisplaySrcFile(filePathWithSourceAlias); + fileLogPiece.setProcess(buildProcessStr(fileTaskProgressDTO)); + fileLogPiece.setSize(fileSizeStr); + fileLogPiece.setSrcIp(sourceCloudIp); + fileLogPiece.setStatus(FileDistStatusEnum.PULLING.getValue()); + fileLogPiece.setStatusDesc(FileDistStatusEnum.PULLING.getName()); + fileLogPieceList.add(fileLogPiece); + thirdFileSourceTaskLog.setFileTaskLogs(fileLogPieceList); + // 写入Redis + redisTemplate.opsForList().rightPush(PREFIX_REDIS_TASK_LOG + taskId, thirdFileSourceTaskLog); + // 一小时后过期 + redisTemplate.expireAt(PREFIX_REDIS_TASK_LOG + taskId, new Date(System.currentTimeMillis() + 3600 * 1000)); + } + + @SuppressWarnings("StringBufferReplaceableByString") + private String buildFileLogContent(FileTaskProgressDTO fileTaskProgressDTO, + String filePathWithSourceAlias, + String fileSizeStr) { + StringBuilder sb = new StringBuilder(); + sb.append("FileName: "); + sb.append(filePathWithSourceAlias); + sb.append(" FileSize: "); + sb.append(fileSizeStr); + sb.append(" "); + sb.append("Speed: "); + sb.append(fileTaskProgressDTO.getSpeed()); + sb.append(" Progress: "); + sb.append(fileTaskProgressDTO.getProgress()); + sb.append("% Detail: "); + sb.append(fileTaskProgressDTO.getContent()); + return sb.toString(); + } + + private String buildProcessStr(FileTaskProgressDTO fileTaskProgressDTO) { + return "" + fileTaskProgressDTO.getProgress() + "%"; + } + +} diff --git a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/dispatch/ReDispatchTimeoutTask.java b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/dispatch/ReDispatchTimeoutTask.java index 1b449d1ffb..0aeaad527c 100644 --- a/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/dispatch/ReDispatchTimeoutTask.java +++ b/src/backend/job-file-gateway/service-job-file-gateway/src/main/java/com/tencent/bk/job/file_gateway/task/dispatch/ReDispatchTimeoutTask.java @@ -41,8 +41,11 @@ public class ReDispatchTimeoutTask { private final FileTaskDAO fileTaskDAO; private final ReDispatchService reDispatchService; - @Value("${job.file-gateway.task.timeout.reDispatch.enabled:true}") - private final boolean enableTimeoutRedispatch = true; + @Value("${job.file-gateway.reDispatch.timeoutTask.enabled:true}") + private boolean reDispatchTimeoutTaskEnabled = true; + + @Value("${job.file-gateway.reDispatch.timeoutTask.timeoutSeconds:10}") + private int reDispatchTaskTimeoutSeconds = 10; @Autowired public ReDispatchTimeoutTask(FileTaskDAO fileTaskDAO, @@ -52,12 +55,13 @@ public ReDispatchTimeoutTask(FileTaskDAO fileTaskDAO, } public void run() { - if (!enableTimeoutRedispatch) { - log.info("Timeout task reDispatch not enabled, you can config it in configuration file by set job" + - ".file-gateway.task.timeout.reDispatch.enable=true"); + if (!reDispatchTimeoutTaskEnabled) { + log.info("reDispatch timeout task not enabled, you can config it in configuration file by set job" + + ".file-gateway.reDispatch.timeoutTask.enabled=true"); + return; } - // 找出未结束且长时间无响应的任务,10s无响应且未结束的任务就应当被重调度了 - long fileSourceTaskStatusExpireTimeMills = 10 * 1000L; + // 找出未结束且长时间无响应的任务,无响应且未结束的任务就应当被重调度了 + long fileSourceTaskStatusExpireTimeMills = reDispatchTaskTimeoutSeconds * 1000L; List timeoutFileSourceTaskIdList = fileTaskDAO.listTimeoutFileSourceTaskIds( fileSourceTaskStatusExpireTimeMills, TaskStatusEnum.getRunningStatusSet(), @@ -66,9 +70,13 @@ public void run() { ); // 进行超时重调度 for (String fileSourceTaskId : timeoutFileSourceTaskIdList) { - log.info("reDispatch fileSourceTask by timeout:{}", fileSourceTaskId); boolean result = reDispatchService.reDispatchByGateway(fileSourceTaskId, 0L, 5000L); - log.info("result={}", result); + log.info( + "reDispatch fileSourceTask by timeout({}s):{}, result={}", + reDispatchTaskTimeoutSeconds, + fileSourceTaskId, + result + ); } } } diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/FileTaskResourceImpl.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/FileTaskResourceImpl.java index 29f0258c7d..5e7f6e3a71 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/FileTaskResourceImpl.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/FileTaskResourceImpl.java @@ -26,13 +26,13 @@ import com.tencent.bk.job.common.model.Response; import com.tencent.bk.job.common.util.json.JsonUtils; -import com.tencent.bk.job.file.worker.cos.service.FileTaskService; -import com.tencent.bk.job.file.worker.cos.service.RemoteClient; -import com.tencent.bk.job.file.worker.cos.service.ThreadCommandBus; import com.tencent.bk.job.file.worker.model.req.BaseReq; import com.tencent.bk.job.file.worker.model.req.ClearTaskFilesReq; import com.tencent.bk.job.file.worker.model.req.DownloadFilesTaskReq; import com.tencent.bk.job.file.worker.model.req.StopTasksReq; +import com.tencent.bk.job.file.worker.service.FileTaskService; +import com.tencent.bk.job.file.worker.service.RemoteClient; +import com.tencent.bk.job.file.worker.service.ThreadCommandBus; import com.tencent.bk.job.file_gateway.consts.TaskCommandEnum; import lombok.extern.slf4j.Slf4j; @@ -71,8 +71,7 @@ public Response stopTasks(StopTasksReq req) { @Override public Response clearFiles(ClearTaskFilesReq req) { List taskIdList = req.getTaskIdList(); - Integer count = 0; - count = fileTaskService.clearTaskFilesAtOnce(taskIdList); + Integer count = fileTaskService.clearTaskFilesAtOnce(taskIdList); return Response.buildSuccessResp(count); } } diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/IFileResource.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/IFileResource.java index dbc6831e53..db0203470d 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/IFileResource.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/IFileResource.java @@ -25,10 +25,10 @@ package com.tencent.bk.job.file.worker.api; import com.tencent.bk.job.common.model.InternalResponse; -import com.tencent.bk.job.file.worker.cos.service.RemoteClient; import com.tencent.bk.job.file.worker.model.req.BaseReq; import com.tencent.bk.job.file.worker.model.req.ExecuteActionReq; import com.tencent.bk.job.file.worker.model.req.ListFileNodeReq; +import com.tencent.bk.job.file.worker.service.RemoteClient; import com.tencent.bk.job.file_gateway.model.resp.common.FileNodesDTO; public interface IFileResource { diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/OpResourceImpl.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/OpResourceImpl.java index 75b78511a5..98af366c35 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/OpResourceImpl.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/OpResourceImpl.java @@ -25,8 +25,8 @@ package com.tencent.bk.job.file.worker.api; import com.tencent.bk.job.common.model.Response; -import com.tencent.bk.job.file.worker.cos.service.OpService; import com.tencent.bk.job.file.worker.model.req.WorkerOffLineReq; +import com.tencent.bk.job.file.worker.service.OpService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RestController; diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/RemoteClientAccess.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/RemoteClientAccess.java index dd6d9b7390..b09ed1fd65 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/RemoteClientAccess.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/api/RemoteClientAccess.java @@ -24,8 +24,8 @@ package com.tencent.bk.job.file.worker.api; -import com.tencent.bk.job.file.worker.cos.service.RemoteClient; import com.tencent.bk.job.file.worker.model.req.BaseReq; +import com.tencent.bk.job.file.worker.service.RemoteClient; public interface RemoteClientAccess { RemoteClient getRemoteClient(BaseReq req); diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/GracefulShutdown.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/GracefulShutdown.java index 23a92cb4f2..0346378107 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/GracefulShutdown.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/config/GracefulShutdown.java @@ -24,7 +24,7 @@ package com.tencent.bk.job.file.worker.config; -import com.tencent.bk.job.file.worker.cos.service.OpService; +import com.tencent.bk.job.file.worker.service.OpService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/DownloadFileTask.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/DownloadFileTask.java similarity index 92% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/DownloadFileTask.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/DownloadFileTask.java index bfbd697ec4..4129e84dda 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/DownloadFileTask.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/DownloadFileTask.java @@ -22,7 +22,7 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.common.constant.ErrorCode; import com.tencent.bk.job.common.exception.InternalException; @@ -31,6 +31,7 @@ import com.tencent.bk.job.common.util.file.PathUtil; import com.tencent.bk.job.file.worker.model.FileMetaData; import com.tencent.bk.job.file_gateway.consts.TaskCommandEnum; +import lombok.Builder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.http.client.methods.HttpRequestBase; @@ -52,14 +53,22 @@ class DownloadFileTask extends Thread { AtomicLong fileSize; AtomicInteger speed; AtomicInteger process; + FileProgressWatchingTask watchingTask; TaskReporter taskReporter; DownloadFileTaskEventListener taskEventListener; - FileProgressWatchingTask watchingTask; - public DownloadFileTask(RemoteClient remoteClient, String taskId, String filePath, String downloadFileDir, - String filePrefix, AtomicLong fileSize, AtomicInteger speed, AtomicInteger process, + @Builder + public DownloadFileTask(RemoteClient remoteClient, + String taskId, + String filePath, + String downloadFileDir, + String filePrefix, + AtomicLong fileSize, + AtomicInteger speed, + AtomicInteger process, FileProgressWatchingTask watchingTask, - TaskReporter taskReporter, DownloadFileTaskEventListener taskEventListener) { + TaskReporter taskReporter, + DownloadFileTaskEventListener taskEventListener) { this.remoteClient = remoteClient; this.taskId = taskId; this.filePath = filePath; @@ -94,8 +103,8 @@ public void downloadFileToLocal( InputStream ins = null; HttpRequestBase req = null; try { - String fileMd5 = ""; - String currentMd5 = ""; + String fileMd5; + String currentMd5; int count = 0; boolean downloadSuccess = false; do { diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/EnvironmentService.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/EnvironmentService.java similarity index 98% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/EnvironmentService.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/EnvironmentService.java index b7ce5ab230..9d7044ee9c 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/EnvironmentService.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/EnvironmentService.java @@ -1,4 +1,4 @@ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.common.constant.ErrorCode; import com.tencent.bk.job.common.constant.JobConstants; diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/FileProgressWatchingTask.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/FileProgressWatchingTask.java similarity index 77% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/FileProgressWatchingTask.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/FileProgressWatchingTask.java index 98a5b6792c..f431f2da29 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/FileProgressWatchingTask.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/FileProgressWatchingTask.java @@ -22,9 +22,10 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.common.util.file.PathUtil; +import lombok.Builder; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicInteger; @@ -33,18 +34,24 @@ @Slf4j class FileProgressWatchingTask extends Thread { - String taskId; - String filePath; - String downloadFileDir; - AtomicLong fileSize; - AtomicInteger speed; - AtomicInteger process; - TaskReporter taskReporter; - FileProgressWatchingTaskEventListener watchingTaskEventListener; + private final String taskId; + private final String filePath; + private final String downloadFileDir; + private final AtomicLong fileSize; + private final AtomicInteger speed; + private final AtomicInteger process; + private final TaskReporter taskReporter; + private final FileProgressWatchingTaskEventListener watchingTaskEventListener; volatile boolean runFlag = true; - public FileProgressWatchingTask(String taskId, String filePath, String downloadFileDir, AtomicLong fileSize, - AtomicInteger speed, AtomicInteger process, TaskReporter taskReporter, + @Builder + public FileProgressWatchingTask(String taskId, + String filePath, + String downloadFileDir, + AtomicLong fileSize, + AtomicInteger speed, + AtomicInteger process, + TaskReporter taskReporter, FileProgressWatchingTaskEventListener watchingTaskEventListener) { this.taskId = taskId; this.filePath = filePath; @@ -60,6 +67,7 @@ public void stopWatching() { this.runFlag = false; } + @SuppressWarnings("BusyWait") @Override public void run() { String fileTaskKey = taskId + "_" + filePath; @@ -76,8 +84,7 @@ public void run() { sleep(1000); } } - } catch (InterruptedException e) { - log.info("watching interrupted", e); + } catch (InterruptedException ignore) { } finally { if (watchingTaskEventListener != null) { watchingTaskEventListener.onWatchingTaskFinally(fileTaskKey); diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/FileTaskService.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/FileTaskService.java similarity index 80% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/FileTaskService.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/FileTaskService.java index 8d8c96a2af..2ad974e832 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/FileTaskService.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/FileTaskService.java @@ -22,7 +22,7 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.common.util.file.PathUtil; import com.tencent.bk.job.file.worker.config.WorkerConfig; @@ -81,14 +81,38 @@ public Integer downloadFiles(RemoteClient client, String taskId, List fi AtomicLong fileSize = new AtomicLong(0L); AtomicInteger speed = new AtomicInteger(0); AtomicInteger process = new AtomicInteger(0); - FileProgressWatchingTask progressWatchingTask = new FileProgressWatchingTask(taskId, filePath, - workerConfig.getWorkspaceDirPath(), fileSize, speed, process, taskReporter, watchingTaskMap::remove); - DownloadFileTask downloadFileTask = new DownloadFileTask(client, taskId, filePath, - workerConfig.getWorkspaceDirPath(), filePrefix, fileSize, speed, process, progressWatchingTask, - taskReporter, tmpfileTaskKey -> { - fileTaskMap.remove(tmpfileTaskKey); - ThreadCommandBus.destroyCommandQueue(tmpfileTaskKey); - }); + FileProgressWatchingTask progressWatchingTask = FileProgressWatchingTask.builder() + .taskId(taskId) + .filePath(filePath) + .downloadFileDir(workerConfig.getWorkspaceDirPath()) + .fileSize(fileSize) + .speed(speed) + .process(process) + .taskReporter(taskReporter) + .watchingTaskEventListener(pFileTaskKey -> { + Future future = watchingTaskMap.get(pFileTaskKey); + if (future != null) { + future.cancel(true); + watchingTaskMap.remove(pFileTaskKey); + } + }) + .build(); + DownloadFileTask downloadFileTask = DownloadFileTask.builder() + .remoteClient(client) + .taskId(taskId) + .filePath(filePath) + .downloadFileDir(workerConfig.getWorkspaceDirPath()) + .filePrefix(filePrefix) + .fileSize(fileSize) + .speed(speed) + .process(process) + .watchingTask(progressWatchingTask) + .taskReporter(taskReporter) + .taskEventListener(tmpFileTaskKey -> { + fileTaskMap.remove(tmpFileTaskKey); + ThreadCommandBus.destroyCommandQueue(tmpFileTaskKey); + }) + .build(); Future fileTaskFuture = fileTaskExecutor.submit(downloadFileTask); Future watchingTaskFuture = watchingTaskExecutor.submit(progressWatchingTask); fileTaskMap.put(fileTaskKey, fileTaskFuture); diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/GatewayInfoService.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/GatewayInfoService.java similarity index 97% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/GatewayInfoService.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/GatewayInfoService.java index 259b83a088..9547a71218 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/GatewayInfoService.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/GatewayInfoService.java @@ -22,7 +22,7 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.file.worker.config.WorkerConfig; import lombok.extern.slf4j.Slf4j; diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/MetaDataService.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/MetaDataService.java similarity index 99% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/MetaDataService.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/MetaDataService.java index 1056aab4db..05613cbf93 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/MetaDataService.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/MetaDataService.java @@ -22,7 +22,7 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.Sets; diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/OpService.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/OpService.java similarity index 99% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/OpService.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/OpService.java index 785d00876a..70101885e4 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/OpService.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/OpService.java @@ -22,7 +22,7 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.common.model.http.HttpReq; import com.tencent.bk.job.common.util.http.ExtHttpHelper; diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/RemoteClient.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/RemoteClient.java similarity index 97% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/RemoteClient.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/RemoteClient.java index 01b34824ce..33e1cd5da2 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/RemoteClient.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/RemoteClient.java @@ -22,7 +22,7 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.common.exception.ServiceException; import com.tencent.bk.job.file.worker.model.FileMetaData; diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/TaskReporter.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/TaskReporter.java similarity index 60% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/TaskReporter.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/TaskReporter.java index ad82a43807..1d198f4089 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/TaskReporter.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/TaskReporter.java @@ -22,7 +22,7 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import java.util.List; @@ -30,18 +30,34 @@ public interface TaskReporter { void reportFileDownloadStart(String taskId, String filePath, String downloadPath); - void reportFileDownloadProgress(String taskId, String filePath, String downloadPath, Long fileSize, Integer speed - , Integer progress); + void reportFileDownloadProgress(String taskId, + String filePath, + String downloadPath, + Long fileSize, + Integer speed, + Integer progress); - void reportFileDownloadProgressWithContent(String taskId, String filePath, String downloadPath, Long fileSize, Integer speed - , Integer progress, String content); + void reportFileDownloadProgressWithContent(String taskId, + String filePath, + String downloadPath, + Long fileSize, + Integer speed, + Integer progress, + String content); - void reportFileDownloadSuccess(String taskId, String filePath, String downloadPath, Long fileSize, Integer speed, + void reportFileDownloadSuccess(String taskId, + String filePath, + String downloadPath, + Long fileSize, + Integer speed, Integer progress); void reportFileDownloadFailure(String taskId, String filePath, String downloadPath); - void reportFileDownloadStopped(String taskId, String filePath, String downloadPath, Long fileSize, + void reportFileDownloadStopped(String taskId, + String filePath, + String downloadPath, + Long fileSize, Integer progress); void reportFileDownloadFailure(String taskId, String filePath, String downloadPath, String content); diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/TaskReporterImpl.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/TaskReporterImpl.java similarity index 77% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/TaskReporterImpl.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/TaskReporterImpl.java index cc4f2e51cb..51b841ab70 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/TaskReporterImpl.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/TaskReporterImpl.java @@ -22,13 +22,11 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.common.model.http.HttpReq; -import com.tencent.bk.job.common.util.http.ExtHttpHelper; -import com.tencent.bk.job.common.util.http.HttpHelperFactory; import com.tencent.bk.job.common.util.http.HttpReqGenUtil; -import com.tencent.bk.job.common.util.json.JsonUtils; +import com.tencent.bk.job.common.util.http.JobHttpClient; import com.tencent.bk.job.file_gateway.consts.TaskStatusEnum; import com.tencent.bk.job.file_gateway.model.req.inner.UpdateFileSourceTaskReq; import lombok.extern.slf4j.Slf4j; @@ -41,12 +39,14 @@ @Service public class TaskReporterImpl implements TaskReporter { - private final ExtHttpHelper httpHelper = HttpHelperFactory.getDefaultHttpHelper(); private final GatewayInfoService gatewayInfoService; + private final JobHttpClient jobHttpClient; @Autowired - public TaskReporterImpl(GatewayInfoService gatewayInfoService) { + public TaskReporterImpl(GatewayInfoService gatewayInfoService, + JobHttpClient jobHttpClient) { this.gatewayInfoService = gatewayInfoService; + this.jobHttpClient = jobHttpClient; } public void reportFileDownloadStart(String taskId, String filePath, String downloadPath) { @@ -61,8 +61,12 @@ public void reportFileDownloadStart(String taskId, String filePath, String downl reportTaskStatus(req); } - public void reportFileDownloadProgress(String taskId, String filePath, String downloadPath, Long fileSize, - Integer speed, Integer progress) { + public void reportFileDownloadProgress(String taskId, + String filePath, + String downloadPath, + Long fileSize, + Integer speed, + Integer progress) { UpdateFileSourceTaskReq req = new UpdateFileSourceTaskReq(); req.setFileSourceTaskId(taskId); req.setFilePath(filePath); @@ -75,8 +79,13 @@ public void reportFileDownloadProgress(String taskId, String filePath, String do reportTaskStatus(req); } - public void reportFileDownloadProgressWithContent(String taskId, String filePath, String downloadPath, Long fileSize, - Integer speed, Integer progress, String content) { + public void reportFileDownloadProgressWithContent(String taskId, + String filePath, + String downloadPath, + Long fileSize, + Integer speed, + Integer progress, + String content) { UpdateFileSourceTaskReq req = new UpdateFileSourceTaskReq(); req.setFileSourceTaskId(taskId); req.setFilePath(filePath); @@ -89,8 +98,12 @@ public void reportFileDownloadProgressWithContent(String taskId, String filePath reportTaskStatus(req); } - public void reportFileDownloadSuccess(String taskId, String filePath, String downloadPath, Long fileSize, - Integer speed, Integer progress) { + public void reportFileDownloadSuccess(String taskId, + String filePath, + String downloadPath, + Long fileSize, + Integer speed, + Integer progress) { UpdateFileSourceTaskReq req = new UpdateFileSourceTaskReq(); req.setFileSourceTaskId(taskId); req.setFilePath(filePath); @@ -108,7 +121,10 @@ public void reportFileDownloadFailure(String taskId, String filePath, String dow } @Override - public void reportFileDownloadStopped(String taskId, String filePath, String downloadPath, Long fileSize, + public void reportFileDownloadStopped(String taskId, + String filePath, + String downloadPath, + Long fileSize, Integer progress) { UpdateFileSourceTaskReq req = new UpdateFileSourceTaskReq(); req.setFileSourceTaskId(taskId); @@ -139,16 +155,12 @@ public void reportWorkerOffLine(List taskIdList, String content) { // TODO } - public void reportTaskStatus(UpdateFileSourceTaskReq updateFileSourceTaskReq) { String url = gatewayInfoService.getReportTaskStatusUrl(); HttpReq req = HttpReqGenUtil.genSimpleJsonReq(url, updateFileSourceTaskReq); - String respStr = null; try { - log.info(String.format("url=%s,body=%s,headers=%s", url, req.getBody(), - JsonUtils.toJson(req.getHeaders()))); - respStr = httpHelper.post(url, req.getBody(), req.getHeaders()); - log.info(String.format("respStr=%s", respStr)); + log.info("url={},body={}", url, req.getBody()); + jobHttpClient.post(req); } catch (Exception e) { log.error("Fail to request file-gateway:", e); } diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/ThreadCommandBus.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/ThreadCommandBus.java similarity index 98% rename from src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/ThreadCommandBus.java rename to src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/ThreadCommandBus.java index 6e155b9fff..f84bcfbb85 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/cos/service/ThreadCommandBus.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/service/ThreadCommandBus.java @@ -22,7 +22,7 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.file_gateway.consts.TaskCommandEnum; import lombok.AllArgsConstructor; @@ -37,7 +37,6 @@ @Slf4j public class ThreadCommandBus { - private static final ConcurrentHashMap> map = new ConcurrentHashMap<>(); public static void destroyCommandQueue(String key) { diff --git a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/task/heartbeat/HeartBeatTask.java b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/task/heartbeat/HeartBeatTask.java index 888c88b6ea..472b911a20 100644 --- a/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/task/heartbeat/HeartBeatTask.java +++ b/src/backend/job-file-worker-sdk/service-job-file-worker-sdk/src/main/java/com/tencent/bk/job/file/worker/task/heartbeat/HeartBeatTask.java @@ -30,9 +30,9 @@ import com.tencent.bk.job.common.util.json.JsonUtils; import com.tencent.bk.job.common.util.machine.MachineUtil; import com.tencent.bk.job.file.worker.config.WorkerConfig; -import com.tencent.bk.job.file.worker.cos.service.EnvironmentService; -import com.tencent.bk.job.file.worker.cos.service.GatewayInfoService; -import com.tencent.bk.job.file.worker.cos.service.MetaDataService; +import com.tencent.bk.job.file.worker.service.EnvironmentService; +import com.tencent.bk.job.file.worker.service.GatewayInfoService; +import com.tencent.bk.job.file.worker.service.MetaDataService; import com.tencent.bk.job.file_gateway.model.req.inner.HeartBeatReq; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; diff --git a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/api/FileResourceProxy.java b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/api/FileResourceProxy.java index 404e8431b3..c0bd47a4f7 100644 --- a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/api/FileResourceProxy.java +++ b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/api/FileResourceProxy.java @@ -28,10 +28,10 @@ import com.tencent.bk.job.common.exception.InvalidParamException; import com.tencent.bk.job.common.model.InternalResponse; import com.tencent.bk.job.file.worker.consts.FileSourceTypeEnum; -import com.tencent.bk.job.file.worker.cos.service.RemoteClient; import com.tencent.bk.job.file.worker.model.req.BaseReq; import com.tencent.bk.job.file.worker.model.req.ExecuteActionReq; import com.tencent.bk.job.file.worker.model.req.ListFileNodeReq; +import com.tencent.bk.job.file.worker.service.RemoteClient; import com.tencent.bk.job.file_gateway.model.resp.common.FileNodesDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -60,7 +60,10 @@ public IFileResource chooseFileResource(BaseReq req) { } else if (FileSourceTypeEnum.BLUEKING_ARTIFACTORY.name().equals(req.getFileSourceTypeCode())) { return artifactoryFileResource; } else { - throw new InvalidParamException(ErrorCode.ILLEGAL_PARAM_WITH_PARAM_NAME, new String[]{"fileSourceTypeCode"}); + throw new InvalidParamException( + ErrorCode.ILLEGAL_PARAM_WITH_PARAM_NAME, + new String[]{"fileSourceTypeCode"} + ); } } diff --git a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/api/FileTaskResourceProxy.java b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/api/FileTaskResourceProxy.java index 9817169b0a..4ee7c3bc6b 100644 --- a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/api/FileTaskResourceProxy.java +++ b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/api/FileTaskResourceProxy.java @@ -24,9 +24,9 @@ package com.tencent.bk.job.file.worker.api; -import com.tencent.bk.job.file.worker.cos.service.FileTaskService; -import com.tencent.bk.job.file.worker.cos.service.RemoteClient; import com.tencent.bk.job.file.worker.model.req.BaseReq; +import com.tencent.bk.job.file.worker.service.FileTaskService; +import com.tencent.bk.job.file.worker.service.RemoteClient; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RestController; diff --git a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/artifactory/impl/ArtifactoryFileResourceImpl.java b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/artifactory/impl/ArtifactoryFileResourceImpl.java index d9658ef5bf..a5058aed21 100644 --- a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/artifactory/impl/ArtifactoryFileResourceImpl.java +++ b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/artifactory/impl/ArtifactoryFileResourceImpl.java @@ -40,11 +40,11 @@ import com.tencent.bk.job.file.worker.artifactory.consts.ArtifactoryNodeTypeEnum; import com.tencent.bk.job.file.worker.artifactory.service.ArtifactoryBaseService; import com.tencent.bk.job.file.worker.artifactory.service.ArtifactoryRemoteClient; -import com.tencent.bk.job.file.worker.cos.service.MetaDataService; -import com.tencent.bk.job.file.worker.cos.service.RemoteClient; import com.tencent.bk.job.file.worker.model.req.BaseReq; import com.tencent.bk.job.file.worker.model.req.ExecuteActionReq; import com.tencent.bk.job.file.worker.model.req.ListFileNodeReq; +import com.tencent.bk.job.file.worker.service.MetaDataService; +import com.tencent.bk.job.file.worker.service.RemoteClient; import com.tencent.bk.job.file_gateway.model.resp.common.FileNodesDTO; import com.tencent.bk.job.file_gateway.model.resp.common.FileTreeNodeDef; import io.micrometer.core.instrument.util.StringUtils; diff --git a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/artifactory/service/ArtifactoryRemoteClient.java b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/artifactory/service/ArtifactoryRemoteClient.java index 1b0ec79002..6868cb7656 100644 --- a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/artifactory/service/ArtifactoryRemoteClient.java +++ b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/artifactory/service/ArtifactoryRemoteClient.java @@ -27,8 +27,8 @@ import com.tencent.bk.job.common.artifactory.model.dto.NodeDTO; import com.tencent.bk.job.common.artifactory.sdk.ArtifactoryClient; import com.tencent.bk.job.common.exception.ServiceException; -import com.tencent.bk.job.file.worker.cos.service.RemoteClient; import com.tencent.bk.job.file.worker.model.FileMetaData; +import com.tencent.bk.job.file.worker.service.RemoteClient; import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; diff --git a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/cos/impl/COSFileResourceImpl.java b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/cos/impl/COSFileResourceImpl.java index a6ffc839dd..eef6cbf12e 100644 --- a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/cos/impl/COSFileResourceImpl.java +++ b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/cos/impl/COSFileResourceImpl.java @@ -35,15 +35,15 @@ import com.tencent.bk.job.file.worker.cos.JobTencentInnerCOSClient; import com.tencent.bk.job.file.worker.cos.consts.COSActionCodeEnum; import com.tencent.bk.job.file.worker.cos.consts.COSNodeTypeEnum; -import com.tencent.bk.job.file.worker.cos.service.COSBaseService; -import com.tencent.bk.job.file.worker.cos.service.COSRemoteClient; -import com.tencent.bk.job.file.worker.cos.service.MetaDataService; -import com.tencent.bk.job.file.worker.cos.service.RemoteClient; import com.tencent.bk.job.file.worker.model.BucketDTO; import com.tencent.bk.job.file.worker.model.FileDTO; import com.tencent.bk.job.file.worker.model.req.BaseReq; import com.tencent.bk.job.file.worker.model.req.ExecuteActionReq; import com.tencent.bk.job.file.worker.model.req.ListFileNodeReq; +import com.tencent.bk.job.file.worker.service.COSBaseService; +import com.tencent.bk.job.file.worker.service.COSRemoteClient; +import com.tencent.bk.job.file.worker.service.MetaDataService; +import com.tencent.bk.job.file.worker.service.RemoteClient; import com.tencent.bk.job.file_gateway.model.resp.common.FileNodesDTO; import com.tencent.bk.job.file_gateway.model.resp.common.FileTreeNodeDef; import com.tencent.cos.model.Bucket; diff --git a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/cos/service/COSBaseService.java b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/service/COSBaseService.java similarity index 97% rename from src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/cos/service/COSBaseService.java rename to src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/service/COSBaseService.java index 7b2225988d..b14024dfdf 100644 --- a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/cos/service/COSBaseService.java +++ b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/service/COSBaseService.java @@ -22,7 +22,7 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.common.model.dto.CommonCredential; import com.tencent.bk.job.file.worker.cos.JobTencentInnerCOSClient; diff --git a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/cos/service/COSRemoteClient.java b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/service/COSRemoteClient.java similarity index 98% rename from src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/cos/service/COSRemoteClient.java rename to src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/service/COSRemoteClient.java index 4e1d15878b..9f5ec9fd92 100644 --- a/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/cos/service/COSRemoteClient.java +++ b/src/backend/job-file-worker/service-job-file-worker/src/main/java/com/tencent/bk/job/file/worker/service/COSRemoteClient.java @@ -22,7 +22,7 @@ * IN THE SOFTWARE. */ -package com.tencent.bk.job.file.worker.cos.service; +package com.tencent.bk.job.file.worker.service; import com.tencent.bk.job.file.worker.cos.JobTencentInnerCOSClient; import com.tencent.bk.job.file.worker.model.FileMetaData; diff --git a/support-files/kubernetes/charts/bk-job/VALUES_LOG.md b/support-files/kubernetes/charts/bk-job/VALUES_LOG.md index a56a43e6ad..3bac091367 100644 --- a/support-files/kubernetes/charts/bk-job/VALUES_LOG.md +++ b/support-files/kubernetes/charts/bk-job/VALUES_LOG.md @@ -13,6 +13,21 @@ fileDistribute: 3.临时文件存储根路径`persistence.localStorage.path`默认值修改为`/data/job_temp_file` +4.新增job-file-gateway文件网关任务重调度相关配置 +```yaml +## job-file-gateway文件网关服务配置 +fileGatewayConfig: + # 任务重调度相关配置 + reDispatch: + # 超时任务 + timeoutTask: + # 是否开启重调度 + enabled: true + # 超时时间(秒) + timeoutSeconds: 10 + +``` + ## 0.5.1 1.增加轻量化部署配置 diff --git a/support-files/kubernetes/charts/bk-job/templates/job-file-gateway/configmap.yaml b/support-files/kubernetes/charts/bk-job/templates/job-file-gateway/configmap.yaml index 80f54ab8b1..f3a0c9142c 100644 --- a/support-files/kubernetes/charts/bk-job/templates/job-file-gateway/configmap.yaml +++ b/support-files/kubernetes/charts/bk-job/templates/job-file-gateway/configmap.yaml @@ -69,6 +69,8 @@ data: readTimeout: 20000 job: file-gateway: + reDispatch: + {{- toYaml .Values.fileGatewayConfig.reDispatch | nindent 10 }} worker-tags: white: {{ .Values.fileGatewayConfig.workerTags.white }} black: {{ .Values.fileGatewayConfig.workerTags.black }} diff --git a/support-files/kubernetes/charts/bk-job/values.yaml b/support-files/kubernetes/charts/bk-job/values.yaml index 6b4a80f10d..878614f84e 100644 --- a/support-files/kubernetes/charts/bk-job/values.yaml +++ b/support-files/kubernetes/charts/bk-job/values.yaml @@ -1179,8 +1179,16 @@ fileGatewayConfig: white: "k8s" # 不能调度的worker标签黑名单,逗号分隔 black: "" + # 任务重调度相关配置 + reDispatch: + # 超时任务 + timeoutTask: + # 是否开启重调度 + enabled: true + # 超时时间(秒) + timeoutSeconds: 10 -## job-file-worker文件源接入点配置 + ## job-file-worker文件源接入点配置 fileWorkerConfig: # 模块是否启用,默认启用 enabled: true diff --git a/support-files/templates/#etc#job#job-file-gateway#job-file-gateway.yml b/support-files/templates/#etc#job#job-file-gateway#job-file-gateway.yml index 427f062030..265cbb933b 100644 --- a/support-files/templates/#etc#job#job-file-gateway#job-file-gateway.yml +++ b/support-files/templates/#etc#job#job-file-gateway#job-file-gateway.yml @@ -63,3 +63,13 @@ spring: multipart: max-file-size: 5GB max-request-size: 5GB +job: + file-gateway: + # 任务重调度相关配置 + reDispatch: + # 超时任务 + timeoutTask: + # 是否开启重调度 + enabled: true + # 超时时间(秒) + timeoutSeconds: 10