From a446337af9e646e50474deafda78177f41ad0c1a Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Mon, 5 Sep 2022 11:05:55 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feature:=20=E6=BB=9A=E5=8A=A8=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E6=96=B0=E5=A2=9EDB=E8=A1=A8=E7=9A=84=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BD=92=E6=A1=A3=20#1196?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修改归档任务的分布式锁过期时间为24h --- .../java/com/tencent/bk/job/backup/archive/ArchiveTaskLock.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/ArchiveTaskLock.java b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/ArchiveTaskLock.java index 90a020e40e..b0f64a8a8f 100644 --- a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/ArchiveTaskLock.java +++ b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/ArchiveTaskLock.java @@ -36,7 +36,7 @@ @Slf4j public class ArchiveTaskLock { private final String ARCHIVE_LOCK_KEY_PREFIX = "JOB_EXECUTE_LOG_ARCHIVE_LOCK"; - private final Long LOCK_TIME = 12 * 3600 * 1000L; + private final Long LOCK_TIME = 24 * 3600 * 1000L; /** * 最小获取锁间隔时间;为了保证在分布式系统中多个节点都能均匀获取到任务,会优先让空闲的节点获取到任务 */ From 182d044a704bb2983b17409af65850b40e74f7fb Mon Sep 17 00:00:00 2001 From: wangyu096 Date: Mon, 5 Sep 2022 12:05:59 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feature:=20=E6=BB=9A=E5=8A=A8=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E6=96=B0=E5=A2=9EDB=E8=A1=A8=E7=9A=84=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BD=92=E6=A1=A3=20#1196?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 计算归档的task_instance_id/step_instance_id优化 --- .../archive/JobExecuteArchiveManage.java | 24 ++++++++++++++++--- .../archive/impl/StepInstanceArchivist.java | 10 ++++++-- .../archive/impl/TaskInstanceArchivist.java | 4 ++-- .../dao/impl/StepInstanceRecordDAO.java | 18 +++++++++----- .../dao/impl/TaskInstanceRecordDAO.java | 12 +++++----- 5 files changed, 49 insertions(+), 19 deletions(-) diff --git a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/JobExecuteArchiveManage.java b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/JobExecuteArchiveManage.java index 5331e92279..2738ace866 100644 --- a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/JobExecuteArchiveManage.java +++ b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/JobExecuteArchiveManage.java @@ -59,6 +59,7 @@ import com.tencent.bk.job.backup.dao.impl.StepInstanceVariableRecordDAO; import com.tencent.bk.job.backup.dao.impl.TaskInstanceRecordDAO; import com.tencent.bk.job.backup.dao.impl.TaskInstanceVariableRecordDAO; +import com.tencent.bk.job.backup.model.dto.ArchiveProgressDTO; import com.tencent.bk.job.backup.service.ArchiveProgressService; import lombok.extern.slf4j.Slf4j; import org.joda.time.DateTime; @@ -97,6 +98,7 @@ public class JobExecuteArchiveManage implements SmartLifecycle { private final GseFileAgentTaskArchivist gseFileAgentTaskArchivist; private final StepInstanceRollingTaskArchivist stepInstanceRollingTaskArchivist; private final RollingConfigArchivist rollingConfigArchivist; + private final ArchiveProgressService archiveProgressService; /** @@ -125,6 +127,7 @@ public JobExecuteArchiveManage(TaskInstanceRecordDAO taskInstanceRecordDAO, ArchiveConfig archiveConfig) { log.info("Init JobExecuteArchiveManage! archiveConfig: {}", archiveConfig); this.archiveConfig = archiveConfig; + this.archiveProgressService = archiveProgressService; this.fileSourceTaskLogArchivist = new FileSourceTaskLogArchivist(fileSourceTaskRecordDAO, executeArchiveDAO, archiveProgressService); this.stepInstanceArchivist = new StepInstanceArchivist(stepInstanceRecordDAO, executeArchiveDAO, @@ -230,9 +233,8 @@ private void doArchive(Long endTime) throws InterruptedException { try { log.info("Start job execute archive before {} at {}", endTime, System.currentTimeMillis()); - long maxNeedArchiveTaskInstanceId = taskInstanceArchivist.getMaxNeedArchiveTaskInstanceId(endTime); - long maxNeedArchiveStepInstanceId = - stepInstanceArchivist.getMaxNeedArchiveStepInstanceId(maxNeedArchiveTaskInstanceId); + long maxNeedArchiveTaskInstanceId = computeMaxNeedArchiveTaskInstanceId(endTime); + long maxNeedArchiveStepInstanceId = computeMaxNeedArchiveStepInstanceId(maxNeedArchiveTaskInstanceId); log.info("Compute archive instance id range, maxNeedArchiveTaskInstanceId: {}, " + "maxNeedArchiveStepInstanceId: {}", maxNeedArchiveTaskInstanceId, maxNeedArchiveStepInstanceId); @@ -249,6 +251,22 @@ private void doArchive(Long endTime) throws InterruptedException { } } + public long computeMaxNeedArchiveTaskInstanceId(Long endTime) { + ArchiveProgressDTO archiveProgress = + archiveProgressService.queryArchiveProgress(taskInstanceArchivist.getTableName()); + long lastArchivedId = archiveProgress != null ? archiveProgress.getLastArchivedId() : 0L; + long maxId = taskInstanceArchivist.getMaxId(endTime); + return Math.max(lastArchivedId, maxId); + } + + public long computeMaxNeedArchiveStepInstanceId(Long taskInstanceId) { + ArchiveProgressDTO archiveProgress = + archiveProgressService.queryArchiveProgress(stepInstanceArchivist.getTableName()); + long lastArchivedId = archiveProgress != null ? archiveProgress.getLastArchivedId() : 0L; + long maxId = stepInstanceArchivist.getMaxId(taskInstanceId); + return Math.max(lastArchivedId, maxId); + } + private void archive(long maxNeedArchiveTaskInstanceId, long maxNeedArchiveStepInstanceId) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(15); diff --git a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/impl/StepInstanceArchivist.java b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/impl/StepInstanceArchivist.java index 7d9f5eda65..1af5ceaaaf 100644 --- a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/impl/StepInstanceArchivist.java +++ b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/impl/StepInstanceArchivist.java @@ -42,8 +42,14 @@ public StepInstanceArchivist(StepInstanceRecordDAO executeRecordDAO, this.deleteIdStepSize = 10_000; } - public Long getMaxNeedArchiveStepInstanceId(Long taskInstanceId) { + /** + * 获取作业实例ID范围内的步骤实例ID最大值 + * + * @param taskInstanceId 作业实例ID + * @return 步骤实例ID 最大值 + */ + public Long getMaxId(Long taskInstanceId) { StepInstanceRecordDAO stepInstanceRecordDAO = (StepInstanceRecordDAO) executeRecordDAO; - return stepInstanceRecordDAO.getMaxNeedArchiveStepInstanceId(taskInstanceId); + return stepInstanceRecordDAO.getMaxId(taskInstanceId); } } diff --git a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/impl/TaskInstanceArchivist.java b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/impl/TaskInstanceArchivist.java index 9c6cfcfbc4..dff75b38ec 100644 --- a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/impl/TaskInstanceArchivist.java +++ b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/archive/impl/TaskInstanceArchivist.java @@ -42,8 +42,8 @@ public TaskInstanceArchivist(TaskInstanceRecordDAO executeRecordDAO, this.deleteIdStepSize = 10_000; } - public Long getMaxNeedArchiveTaskInstanceId(Long endTime) { + public Long getMaxId(Long endTime) { TaskInstanceRecordDAO taskInstanceRecordDAO = (TaskInstanceRecordDAO) executeRecordDAO; - return taskInstanceRecordDAO.getMaxNeedArchiveTaskInstanceId(endTime); + return taskInstanceRecordDAO.getMaxId(endTime); } } diff --git a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/dao/impl/StepInstanceRecordDAO.java b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/dao/impl/StepInstanceRecordDAO.java index d3854e7a43..212cd45a19 100644 --- a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/dao/impl/StepInstanceRecordDAO.java +++ b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/dao/impl/StepInstanceRecordDAO.java @@ -26,16 +26,22 @@ public Table getTable() { return TABLE; } - public Long getMaxNeedArchiveStepInstanceId(Long taskInstanceId) { - Record1 maxNeedStepInstanceIdRecord = + /** + * 获取作业实例ID范围内的步骤实例ID最大值 + * + * @param taskInstanceId 作业实例ID + * @return 步骤实例ID 最大值 + */ + public Long getMaxId(Long taskInstanceId) { + Record1 record = context.select(max(TABLE.ID)) .from(TABLE) .where(TABLE.TASK_INSTANCE_ID.lessOrEqual(taskInstanceId)) .fetchOne(); - if (maxNeedStepInstanceIdRecord != null) { - Long maxNeedStepInstanceId = (Long) maxNeedStepInstanceIdRecord.get(0); - if (maxNeedStepInstanceId != null) { - return maxNeedStepInstanceId; + if (record != null) { + Long maxId = (Long) record.get(0); + if (maxId != null) { + return maxId; } } return 0L; diff --git a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/dao/impl/TaskInstanceRecordDAO.java b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/dao/impl/TaskInstanceRecordDAO.java index b06c50b688..67c5155049 100644 --- a/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/dao/impl/TaskInstanceRecordDAO.java +++ b/src/backend/job-backup/service-job-backup/src/main/java/com/tencent/bk/job/backup/dao/impl/TaskInstanceRecordDAO.java @@ -31,16 +31,16 @@ public TableField getArchiveIdField() { return TABLE.ID; } - public Long getMaxNeedArchiveTaskInstanceId(Long endTime) { - Record1 maxNeedTaskInstanceIdRecord = + public Long getMaxId(Long endTime) { + Record1 record = context.select(max(TABLE.ID)) .from(TABLE) .where(TABLE.CREATE_TIME.lessOrEqual(endTime)) .fetchOne(); - if (maxNeedTaskInstanceIdRecord != null) { - Long maxNeedTaskInstanceId = (Long) maxNeedTaskInstanceIdRecord.get(0); - if (maxNeedTaskInstanceId != null) { - return maxNeedTaskInstanceId; + if (record != null) { + Long maxId = (Long) record.get(0); + if (maxId != null) { + return maxId; } } return 0L;