Skip to content

Commit

Permalink
fix(dlm): task status is wrong (#3491)
Browse files Browse the repository at this point in the history
* fix the issue of wrong task status

* format

* fix the issue of throw not found exception when database is deleted.

* simplify code
  • Loading branch information
guowl3 authored Sep 23, 2024
1 parent 2da088b commit acf70aa
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,28 @@ public List<DlmTableUnit> findByScheduleTaskId(Long scheduleTaskId) {
Collectors.toList());
}

/**
* generate final task status by scheduleTaskId when the task is finished
*/
@SkipAuthorize("odc internal usage")
public TaskStatus getTaskStatus(Long scheduleTaskId) {
return getTaskStatus(findByScheduleTaskId(scheduleTaskId));
}

public TaskStatus getTaskStatus(List<DlmTableUnit> dlmTableUnits) {
public TaskStatus getFinalTaskStatus(Long scheduleTaskId) {
List<DlmTableUnit> dlmTableUnits = findByScheduleTaskId(scheduleTaskId);
Set<TaskStatus> collect = dlmTableUnits.stream().map(DlmTableUnit::getStatus).collect(
Collectors.toSet());
// If any table fails, the task is considered a failure.
if (collect.contains(TaskStatus.FAILED)) {
return TaskStatus.FAILED;
}
if (collect.contains(TaskStatus.DONE) && collect.size() == 1) {
return TaskStatus.DONE;
// If any table is canceled, the task is considered canceled.
if (collect.contains(TaskStatus.CANCELED)) {
return TaskStatus.CANCELED;
}
// The task is considered failed if any table is still preparing or running when the task is
// finished.
if (collect.contains(TaskStatus.PREPARING) || collect.contains(TaskStatus.RUNNING)) {
return TaskStatus.FAILED;
}
return TaskStatus.CANCELED;
return TaskStatus.DONE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.core.shared.constant.FlowStatus;
import com.oceanbase.odc.core.shared.constant.TaskType;
import com.oceanbase.odc.core.shared.exception.NotFoundException;
import com.oceanbase.odc.metadb.flow.FlowInstanceEntity;
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
import com.oceanbase.odc.metadb.flow.UserTaskInstanceEntity;
Expand Down Expand Up @@ -195,11 +196,7 @@ public ScheduleDetailRespHist generateHistoryScheduleDetail(Schedule schedule) {
resp.setUpdateTime(schedule.getUpdateTime());
resp.setDescription(schedule.getDescription());
resp.setJobParameters(detailParameters(schedule));

List<Database> databaseByIds = getDatabaseByIds(Collections.singleton(schedule.getDatabaseId()));
if (!databaseByIds.isEmpty()) {
resp.setDatabase(databaseByIds.get(0));
}
resp.setDatabase(detailDatabaseOrNull(schedule.getDatabaseId()));

Set<Long> approvableFlowInstanceIds = approvalPermissionService.getApprovableApprovalInstances()
.stream()
Expand Down Expand Up @@ -387,29 +384,37 @@ private ScheduleTaskParameters detailParameters(Schedule schedule) {
switch (schedule.getType()) {
case DATA_ARCHIVE: {
DataArchiveParameters parameters = (DataArchiveParameters) schedule.getParameters();
parameters.setSourceDatabase(databaseService.detail(parameters.getSourceDatabaseId()));
parameters.setTargetDatabase(databaseService.detail(parameters.getTargetDataBaseId()));
parameters.setSourceDatabase(detailDatabaseOrNull(parameters.getSourceDatabaseId()));
parameters.setTargetDatabase(detailDatabaseOrNull(parameters.getTargetDataBaseId()));
limiterService.findByScheduleId(schedule.getId()).ifPresent(parameters::setRateLimit);
return parameters;
}
case DATA_DELETE: {
DataDeleteParameters parameters = (DataDeleteParameters) schedule.getParameters();
if (parameters.getTargetDatabaseId() != null) {
parameters.setTargetDatabase(databaseService.detail(parameters.getTargetDatabaseId()));
parameters.setTargetDatabase(detailDatabaseOrNull(parameters.getTargetDatabaseId()));
}
parameters.setDatabase(databaseService.detail(parameters.getDatabaseId()));
parameters.setDatabase(detailDatabaseOrNull(parameters.getDatabaseId()));
limiterService.findByScheduleId(schedule.getId()).ifPresent(parameters::setRateLimit);
return parameters;
}
case SQL_PLAN: {
SqlPlanParameters parameters = (SqlPlanParameters) schedule.getParameters();
if (parameters.getDatabaseId() != null) {
parameters.setDatabaseInfo(databaseService.detail(parameters.getDatabaseId()));
parameters.setDatabaseInfo(detailDatabaseOrNull(parameters.getDatabaseId()));
}
return parameters;
}
default:
return schedule.getParameters();
}
}

private Database detailDatabaseOrNull(Long databaseId) {
try {
return databaseService.detail(databaseId);
} catch (NotFoundException e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class DefaultJobTerminateListener extends AbstractEventListener<JobTermin
public void onEvent(JobTerminateEvent event) {
JobEntity jobEntity = taskFrameworkService.find(event.getJi().getId());
scheduleTaskService.findByJobId(jobEntity.getId()).ifPresent(o -> {
TaskStatus taskStatus = "DLM".equals(jobEntity.getJobType()) ? dlmService.getTaskStatus(o.getId())
TaskStatus taskStatus = "DLM".equals(jobEntity.getJobType()) ? dlmService.getFinalTaskStatus(o.getId())
: event.getStatus().convertTaskStatus();
scheduleTaskService.updateStatusById(o.getId(), taskStatus);
log.info("Update schedule task status to {} succeed,scheduleTaskId={}", taskStatus, o.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ public void process(TaskResult result) {
log.info("Create or update dlm tableUnits success,jobIdentity={},scheduleTaskId={}",
result.getJobIdentity(),
dlmTableUnits.get(0).getScheduleTaskId());
TaskStatus taskStatus = dlmService.getTaskStatus(dlmTableUnits);
taskService.updateStatusById(dlmTableUnits.get(0).getScheduleTaskId(), taskStatus);
TaskStatus taskStatus = taskService.nullSafeGetById(dlmTableUnits.get(0).getScheduleTaskId()).getStatus();
if (taskStatus != TaskStatus.RUNNING) {
taskService.updateStatusById(dlmTableUnits.get(0).getScheduleTaskId(), TaskStatus.RUNNING);
}
log.info("Update schedule task status to {} success", taskStatus);
} catch (Exception e) {
log.warn("Refresh result failed.", e);
Expand Down

0 comments on commit acf70aa

Please sign in to comment.