Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dlm): data delete retry failed #2564

Merged
merged 2 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
Expand Down Expand Up @@ -104,11 +105,17 @@ public void jobToBeExecuted(JobExecutionContext context) {
// For tasks that do not allow concurrent execution, if they can be successfully scheduled, it
// indicates that the existing tasks have exited. If there are still tasks in the processing state,
// then it is necessary to correct their status.
if (!taskFrameworkEnabledProperties.isEnabled() && context.getJobDetail().isConcurrentExectionDisallowed()
&& scheduleEntity.getJobType().isSync()) {
List<ScheduleTaskEntity> processingTask = taskRepository.findByJobNameAndStatusIn(
if (context.getJobDetail().isConcurrentExectionDisallowed() && scheduleEntity.getJobType().isSync()) {
List<ScheduleTaskEntity> toBeCorrectedList = taskRepository.findByJobNameAndStatusIn(
scheduleId.toString(), TaskStatus.getProcessingStatus());
processingTask.forEach(task -> {
// For the scenario where the task framework is switched from closed to open, it is necessary to
// correct
// the status of tasks that were not completed while in the closed state.
if (taskFrameworkEnabledProperties.isEnabled()) {
toBeCorrectedList =
toBeCorrectedList.stream().filter(o -> o.getJobId() == null).collect(Collectors.toList());
}
toBeCorrectedList.forEach(task -> {
taskRepository.updateStatusById(task.getId(), TaskStatus.CANCELED);
log.info("Task status correction successful,scheduleTaskId={}", task.getId());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public abstract class AbstractDlmJob implements OdcJob {
public final TaskFrameworkEnabledProperties taskFrameworkProperties;

public final TaskFrameworkService taskFrameworkService;
public Thread jobThread;
private Job job;
private boolean isInterrupted = false;


public AbstractDlmJob() {
Expand All @@ -106,7 +106,7 @@ public void executeTask(Long taskId, List<DlmTableUnit> dlmTableUnits) {
scheduleTaskRepository.updateStatusById(taskId, TaskStatus.RUNNING);
log.info("Task is ready,taskId={}", taskId);
for (DlmTableUnit dlmTableUnit : dlmTableUnits) {
if (jobThread.isInterrupted()) {
if (isInterrupted) {
dlmService.updateStatusByDlmTableUnitId(dlmTableUnit.getDlmTableUnitId(), TaskStatus.CANCELED);
log.info("Task interrupted and will exit.TaskId={}", taskId);
continue;
Expand Down Expand Up @@ -160,7 +160,7 @@ public TaskStatus getTaskStatus(Long scheduleTaskId) {
if (collect.contains(TaskStatus.DONE) && collect.size() == 1) {
return TaskStatus.DONE;
}
if (jobThread.isInterrupted()) {
if (isInterrupted) {
return TaskStatus.CANCELED;
}
if (collect.contains(TaskStatus.FAILED)) {
Expand Down Expand Up @@ -275,13 +275,10 @@ public void after(JobExecutionContext context) {

@Override
public void interrupt() {
if (jobThread == null) {
throw new IllegalStateException("Task is not executing.");
}
isInterrupted = true;
if (job != null) {
job.stop();
log.info("Job will be interrupted,jobId={}", job.getJobMeta().getJobId());
}
jobThread.interrupt();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class DataArchiveDeleteJob extends AbstractDlmJob {
@Override
public void executeJob(JobExecutionContext context) {

jobThread = Thread.currentThread();
ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();
DataArchiveClearParameters dataArchiveClearParameters = JsonUtils.fromJson(taskEntity.getParametersJson(),
DataArchiveClearParameters.class);
Expand Down Expand Up @@ -76,20 +75,21 @@ public void executeJob(JobExecutionContext context) {
scheduleTaskRepository.updateTaskResult(taskEntity.getId(), JsonUtils.toJson(parameters));
return;
}

// prepare tasks for clear
List<DlmTableUnit> dlmTableUnits = dlmService.findByScheduleTaskId(dataArchiveTask.getId());
for (int i = 0; i < dlmTableUnits.size(); i++) {
dlmTableUnits.get(i)
.setDlmTableUnitId(
DlmJobIdUtil.generateHistoryJobId(taskEntity.getJobName(), taskEntity.getJobGroup(),
taskEntity.getId(),
i));
dlmTableUnits.get(i).setType(JobType.DELETE);
dlmTableUnits.get(i).setStatus(TaskStatus.PREPARING);
dlmTableUnits.get(i).setScheduleTaskId(taskEntity.getId());
List<DlmTableUnit> dlmTableUnits = dlmService.findByScheduleTaskId(taskEntity.getId());
if (dlmTableUnits.isEmpty()) {
dlmTableUnits = dlmService.findByScheduleTaskId(dataArchiveTask.getId());
for (int i = 0; i < dlmTableUnits.size(); i++) {
dlmTableUnits.get(i)
.setDlmTableUnitId(
DlmJobIdUtil.generateHistoryJobId(taskEntity.getJobName(), taskEntity.getJobGroup(),
taskEntity.getId(),
i));
dlmTableUnits.get(i).setType(JobType.DELETE);
dlmTableUnits.get(i).setStatus(TaskStatus.PREPARING);
dlmTableUnits.get(i).setScheduleTaskId(taskEntity.getId());
}
dlmService.createDlmTableUnits(dlmTableUnits);
}
dlmService.createDlmTableUnits(dlmTableUnits);
executeTask(taskEntity.getId(), dlmTableUnits);
TaskStatus taskStatus = getTaskStatus(taskEntity.getId());
scheduleTaskRepository.updateStatusById(taskEntity.getId(), taskStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public void executeJob(JobExecutionContext context) {
executeInTaskFramework(context);
return;
}
jobThread = Thread.currentThread();

ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ public class DataArchiveRollbackJob extends AbstractDlmJob {
@Override
public void executeJob(JobExecutionContext context) {

jobThread = Thread.currentThread();

ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();
DataArchiveRollbackParameters rollbackParameters = JsonUtils.fromJson(taskEntity.getParametersJson(),
DataArchiveRollbackParameters.class);
Expand Down Expand Up @@ -92,6 +90,9 @@ public void executeJob(JobExecutionContext context) {
i));
dlmTableUnit.setSourceDatasourceInfo(dlmTableUnit.getTargetDatasourceInfo());
dlmTableUnit.setTargetDatasourceInfo(temp);
String tmp = dlmTableUnit.getTableName();
dlmTableUnit.setTableName(dlmTableUnit.getTargetTableName());
dlmTableUnit.setTargetTableName(tmp);
dlmTableUnit.setType(JobType.ROLLBACK);
dlmTableUnit.setStatus(
dlmTableUnit.getStatus() == TaskStatus.PREPARING ? TaskStatus.DONE : TaskStatus.PREPARING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public class DataDeleteJob extends AbstractDlmJob {
@Override
public void executeJob(JobExecutionContext context) {

jobThread = Thread.currentThread();

ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();

// execute in task framework.
Expand Down Expand Up @@ -93,6 +91,7 @@ public List<DlmTableUnit> splitTask(ScheduleTaskEntity taskEntity) {
DlmTableUnitParameters parameter = new DlmTableUnitParameters();
parameter.setMigrateRule(condition);
parameter.setCheckMode(CheckMode.MULTIPLE_GET);
parameter.setMigratePartitions(table.getPartitions());
dlmTableUnit.setParameters(parameter);
dlmTableUnit.setStatus(TaskStatus.PREPARING);
JobType jobType = parameters.getNeedCheckBeforeDelete() ? JobType.DELETE : JobType.QUICK_DELETE;
Expand Down