Skip to content

Commit

Permalink
perf: 分库分表改造-任务表以及子表加入 task_instance_id 字段 #2991
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Nov 19, 2024
1 parent 984e607 commit b981c5d
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
@Getter
public enum IdGenType {
AUTO_INCREMENT(IdGenType.Constants.AUTO_INCREMENT),
SEGMENT(IdGenType.Constants.SEGMENT);
SEGMENT(IdGenType.Constants.LEAF_SEGMENT);

public static class Constants {
public final static String AUTO_INCREMENT = "auto_increment";
public final static String SEGMENT = "segment";
public final static String LEAF_SEGMENT = "leaf_segment";
}

private final String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ public JobInstanceArchiveTaskScheduler jobInstanceArchiveTaskScheduler(
ArchiveProperties archiveProperties,
JobInstanceArchiveTaskScheduleLock jobInstanceArchiveTaskScheduleLock,
JobInstanceSubTableArchivers jobInstanceSubTableArchivers,
JobInstanceColdDAO jobInstanceColdDAO,
ObjectProvider<JobInstanceColdDAO> jobInstanceColdDAOObjectProvider,
ArchiveTaskLock archiveTaskLock,
ArchiveErrorTaskCounter archiveErrorTaskCounter,
ArchiveTablePropsStorage archiveTablePropsStorage) {
Expand All @@ -497,7 +497,7 @@ public JobInstanceArchiveTaskScheduler jobInstanceArchiveTaskScheduler(
archiveProperties,
jobInstanceArchiveTaskScheduleLock,
jobInstanceSubTableArchivers,
jobInstanceColdDAO,
jobInstanceColdDAOObjectProvider.getIfAvailable(),
archiveTaskLock,
archiveErrorTaskCounter,
archiveTablePropsStorage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public SegmentIdGenCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}

@ConditionalOnProperty(value = "idGen.type", havingValue = IdGenType.Constants.SEGMENT)
@ConditionalOnProperty(value = "idGen.type", havingValue = IdGenType.Constants.LEAF_SEGMENT)
static class IdGenTypeCondition {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public PropBasedDynamicIdGen(
candidateIdGens.put(IdGenType.Constants.AUTO_INCREMENT, autoIncrementIdGenerator);
}
if (segmentIdGenerator != null) {
candidateIdGens.put(IdGenType.Constants.SEGMENT, segmentIdGenerator);
candidateIdGens.put(IdGenType.Constants.LEAF_SEGMENT, segmentIdGenerator);
}
super.initCandidateComponents(candidateIdGens);
}
Expand Down
3 changes: 2 additions & 1 deletion support-files/kubernetes/charts/bk-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1026,10 +1026,11 @@ executeConfig:
# 分布式ID组件 leaf 配置
leaf:
enabled: false
# id生成器配置
idGen:
migration:
enabled: false
# id生成器配置。auto_increment: 自增长,segment: leaf segment id
# auto_increment: 自增长;leaf_segment: leaf segment id
type: auto_increment

## job-crontab定时任务配置
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,22 +286,40 @@ DROP PROCEDURE IF EXISTS job_add_task_instance_id;

DELIMITER <JOB_UBF>

CREATE PROCEDURE job_add_task_instance_id()
BEGIN
CREATE PROCEDURE job_add_task_instance_id(IN fromStepInstanceId BIGINT, IN endStepInstanceId BIGINT)
label:BEGIN

DECLARE minId BIGINT;
DECLARE maxId BIGINT;
DECLARE fromId BIGINT;
DECLARE endId BIGINT;

SET AUTOCOMMIT = 0;
-- 如果 step_instance 表为空,无需变更
IF NOT EXISTS (SELECT 1 FROM step_instance LIMIT 1) THEN
LEAVE label;
END IF;

-- 如果 gse_task 表中不存在task_instance_id = 0,说明已经执行过该变更
IF NOT EXISTS (SELECT 1 FROM gse_task WHERE task_instance_id = 0 LIMIT 1) THEN
LEAVE label;
END IF;


SELECT MIN(id), MAX(id) INTO minId, maxId FROM step_instance;

SET fromId = minId;
IF fromStepInstanceId > 0 THEN
SET minId = fromStepInstanceId;
END IF;

IF endStepInstanceId > 0 THEN
SET maxId = endStepInstanceId;
END IF;

SET fromId = minId - 1;

WHILE fromId <= maxId DO
SET endId = fromId + 999;
SELECT MIN(t.id),MAX(t.id) INTO fromId,endId FROM (SELECT id FROM step_instance WHERE id > fromId ORDER BY id asc LIMIT 1000) t;

UPDATE file_source_task_log t1
INNER JOIN (
Expand Down Expand Up @@ -386,14 +404,14 @@ BEGIN

COMMIT;

SET fromId = endId + 1;
SET fromId = endId;
END WHILE;

END <JOB_UBF>
DELIMITER ;
COMMIT;

CALL job_add_task_instance_id();
CALL job_add_task_instance_id(-1,-1);
DROP PROCEDURE IF EXISTS job_add_task_instance_id;


Expand All @@ -402,22 +420,37 @@ DROP PROCEDURE IF EXISTS job_update_task_instance_host_data;

DELIMITER <JOB_UBF>

CREATE PROCEDURE job_update_task_instance_host_data()
BEGIN
CREATE PROCEDURE job_update_task_instance_host_data(IN fromTaskInstanceId BIGINT, IN endTaskInstanceId BIGINT)
label:BEGIN

DECLARE minId BIGINT;
DECLARE maxId BIGINT;
DECLARE fromId BIGINT;
DECLARE endId BIGINT;

IF NOT EXISTS (SELECT 1 FROM task_instance LIMIT 1) THEN
LEAVE label;
END IF;

IF NOT EXISTS (SELECT 1 FROM task_instance_host WHERE app_id = 0 LIMIT 1) THEN
LEAVE label;
END IF;

SET AUTOCOMMIT = 0;

SELECT MIN(id), MAX(id) INTO minId, maxId FROM task_instance;
IF fromTaskInstanceId > 0 THEN
SET minId = fromTaskInstanceId;
END IF;

IF endTaskInstanceId > 0 THEN
SET maxId = endTaskInstanceId;
END IF;

SET fromId = minId;
SET fromId = minId - 1;

WHILE fromId <= maxId DO
SET endId = fromId + 999;
SELECT MIN(t.id),MAX(t.id) INTO fromId,endId FROM (SELECT id FROM task_instance where id > fromId ORDER BY id asc LIMIT 1000) t;

UPDATE task_instance_host t1
INNER JOIN (
Expand All @@ -430,13 +463,13 @@ BEGIN

COMMIT;

SET fromId = endId + 1;
SET fromId = endId;
END WHILE;

END <JOB_UBF>
DELIMITER ;
COMMIT;

CALL job_update_task_instance_host_data();
CALL job_update_task_instance_host_data(-1,-1);
DROP PROCEDURE IF EXISTS job_update_task_instance_host_data;

0 comments on commit b981c5d

Please sign in to comment.