Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dlm): cherry-pick from 4.3.x #3518

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,28 @@ public List<DlmTableUnit> findByScheduleTaskId(Long scheduleTaskId) {
Collectors.toList());
}

/**
* generate final task status by scheduleTaskId when the task is finished
*/
@SkipAuthorize("odc internal usage")
public TaskStatus getTaskStatus(Long scheduleTaskId) {
return getTaskStatus(findByScheduleTaskId(scheduleTaskId));
}

public TaskStatus getTaskStatus(List<DlmTableUnit> dlmTableUnits) {
public TaskStatus getFinalTaskStatus(Long scheduleTaskId) {
List<DlmTableUnit> dlmTableUnits = findByScheduleTaskId(scheduleTaskId);
Set<TaskStatus> collect = dlmTableUnits.stream().map(DlmTableUnit::getStatus).collect(
Collectors.toSet());
// If any table fails, the task is considered a failure.
if (collect.contains(TaskStatus.FAILED)) {
return TaskStatus.FAILED;
}
if (collect.contains(TaskStatus.DONE) && collect.size() == 1) {
return TaskStatus.DONE;
// If any table is canceled, the task is considered canceled.
if (collect.contains(TaskStatus.CANCELED)) {
return TaskStatus.CANCELED;
}
// The task is considered failed if any table is still preparing or running when the task is
// finished.
if (collect.contains(TaskStatus.PREPARING) || collect.contains(TaskStatus.RUNNING)) {
return TaskStatus.FAILED;
}
return TaskStatus.CANCELED;
return TaskStatus.DONE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.core.shared.constant.FlowStatus;
import com.oceanbase.odc.core.shared.constant.TaskType;
import com.oceanbase.odc.core.shared.exception.NotFoundException;
import com.oceanbase.odc.metadb.flow.FlowInstanceEntity;
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
import com.oceanbase.odc.metadb.flow.UserTaskInstanceEntity;
Expand Down Expand Up @@ -195,11 +196,7 @@ public ScheduleDetailRespHist generateHistoryScheduleDetail(Schedule schedule) {
resp.setUpdateTime(schedule.getUpdateTime());
resp.setDescription(schedule.getDescription());
resp.setJobParameters(detailParameters(schedule));

List<Database> databaseByIds = getDatabaseByIds(Collections.singleton(schedule.getDatabaseId()));
if (!databaseByIds.isEmpty()) {
resp.setDatabase(databaseByIds.get(0));
}
resp.setDatabase(detailDatabaseOrNull(schedule.getDatabaseId()));

Set<Long> approvableFlowInstanceIds = approvalPermissionService.getApprovableApprovalInstances()
.stream()
Expand Down Expand Up @@ -387,29 +384,37 @@ private ScheduleTaskParameters detailParameters(Schedule schedule) {
switch (schedule.getType()) {
case DATA_ARCHIVE: {
DataArchiveParameters parameters = (DataArchiveParameters) schedule.getParameters();
parameters.setSourceDatabase(databaseService.detail(parameters.getSourceDatabaseId()));
parameters.setTargetDatabase(databaseService.detail(parameters.getTargetDataBaseId()));
parameters.setSourceDatabase(detailDatabaseOrNull(parameters.getSourceDatabaseId()));
parameters.setTargetDatabase(detailDatabaseOrNull(parameters.getTargetDataBaseId()));
limiterService.findByScheduleId(schedule.getId()).ifPresent(parameters::setRateLimit);
return parameters;
}
case DATA_DELETE: {
DataDeleteParameters parameters = (DataDeleteParameters) schedule.getParameters();
if (parameters.getTargetDatabaseId() != null) {
parameters.setTargetDatabase(databaseService.detail(parameters.getTargetDatabaseId()));
parameters.setTargetDatabase(detailDatabaseOrNull(parameters.getTargetDatabaseId()));
}
parameters.setDatabase(databaseService.detail(parameters.getDatabaseId()));
parameters.setDatabase(detailDatabaseOrNull(parameters.getDatabaseId()));
limiterService.findByScheduleId(schedule.getId()).ifPresent(parameters::setRateLimit);
return parameters;
}
case SQL_PLAN: {
SqlPlanParameters parameters = (SqlPlanParameters) schedule.getParameters();
if (parameters.getDatabaseId() != null) {
parameters.setDatabaseInfo(databaseService.detail(parameters.getDatabaseId()));
parameters.setDatabaseInfo(detailDatabaseOrNull(parameters.getDatabaseId()));
}
return parameters;
}
default:
return schedule.getParameters();
}
}

private Database detailDatabaseOrNull(Long databaseId) {
try {
return databaseService.detail(databaseId);
} catch (NotFoundException e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class DefaultJobTerminateListener extends AbstractEventListener<JobTermin
public void onEvent(JobTerminateEvent event) {
JobEntity jobEntity = taskFrameworkService.find(event.getJi().getId());
scheduleTaskService.findByJobId(jobEntity.getId()).ifPresent(o -> {
TaskStatus taskStatus = "DLM".equals(jobEntity.getJobType()) ? dlmService.getTaskStatus(o.getId())
TaskStatus taskStatus = "DLM".equals(jobEntity.getJobType()) ? dlmService.getFinalTaskStatus(o.getId())
: event.getStatus().convertTaskStatus();
scheduleTaskService.updateStatusById(o.getId(), taskStatus);
log.info("Update schedule task status to {} succeed,scheduleTaskId={}", taskStatus, o.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ public void process(TaskResult result) {
log.info("Create or update dlm tableUnits success,jobIdentity={},scheduleTaskId={}",
result.getJobIdentity(),
dlmTableUnits.get(0).getScheduleTaskId());
TaskStatus taskStatus = dlmService.getTaskStatus(dlmTableUnits);
taskService.updateStatusById(dlmTableUnits.get(0).getScheduleTaskId(), taskStatus);
TaskStatus taskStatus = taskService.nullSafeGetById(dlmTableUnits.get(0).getScheduleTaskId()).getStatus();
if (taskStatus != TaskStatus.RUNNING) {
taskService.updateStatusById(dlmTableUnits.get(0).getScheduleTaskId(), TaskStatus.RUNNING);
}
log.info("Update schedule task status to {} success", taskStatus);
} catch (Exception e) {
log.warn("Refresh result failed.", e);
Expand Down