Skip to content

Commit

Permalink
fix(dlm): data delete retry failed (#2564)
Browse files Browse the repository at this point in the history
* several bug fix

* fix rollback failed
  • Loading branch information
guowl3 committed Jun 3, 2024
1 parent 0bfa172 commit 39d7fb5
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
Expand Down Expand Up @@ -104,11 +105,17 @@ public void jobToBeExecuted(JobExecutionContext context) {
// For tasks that do not allow concurrent execution, if they can be successfully scheduled, it
// indicates that the existing tasks have exited. If there are still tasks in the processing state,
// then it is necessary to correct their status.
if (!taskFrameworkEnabledProperties.isEnabled() && context.getJobDetail().isConcurrentExectionDisallowed()
&& scheduleEntity.getJobType().isSync()) {
List<ScheduleTaskEntity> processingTask = taskRepository.findByJobNameAndStatusIn(
if (context.getJobDetail().isConcurrentExectionDisallowed() && scheduleEntity.getJobType().isSync()) {
List<ScheduleTaskEntity> toBeCorrectedList = taskRepository.findByJobNameAndStatusIn(
scheduleId.toString(), TaskStatus.getProcessingStatus());
processingTask.forEach(task -> {
// For the scenario where the task framework is switched from closed to open, it is necessary to
// correct
// the status of tasks that were not completed while in the closed state.
if (taskFrameworkEnabledProperties.isEnabled()) {
toBeCorrectedList =
toBeCorrectedList.stream().filter(o -> o.getJobId() == null).collect(Collectors.toList());
}
toBeCorrectedList.forEach(task -> {
taskRepository.updateStatusById(task.getId(), TaskStatus.CANCELED);
log.info("Task status correction successful,scheduleTaskId={}", task.getId());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public abstract class AbstractDlmJob implements OdcJob {
public final TaskFrameworkEnabledProperties taskFrameworkProperties;

public final TaskFrameworkService taskFrameworkService;
public Thread jobThread;
private Job job;
private boolean isInterrupted = false;


public AbstractDlmJob() {
Expand All @@ -106,7 +106,7 @@ public void executeTask(Long taskId, List<DlmTableUnit> dlmTableUnits) {
scheduleTaskRepository.updateStatusById(taskId, TaskStatus.RUNNING);
log.info("Task is ready,taskId={}", taskId);
for (DlmTableUnit dlmTableUnit : dlmTableUnits) {
if (jobThread.isInterrupted()) {
if (isInterrupted) {
dlmService.updateStatusByDlmTableUnitId(dlmTableUnit.getDlmTableUnitId(), TaskStatus.CANCELED);
log.info("Task interrupted and will exit.TaskId={}", taskId);
continue;
Expand Down Expand Up @@ -160,7 +160,7 @@ public TaskStatus getTaskStatus(Long scheduleTaskId) {
if (collect.contains(TaskStatus.DONE) && collect.size() == 1) {
return TaskStatus.DONE;
}
if (jobThread.isInterrupted()) {
if (isInterrupted) {
return TaskStatus.CANCELED;
}
if (collect.contains(TaskStatus.FAILED)) {
Expand Down Expand Up @@ -275,13 +275,10 @@ public void after(JobExecutionContext context) {

@Override
public void interrupt() {
if (jobThread == null) {
throw new IllegalStateException("Task is not executing.");
}
isInterrupted = true;
if (job != null) {
job.stop();
log.info("Job will be interrupted,jobId={}", job.getJobMeta().getJobId());
}
jobThread.interrupt();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class DataArchiveDeleteJob extends AbstractDlmJob {
@Override
public void executeJob(JobExecutionContext context) {

jobThread = Thread.currentThread();
ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();
DataArchiveClearParameters dataArchiveClearParameters = JsonUtils.fromJson(taskEntity.getParametersJson(),
DataArchiveClearParameters.class);
Expand Down Expand Up @@ -76,20 +75,21 @@ public void executeJob(JobExecutionContext context) {
scheduleTaskRepository.updateTaskResult(taskEntity.getId(), JsonUtils.toJson(parameters));
return;
}

// prepare tasks for clear
List<DlmTableUnit> dlmTableUnits = dlmService.findByScheduleTaskId(dataArchiveTask.getId());
for (int i = 0; i < dlmTableUnits.size(); i++) {
dlmTableUnits.get(i)
.setDlmTableUnitId(
DlmJobIdUtil.generateHistoryJobId(taskEntity.getJobName(), taskEntity.getJobGroup(),
taskEntity.getId(),
i));
dlmTableUnits.get(i).setType(JobType.DELETE);
dlmTableUnits.get(i).setStatus(TaskStatus.PREPARING);
dlmTableUnits.get(i).setScheduleTaskId(taskEntity.getId());
List<DlmTableUnit> dlmTableUnits = dlmService.findByScheduleTaskId(taskEntity.getId());
if (dlmTableUnits.isEmpty()) {
dlmTableUnits = dlmService.findByScheduleTaskId(dataArchiveTask.getId());
for (int i = 0; i < dlmTableUnits.size(); i++) {
dlmTableUnits.get(i)
.setDlmTableUnitId(
DlmJobIdUtil.generateHistoryJobId(taskEntity.getJobName(), taskEntity.getJobGroup(),
taskEntity.getId(),
i));
dlmTableUnits.get(i).setType(JobType.DELETE);
dlmTableUnits.get(i).setStatus(TaskStatus.PREPARING);
dlmTableUnits.get(i).setScheduleTaskId(taskEntity.getId());
}
dlmService.createDlmTableUnits(dlmTableUnits);
}
dlmService.createDlmTableUnits(dlmTableUnits);
executeTask(taskEntity.getId(), dlmTableUnits);
TaskStatus taskStatus = getTaskStatus(taskEntity.getId());
scheduleTaskRepository.updateStatusById(taskEntity.getId(), taskStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public void executeJob(JobExecutionContext context) {
executeInTaskFramework(context);
return;
}
jobThread = Thread.currentThread();

ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ public class DataArchiveRollbackJob extends AbstractDlmJob {
@Override
public void executeJob(JobExecutionContext context) {

jobThread = Thread.currentThread();

ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();
DataArchiveRollbackParameters rollbackParameters = JsonUtils.fromJson(taskEntity.getParametersJson(),
DataArchiveRollbackParameters.class);
Expand Down Expand Up @@ -92,6 +90,9 @@ public void executeJob(JobExecutionContext context) {
i));
dlmTableUnit.setSourceDatasourceInfo(dlmTableUnit.getTargetDatasourceInfo());
dlmTableUnit.setTargetDatasourceInfo(temp);
String tmp = dlmTableUnit.getTableName();
dlmTableUnit.setTableName(dlmTableUnit.getTargetTableName());
dlmTableUnit.setTargetTableName(tmp);
dlmTableUnit.setType(JobType.ROLLBACK);
dlmTableUnit.setStatus(
dlmTableUnit.getStatus() == TaskStatus.PREPARING ? TaskStatus.DONE : TaskStatus.PREPARING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public class DataDeleteJob extends AbstractDlmJob {
@Override
public void executeJob(JobExecutionContext context) {

jobThread = Thread.currentThread();

ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();

// execute in task framework.
Expand Down Expand Up @@ -93,6 +91,7 @@ public List<DlmTableUnit> splitTask(ScheduleTaskEntity taskEntity) {
DlmTableUnitParameters parameter = new DlmTableUnitParameters();
parameter.setMigrateRule(condition);
parameter.setCheckMode(CheckMode.MULTIPLE_GET);
parameter.setMigratePartitions(table.getPartitions());
dlmTableUnit.setParameters(parameter);
dlmTableUnit.setStatus(TaskStatus.PREPARING);
JobType jobType = parameters.getNeedCheckBeforeDelete() ? JobType.DELETE : JobType.QUICK_DELETE;
Expand Down

0 comments on commit 39d7fb5

Please sign in to comment.