Skip to content

Commit

Permalink
fix(dlm): cherry pick from 4.3.x (#3541)
Browse files Browse the repository at this point in the history
* fix(schedule): risk level mismatch when operating a schedule (#3529)

* fix select risklevel failed

* fix description

* fix(dlm): task status is wrong (#3491)

* fix the issue of wrong task status

* format

* fix the issue of throw not found exception when database is deleted.

* simplify code
  • Loading branch information
guowl3 committed Sep 24, 2024
1 parent 15a732e commit 4b4d16b
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,28 @@ public List<DlmTableUnit> findByScheduleTaskId(Long scheduleTaskId) {
Collectors.toList());
}

/**
* generate final task status by scheduleTaskId when the task is finished
*/
@SkipAuthorize("odc internal usage")
public TaskStatus getTaskStatus(Long scheduleTaskId) {
return getTaskStatus(findByScheduleTaskId(scheduleTaskId));
}

public TaskStatus getTaskStatus(List<DlmTableUnit> dlmTableUnits) {
public TaskStatus getFinalTaskStatus(Long scheduleTaskId) {
List<DlmTableUnit> dlmTableUnits = findByScheduleTaskId(scheduleTaskId);
Set<TaskStatus> collect = dlmTableUnits.stream().map(DlmTableUnit::getStatus).collect(
Collectors.toSet());
// If any table fails, the task is considered a failure.
if (collect.contains(TaskStatus.FAILED)) {
return TaskStatus.FAILED;
}
if (collect.contains(TaskStatus.DONE) && collect.size() == 1) {
return TaskStatus.DONE;
// If any table is canceled, the task is considered canceled.
if (collect.contains(TaskStatus.CANCELED)) {
return TaskStatus.CANCELED;
}
// The task is considered failed if any table is still preparing or running when the task is
// finished.
if (collect.contains(TaskStatus.PREPARING) || collect.contains(TaskStatus.RUNNING)) {
return TaskStatus.FAILED;
}
return TaskStatus.CANCELED;
return TaskStatus.DONE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase;
import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq;
import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters;
import com.oceanbase.odc.service.schedule.model.CreateScheduleReq;
import com.oceanbase.odc.service.schedule.model.ScheduleChangeParams;

/**
* @Author:tinker
Expand Down Expand Up @@ -51,15 +51,15 @@ public static void generateDescription(CreateFlowInstanceReq req) {
}
}

public static void generateScheduleDescription(CreateScheduleReq req) {
if (StringUtils.isEmpty(req.getDescription())) {
public static void generateScheduleDescription(ScheduleChangeParams req) {
if (StringUtils.isEmpty(req.getCreateScheduleReq().getDescription())) {
String environmentName = req.getEnvironmentName();
String connectionName = req.getConnectionName();
String databaseName = req.getDatabaseName();
String description =
StringUtils.isEmpty(connectionName) ? String.format("[%s]%s", environmentName, databaseName)
: String.format("[%s]%s.%s", environmentName, connectionName, databaseName);
req.setDescription(description);
req.getCreateScheduleReq().setDescription(description);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public List<FlowInstanceDetailResp> dispatchCreateSchedule(CreateFlowInstanceReq
createScheduleReq.setParameters(parameters.getScheduleTaskParameters());
createScheduleReq.setTriggerConfig(parameters.getTriggerConfig());
createScheduleReq.setType(parameters.getType());
createScheduleReq.setDescription(parameters.getDescription());
createScheduleReq.setDescription(createReq.getDescription());
scheduleChangeParams = ScheduleChangeParams.with(createScheduleReq);
break;
}
Expand All @@ -229,7 +229,7 @@ public List<FlowInstanceDetailResp> dispatchCreateSchedule(CreateFlowInstanceReq
updateScheduleReq.setParameters(parameters.getScheduleTaskParameters());
updateScheduleReq.setTriggerConfig(parameters.getTriggerConfig());
updateScheduleReq.setType(parameters.getType());
updateScheduleReq.setDescription(parameters.getDescription());
updateScheduleReq.setDescription(createReq.getDescription());
scheduleChangeParams = ScheduleChangeParams.with(parameters.getTaskId(), updateScheduleReq);
break;
}
Expand Down Expand Up @@ -257,8 +257,8 @@ public ChangeScheduleResp changeSchedule(ScheduleChangeParams req) {
ScheduleEntity entity = new ScheduleEntity();

entity.setName(req.getCreateScheduleReq().getName());
entity.setProjectId(req.getCreateScheduleReq().getProjectId());
DescriptionGenerator.generateScheduleDescription(req.getCreateScheduleReq());
entity.setProjectId(req.getProjectId());
DescriptionGenerator.generateScheduleDescription(req);
entity.setDescription(req.getCreateScheduleReq().getDescription());
entity.setJobParametersJson(JsonUtils.toJson(req.getCreateScheduleReq().getParameters()));
entity.setTriggerConfigJson(JsonUtils.toJson(req.getCreateScheduleReq().getTriggerConfig()));
Expand All @@ -270,9 +270,9 @@ public ChangeScheduleResp changeSchedule(ScheduleChangeParams req) {
entity.setOrganizationId(authenticationFacade.currentOrganizationId());
entity.setCreatorId(authenticationFacade.currentUserId());
entity.setModifierId(authenticationFacade.currentUserId());
entity.setDatabaseId(req.getCreateScheduleReq().getDatabaseId());
entity.setDatabaseName(req.getCreateScheduleReq().getDatabaseName());
entity.setDataSourceId(req.getCreateScheduleReq().getConnectionId());
entity.setDatabaseId(req.getDatabaseId());
entity.setDatabaseName(req.getDatabaseName());
entity.setDataSourceId(req.getConnectionId());

targetSchedule = scheduleMapper.entityToModel(scheduleRepository.save(entity));
req.setScheduleId(targetSchedule.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.core.shared.constant.FlowStatus;
import com.oceanbase.odc.core.shared.constant.TaskType;
import com.oceanbase.odc.core.shared.exception.NotFoundException;
import com.oceanbase.odc.metadb.flow.FlowInstanceEntity;
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
import com.oceanbase.odc.metadb.flow.UserTaskInstanceEntity;
Expand Down Expand Up @@ -195,11 +196,7 @@ public ScheduleDetailRespHist generateHistoryScheduleDetail(Schedule schedule) {
resp.setUpdateTime(schedule.getUpdateTime());
resp.setDescription(schedule.getDescription());
resp.setJobParameters(detailParameters(schedule));

List<Database> databaseByIds = getDatabaseByIds(Collections.singleton(schedule.getDatabaseId()));
if (!databaseByIds.isEmpty()) {
resp.setDatabase(databaseByIds.get(0));
}
resp.setDatabase(detailDatabaseOrNull(schedule.getDatabaseId()));

Set<Long> approvableFlowInstanceIds = approvalPermissionService.getApprovableApprovalInstances()
.stream()
Expand Down Expand Up @@ -388,29 +385,37 @@ private ScheduleTaskParameters detailParameters(Schedule schedule) {
switch (schedule.getType()) {
case DATA_ARCHIVE: {
DataArchiveParameters parameters = (DataArchiveParameters) schedule.getParameters();
parameters.setSourceDatabase(databaseService.detail(parameters.getSourceDatabaseId()));
parameters.setTargetDatabase(databaseService.detail(parameters.getTargetDataBaseId()));
parameters.setSourceDatabase(detailDatabaseOrNull(parameters.getSourceDatabaseId()));
parameters.setTargetDatabase(detailDatabaseOrNull(parameters.getTargetDataBaseId()));
limiterService.findByScheduleId(schedule.getId()).ifPresent(parameters::setRateLimit);
return parameters;
}
case DATA_DELETE: {
DataDeleteParameters parameters = (DataDeleteParameters) schedule.getParameters();
if (parameters.getTargetDatabaseId() != null) {
parameters.setTargetDatabase(databaseService.detail(parameters.getTargetDatabaseId()));
parameters.setTargetDatabase(detailDatabaseOrNull(parameters.getTargetDatabaseId()));
}
parameters.setDatabase(databaseService.detail(parameters.getDatabaseId()));
parameters.setDatabase(detailDatabaseOrNull(parameters.getDatabaseId()));
limiterService.findByScheduleId(schedule.getId()).ifPresent(parameters::setRateLimit);
return parameters;
}
case SQL_PLAN: {
SqlPlanParameters parameters = (SqlPlanParameters) schedule.getParameters();
if (parameters.getDatabaseId() != null) {
parameters.setDatabaseInfo(databaseService.detail(parameters.getDatabaseId()));
parameters.setDatabaseInfo(detailDatabaseOrNull(parameters.getDatabaseId()));
}
return parameters;
}
default:
return schedule.getParameters();
}
}

private Database detailDatabaseOrNull(Long databaseId) {
try {
return databaseService.detail(databaseId);
} catch (NotFoundException e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ public Long create(ScheduleChangeParams params) {
: params.getUpdateScheduleReq().getTriggerConfig());
}
req.setParameters(alterScheduleParameters);
req.setProjectId(params.getProjectId());
req.setProjectName(params.getProjectName());
req.setDatabaseName(params.getDatabaseName());
req.setConnectionId(params.getConnectionId());
req.setConnectionName(params.getConnectionName());
req.setEnvironmentId(params.getEnvironmentId());
req.setEnvironmentName(params.getEnvironmentName());
return SpringContextUtil.getBean(FlowInstanceService.class).createAlterSchedule(req);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonProperty.Access;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.oceanbase.odc.service.dlm.model.DataArchiveParameters;
import com.oceanbase.odc.service.dlm.model.DataDeleteParameters;
import com.oceanbase.odc.service.loaddata.model.LoadDataParameters;
import com.oceanbase.odc.service.schedule.processor.ScheduleChangePreprocessor;
import com.oceanbase.odc.service.sqlplan.model.SqlPlanParameters;

import lombok.Data;
Expand Down Expand Up @@ -66,25 +63,5 @@ public class CreateScheduleReq {

private String description;

/**
* Followings are filled by aspect {@link ScheduleChangePreprocessor}
*/
@JsonProperty(access = Access.READ_ONLY)
private Long projectId;
@JsonProperty(access = Access.READ_ONLY)
private String projectName;
@JsonProperty(access = Access.READ_ONLY)
private Long databaseId;
@JsonProperty(access = Access.READ_ONLY)
private String databaseName;
@JsonProperty(access = Access.READ_ONLY)
private Long connectionId;
@JsonProperty(access = Access.READ_ONLY)
private String connectionName;
@JsonProperty(access = Access.READ_ONLY)
private Long environmentId;
@JsonProperty(access = Access.READ_ONLY)
private String environmentName;

}

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package com.oceanbase.odc.service.schedule.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonProperty.Access;

import lombok.Data;

/**
Expand All @@ -36,6 +39,23 @@ public class ScheduleChangeParams {

private UpdateScheduleReq updateScheduleReq;

@JsonProperty(access = Access.READ_ONLY)
private Long projectId;
@JsonProperty(access = Access.READ_ONLY)
private String projectName;
@JsonProperty(access = Access.READ_ONLY)
private Long databaseId;
@JsonProperty(access = Access.READ_ONLY)
private String databaseName;
@JsonProperty(access = Access.READ_ONLY)
private Long connectionId;
@JsonProperty(access = Access.READ_ONLY)
private String connectionName;
@JsonProperty(access = Access.READ_ONLY)
private Long environmentId;
@JsonProperty(access = Access.READ_ONLY)
private String environmentName;

public static ScheduleChangeParams with(Long id, OperationType type) {
ScheduleChangeParams req = new ScheduleChangeParams();
req.setScheduleId(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void process(ScheduleChangeParams params) {
ScheduleType type;
if (params.getOperationType() == OperationType.CREATE) {
type = params.getCreateScheduleReq().getType();
adaptCreateScheduleReq(params.getCreateScheduleReq());
adaptScheduleChangeParams(params);
} else {
type = scheduleService.nullSafeGetModelById(params.getScheduleId()).getType();
}
Expand Down Expand Up @@ -93,7 +93,7 @@ public void afterPropertiesSet() throws Exception {
});
}

private void adaptCreateScheduleReq(CreateScheduleReq req) {
private void adaptScheduleChangeParams(ScheduleChangeParams req) {
Database srcDb = databaseService.detail(getTargetDatabaseId(req));
req.setProjectId(srcDb.getProject().getId());
req.setProjectName(srcDb.getProject().getName());
Expand All @@ -105,7 +105,11 @@ private void adaptCreateScheduleReq(CreateScheduleReq req) {
req.setDatabaseId(srcDb.getId());
}

private Long getTargetDatabaseId(CreateScheduleReq req) {
private Long getTargetDatabaseId(ScheduleChangeParams params) {
if (params.getOperationType() != OperationType.CREATE) {
return scheduleService.nullSafeGetById(params.getScheduleId()).getDatabaseId();
}
CreateScheduleReq req = params.getCreateScheduleReq();
switch (req.getType()) {
case DATA_ARCHIVE: {
DataArchiveParameters parameters = (DataArchiveParameters) req.getParameters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class DefaultJobTerminateListener extends AbstractEventListener<JobTermin
public void onEvent(JobTerminateEvent event) {
JobEntity jobEntity = taskFrameworkService.find(event.getJi().getId());
scheduleTaskService.findByJobId(jobEntity.getId()).ifPresent(o -> {
TaskStatus taskStatus = "DLM".equals(jobEntity.getJobType()) ? dlmService.getTaskStatus(o.getId())
TaskStatus taskStatus = "DLM".equals(jobEntity.getJobType()) ? dlmService.getFinalTaskStatus(o.getId())
: event.getStatus().convertTaskStatus();
scheduleTaskService.updateStatusById(o.getId(), taskStatus);
log.info("Update schedule task status to {} succeed,scheduleTaskId={}", taskStatus, o.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ public void process(TaskResult result) {
log.info("Create or update dlm tableUnits success,jobIdentity={},scheduleTaskId={}",
result.getJobIdentity(),
dlmTableUnits.get(0).getScheduleTaskId());
TaskStatus taskStatus = dlmService.getTaskStatus(dlmTableUnits);
taskService.updateStatusById(dlmTableUnits.get(0).getScheduleTaskId(), taskStatus);
TaskStatus taskStatus = taskService.nullSafeGetById(dlmTableUnits.get(0).getScheduleTaskId()).getStatus();
if (taskStatus != TaskStatus.RUNNING) {
taskService.updateStatusById(dlmTableUnits.get(0).getScheduleTaskId(), TaskStatus.RUNNING);
}
log.info("Update schedule task status to {} success", taskStatus);
} catch (Exception e) {
log.warn("Refresh result failed.", e);
Expand Down

0 comments on commit 4b4d16b

Please sign in to comment.