diff --git a/server/odc-server/src/main/resources/log4j2.xml b/server/odc-server/src/main/resources/log4j2.xml
index ad4a87e541..32c7e56fa8 100644
--- a/server/odc-server/src/main/resources/log4j2.xml
+++ b/server/odc-server/src/main/resources/log4j2.xml
@@ -816,6 +816,10 @@
+
+
+
+
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java
index 5b4c27a74e..d97d447f69 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMJobStore.java
@@ -73,8 +73,9 @@ public void destroy() {
}
public List getDlmTableUnits(Long scheduleTaskId) throws SQLException {
- try (Connection conn = dataSource.getConnection()) {
- PreparedStatement ps = conn.prepareStatement("select * from dlm_table_unit where schedule_task_id = ?");
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement ps =
+ conn.prepareStatement("select * from dlm_table_unit where schedule_task_id = ?")) {
ps.setLong(1, scheduleTaskId);
ResultSet resultSet = ps.executeQuery();
List dlmTableUnits = new LinkedList<>();
@@ -108,7 +109,7 @@ public void updateDlmTableUnitStatus(String dlmTableUnitId, TaskStatus status) t
"end_time = CASE WHEN ? IN ('CANCELED', 'DONE', 'FAILED') THEN CURRENT_TIMESTAMP ELSE end_time END " +
"WHERE dlm_table_unit_id = ?";
- try (PreparedStatement pstmt = dataSource.getConnection().prepareStatement(sql)) {
+ try (Connection conn = dataSource.getConnection(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, status.name());
pstmt.setString(2, status.name());
pstmt.setString(3, status.name());
@@ -152,9 +153,9 @@ public void storeDlmTableUnit(List dlmTableUnits) throws SQLExcept
@Override
public TaskGenerator getTaskGenerator(String generatorId, String jobId) throws SQLException {
if (enableBreakpointRecovery) {
- try (Connection conn = dataSource.getConnection()) {
- PreparedStatement ps = conn.prepareStatement(
- "select * from dlm_task_generator where job_id = ?");
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement ps = conn.prepareStatement(
+ "select * from dlm_task_generator where job_id = ?")) {
ps.setString(1, jobId);
ResultSet resultSet = ps.executeQuery();
if (resultSet.next()) {
@@ -191,8 +192,8 @@ public void storeTaskGenerator(TaskGenerator taskGenerator) throws SQLException
sb.append(
"processed_row_count=values(processed_row_count),processed_data_size=values(processed_data_size),primary_key_save_point=values(primary_key_save_point)");
log.info("start to store task generator:{}", taskGenerator);
- try (Connection conn = dataSource.getConnection()) {
- PreparedStatement ps = conn.prepareStatement(sb.toString());
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement ps = conn.prepareStatement(sb.toString())) {
ps.setString(1, taskGenerator.getId());
ps.setString(2, taskGenerator.getJobId());
ps.setLong(3, taskGenerator.getProcessedDataSize());
@@ -247,9 +248,9 @@ public void storeJobStatistic(JobMeta jobMeta) throws JobSqlException {
@Override
public List getTaskMeta(JobMeta jobMeta) throws SQLException {
if (enableBreakpointRecovery) {
- try (Connection conn = dataSource.getConnection()) {
- PreparedStatement ps = conn.prepareStatement(
- "select * from dlm_task_unit where generator_id = ? AND status !='SUCCESS'");
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement ps = conn.prepareStatement(
+ "select * from dlm_task_unit where generator_id = ? AND status !='SUCCESS'")) {
ps.setString(1, jobMeta.getGenerator().getId());
ResultSet resultSet = ps.executeQuery();
List taskMetas = new LinkedList<>();
@@ -286,8 +287,8 @@ public void storeTaskMeta(TaskMeta taskMeta) throws SQLException {
"status=values(status),partition_name=values(partition_name),lower_bound_primary_key=values(lower_bound_primary_key),");
sb.append(
"upper_bound_primary_key=values(upper_bound_primary_key),primary_key_cursor=values(primary_key_cursor)");
- try (Connection conn = dataSource.getConnection()) {
- PreparedStatement ps = conn.prepareStatement(sb.toString());
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement ps = conn.prepareStatement(sb.toString())) {
ps.setLong(1, taskMeta.getTaskIndex());
ps.setString(2, taskMeta.getJobMeta().getJobId());
ps.setString(3, taskMeta.getGeneratorId());
@@ -308,13 +309,13 @@ public void storeTaskMeta(TaskMeta taskMeta) throws SQLException {
@Override
public Long getAbnormalTaskIndex(String jobId) {
- try (Connection conn = dataSource.getConnection()) {
- PreparedStatement ps = conn.prepareStatement(
- "select count(1) from dlm_task_unit where job_id=? and (status != 'SUCCESS' or primary_key_cursor is null)");
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement ps = conn.prepareStatement(
+ "select count(1) from dlm_task_unit where job_id=? and (status != 'SUCCESS' or primary_key_cursor is null)")) {
ps.setString(1, jobId);
ResultSet resultSet = ps.executeQuery();
- while (resultSet.next()) {
- Long count = resultSet.getLong(1);
+ if (resultSet.next()) {
+ long count = resultSet.getLong(1);
return count > 0 ? count : null;
}
} catch (Exception ignored) {
@@ -330,9 +331,9 @@ public void updateTableSizeInfo(TableSizeInfo tableSizeInfo, long l) {
@Override
public void updateLimiter(JobMeta jobMeta) {
- try (Connection conn = dataSource.getConnection()) {
- PreparedStatement ps = conn.prepareStatement(
- "select * from dlm_config_limiter_configuration where order_id = ?");
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement ps = conn.prepareStatement(
+ "select * from dlm_config_limiter_configuration where order_id = ?")) {
ps.setString(1, DlmJobIdUtil.getJobName(jobMeta.getJobId()));
ResultSet resultSet = ps.executeQuery();
RateLimitConfiguration rateLimit;
@@ -381,10 +382,10 @@ private void setTableLimitConfig(TableMeta tableMeta, int rowLimit) {
}
private void initEnableBreakpointRecovery() {
- try (Connection conn = dataSource.getConnection()) {
- PreparedStatement preparedStatement = conn.prepareStatement(
- "select value from config_system_configuration where `key` = 'odc.task.dlm"
- + ".support-breakpoint-recovery'");
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement preparedStatement = conn.prepareStatement(
+ "select value from config_system_configuration where `key` = 'odc.task.dlm"
+ + ".support-breakpoint-recovery'")) {
ResultSet resultSet = preparedStatement.executeQuery();
if (resultSet.next()) {
this.enableBreakpointRecovery = resultSet.getBoolean(1);
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java
index 9cbd01b799..d0d5b2b4dc 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java
@@ -18,6 +18,7 @@
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
@@ -106,4 +107,16 @@ public List findByScheduleTaskId(Long scheduleTaskId) {
Collectors.toList());
}
+ public TaskStatus getTaskStatus(Long scheduleTaskId) {
+ Set collect = findByScheduleTaskId(scheduleTaskId).stream().map(DlmTableUnit::getStatus).collect(
+ Collectors.toSet());
+ if (collect.contains(TaskStatus.FAILED)) {
+ return TaskStatus.FAILED;
+ }
+ if (collect.contains(TaskStatus.DONE) && collect.size() == 1) {
+ return TaskStatus.DONE;
+ }
+ return TaskStatus.CANCELED;
+ }
+
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DataArchiveTableConfig.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DataArchiveTableConfig.java
index c43f50a825..b852f42cdb 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DataArchiveTableConfig.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/model/DataArchiveTableConfig.java
@@ -18,6 +18,8 @@
import java.util.LinkedList;
import java.util.List;
+import com.oceanbase.odc.common.util.StringUtils;
+
import lombok.Data;
/**
@@ -37,4 +39,8 @@ public class DataArchiveTableConfig {
// the sql condition such as "gmt_create < '2023-01-01'"
private String conditionExpression;
+
+ public String getTargetTableName() {
+ return StringUtils.isEmpty(targetTableName) ? tableName : targetTableName;
+ }
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DataArchivePreprocessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DataArchivePreprocessor.java
index 28ee8c9680..81194f622b 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DataArchivePreprocessor.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DataArchivePreprocessor.java
@@ -20,7 +20,6 @@
import org.springframework.beans.factory.annotation.Autowired;
-import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.common.util.VersionUtils;
import com.oceanbase.odc.core.session.ConnectionSession;
import com.oceanbase.odc.core.session.ConnectionSessionConstants;
@@ -188,11 +187,5 @@ private void initDefaultConfig(DataArchiveParameters parameters) {
parameters.setScanBatchSize(dlmConfiguration.getDefaultScanBatchSize());
parameters.setQueryTimeout(dlmConfiguration.getTaskConnectionQueryTimeout());
parameters.setShardingStrategy(dlmConfiguration.getShardingStrategy());
- // set default target table name.
- parameters.getTables().forEach(tableConfig -> {
- if (StringUtils.isEmpty(tableConfig.getTargetTableName())) {
- tableConfig.setTargetTableName(tableConfig.getTableName());
- }
- });
}
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DataDeletePreprocessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DataDeletePreprocessor.java
index 9ffbf85adf..d48cf1a00c 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DataDeletePreprocessor.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DataDeletePreprocessor.java
@@ -113,13 +113,6 @@ private void initDefaultConfig(DataDeleteParameters parameters) {
.setWriteThreadCount(dlmConfiguration.getSingleTaskThreadPoolSize() - parameters.getReadThreadCount());
parameters.setScanBatchSize(dlmConfiguration.getDefaultScanBatchSize());
parameters.setQueryTimeout(dlmConfiguration.getTaskConnectionQueryTimeout());
- // if no need check before delete, or target table name is null, set default target table name
- parameters.getTables().forEach(tableConfig -> {
- if (!parameters.getNeedCheckBeforeDelete() || Objects.isNull(tableConfig.getTargetTableName())) {
- tableConfig.setTargetTableName(tableConfig.getTableName());
- }
- });
-
}
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java
index c809f0717a..51371bca48 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java
@@ -20,8 +20,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.quartz.JobExecutionContext;
@@ -109,7 +107,6 @@ public void executeTask(Long taskId, List dlmTableUnits, Long time
log.info("Task is ready,taskId={}", taskId);
for (DlmTableUnit dlmTableUnit : dlmTableUnits) {
if (isInterrupted) {
- dlmService.updateStatusByDlmTableUnitId(dlmTableUnit.getDlmTableUnitId(), TaskStatus.CANCELED);
log.info("Task interrupted and will exit.TaskId={}", taskId);
continue;
}
@@ -118,17 +115,19 @@ public void executeTask(Long taskId, List dlmTableUnits, Long time
dlmTableUnit.getTableName());
continue;
}
- try {
- if (dlmTableUnit.getType() == JobType.MIGRATE) {
- try {
- DLMTableStructureSynchronizer.sync(dlmTableUnit.getSourceDatasourceInfo(),
- dlmTableUnit.getTargetDatasourceInfo(), dlmTableUnit.getTableName(),
- dlmTableUnit.getTargetTableName(),
- dlmTableUnit.getParameters().getSyncDBObjectType());
- } catch (SQLException e) {
- log.warn("Sync table structure failed,tableName={}", dlmTableUnit.getTableName(), e);
- }
+ if (dlmTableUnit.getType() == JobType.MIGRATE) {
+ try {
+ DLMTableStructureSynchronizer.sync(dlmTableUnit.getSourceDatasourceInfo(),
+ dlmTableUnit.getTargetDatasourceInfo(), dlmTableUnit.getTableName(),
+ dlmTableUnit.getTargetTableName(),
+ dlmTableUnit.getParameters().getSyncDBObjectType());
+ } catch (SQLException e) {
+ log.warn("Sync table structure failed,tableName={}", dlmTableUnit.getTableName(), e);
+ dlmService.updateStatusByDlmTableUnitId(dlmTableUnit.getDlmTableUnitId(), TaskStatus.FAILED);
+ continue;
}
+ }
+ try {
job = jobFactory.createJob(dlmTableUnit);
log.info("Create dlm job succeed,taskId={},task parameters={}", taskId, dlmTableUnit);
} catch (Exception e) {
@@ -144,7 +143,7 @@ public void executeTask(Long taskId, List dlmTableUnits, Long time
log.info("DLM job succeed,taskId={},unitId={}", taskId, dlmTableUnit.getDlmTableUnitId());
} catch (JobException e) {
// used to stop several sub-threads.
- if (job.getJobMeta().isToStop()) {
+ if (isInterrupted) {
log.info("Data archive task is Interrupted,taskId={}", taskId);
dlmService.updateStatusByDlmTableUnitId(dlmTableUnit.getDlmTableUnitId(), TaskStatus.CANCELED);
} else {
@@ -156,19 +155,7 @@ public void executeTask(Long taskId, List dlmTableUnits, Long time
}
public TaskStatus getTaskStatus(Long scheduleTaskId) {
- Set collect =
- dlmService.findByScheduleTaskId(scheduleTaskId).stream().map(DlmTableUnit::getStatus).collect(
- Collectors.toSet());
- if (collect.contains(TaskStatus.DONE) && collect.size() == 1) {
- return TaskStatus.DONE;
- }
- if (isInterrupted) {
- return TaskStatus.CANCELED;
- }
- if (collect.contains(TaskStatus.FAILED)) {
- return TaskStatus.FAILED;
- }
- return TaskStatus.CANCELED;
+ return dlmService.getTaskStatus(scheduleTaskId);
}
public List getTaskUnits(ScheduleTaskEntity taskEntity) {
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java
index 8954ed02e0..9326dcc478 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java
@@ -23,7 +23,6 @@
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.core.shared.constant.TaskStatus;
import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity;
-import com.oceanbase.odc.service.dlm.DataSourceInfoMapper;
import com.oceanbase.odc.service.dlm.model.DataArchiveParameters;
import com.oceanbase.odc.service.dlm.model.DataArchiveTableConfig;
import com.oceanbase.odc.service.dlm.model.DlmTableUnit;
@@ -91,16 +90,10 @@ private void executeInTaskFramework(JobExecutionContext context) {
parameters.setReadThreadCount(dataArchiveParameters.getReadThreadCount());
parameters.setShardingStrategy(dataArchiveParameters.getShardingStrategy());
parameters.setScanBatchSize(dataArchiveParameters.getScanBatchSize());
- parameters
- .setSourceDs(DataSourceInfoMapper.toDataSourceInfo(
- databaseService.findDataSourceForConnectById(dataArchiveParameters.getSourceDatabaseId())));
- parameters
- .setTargetDs(DataSourceInfoMapper.toDataSourceInfo(
- databaseService.findDataSourceForConnectById(dataArchiveParameters.getTargetDataBaseId())));
+ parameters.setSourceDs(getDataSourceInfo(dataArchiveParameters.getSourceDatabaseId()));
+ parameters.setTargetDs(getDataSourceInfo(dataArchiveParameters.getTargetDataBaseId()));
parameters.getSourceDs().setQueryTimeout(dataArchiveParameters.getQueryTimeout());
parameters.getTargetDs().setQueryTimeout(dataArchiveParameters.getQueryTimeout());
- parameters.getSourceDs().setDatabaseName(dataArchiveParameters.getSourceDatabaseName());
- parameters.getTargetDs().setDatabaseName(dataArchiveParameters.getTargetDatabaseName());
parameters.setSyncTableStructure(dataArchiveParameters.getSyncTableStructure());
Long jobId = publishJob(parameters, dataArchiveParameters.getTimeoutMillis());
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java
index 9e28363a09..f72cf3ae7e 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java
@@ -25,7 +25,6 @@
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.core.shared.constant.TaskStatus;
import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity;
-import com.oceanbase.odc.service.dlm.DataSourceInfoMapper;
import com.oceanbase.odc.service.dlm.model.DataArchiveTableConfig;
import com.oceanbase.odc.service.dlm.model.DataDeleteParameters;
import com.oceanbase.odc.service.dlm.model.DlmTableUnit;
@@ -33,6 +32,7 @@
import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration;
import com.oceanbase.odc.service.dlm.utils.DataArchiveConditionUtil;
import com.oceanbase.odc.service.dlm.utils.DlmJobIdUtil;
+import com.oceanbase.odc.service.schedule.model.DlmTableUnitStatistic;
import com.oceanbase.tools.migrator.common.enums.JobType;
import com.oceanbase.tools.migrator.task.CheckMode;
@@ -105,6 +105,7 @@ public List splitTask(ScheduleTaskEntity taskEntity) {
parameter.setMigratePartitions(table.getPartitions());
dlmTableUnit.setParameters(parameter);
dlmTableUnit.setStatus(TaskStatus.PREPARING);
+ dlmTableUnit.setStatistic(new DlmTableUnitStatistic());
JobType jobType = parameters.getNeedCheckBeforeDelete() ? JobType.DELETE : JobType.QUICK_DELETE;
dlmTableUnit.setType(parameters.getDeleteByUniqueKey() ? jobType : JobType.DEIRECT_DELETE);
dlmTasks.add(dlmTableUnit);
@@ -136,18 +137,12 @@ private void executeInTaskFramework(JobExecutionContext context) {
parameters.setWriteThreadCount(dataDeleteParameters.getWriteThreadCount());
parameters.setReadThreadCount(dataDeleteParameters.getReadThreadCount());
parameters.setScanBatchSize(dataDeleteParameters.getScanBatchSize());
- parameters
- .setSourceDs(DataSourceInfoMapper.toDataSourceInfo(
- databaseService.findDataSourceForConnectById(dataDeleteParameters.getDatabaseId())));
- parameters
- .setTargetDs(DataSourceInfoMapper.toDataSourceInfo(
- databaseService.findDataSourceForConnectById(dataDeleteParameters.getTargetDatabaseId() == null
- ? dataDeleteParameters.getDatabaseId()
- : dataDeleteParameters.getTargetDatabaseId())));
+ parameters.setSourceDs(getDataSourceInfo(dataDeleteParameters.getDatabaseId()));
+ parameters.setTargetDs(getDataSourceInfo(dataDeleteParameters.getTargetDatabaseId() == null
+ ? dataDeleteParameters.getDatabaseId()
+ : dataDeleteParameters.getTargetDatabaseId()));
parameters.getSourceDs().setQueryTimeout(dataDeleteParameters.getQueryTimeout());
parameters.getTargetDs().setQueryTimeout(dataDeleteParameters.getQueryTimeout());
- parameters.getSourceDs().setDatabaseName(dataDeleteParameters.getDatabaseName());
- parameters.getTargetDs().setDatabaseName(dataDeleteParameters.getTargetDatabaseName());
Long jobId = publishJob(parameters, dataDeleteParameters.getTimeoutMillis());
scheduleTaskRepository.updateJobIdById(taskEntity.getId(), jobId);
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/task/DataArchiveTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/task/DataArchiveTask.java
index 026e339b0f..b342ba195c 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/task/DataArchiveTask.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/executor/task/DataArchiveTask.java
@@ -84,7 +84,11 @@ protected boolean doStart(JobContext context) throws Exception {
log.info("Job is terminated,jobIdentity={}", context.getJobIdentity());
break;
}
- if (parameters.getJobType() == JobType.MIGRATE && !parameters.getSyncTableStructure().isEmpty()) {
+ if (dlmTableUnit.getStatus() == TaskStatus.DONE) {
+ log.info("The table had been completed,tableName={}", dlmTableUnit.getTableName());
+ continue;
+ }
+ if (parameters.getJobType() == JobType.MIGRATE) {
try {
DLMTableStructureSynchronizer.sync(
DataSourceInfoMapper.toConnectionConfig(parameters.getSourceDs()),
@@ -94,6 +98,7 @@ protected boolean doStart(JobContext context) throws Exception {
} catch (Exception e) {
log.warn("Failed to sync target table structure,table will be ignored,tableName={}",
dlmTableUnit.getTableName(), e);
+ jobStore.updateDlmTableUnitStatus(dlmTableUnit.getDlmTableUnitId(), TaskStatus.FAILED);
continue;
}
}
@@ -148,7 +153,7 @@ private List getDlmTableUnits(DLMJobReq req) throws SQLException {
dlmTableUnit.setTargetDatasourceInfo(req.getTargetDs());
dlmTableUnit.setFireTime(req.getFireTime());
dlmTableUnit.setStatus(TaskStatus.PREPARING);
- dlmTableUnit.setType(JobType.MIGRATE);
+ dlmTableUnit.setType(req.getJobType());
dlmTableUnit.setStatistic(new DlmTableUnitStatistic());
dlmTableUnits.add(dlmTableUnit);
});
@@ -159,6 +164,11 @@ private List getDlmTableUnits(DLMJobReq req) throws SQLException {
@Override
protected void doStop() throws Exception {
job.stop();
+ try {
+ jobStore.updateDlmTableUnitStatus(job.getJobMeta().getJobId(), TaskStatus.CANCELED);
+ } catch (Exception e) {
+ log.warn("Update dlm table unit status failed,DlmTableUnitId={}", job.getJobMeta().getJobId());
+ }
}
@Override
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java
index f6f4a9e089..135349e83f 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java
@@ -25,11 +25,12 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.oceanbase.odc.common.event.AbstractEventListener;
import com.oceanbase.odc.common.json.JsonUtils;
+import com.oceanbase.odc.core.shared.constant.TaskStatus;
import com.oceanbase.odc.metadb.schedule.ScheduleTaskRepository;
import com.oceanbase.odc.metadb.task.JobEntity;
import com.oceanbase.odc.metadb.task.TaskEntity;
import com.oceanbase.odc.metadb.task.TaskRepository;
-import com.oceanbase.odc.service.common.util.SpringContextUtil;
+import com.oceanbase.odc.service.dlm.DLMService;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.ScheduleTaskService;
import com.oceanbase.odc.service.schedule.job.DLMJobReq;
@@ -57,16 +58,17 @@ public class DefaultJobTerminateListener extends AbstractEventListener {
- taskRepository.updateStatusById(o.getId(), event.getStatus().convertTaskStatus());
- log.info("Update schedule task status to {} succeed,scheduleTaskId={}", event.getStatus(), o.getId());
+ TaskStatus taskStatus = dlmService.getTaskStatus(o.getId());
+ scheduleTaskRepository.updateStatusById(o.getId(), taskStatus);
+ log.info("Update schedule task status to {} succeed,scheduleTaskId={}", taskStatus, o.getId());
DLMJobReq parameters = JsonUtils.fromJson(
JsonUtils
.fromJson(jobEntity.getJobParametersJson(), new TypeReference