Skip to content

Commit

Permalink
feature: Job 支持容器执行 - 脚本任务 TencentBlueKing#2631
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Feb 2, 2024
1 parent 2e1f067 commit 2539355
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import static java.lang.annotation.ElementType.TYPE;

/**
* 兼容历史版本的实现
* 用来标识兼容历史版本的实现
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.lang.annotation.Target;

/**
* 用于表示某个持久对象,该对象会被持久化到DB中,所以对于该对象的改动需要谨慎,避免破坏性的修改(比如修改字段名)
* 用于标识持久化的对象。该对象会被持久化到DB中,所以对于该对象的改动需要谨慎,避免破坏性的修改(比如修改字段名)
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@

import com.tencent.bk.job.backup.archive.impl.FileSourceTaskLogArchivist;
import com.tencent.bk.job.backup.archive.impl.GseFileAgentTaskArchivist;
import com.tencent.bk.job.backup.archive.impl.GseFileExecuteObjTaskArchivist;
import com.tencent.bk.job.backup.archive.impl.GseScriptAgentTaskArchivist;
import com.tencent.bk.job.backup.archive.impl.GseScriptExecuteObjTaskArchivist;
import com.tencent.bk.job.backup.archive.impl.GseTaskArchivist;
import com.tencent.bk.job.backup.archive.impl.GseTaskIpLogArchivist;
import com.tencent.bk.job.backup.archive.impl.GseTaskLogArchivist;
import com.tencent.bk.job.backup.archive.impl.OperationLogArchivist;
import com.tencent.bk.job.backup.archive.impl.RollingConfigArchivist;
import com.tencent.bk.job.backup.archive.impl.StepInstanceArchivist;
Expand All @@ -46,9 +46,9 @@
import com.tencent.bk.job.backup.dao.ExecuteRecordDAO;
import com.tencent.bk.job.backup.dao.impl.FileSourceTaskLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseFileAgentTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseFileExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseScriptAgentTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseTaskIpLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseTaskLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseScriptExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.OperationLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.RollingConfigRecordDAO;
Expand Down Expand Up @@ -87,11 +87,11 @@ public class JobExecuteArchiveManage implements SmartLifecycle {
private final TaskInstanceVariableRecordDAO taskInstanceVariableRecordDAO;
private final OperationLogRecordDAO operationLogRecordDAO;
private final FileSourceTaskLogRecordDAO fileSourceTaskLogRecordDAO;
private final GseTaskLogRecordDAO gseTaskLogRecordDAO;
private final GseTaskIpLogRecordDAO gseTaskIpLogRecordDAO;
private final GseTaskRecordDAO gseTaskRecordDAO;
private final GseScriptAgentTaskRecordDAO gseScriptAgentTaskRecordDAO;
private final GseFileAgentTaskRecordDAO gseFileAgentTaskRecordDAO;
private final GseScriptExecuteObjTaskRecordDAO gseScriptExecuteObjTaskRecordDAO;
private final GseFileExecuteObjTaskRecordDAO gseFileExecuteObjTaskRecordDAO;
private final StepInstanceRollingTaskRecordDAO stepInstanceRollingTaskRecordDAO;
private final RollingConfigRecordDAO rollingConfigRecordDAO;
private final TaskInstanceHostRecordDAO taskInstanceHostRecordDAO;
Expand All @@ -112,18 +112,20 @@ public JobExecuteArchiveManage(TaskInstanceRecordDAO taskInstanceRecordDAO,
TaskInstanceVariableRecordDAO taskInstanceVariableRecordDAO,
OperationLogRecordDAO operationLogRecordDAO,
FileSourceTaskLogRecordDAO fileSourceTaskLogRecordDAO,
GseTaskLogRecordDAO gseTaskLogRecordDAO,
GseTaskIpLogRecordDAO gseTaskIpLogRecordDAO,
GseTaskRecordDAO gseTaskRecordDAO,
GseScriptAgentTaskRecordDAO gseScriptAgentTaskRecordDAO,
GseFileAgentTaskRecordDAO gseFileAgentTaskRecordDAO,
GseScriptExecuteObjTaskRecordDAO gseScriptExecuteObjTaskRecordDAO,
GseFileExecuteObjTaskRecordDAO gseFileExecuteObjTaskRecordDAO,
StepInstanceRollingTaskRecordDAO stepInstanceRollingTaskRecordDAO,
RollingConfigRecordDAO rollingConfigRecordDAO,
TaskInstanceHostRecordDAO taskInstanceHostRecordDAO,
ExecuteArchiveDAO executeArchiveDAO,
ArchiveProgressService archiveProgressService,
ArchiveDBProperties archiveDBProperties,
ExecutorService archiveExecutor) {
this.gseScriptExecuteObjTaskRecordDAO = gseScriptExecuteObjTaskRecordDAO;
this.gseFileExecuteObjTaskRecordDAO = gseFileExecuteObjTaskRecordDAO;
log.info("Init JobExecuteArchiveManage! archiveConfig: {}", archiveDBProperties);
this.archiveDBProperties = archiveDBProperties;
this.archiveProgressService = archiveProgressService;
Expand All @@ -137,8 +139,6 @@ public JobExecuteArchiveManage(TaskInstanceRecordDAO taskInstanceRecordDAO,
this.taskInstanceVariableRecordDAO = taskInstanceVariableRecordDAO;
this.operationLogRecordDAO = operationLogRecordDAO;
this.fileSourceTaskLogRecordDAO = fileSourceTaskLogRecordDAO;
this.gseTaskLogRecordDAO = gseTaskLogRecordDAO;
this.gseTaskIpLogRecordDAO = gseTaskIpLogRecordDAO;
this.gseTaskRecordDAO = gseTaskRecordDAO;
this.gseScriptAgentTaskRecordDAO = gseScriptAgentTaskRecordDAO;
this.gseFileAgentTaskRecordDAO = gseFileAgentTaskRecordDAO;
Expand Down Expand Up @@ -277,10 +277,6 @@ private void archive(long maxNeedArchiveTaskInstanceId, long maxNeedArchiveStepI
addStepInstanceScriptArchiveTask(maxNeedArchiveStepInstanceId, countDownLatch);
// file_source_task_log
addFileSourceTaskLogArchiveTask(maxNeedArchiveStepInstanceId, countDownLatch);
// gse_task_log
addGseTaskLogArchiveTask(maxNeedArchiveStepInstanceId, countDownLatch);
// gse_task_ip_log
addGseTaskIpLogArchiveTask(maxNeedArchiveStepInstanceId, countDownLatch);
// task_instance_variable
addTaskInstanceVariableArchiveTask(maxNeedArchiveTaskInstanceId, countDownLatch);
// step_instance_variable
Expand All @@ -299,6 +295,10 @@ private void archive(long maxNeedArchiveTaskInstanceId, long maxNeedArchiveStepI
addRollingConfigArchiveTask(maxNeedArchiveTaskInstanceId, countDownLatch);
// task_instance_host
addTaskInstanceHostArchiveTask(maxNeedArchiveTaskInstanceId, countDownLatch);
// gse_script_execute_obj_task
addGseScriptExecuteObjTaskArchiveTask(maxNeedArchiveTaskInstanceId, countDownLatch);
// gse_file_execute_obj_task
addGseFileExecuteObjTaskArchiveTask(maxNeedArchiveTaskInstanceId, countDownLatch);

log.info("Archive task submitted. Waiting for complete...");
countDownLatch.await();
Expand Down Expand Up @@ -379,30 +379,6 @@ private void addFileSourceTaskLogArchiveTask(Long maxNeedArchiveStepInstanceId,
.archive());
}

private void addGseTaskLogArchiveTask(Long maxNeedArchiveStepInstanceId, CountDownLatch countDownLatch) {
archiveExecutor.execute(() ->
new GseTaskLogArchivist(
gseTaskLogRecordDAO,
executeArchiveDAO,
archiveProgressService,
archiveDBProperties,
maxNeedArchiveStepInstanceId,
countDownLatch)
.archive());
}

private void addGseTaskIpLogArchiveTask(Long maxNeedArchiveStepInstanceId, CountDownLatch countDownLatch) {
archiveExecutor.execute(() ->
new GseTaskIpLogArchivist(
gseTaskIpLogRecordDAO,
executeArchiveDAO,
archiveProgressService,
archiveDBProperties,
maxNeedArchiveStepInstanceId,
countDownLatch)
.archive());
}

private void addTaskInstanceVariableArchiveTask(Long maxNeedArchiveTaskInstanceId, CountDownLatch countDownLatch) {
archiveExecutor.execute(() ->
new TaskInstanceVariableArchivist(
Expand Down Expand Up @@ -475,6 +451,32 @@ private void addGseFileAgentTaskArchiveTask(Long maxNeedArchiveStepInstanceId, C
.archive());
}

private void addGseScriptExecuteObjTaskArchiveTask(Long maxNeedArchiveTaskInstanceId,
CountDownLatch countDownLatch) {
archiveExecutor.execute(() ->
new GseScriptExecuteObjTaskArchivist(
gseScriptExecuteObjTaskRecordDAO,
executeArchiveDAO,
archiveProgressService,
archiveDBProperties,
maxNeedArchiveTaskInstanceId,
countDownLatch)
.archive());
}

private void addGseFileExecuteObjTaskArchiveTask(Long maxNeedArchiveTaskInstanceId,
CountDownLatch countDownLatch) {
archiveExecutor.execute(() ->
new GseFileExecuteObjTaskArchivist(
gseFileExecuteObjTaskRecordDAO,
executeArchiveDAO,
archiveProgressService,
archiveDBProperties,
maxNeedArchiveTaskInstanceId,
countDownLatch)
.archive());
}

private void addStepInstanceRollingTaskArchiveTask(Long maxNeedArchiveStepInstanceId,
CountDownLatch countDownLatch) {
archiveExecutor.execute(() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@
import com.tencent.bk.job.backup.archive.AbstractArchivist;
import com.tencent.bk.job.backup.config.ArchiveDBProperties;
import com.tencent.bk.job.backup.dao.ExecuteArchiveDAO;
import com.tencent.bk.job.backup.dao.impl.GseTaskIpLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseFileExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.service.ArchiveProgressService;
import com.tencent.bk.job.execute.model.tables.records.GseTaskIpLogRecord;
import com.tencent.bk.job.execute.model.tables.records.GseFileExecuteObjTaskRecord;

import java.util.concurrent.CountDownLatch;

/**
* gse_task_ip_log 表归档
* gse_file_execute_obj_task 表归档
*/
public class GseTaskIpLogArchivist extends AbstractArchivist<GseTaskIpLogRecord> {
public class GseFileExecuteObjTaskArchivist extends AbstractArchivist<GseFileExecuteObjTaskRecord> {

public GseTaskIpLogArchivist(GseTaskIpLogRecordDAO executeRecordDAO,
ExecuteArchiveDAO executeArchiveDAO,
ArchiveProgressService archiveProgressService,
ArchiveDBProperties archiveDBProperties,
Long maxNeedArchiveId,
CountDownLatch countDownLatch) {
public GseFileExecuteObjTaskArchivist(GseFileExecuteObjTaskRecordDAO executeRecordDAO,
ExecuteArchiveDAO executeArchiveDAO,
ArchiveProgressService archiveProgressService,
ArchiveDBProperties archiveDBProperties,
Long maxNeedArchiveId,
CountDownLatch countDownLatch) {
super(executeRecordDAO,
executeArchiveDAO,
archiveProgressService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,29 @@
import com.tencent.bk.job.backup.archive.AbstractArchivist;
import com.tencent.bk.job.backup.config.ArchiveDBProperties;
import com.tencent.bk.job.backup.dao.ExecuteArchiveDAO;
import com.tencent.bk.job.backup.dao.impl.GseTaskLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseScriptExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.service.ArchiveProgressService;
import com.tencent.bk.job.execute.model.tables.records.GseTaskLogRecord;
import com.tencent.bk.job.execute.model.tables.records.GseScriptExecuteObjTaskRecord;

import java.util.concurrent.CountDownLatch;

/**
* gse_task_log 表归档
* gse_script_execute_obj_task 表归档
*/
public class GseTaskLogArchivist extends AbstractArchivist<GseTaskLogRecord> {
public class GseScriptExecuteObjTaskArchivist extends AbstractArchivist<GseScriptExecuteObjTaskRecord> {

public GseTaskLogArchivist(GseTaskLogRecordDAO executeRecordDAO,
ExecuteArchiveDAO executeArchiveDAO,
ArchiveProgressService archiveProgressService,
ArchiveDBProperties archiveDBProperties,
Long maxNeedArchiveId,
CountDownLatch countDownLatch) {
public GseScriptExecuteObjTaskArchivist(GseScriptExecuteObjTaskRecordDAO executeRecordDAO,
ExecuteArchiveDAO executeArchiveDAO,
ArchiveProgressService archiveProgressService,
ArchiveDBProperties archiveDBProperties,
Long maxNeedArchiveId,
CountDownLatch countDownLatch) {
super(executeRecordDAO,
executeArchiveDAO,
archiveProgressService,
archiveDBProperties,
maxNeedArchiveId,
countDownLatch);
this.deleteIdStepSize = 10_000;
this.deleteIdStepSize = 1_000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import com.tencent.bk.job.backup.dao.impl.ExecuteArchiveDAOImpl;
import com.tencent.bk.job.backup.dao.impl.FileSourceTaskLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseFileAgentTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseFileExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseScriptAgentTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseTaskIpLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseTaskLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseScriptExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.OperationLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.RollingConfigRecordDAO;
Expand Down Expand Up @@ -139,22 +139,6 @@ public OperationLogRecordDAO operationLogRecordDAO(
return new OperationLogRecordDAO(context, archiveDBProperties);
}

@Bean(name = "gseTaskLogRecordDAO")
public GseTaskLogRecordDAO gseTaskLogRecordDAO(
@Qualifier("job-execute-dsl-context") DSLContext context,
ArchiveDBProperties archiveDBProperties) {
log.info("Init GseTaskLogRecordDAO");
return new GseTaskLogRecordDAO(context, archiveDBProperties);
}

@Bean(name = "gseTaskIpLogRecordDAO")
public GseTaskIpLogRecordDAO gseTaskIpLogRecordDAO(
@Qualifier("job-execute-dsl-context") DSLContext context,
ArchiveDBProperties archiveDBProperties) {
log.info("Init GseTaskIpLogRecordDAO");
return new GseTaskIpLogRecordDAO(context, archiveDBProperties);
}

@Bean(name = "fileSourceTaskLogRecordDAO")
public FileSourceTaskLogRecordDAO fileSourceTaskLogRecordDAO(
@Qualifier("job-execute-dsl-context") DSLContext context,
Expand Down Expand Up @@ -187,6 +171,22 @@ public GseFileAgentTaskRecordDAO gseFileAgentTaskRecordDAO(
return new GseFileAgentTaskRecordDAO(context, archiveDBProperties);
}

@Bean(name = "gseScriptExecuteObjTaskRecordDAO")
public GseScriptExecuteObjTaskRecordDAO gseScriptExecuteObjTaskRecordDAO(
@Qualifier("job-execute-dsl-context") DSLContext context,
ArchiveDBProperties archiveDBProperties) {
log.info("Init GseScriptExecuteObjTaskRecordDAO");
return new GseScriptExecuteObjTaskRecordDAO(context, archiveDBProperties);
}

@Bean(name = "gseFileExecuteObjTaskRecordDAO")
public GseFileExecuteObjTaskRecordDAO gseFileExecuteObjTaskRecordDAO(
@Qualifier("job-execute-dsl-context") DSLContext context,
ArchiveDBProperties archiveDBProperties) {
log.info("Init GseFileExecuteObjTaskRecordDAO");
return new GseFileExecuteObjTaskRecordDAO(context, archiveDBProperties);
}

@Bean(name = "stepInstanceRollingTaskRecordDAO")
public StepInstanceRollingTaskRecordDAO stepInstanceRollingTaskRecordDAO(
@Qualifier("job-execute-dsl-context") DSLContext context,
Expand Down Expand Up @@ -240,11 +240,11 @@ public JobExecuteArchiveManage jobExecuteArchiveManage(
ObjectProvider<TaskInstanceVariableRecordDAO> taskInstanceVariableRecordDAOObjectProvider,
ObjectProvider<OperationLogRecordDAO> operationLogRecordDAOObjectProvider,
ObjectProvider<FileSourceTaskLogRecordDAO> fileSourceTaskLogRecordDAOObjectProvider,
ObjectProvider<GseTaskLogRecordDAO> gseTaskLogRecordDAOObjectProvider,
ObjectProvider<GseTaskIpLogRecordDAO> gseTaskIpLogRecordDAOObjectProvider,
ObjectProvider<GseTaskRecordDAO> gseTaskRecordDAOObjectProvider,
ObjectProvider<GseScriptAgentTaskRecordDAO> gseScriptAgentTaskRecordDAOObjectProvider,
ObjectProvider<GseFileAgentTaskRecordDAO> gseFileAgentTaskRecordDAOObjectProvider,
ObjectProvider<GseScriptExecuteObjTaskRecordDAO> gseScriptExecuteObjTaskRecordDAOObjectProvider,
ObjectProvider<GseFileExecuteObjTaskRecordDAO> gseFileExecuteObjTaskRecordDAOObjectProvider,
ObjectProvider<StepInstanceRollingTaskRecordDAO> stepInstanceRollingTaskRecordDAOObjectProvider,
ObjectProvider<RollingConfigRecordDAO> rollingConfigRecordDAOObjectProvider,
ObjectProvider<TaskInstanceHostRecordDAO> taskInstanceHostRecordDAOObjectProvider,
Expand All @@ -264,11 +264,11 @@ public JobExecuteArchiveManage jobExecuteArchiveManage(
taskInstanceVariableRecordDAOObjectProvider.getIfAvailable(),
operationLogRecordDAOObjectProvider.getIfAvailable(),
fileSourceTaskLogRecordDAOObjectProvider.getIfAvailable(),
gseTaskLogRecordDAOObjectProvider.getIfAvailable(),
gseTaskIpLogRecordDAOObjectProvider.getIfAvailable(),
gseTaskRecordDAOObjectProvider.getIfAvailable(),
gseScriptAgentTaskRecordDAOObjectProvider.getIfAvailable(),
gseFileAgentTaskRecordDAOObjectProvider.getIfAvailable(),
gseScriptExecuteObjTaskRecordDAOObjectProvider.getIfAvailable(),
gseFileExecuteObjTaskRecordDAOObjectProvider.getIfAvailable(),
stepInstanceRollingTaskRecordDAOObjectProvider.getIfAvailable(),
rollingConfigRecordDAOObjectProvider.getIfAvailable(),
taskInstanceHostRecordDAOObjectProvider.getIfAvailable(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.tencent.bk.job.backup.dao.impl;

import com.tencent.bk.job.backup.config.ArchiveDBProperties;
import com.tencent.bk.job.execute.model.tables.GseFileExecuteObjTask;
import com.tencent.bk.job.execute.model.tables.records.GseFileExecuteObjTaskRecord;
import org.jooq.DSLContext;
import org.jooq.Table;
import org.jooq.TableField;

/**
* gse_file_execute_obj_task DAO
*/
public class GseFileExecuteObjTaskRecordDAO extends AbstractExecuteRecordDAO<GseFileExecuteObjTaskRecord> {

private static final GseFileExecuteObjTask TABLE = GseFileExecuteObjTask.GSE_FILE_EXECUTE_OBJ_TASK;

public GseFileExecuteObjTaskRecordDAO(DSLContext context, ArchiveDBProperties archiveDBProperties) {
super(context, archiveDBProperties);
}

@Override
public Table<GseFileExecuteObjTaskRecord> getTable() {
return TABLE;
}

@Override
public TableField<GseFileExecuteObjTaskRecord, Long> getArchiveIdField() {
return TABLE.TASK_INSTANCE_ID;
}
}
Loading

0 comments on commit 2539355

Please sign in to comment.