diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java index 5d3571eaf4..20da69850c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/dlm/DLMService.java @@ -116,21 +116,28 @@ public List findByScheduleTaskId(Long scheduleTaskId) { Collectors.toList()); } + /** + * generate final task status by scheduleTaskId when the task is finished + */ @SkipAuthorize("odc internal usage") - public TaskStatus getTaskStatus(Long scheduleTaskId) { - return getTaskStatus(findByScheduleTaskId(scheduleTaskId)); - } - - public TaskStatus getTaskStatus(List dlmTableUnits) { + public TaskStatus getFinalTaskStatus(Long scheduleTaskId) { + List dlmTableUnits = findByScheduleTaskId(scheduleTaskId); Set collect = dlmTableUnits.stream().map(DlmTableUnit::getStatus).collect( Collectors.toSet()); + // If any table fails, the task is considered a failure. if (collect.contains(TaskStatus.FAILED)) { return TaskStatus.FAILED; } - if (collect.contains(TaskStatus.DONE) && collect.size() == 1) { - return TaskStatus.DONE; + // If any table is canceled, the task is considered canceled. + if (collect.contains(TaskStatus.CANCELED)) { + return TaskStatus.CANCELED; + } + // The task is considered failed if any table is still preparing or running when the task is + // finished. + if (collect.contains(TaskStatus.PREPARING) || collect.contains(TaskStatus.RUNNING)) { + return TaskStatus.FAILED; } - return TaskStatus.CANCELED; + return TaskStatus.DONE; } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/DescriptionGenerator.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/DescriptionGenerator.java index 0884bd1161..d5e7a01177 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/DescriptionGenerator.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/DescriptionGenerator.java @@ -23,7 +23,7 @@ import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; -import com.oceanbase.odc.service.schedule.model.CreateScheduleReq; +import com.oceanbase.odc.service.schedule.model.ScheduleChangeParams; /** * @Author:tinker @@ -51,15 +51,15 @@ public static void generateDescription(CreateFlowInstanceReq req) { } } - public static void generateScheduleDescription(CreateScheduleReq req) { - if (StringUtils.isEmpty(req.getDescription())) { + public static void generateScheduleDescription(ScheduleChangeParams req) { + if (StringUtils.isEmpty(req.getCreateScheduleReq().getDescription())) { String environmentName = req.getEnvironmentName(); String connectionName = req.getConnectionName(); String databaseName = req.getDatabaseName(); String description = StringUtils.isEmpty(connectionName) ? String.format("[%s]%s", environmentName, databaseName) : String.format("[%s]%s.%s", environmentName, connectionName, databaseName); - req.setDescription(description); + req.getCreateScheduleReq().setDescription(description); } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java index 8731d205af..e06e6dbbc2 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/ScheduleService.java @@ -220,7 +220,7 @@ public List dispatchCreateSchedule(CreateFlowInstanceReq createScheduleReq.setParameters(parameters.getScheduleTaskParameters()); createScheduleReq.setTriggerConfig(parameters.getTriggerConfig()); createScheduleReq.setType(parameters.getType()); - createScheduleReq.setDescription(parameters.getDescription()); + createScheduleReq.setDescription(createReq.getDescription()); scheduleChangeParams = ScheduleChangeParams.with(createScheduleReq); break; } @@ -229,7 +229,7 @@ public List dispatchCreateSchedule(CreateFlowInstanceReq updateScheduleReq.setParameters(parameters.getScheduleTaskParameters()); updateScheduleReq.setTriggerConfig(parameters.getTriggerConfig()); updateScheduleReq.setType(parameters.getType()); - updateScheduleReq.setDescription(parameters.getDescription()); + updateScheduleReq.setDescription(createReq.getDescription()); scheduleChangeParams = ScheduleChangeParams.with(parameters.getTaskId(), updateScheduleReq); break; } @@ -257,8 +257,8 @@ public ChangeScheduleResp changeSchedule(ScheduleChangeParams req) { ScheduleEntity entity = new ScheduleEntity(); entity.setName(req.getCreateScheduleReq().getName()); - entity.setProjectId(req.getCreateScheduleReq().getProjectId()); - DescriptionGenerator.generateScheduleDescription(req.getCreateScheduleReq()); + entity.setProjectId(req.getProjectId()); + DescriptionGenerator.generateScheduleDescription(req); entity.setDescription(req.getCreateScheduleReq().getDescription()); entity.setJobParametersJson(JsonUtils.toJson(req.getCreateScheduleReq().getParameters())); entity.setTriggerConfigJson(JsonUtils.toJson(req.getCreateScheduleReq().getTriggerConfig())); @@ -270,9 +270,9 @@ public ChangeScheduleResp changeSchedule(ScheduleChangeParams req) { entity.setOrganizationId(authenticationFacade.currentOrganizationId()); entity.setCreatorId(authenticationFacade.currentUserId()); entity.setModifierId(authenticationFacade.currentUserId()); - entity.setDatabaseId(req.getCreateScheduleReq().getDatabaseId()); - entity.setDatabaseName(req.getCreateScheduleReq().getDatabaseName()); - entity.setDataSourceId(req.getCreateScheduleReq().getConnectionId()); + entity.setDatabaseId(req.getDatabaseId()); + entity.setDatabaseName(req.getDatabaseName()); + entity.setDataSourceId(req.getConnectionId()); targetSchedule = scheduleMapper.entityToModel(scheduleRepository.save(entity)); req.setScheduleId(targetSchedule.getId()); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/factory/ScheduleResponseMapperFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/factory/ScheduleResponseMapperFactory.java index b38e637d12..69e8544350 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/factory/ScheduleResponseMapperFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/factory/ScheduleResponseMapperFactory.java @@ -34,6 +34,7 @@ import com.oceanbase.odc.common.json.JsonUtils; import com.oceanbase.odc.core.shared.constant.FlowStatus; import com.oceanbase.odc.core.shared.constant.TaskType; +import com.oceanbase.odc.core.shared.exception.NotFoundException; import com.oceanbase.odc.metadb.flow.FlowInstanceEntity; import com.oceanbase.odc.metadb.flow.FlowInstanceRepository; import com.oceanbase.odc.metadb.flow.UserTaskInstanceEntity; @@ -195,11 +196,7 @@ public ScheduleDetailRespHist generateHistoryScheduleDetail(Schedule schedule) { resp.setUpdateTime(schedule.getUpdateTime()); resp.setDescription(schedule.getDescription()); resp.setJobParameters(detailParameters(schedule)); - - List databaseByIds = getDatabaseByIds(Collections.singleton(schedule.getDatabaseId())); - if (!databaseByIds.isEmpty()) { - resp.setDatabase(databaseByIds.get(0)); - } + resp.setDatabase(detailDatabaseOrNull(schedule.getDatabaseId())); Set approvableFlowInstanceIds = approvalPermissionService.getApprovableApprovalInstances() .stream() @@ -388,24 +385,24 @@ private ScheduleTaskParameters detailParameters(Schedule schedule) { switch (schedule.getType()) { case DATA_ARCHIVE: { DataArchiveParameters parameters = (DataArchiveParameters) schedule.getParameters(); - parameters.setSourceDatabase(databaseService.detail(parameters.getSourceDatabaseId())); - parameters.setTargetDatabase(databaseService.detail(parameters.getTargetDataBaseId())); + parameters.setSourceDatabase(detailDatabaseOrNull(parameters.getSourceDatabaseId())); + parameters.setTargetDatabase(detailDatabaseOrNull(parameters.getTargetDataBaseId())); limiterService.findByScheduleId(schedule.getId()).ifPresent(parameters::setRateLimit); return parameters; } case DATA_DELETE: { DataDeleteParameters parameters = (DataDeleteParameters) schedule.getParameters(); if (parameters.getTargetDatabaseId() != null) { - parameters.setTargetDatabase(databaseService.detail(parameters.getTargetDatabaseId())); + parameters.setTargetDatabase(detailDatabaseOrNull(parameters.getTargetDatabaseId())); } - parameters.setDatabase(databaseService.detail(parameters.getDatabaseId())); + parameters.setDatabase(detailDatabaseOrNull(parameters.getDatabaseId())); limiterService.findByScheduleId(schedule.getId()).ifPresent(parameters::setRateLimit); return parameters; } case SQL_PLAN: { SqlPlanParameters parameters = (SqlPlanParameters) schedule.getParameters(); if (parameters.getDatabaseId() != null) { - parameters.setDatabaseInfo(databaseService.detail(parameters.getDatabaseId())); + parameters.setDatabaseInfo(detailDatabaseOrNull(parameters.getDatabaseId())); } return parameters; } @@ -413,4 +410,12 @@ private ScheduleTaskParameters detailParameters(Schedule schedule) { return schedule.getParameters(); } } + + private Database detailDatabaseOrNull(Long databaseId) { + try { + return databaseService.detail(databaseId); + } catch (NotFoundException e) { + return null; + } + } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DefaultApprovalFlowClient.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DefaultApprovalFlowClient.java index a6a8f7b870..7861dc8c3f 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DefaultApprovalFlowClient.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/flowtask/DefaultApprovalFlowClient.java @@ -45,6 +45,13 @@ public Long create(ScheduleChangeParams params) { : params.getUpdateScheduleReq().getTriggerConfig()); } req.setParameters(alterScheduleParameters); + req.setProjectId(params.getProjectId()); + req.setProjectName(params.getProjectName()); + req.setDatabaseName(params.getDatabaseName()); + req.setConnectionId(params.getConnectionId()); + req.setConnectionName(params.getConnectionName()); + req.setEnvironmentId(params.getEnvironmentId()); + req.setEnvironmentName(params.getEnvironmentName()); return SpringContextUtil.getBean(FlowInstanceService.class).createAlterSchedule(req); } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/CreateScheduleReq.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/CreateScheduleReq.java index 7dbe966971..754292698d 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/CreateScheduleReq.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/CreateScheduleReq.java @@ -18,8 +18,6 @@ import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonProperty.Access; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo.As; @@ -27,7 +25,6 @@ import com.oceanbase.odc.service.dlm.model.DataArchiveParameters; import com.oceanbase.odc.service.dlm.model.DataDeleteParameters; import com.oceanbase.odc.service.loaddata.model.LoadDataParameters; -import com.oceanbase.odc.service.schedule.processor.ScheduleChangePreprocessor; import com.oceanbase.odc.service.sqlplan.model.SqlPlanParameters; import lombok.Data; @@ -66,25 +63,5 @@ public class CreateScheduleReq { private String description; - /** - * Followings are filled by aspect {@link ScheduleChangePreprocessor} - */ - @JsonProperty(access = Access.READ_ONLY) - private Long projectId; - @JsonProperty(access = Access.READ_ONLY) - private String projectName; - @JsonProperty(access = Access.READ_ONLY) - private Long databaseId; - @JsonProperty(access = Access.READ_ONLY) - private String databaseName; - @JsonProperty(access = Access.READ_ONLY) - private Long connectionId; - @JsonProperty(access = Access.READ_ONLY) - private String connectionName; - @JsonProperty(access = Access.READ_ONLY) - private Long environmentId; - @JsonProperty(access = Access.READ_ONLY) - private String environmentName; - } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/ScheduleChangeParams.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/ScheduleChangeParams.java index 39d353664d..80d1b7040f 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/ScheduleChangeParams.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/model/ScheduleChangeParams.java @@ -15,6 +15,9 @@ */ package com.oceanbase.odc.service.schedule.model; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonProperty.Access; + import lombok.Data; /** @@ -36,6 +39,23 @@ public class ScheduleChangeParams { private UpdateScheduleReq updateScheduleReq; + @JsonProperty(access = Access.READ_ONLY) + private Long projectId; + @JsonProperty(access = Access.READ_ONLY) + private String projectName; + @JsonProperty(access = Access.READ_ONLY) + private Long databaseId; + @JsonProperty(access = Access.READ_ONLY) + private String databaseName; + @JsonProperty(access = Access.READ_ONLY) + private Long connectionId; + @JsonProperty(access = Access.READ_ONLY) + private String connectionName; + @JsonProperty(access = Access.READ_ONLY) + private Long environmentId; + @JsonProperty(access = Access.READ_ONLY) + private String environmentName; + public static ScheduleChangeParams with(Long id, OperationType type) { ScheduleChangeParams req = new ScheduleChangeParams(); req.setScheduleId(id); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/ScheduleChangePreprocessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/ScheduleChangePreprocessor.java index 236dbc2a60..fad0339aa4 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/ScheduleChangePreprocessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/schedule/processor/ScheduleChangePreprocessor.java @@ -65,7 +65,7 @@ public void process(ScheduleChangeParams params) { ScheduleType type; if (params.getOperationType() == OperationType.CREATE) { type = params.getCreateScheduleReq().getType(); - adaptCreateScheduleReq(params.getCreateScheduleReq()); + adaptScheduleChangeParams(params); } else { type = scheduleService.nullSafeGetModelById(params.getScheduleId()).getType(); } @@ -93,7 +93,7 @@ public void afterPropertiesSet() throws Exception { }); } - private void adaptCreateScheduleReq(CreateScheduleReq req) { + private void adaptScheduleChangeParams(ScheduleChangeParams req) { Database srcDb = databaseService.detail(getTargetDatabaseId(req)); req.setProjectId(srcDb.getProject().getId()); req.setProjectName(srcDb.getProject().getName()); @@ -105,7 +105,11 @@ private void adaptCreateScheduleReq(CreateScheduleReq req) { req.setDatabaseId(srcDb.getId()); } - private Long getTargetDatabaseId(CreateScheduleReq req) { + private Long getTargetDatabaseId(ScheduleChangeParams params) { + if (params.getOperationType() != OperationType.CREATE) { + return scheduleService.nullSafeGetById(params.getScheduleId()).getDatabaseId(); + } + CreateScheduleReq req = params.getCreateScheduleReq(); switch (req.getType()) { case DATA_ARCHIVE: { DataArchiveParameters parameters = (DataArchiveParameters) req.getParameters(); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java index 886f68cbe7..c68d31a22d 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/listener/DefaultJobTerminateListener.java @@ -64,7 +64,7 @@ public class DefaultJobTerminateListener extends AbstractEventListener { - TaskStatus taskStatus = "DLM".equals(jobEntity.getJobType()) ? dlmService.getTaskStatus(o.getId()) + TaskStatus taskStatus = "DLM".equals(jobEntity.getJobType()) ? dlmService.getFinalTaskStatus(o.getId()) : event.getStatus().convertTaskStatus(); scheduleTaskService.updateStatusById(o.getId(), taskStatus); log.info("Update schedule task status to {} succeed,scheduleTaskId={}", taskStatus, o.getId()); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/DLMResultProcessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/DLMResultProcessor.java index ae44a56626..0bfb1cae6e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/DLMResultProcessor.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/processor/DLMResultProcessor.java @@ -60,8 +60,10 @@ public void process(TaskResult result) { log.info("Create or update dlm tableUnits success,jobIdentity={},scheduleTaskId={}", result.getJobIdentity(), dlmTableUnits.get(0).getScheduleTaskId()); - TaskStatus taskStatus = dlmService.getTaskStatus(dlmTableUnits); - taskService.updateStatusById(dlmTableUnits.get(0).getScheduleTaskId(), taskStatus); + TaskStatus taskStatus = taskService.nullSafeGetById(dlmTableUnits.get(0).getScheduleTaskId()).getStatus(); + if (taskStatus != TaskStatus.RUNNING) { + taskService.updateStatusById(dlmTableUnits.get(0).getScheduleTaskId(), TaskStatus.RUNNING); + } log.info("Update schedule task status to {} success", taskStatus); } catch (Exception e) { log.warn("Refresh result failed.", e);