diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/OdcJobListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/OdcJobListener.java index 1ff3d60d65..8d4a5ae6cb 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/OdcJobListener.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/OdcJobListener.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; @@ -104,11 +105,17 @@ public void jobToBeExecuted(JobExecutionContext context) { // For tasks that do not allow concurrent execution, if they can be successfully scheduled, it // indicates that the existing tasks have exited. If there are still tasks in the processing state, // then it is necessary to correct their status. - if (!taskFrameworkEnabledProperties.isEnabled() && context.getJobDetail().isConcurrentExectionDisallowed() - && scheduleEntity.getJobType().isSync()) { - List processingTask = taskRepository.findByJobNameAndStatusIn( + if (context.getJobDetail().isConcurrentExectionDisallowed() && scheduleEntity.getJobType().isSync()) { + List toBeCorrectedList = taskRepository.findByJobNameAndStatusIn( scheduleId.toString(), TaskStatus.getProcessingStatus()); - processingTask.forEach(task -> { + // For the scenario where the task framework is switched from closed to open, it is necessary to + // correct + // the status of tasks that were not completed while in the closed state. + if (taskFrameworkEnabledProperties.isEnabled()) { + toBeCorrectedList = + toBeCorrectedList.stream().filter(o -> o.getJobId() == null).collect(Collectors.toList()); + } + toBeCorrectedList.forEach(task -> { taskRepository.updateStatusById(task.getId(), TaskStatus.CANCELED); log.info("Task status correction successful,scheduleTaskId={}", task.getId()); }); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java index 8abff94f64..962b44ab3d 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/AbstractDlmJob.java @@ -84,8 +84,8 @@ public abstract class AbstractDlmJob implements OdcJob { public final TaskFrameworkEnabledProperties taskFrameworkProperties; public final TaskFrameworkService taskFrameworkService; - public Thread jobThread; private Job job; + private boolean isInterrupted = false; public AbstractDlmJob() { @@ -106,7 +106,7 @@ public void executeTask(Long taskId, List dlmTableUnits) { scheduleTaskRepository.updateStatusById(taskId, TaskStatus.RUNNING); log.info("Task is ready,taskId={}", taskId); for (DlmTableUnit dlmTableUnit : dlmTableUnits) { - if (jobThread.isInterrupted()) { + if (isInterrupted) { dlmService.updateStatusByDlmTableUnitId(dlmTableUnit.getDlmTableUnitId(), TaskStatus.CANCELED); log.info("Task interrupted and will exit.TaskId={}", taskId); continue; @@ -160,7 +160,7 @@ public TaskStatus getTaskStatus(Long scheduleTaskId) { if (collect.contains(TaskStatus.DONE) && collect.size() == 1) { return TaskStatus.DONE; } - if (jobThread.isInterrupted()) { + if (isInterrupted) { return TaskStatus.CANCELED; } if (collect.contains(TaskStatus.FAILED)) { @@ -275,13 +275,10 @@ public void after(JobExecutionContext context) { @Override public void interrupt() { - if (jobThread == null) { - throw new IllegalStateException("Task is not executing."); - } + isInterrupted = true; if (job != null) { job.stop(); log.info("Job will be interrupted,jobId={}", job.getJobMeta().getJobId()); } - jobThread.interrupt(); } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java index 180f8c98bf..6981a1ab50 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveDeleteJob.java @@ -40,7 +40,6 @@ public class DataArchiveDeleteJob extends AbstractDlmJob { @Override public void executeJob(JobExecutionContext context) { - jobThread = Thread.currentThread(); ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult(); DataArchiveClearParameters dataArchiveClearParameters = JsonUtils.fromJson(taskEntity.getParametersJson(), DataArchiveClearParameters.class); @@ -76,20 +75,21 @@ public void executeJob(JobExecutionContext context) { scheduleTaskRepository.updateTaskResult(taskEntity.getId(), JsonUtils.toJson(parameters)); return; } - - // prepare tasks for clear - List dlmTableUnits = dlmService.findByScheduleTaskId(dataArchiveTask.getId()); - for (int i = 0; i < dlmTableUnits.size(); i++) { - dlmTableUnits.get(i) - .setDlmTableUnitId( - DlmJobIdUtil.generateHistoryJobId(taskEntity.getJobName(), taskEntity.getJobGroup(), - taskEntity.getId(), - i)); - dlmTableUnits.get(i).setType(JobType.DELETE); - dlmTableUnits.get(i).setStatus(TaskStatus.PREPARING); - dlmTableUnits.get(i).setScheduleTaskId(taskEntity.getId()); + List dlmTableUnits = dlmService.findByScheduleTaskId(taskEntity.getId()); + if (dlmTableUnits.isEmpty()) { + dlmTableUnits = dlmService.findByScheduleTaskId(dataArchiveTask.getId()); + for (int i = 0; i < dlmTableUnits.size(); i++) { + dlmTableUnits.get(i) + .setDlmTableUnitId( + DlmJobIdUtil.generateHistoryJobId(taskEntity.getJobName(), taskEntity.getJobGroup(), + taskEntity.getId(), + i)); + dlmTableUnits.get(i).setType(JobType.DELETE); + dlmTableUnits.get(i).setStatus(TaskStatus.PREPARING); + dlmTableUnits.get(i).setScheduleTaskId(taskEntity.getId()); + } + dlmService.createDlmTableUnits(dlmTableUnits); } - dlmService.createDlmTableUnits(dlmTableUnits); executeTask(taskEntity.getId(), dlmTableUnits); TaskStatus taskStatus = getTaskStatus(taskEntity.getId()); scheduleTaskRepository.updateStatusById(taskEntity.getId(), taskStatus); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java index cd90bf5f53..7cff4e51f3 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveJob.java @@ -46,7 +46,6 @@ public void executeJob(JobExecutionContext context) { executeInTaskFramework(context); return; } - jobThread = Thread.currentThread(); ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult(); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java index 9658e1b3fb..eccb50d125 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataArchiveRollbackJob.java @@ -42,8 +42,6 @@ public class DataArchiveRollbackJob extends AbstractDlmJob { @Override public void executeJob(JobExecutionContext context) { - jobThread = Thread.currentThread(); - ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult(); DataArchiveRollbackParameters rollbackParameters = JsonUtils.fromJson(taskEntity.getParametersJson(), DataArchiveRollbackParameters.class); @@ -92,6 +90,9 @@ public void executeJob(JobExecutionContext context) { i)); dlmTableUnit.setSourceDatasourceInfo(dlmTableUnit.getTargetDatasourceInfo()); dlmTableUnit.setTargetDatasourceInfo(temp); + String tmp = dlmTableUnit.getTableName(); + dlmTableUnit.setTableName(dlmTableUnit.getTargetTableName()); + dlmTableUnit.setTargetTableName(tmp); dlmTableUnit.setType(JobType.ROLLBACK); dlmTableUnit.setStatus( dlmTableUnit.getStatus() == TaskStatus.PREPARING ? TaskStatus.DONE : TaskStatus.PREPARING); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java index 94f3d6ffea..2a9fc7b575 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/job/DataDeleteJob.java @@ -49,8 +49,6 @@ public class DataDeleteJob extends AbstractDlmJob { @Override public void executeJob(JobExecutionContext context) { - jobThread = Thread.currentThread(); - ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult(); // execute in task framework. @@ -93,6 +91,7 @@ public List splitTask(ScheduleTaskEntity taskEntity) { DlmTableUnitParameters parameter = new DlmTableUnitParameters(); parameter.setMigrateRule(condition); parameter.setCheckMode(CheckMode.MULTIPLE_GET); + parameter.setMigratePartitions(table.getPartitions()); dlmTableUnit.setParameters(parameter); dlmTableUnit.setStatus(TaskStatus.PREPARING); JobType jobType = parameters.getNeedCheckBeforeDelete() ? JobType.DELETE : JobType.QUICK_DELETE;