diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowApprovalInstanceTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowApprovalInstanceTest.java index ebf58515d0..65e94ec921 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowApprovalInstanceTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowApprovalInstanceTest.java @@ -204,7 +204,7 @@ public void approve_approveWithComment_approveSucceed() { private FlowApprovalInstance createApprovalInstance(boolean startEndPoint, boolean endEndPoint, String name, String activityId, Long flowInstanceId) { - FlowApprovalInstance instance = new FlowApprovalInstance(1L, flowInstanceId, null, null, 12, startEndPoint, + FlowApprovalInstance instance = new FlowApprovalInstance(1L, flowInstanceId, null, 12, startEndPoint, endEndPoint, false, flowableAdaptor, taskService, formService, new LocalEventPublisher(), authenticationFacade, nodeRepository, sequenceRepository, userTaskInstanceRepository); instance.setName(name); diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowFactoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowFactoryTest.java index cc13b6575c..fc55073199 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowFactoryTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowFactoryTest.java @@ -123,7 +123,7 @@ public void generateFlowGatewayInstance_createNewOne_returnFlowGatewayInstance() public void generateFlowApprovalInstance_createNewOne_returnFlowApprovalInstance() { FlowInstance flowInstance = flowFactory.generateFlowInstance("Test FlowInstance", "Test"); FlowApprovalInstance approvalInstance = - flowFactory.generateFlowApprovalInstance(flowInstance.getId(), true, false, false, 15, null, null); + flowFactory.generateFlowApprovalInstance(flowInstance.getId(), true, false, false, 15, null); Assert.assertEquals((long) flowInstance.getId(), approvalInstance.getFlowInstanceId()); } @@ -190,7 +190,7 @@ private FlowTaskInstance createTaskInstance(Long flowInstanceId, ExecutionStrate private FlowApprovalInstance createApprovalInstance(Long flowInstanceId) { return new FlowApprovalInstance(authenticationFacade.currentOrganizationId(), flowInstanceId, null, - null, 10, false, false, false, flowAdaptor, taskService, formService, new LocalEventPublisher(), + 10, false, false, false, flowAdaptor, taskService, formService, new LocalEventPublisher(), authenticationFacade, nodeRepository, sequenceRepository, userTaskInstanceRepository); } diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceServiceTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceServiceTest.java index 16c5abeadd..281632363f 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceServiceTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceServiceTest.java @@ -551,7 +551,7 @@ public TestFlowApprovalInstance(@NonNull Long organizationId, @NonNull Long flow @NonNull NodeInstanceEntityRepository nodeRepository, @NonNull SequenceInstanceRepository sequenceRepository, @NonNull UserTaskInstanceRepository userTaskInstanceRepository) { - super(organizationId, flowInstanceId, null, null, expireIntervalSeconds, startEndpoint, endEndPoint, false, + super(organizationId, flowInstanceId, null, expireIntervalSeconds, startEndpoint, endEndPoint, false, flowableAdaptor, taskService, formService, eventPublisher, authenticationFacade, nodeRepository, sequenceRepository, userTaskInstanceRepository); } diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceTest.java index aeaf428a1d..2b596027be 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceTest.java @@ -296,7 +296,7 @@ private FlowTaskInstance createTaskInstance(Long flowInstanceId, TaskType taskTy } private FlowApprovalInstance createApprovalInstance(Long flowInstanceId, Integer expireIntervalSeconds) { - return new FlowApprovalInstance(authenticationFacade.currentOrganizationId(), flowInstanceId, null, null, + return new FlowApprovalInstance(authenticationFacade.currentOrganizationId(), flowInstanceId, null, expireIntervalSeconds, false, false, false, flowableAdaptor, taskService, formService, new LocalEventPublisher(), authenticationFacade, nodeRepository, sequenceRepository, userTaskInstanceRepository); diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceUtilTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceUtilTest.java index 13ebcc8d58..8205a910e4 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceUtilTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowInstanceUtilTest.java @@ -164,7 +164,7 @@ private FlowTaskInstance createTaskInstance(Long flowInstanceId, ExecutionStrate private FlowApprovalInstance createApprovalInstance(Long flowInstanceId) { return new FlowApprovalInstance(authenticationFacade.currentOrganizationId(), flowInstanceId, null, - null, 10, false, false, false, flowAdaptor, taskService, formService, new LocalEventPublisher(), + 10, false, false, false, flowAdaptor, taskService, formService, new LocalEventPublisher(), authenticationFacade, nodeRepository, sequenceRepository, userTaskInstanceRepository); } diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowResponseMapperFactoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowResponseMapperFactoryTest.java index 8a97025903..49fdf10b70 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowResponseMapperFactoryTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowResponseMapperFactoryTest.java @@ -259,7 +259,7 @@ private Database getDatabase() { private FlowApprovalInstance createApprovalInstance(Long flowInstanceId, Integer expireIntervalSeconds, boolean startEndPoint, boolean endEndPoint) { return new FlowApprovalInstance(authenticationFacade.currentOrganizationId(), flowInstanceId, null, - null, expireIntervalSeconds, startEndPoint, endEndPoint, false, flowAdaptor, taskService, formService, + expireIntervalSeconds, startEndPoint, endEndPoint, false, flowAdaptor, taskService, formService, eventPublisher, authenticationFacade, nodeRepository, sequenceRepository, userTaskInstanceRepository); } diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowableAdaptorTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowableAdaptorTest.java index df9e0cd864..340eb3313f 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowableAdaptorTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/flow/FlowableAdaptorTest.java @@ -207,7 +207,7 @@ private FlowGatewayInstance createGatewayInstance(Long flowInstanceId) { } private FlowApprovalInstance createApprovalInstance(Long flowInstanceId) { - return new FlowApprovalInstance(1L, flowInstanceId, null, null, 12, true, true, false, flowableAdaptor, + return new FlowApprovalInstance(1L, flowInstanceId, null, 12, true, true, false, flowableAdaptor, taskService, formService, new LocalEventPublisher(), authenticationFacade, nodeRepository, sequenceRepository, userTaskInstanceRepository); } diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/integration/IntegrationTestUtil.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/integration/IntegrationTestUtil.java index d378437950..3ece702d96 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/integration/IntegrationTestUtil.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/integration/IntegrationTestUtil.java @@ -18,6 +18,7 @@ import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; @@ -70,7 +71,7 @@ public static IntegrationConfig createApprovalConfig() { } public static TemplateVariables createTemplateVariables() { - Map variables = new HashMap<>(); + Map variables = new HashMap<>(); variables.put(Variable.USER_NAME.key(), "Jack"); variables.put(Variable.USER_ACCOUNT.key(), "Jack"); variables.put(Variable.USER_ID.key(), 10001L); diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/model/FlowableElement.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/model/FlowableElement.java index 3fca41964e..204e3ea443 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/model/FlowableElement.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/flow/model/FlowableElement.java @@ -16,6 +16,7 @@ package com.oceanbase.odc.core.flow.model; import com.oceanbase.odc.core.flow.builder.ErrorBoundaryEventBuilder; +import com.oceanbase.odc.core.flow.builder.ServiceTaskBuilder; import com.oceanbase.odc.core.flow.builder.SignalCatchEventBuilder; import com.oceanbase.odc.core.flow.builder.TimerBoundaryEventBuilder; import com.oceanbase.odc.core.flow.builder.UserTaskBuilder; @@ -64,4 +65,8 @@ public FlowableElement(@NonNull UserTaskBuilder userTaskBuilder) { this(userTaskBuilder.getGraphId(), userTaskBuilder.getName(), FlowableElementType.USER_TASK); } + public FlowableElement(@NonNull ServiceTaskBuilder serviceTaskBuilder) { + this(serviceTaskBuilder.getGraphId(), serviceTaskBuilder.getName(), FlowableElementType.SERVICE_TASK); + } + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/UserTaskInstanceRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/UserTaskInstanceRepository.java index 8afd2da5b2..d04ea6723d 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/UserTaskInstanceRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/flow/UserTaskInstanceRepository.java @@ -72,7 +72,8 @@ List findByResourceRoleIdentifierIn( @Transactional @Query("update UserTaskInstanceEntity as ut set ut.userTaskId=:#{#param.userTaskId},ut.status=:#{#param.status}," + "ut.approved=:#{#param.approved},ut.operatorId=:#{#param.operatorId},ut.comment=:#{#param.comment}," - + "ut.expireIntervalSeconds=:#{#param.expireIntervalSeconds} where ut.id=:#{#param.id}") + + "ut.expireIntervalSeconds=:#{#param.expireIntervalSeconds},ut.externalFlowInstanceId=:#{#param.externalFlowInstanceId}" + + " where ut.id=:#{#param.id}") @Modifying int update(@Param("param") UserTaskInstanceEntity entity); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java index 3fa2723357..b80ebba52b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java @@ -212,7 +212,6 @@ public class FlowInstanceService { private final List> shadowTableComparingTaskHooks = new ArrayList<>(); private static final long MAX_EXPORT_OBJECT_COUNT = 10000; private static final String ODC_SITE_URL = "odc.site.url"; - private static final String INVALID_EXTERNAL_INSTANCE_ID = "N/A"; @PostConstruct public void init() { @@ -273,11 +272,6 @@ public List create(@NotNull @Valid CreateFlowInstanceReq PreConditions.maxLength(taskParameters.getSqlContent(), "sql content", flowTaskProperties.getSqlContentMaxLength()); } - - Long connId = createReq.getConnectionId(); - ConnectionConfig conn = connectionService.getForConnectionSkipPermissionCheck(connId); - - // acquire export masking policy if (createReq.getTaskType() == TaskType.EXPORT) { DataTransferConfig dataTransferConfig = (DataTransferConfig) createReq.getParameters(); if (dataTransferConfig.getExportDbObjects().size() > MAX_EXPORT_OBJECT_COUNT) { @@ -288,6 +282,7 @@ public List create(@NotNull @Valid CreateFlowInstanceReq } List riskLevels = riskLevelService.list(); Verify.notEmpty(riskLevels, "riskLevels"); + ConnectionConfig conn = connectionService.getForConnectionSkipPermissionCheck(createReq.getConnectionId()); return Collections.singletonList(buildFlowInstance(riskLevels, createReq, conn)); } @@ -488,11 +483,11 @@ private FlowInstanceDetailResp cancel(@NotNull FlowInstance flowInstance, Boolea Verify.singleton(approvalInstances, "FlowApprovalInstance"); FlowApprovalInstance instance = approvalInstances.get(0); Verify.verify(instance.isPresentOnThisMachine(), "Approval instance is not on this machine"); + // Cancel external process instance when related ODC flow instance is cancelled + cancelAllRelatedExternalInstance(flowInstance); instance.disApprove(null, !skipAuth); flowInstanceRepository.updateStatusById(instance.getFlowInstanceId(), FlowStatus.CANCELLED); userTaskInstanceRepository.updateStatusById(instance.getId(), FlowNodeStatus.CANCELLED); - // Cancel external process instance when related ODC approval node is cancelled - cancelAllRelatedExternalInstance(flowInstance); return FlowInstanceDetailResp.withIdAndType(id, taskTypeHolder.getValue()); } @@ -691,8 +686,8 @@ private FlowInstanceDetailResp buildFlowInstance(List riskLevels, flowInstance.newFlowInstance().next(riskDetectInstance).next(riskLevelGateway); for (int i = 0; i < riskLevels.size(); i++) { FlowInstanceConfigurer targetConfigurer = buildConfigurer(riskLevels.get(i).getApprovalFlowConfig(), - flowInstance, flowInstanceReq.getTaskType(), connectionConfig, - taskEntity.getId(), flowInstanceReq.getParameters(), flowInstanceReq); + flowInstance, flowInstanceReq.getTaskType(), taskEntity.getId(), + flowInstanceReq.getParameters(), flowInstanceReq); startConfigurer.route( String.format("${%s == %d}", RuntimeTaskConstants.RISKLEVEL, riskLevels.get(i).getLevel()), targetConfigurer); @@ -733,7 +728,6 @@ private FlowInstanceConfigurer buildConfigurer( @NonNull ApprovalFlowConfig approvalFlowConfig, @NonNull FlowInstance flowInstance, @NonNull TaskType taskType, - @NonNull ConnectionConfig connectionConfig, @NonNull Long targetTaskId, @NonNull TaskParameters parameters, @NonNull CreateFlowInstanceReq flowInstanceReq) { @@ -744,23 +738,10 @@ private FlowInstanceConfigurer buildConfigurer( FlowInstanceConfigurer configurer; ApprovalNodeConfig nodeConfig = nodeConfigs.get(nodeSequence); Long resourceRoleId = nodeConfig.getResourceRoleId(); - String externalFlowInstanceId = null; - Long externalApprovalId = nodeConfig.getExternalApprovalId(); - if (Objects.nonNull(externalApprovalId)) { - IntegrationConfig config = integrationService.detailWithoutPermissionCheck(externalApprovalId); - ApprovalProperties properties = ApprovalProperties.from(config); - TemplateVariables variables = buildTemplateVariables(flowInstanceReq, connectionConfig); - try { - externalFlowInstanceId = approvalClient.start(properties, variables); - } catch (Exception e) { - externalFlowInstanceId = INVALID_EXTERNAL_INSTANCE_ID; - log.warn("Create external approval instance failed, the instance will be force closed!"); - } - } FlowApprovalInstance approvalInstance = flowFactory.generateFlowApprovalInstance(flowInstance.getId(), false, false, nodeConfig.getAutoApproval(), approvalFlowConfig.getApprovalExpirationIntervalSeconds(), - externalApprovalId, externalFlowInstanceId); + nodeConfig.getExternalApprovalId()); if (Objects.nonNull(resourceRoleId)) { approvalPermissionService.setCandidateResourceRole(approvalInstance.getId(), StringUtils.join(flowInstanceReq.getProjectId(), ":", resourceRoleId)); @@ -861,11 +842,10 @@ private TemplateVariables buildTemplateVariables(CreateFlowInstanceReq flowInsta variables.setAttribute(Variable.TASK_TYPE, taskType.getLocalizedMessage()); variables.setAttribute(Variable.TASK_DETAILS, JsonUtils.toJson(flowInstanceReq.getParameters())); // set connection related variables - ConnectionConfig connection = connectionService.getWithoutPermissionCheck(flowInstanceReq.getConnectionId()); - if (Objects.nonNull(connection)) { - variables.setAttribute(Variable.CONNECTION_NAME, connection.getName()); - variables.setAttribute(Variable.CONNECTION_TENANT, connection.getTenantName()); - for (Entry entry : connection.getProperties().entrySet()) { + if (Objects.nonNull(config)) { + variables.setAttribute(Variable.CONNECTION_NAME, config.getName()); + variables.setAttribute(Variable.CONNECTION_TENANT, config.getTenantName()); + for (Entry entry : config.getProperties().entrySet()) { variables.setAttribute(Variable.CONNECTION_PROPERTIES, entry.getKey(), entry.getValue()); } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowFactory.java index fa3cd5615f..dcdb70b4df 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowFactory.java @@ -126,11 +126,10 @@ public FlowGatewayInstance generateFlowGatewayInstance(@NonNull Long flowInstanc } public FlowApprovalInstance generateFlowApprovalInstance(@NonNull Long flowInstanceId, boolean isStartEndPoint, - boolean isEndEndPoint, boolean autoApprove, int expireIntervalSeconds, Long externalApprovalId, - String externalFlowInstanceId) { + boolean isEndEndPoint, boolean autoApprove, int expireIntervalSeconds, Long externalApprovalId) { Verify.verify(expireIntervalSeconds > 0, "ApprovalExpirationInterval can not be negative"); return new FlowApprovalInstance(authenticationFacade.currentOrganizationId(), flowInstanceId, - externalApprovalId, externalFlowInstanceId, expireIntervalSeconds, isStartEndPoint, isEndEndPoint, + externalApprovalId, expireIntervalSeconds, isStartEndPoint, isEndEndPoint, autoApprove, flowableAdaptor, flowableTaskService, formService, eventPublisher, authenticationFacade, nodeRepository, sequenceRepository, userTaskInstanceRepository); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowResponseMapperFactory.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowResponseMapperFactory.java index 6f183dc02f..89e51e7cd7 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowResponseMapperFactory.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/factory/FlowResponseMapperFactory.java @@ -57,6 +57,7 @@ import com.oceanbase.odc.metadb.iam.UserRoleRepository; import com.oceanbase.odc.metadb.iam.resourcerole.UserResourceRoleEntity; import com.oceanbase.odc.metadb.iam.resourcerole.UserResourceRoleRepository; +import com.oceanbase.odc.metadb.integration.IntegrationEntity; import com.oceanbase.odc.metadb.regulation.risklevel.RiskLevelRepository; import com.oceanbase.odc.metadb.task.TaskEntity; import com.oceanbase.odc.metadb.task.TaskRepository; @@ -235,6 +236,10 @@ private FlowNodeInstanceMapper generateNodeMapper(@NonNull Collection flow .withGetTaskById(taskId2TaskEntity::get) .withGetUserById(userId2User::get) .withGetRolesByUserId(userId2Roles::get) + .withGetExternalApprovalNameById(externalApprovalId -> { + IntegrationEntity config = integrationService.nullSafeGet(externalApprovalId); + return config.getName(); + }) .withGetExternalUrlByExternalId(externalApproval -> { IntegrationConfig config = integrationService.detailWithoutPermissionCheck(externalApproval.getApprovalId()); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowApprovalInstance.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowApprovalInstance.java index 3626a57a65..cb0c87046c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowApprovalInstance.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowApprovalInstance.java @@ -40,6 +40,7 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.NonNull; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; /** @@ -56,12 +57,14 @@ public class FlowApprovalInstance extends BaseFlowUserTaskInstance { public static final String APPROVAL_VARIABLE_NAME = "approved"; + @Setter private String comment; private Long operatorId; private boolean approved; private final Integer expireIntervalSeconds; private final boolean autoApprove; - private final String externalFlowInstanceId; + @Setter + private String externalFlowInstanceId; private final Long externalApprovalId; private boolean waitForConfirm; @@ -94,7 +97,7 @@ public FlowApprovalInstance(@NonNull UserTaskInstanceEntity entity, @NonNull Flo } public FlowApprovalInstance(@NonNull Long organizationId, @NonNull Long flowInstanceId, - Long externalApprovalId, String externalFlowInstanceId, + Long externalApprovalId, @NonNull Integer expireIntervalSeconds, boolean startEndpoint, boolean endEndPoint, boolean autoApprove, @NonNull FlowableAdaptor flowableAdaptor, @NonNull TaskService taskService, @NonNull FormService formService, @@ -109,7 +112,6 @@ public FlowApprovalInstance(@NonNull Long organizationId, @NonNull Long flowInst this.authenticationFacade = authenticationFacade; this.expireIntervalSeconds = expireIntervalSeconds; this.autoApprove = autoApprove; - this.externalFlowInstanceId = externalFlowInstanceId; this.externalApprovalId = externalApprovalId; alloc(); create(); @@ -135,7 +137,6 @@ public FlowApprovalInstance(@NonNull Long organizationId, @NonNull Long flowInst this.expireIntervalSeconds = expireIntervalSeconds; this.autoApprove = autoApprove; this.waitForConfirm = waitForConfirm; - this.externalFlowInstanceId = null; this.externalApprovalId = null; alloc(); create(); @@ -188,6 +189,7 @@ public void update() { entity.setOperatorId(getOperatorId()); entity.setComment(getComment()); entity.setExpireIntervalSeconds(getExpireIntervalSeconds()); + entity.setExternalFlowInstanceId(getExternalFlowInstanceId()); int affectRows = userTaskInstanceRepository.update(entity); log.info("Update approval task instance successfully, affectRows={}, approvalTask={}", affectRows, entity); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java index 4c095a299f..0e721a356b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/instance/FlowInstanceConfigurer.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Date; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Supplier; @@ -54,6 +55,8 @@ import com.oceanbase.odc.service.flow.model.FlowNodeType; import com.oceanbase.odc.service.flow.model.FlowTaskExecutionStrategy; import com.oceanbase.odc.service.flow.task.BaseRuntimeFlowableDelegate; +import com.oceanbase.odc.service.flow.task.CreateExternalApprovalTask; +import com.oceanbase.odc.service.flow.task.model.RuntimeTaskConstants; import lombok.Getter; import lombok.NonNull; @@ -208,10 +211,28 @@ protected FlowInstanceConfigurer next(@NonNull FlowApprovalInstance nextNode, userTaskConsumer.accept(builder); return builder; }); - targetExecution.next(userTaskBuilder); + if (Objects.nonNull(nextNode.getExternalApprovalId())) { + String serviceTaskName = FlowNodeType.APPROVAL_TASK.name() + "_external_approval_task_" + nextNode.getId(); + ServiceTaskBuilder serviceTaskBuilder = nullSafeGetNodeBuilder(serviceTaskName, nextNode, + () -> new ServiceTaskBuilder(serviceTaskName, CreateExternalApprovalTask.class)); + if (this.requiresActivityIdAndName) { + flowableAdaptor.setFlowableElement(nextNode, new FlowableElement(serviceTaskBuilder)); + } + String gatewayName = FlowNodeType.APPROVAL_TASK.name() + "_external_approval_gateway_" + nextNode.getId(); + ExclusiveGatewayBuilder gatewayBuilder = nullSafeGetNodeBuilder(gatewayName, nextNode, + () -> new ExclusiveGatewayBuilder(gatewayName)); + targetExecution.next(serviceTaskBuilder).next(gatewayBuilder); + String expr = RuntimeTaskConstants.SUCCESS_CREATE_EXT_INS + "_" + nextNode.getId(); + targetExecution.route(String.format("${!%s}", expr), this.targetProcessBuilder.endProcess()); + targetExecution.next(userTaskBuilder, new ConditionSequenceFlowBuilder( + gatewayBuilder.getGraphId() + " -> " + serviceTaskBuilder.getGraphId(), + String.format("${%s}", expr))); + } else { + targetExecution.next(userTaskBuilder); + } if (log.isDebugEnabled()) { log.debug( - "Successfully set up the approval task node instance, instanceId={}, intanceType={}, activityId={}, name={}", + "Successfully set up the approval task node instance, instanceId={}, instanceType={}, activityId={}, name={}", nextNode.getId(), nextNode.getNodeType(), userTaskBuilder.getGraphId(), userTaskBuilder.getName()); } return next(userTaskBuilder, nextNode); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowNodeInstanceDetailResp.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowNodeInstanceDetailResp.java index 8704b4672d..2c4301be37 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowNodeInstanceDetailResp.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowNodeInstanceDetailResp.java @@ -15,6 +15,7 @@ */ package com.oceanbase.odc.service.flow.model; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Objects; @@ -59,7 +60,7 @@ public class FlowNodeInstanceDetailResp { private Date createTime; private Date completeTime; private Date deadlineTime; - private Boolean externalApprove; + private String externalApprovalName; private String externalFlowInstanceUrl; private Integer issueCount; private List unauthorizedDatabaseNames; @@ -74,6 +75,13 @@ public static class FlowNodeInstanceMapper { private Function> getRolesByUserId = null; private Function> getCandidatesByApprovalId = null; private Function getExternalUrlByExternalId = null; + private Function getExternalApprovalNameById = null; + + public FlowNodeInstanceMapper withGetExternalApprovalNameById( + @NonNull Function getExternalApprovalNameById) { + this.getExternalApprovalNameById = getExternalApprovalNameById; + return this; + } public FlowNodeInstanceMapper withGetExternalUrlByExternalId( @NonNull Function getExternalUrlByExternalId) { @@ -142,8 +150,7 @@ public FlowNodeInstanceDetailResp map(@NonNull FlowTaskInstance instance) { } if (Objects.nonNull(result.getPermissionCheckResult())) { resp.setUnauthorizedDatabaseNames( - result.getPermissionCheckResult().getUnauthorizedDatabaseNames().stream().collect( - Collectors.toList())); + new ArrayList<>(result.getPermissionCheckResult().getUnauthorizedDatabaseNames())); } } } @@ -153,12 +160,13 @@ public FlowNodeInstanceDetailResp map(@NonNull FlowTaskInstance instance) { public FlowNodeInstanceDetailResp map(@NonNull FlowApprovalInstance instance) { FlowNodeInstanceDetailResp resp = commonMap(instance); resp.setAutoApprove(instance.isAutoApprove()); - if (StringUtils.isNotBlank(instance.getExternalFlowInstanceId())) { - resp.setExternalApprove(true); - ExternalApproval externalApproval = ExternalApproval.builder() - .approvalId(instance.getExternalApprovalId()) - .instanceId(instance.getExternalFlowInstanceId()).build(); - resp.setExternalFlowInstanceUrl(getExternalUrlByExternalId.apply(externalApproval)); + if (Objects.nonNull(instance.getExternalApprovalId())) { + resp.setExternalApprovalName(getExternalApprovalNameById.apply(instance.getExternalApprovalId())); + if (StringUtils.isNotBlank(instance.getExternalFlowInstanceId())) { + resp.setExternalFlowInstanceUrl(getExternalUrlByExternalId.apply(ExternalApproval.builder() + .approvalId(instance.getExternalApprovalId()) + .instanceId(instance.getExternalFlowInstanceId()).build())); + } } if (instance.getStatus().isFinalStatus()) { if (!instance.isApproved() && instance.getStatus() == FlowNodeStatus.COMPLETED) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java new file mode 100644 index 0000000000..d5715b44a6 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/CreateExternalApprovalTask.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.odc.service.flow.task; + +import java.util.Objects; +import java.util.Optional; + +import org.flowable.engine.delegate.DelegateExecution; +import org.springframework.beans.factory.annotation.Autowired; + +import com.oceanbase.odc.common.util.RetryExecutor; +import com.oceanbase.odc.core.flow.BaseFlowableDelegate; +import com.oceanbase.odc.core.shared.constant.FlowStatus; +import com.oceanbase.odc.metadb.flow.FlowInstanceRepository; +import com.oceanbase.odc.service.flow.FlowableAdaptor; +import com.oceanbase.odc.service.flow.instance.FlowApprovalInstance; +import com.oceanbase.odc.service.flow.model.FlowNodeStatus; +import com.oceanbase.odc.service.flow.task.model.RuntimeTaskConstants; +import com.oceanbase.odc.service.flow.util.FlowTaskUtil; +import com.oceanbase.odc.service.integration.IntegrationService; +import com.oceanbase.odc.service.integration.client.ApprovalClient; +import com.oceanbase.odc.service.integration.model.ApprovalProperties; +import com.oceanbase.odc.service.integration.model.IntegrationConfig; +import com.oceanbase.odc.service.integration.model.TemplateVariables; + +import lombok.extern.slf4j.Slf4j; + +/** + * @author gaoda.xy + * @date 2023/9/1 14:00 + */ +@Slf4j +public class CreateExternalApprovalTask extends BaseFlowableDelegate { + + @Autowired + private FlowableAdaptor flowableAdaptor; + @Autowired + private IntegrationService integrationService; + @Autowired + private ApprovalClient approvalClient; + @Autowired + private FlowInstanceRepository flowInstanceRepository; + private final RetryExecutor retryExecutor = RetryExecutor.builder().retryIntervalMillis(1000).retryTimes(3).build(); + + @Override + protected void run(DelegateExecution execution) throws Exception { + FlowApprovalInstance flowApprovalInstance; + try { + flowApprovalInstance = getFlowApprovalInstance(execution); + } catch (Exception e) { + log.warn("Get flow approval instance failed, activityId={}, processDefinitionId={}", + execution.getCurrentActivityId(), execution.getProcessDefinitionId(), e); + return; + } + Long externalApprovalId = flowApprovalInstance.getExternalApprovalId(); + String expr = RuntimeTaskConstants.SUCCESS_CREATE_EXT_INS + "_" + flowApprovalInstance.getId(); + if (Objects.nonNull(externalApprovalId)) { + try { + IntegrationConfig config = integrationService.detailWithoutPermissionCheck(externalApprovalId); + ApprovalProperties properties = ApprovalProperties.from(config); + TemplateVariables variables = FlowTaskUtil.getTemplateVariables(execution.getVariables()); + String externalFlowInstanceId = approvalClient.start(properties, variables); + flowApprovalInstance.setExternalFlowInstanceId(externalFlowInstanceId); + flowApprovalInstance.update(); + execution.setVariable(expr, true); + } catch (Exception e) { + log.warn("Create external approval instance failed, the flow instance is coming to an end, " + + "flowApprovalInstanceId={}, externalApprovalId={}", + flowApprovalInstance.getId(), externalApprovalId, e); + flowApprovalInstance.setStatus(FlowNodeStatus.FAILED); + flowApprovalInstance.setComment(e.getLocalizedMessage()); + flowApprovalInstance.update(); + flowInstanceRepository.updateStatusById(flowApprovalInstance.getFlowInstanceId(), + FlowStatus.EXECUTION_FAILED); + execution.setVariable(expr, false); + } + } + } + + private FlowApprovalInstance getFlowApprovalInstance(DelegateExecution execution) { + String activityId = execution.getCurrentActivityId(); + String processDefinitionId = execution.getProcessDefinitionId(); + Optional> flowInstanceIdOpt = retryExecutor.run( + () -> flowableAdaptor.getFlowInstanceIdByProcessDefinitionId(processDefinitionId), Optional::isPresent); + if (!flowInstanceIdOpt.isPresent() || !flowInstanceIdOpt.get().isPresent()) { + log.warn("Flow instance id does not exist, activityId={}, processDefinitionId={}", activityId, + processDefinitionId); + throw new IllegalStateException( + "Can not find flow instance id by process definition id " + processDefinitionId); + } + Long flowInstanceId = flowInstanceIdOpt.get().get(); + Optional instanceOpt = + flowableAdaptor.getApprovalInstanceByActivityId(activityId, flowInstanceId); + if (!instanceOpt.isPresent()) { + log.warn("Flow approval instance does not exist, activityId={}, flowInstanceId={}", activityId, + flowInstanceId); + throw new IllegalStateException("Can not find instance by activityId " + activityId); + } + return instanceOpt.get(); + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/RuntimeTaskConstants.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/RuntimeTaskConstants.java index c3319033a0..84d184f4bd 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/RuntimeTaskConstants.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/RuntimeTaskConstants.java @@ -34,5 +34,5 @@ public class RuntimeTaskConstants { public static final Integer DEFAULT_TASK_CHECK_INTERVAL_SECONDS = 5; public static final String RISKLEVEL_DESCRIBER = "riskLevelDescriber"; public static final String RISKLEVEL = "riskLevel"; - public static final String SQL_CHECK_RESULT = "sql_check_result"; + public static final String SUCCESS_CREATE_EXT_INS = "successCreateExternalApprovalInstance"; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/ApprovalSynchronizer.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/ApprovalSynchronizer.java index 734880a337..cac72a2289 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/ApprovalSynchronizer.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/ApprovalSynchronizer.java @@ -104,8 +104,7 @@ private void syncApprovalStatus() throws InterruptedException { processInstance = flowInstanceId2ProcessInstance.get(entity.getFlowInstanceId()); variables = FlowTaskUtil.getTemplateVariables(processInstance.getProcessVariables()); variables.setAttribute(Variable.PROCESS_INSTANCE_ID, entity.getExternalFlowInstanceId()); - update(entity.getFlowInstanceId(), entity.getExternalApprovalId(), processInstance, - entity.getExternalFlowInstanceId(), variables); + update(entity.getFlowInstanceId(), entity.getExternalApprovalId(), processInstance, variables); } catch (Exception e) { log.warn( "Failed to synchronize external approval status, flowInstanceId={}, integrationId={}, externalProcessInstanceId={}, variables={}", @@ -119,10 +118,7 @@ private void syncApprovalStatus() throws InterruptedException { } private void update(Long flowInstanceId, Long externalApprovalId, ProcessInstance processInstance, - String externalInstanceId, TemplateVariables variables) throws IOException { - if ("N/A".equals(externalInstanceId)) { - flowInstanceService.cancel(flowInstanceId, true); - } + TemplateVariables variables) throws IOException { ApprovalProperties properties = (ApprovalProperties) integrationService.getIntegrationProperties(externalApprovalId); ApprovalStatus status = approvalClient.status(properties, variables); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/client/ApprovalClient.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/client/ApprovalClient.java index 2bbc16857b..6a13073da8 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/client/ApprovalClient.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/client/ApprovalClient.java @@ -27,6 +27,7 @@ import org.springframework.stereotype.Component; import com.oceanbase.odc.core.shared.Verify; +import com.oceanbase.odc.core.shared.constant.ErrorCodes; import com.oceanbase.odc.core.shared.exception.ExternalServiceError; import com.oceanbase.odc.core.shared.exception.UnexpectedException; import com.oceanbase.odc.service.integration.HttpOperationService; @@ -96,7 +97,8 @@ public String start(@NonNull ApprovalProperties properties, TemplateVariables va try { response = httpClient.execute(request, new BasicResponseHandler()); } catch (Exception e) { - throw new ExternalServiceError("Request execute failed: " + e.getMessage()); + throw new ExternalServiceError(ErrorCodes.ExternalServiceError, + "Request execute failed: " + e.getMessage()); } String decrypt = EncryptionUtil.decrypt(response, encryption); checkResponse(decrypt, start.getRequestSuccessExpression()); @@ -128,7 +130,8 @@ public ApprovalStatus status(@NonNull ApprovalProperties properties, TemplateVar try { response = httpClient.execute(request, new BasicResponseHandler()); } catch (Exception e) { - throw new ExternalServiceError("Request execute failed: " + e.getMessage()); + throw new ExternalServiceError(ErrorCodes.ExternalServiceError, + "Request execute failed: " + e.getMessage()); } String decrypt = EncryptionUtil.decrypt(response, encryption); checkResponse(decrypt, status.getRequestSuccessExpression()); @@ -169,7 +172,8 @@ public void cancel(@NonNull ApprovalProperties properties, TemplateVariables var try { response = httpClient.execute(request, new BasicResponseHandler()); } catch (Exception e) { - throw new ExternalServiceError("Request execute failed: " + e.getMessage()); + throw new ExternalServiceError(ErrorCodes.ExternalServiceError, + "Request execute failed: " + e.getMessage()); } String decrypt = EncryptionUtil.decrypt(response, encryption); checkResponse(decrypt, cancel.getRequestSuccessExpression()); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/client/SqlInterceptorClient.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/client/SqlInterceptorClient.java index 3b3835d3b0..a25a2d72a7 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/client/SqlInterceptorClient.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/client/SqlInterceptorClient.java @@ -27,6 +27,7 @@ import org.springframework.stereotype.Component; import com.oceanbase.odc.core.shared.Verify; +import com.oceanbase.odc.core.shared.constant.ErrorCodes; import com.oceanbase.odc.core.shared.exception.ExternalServiceError; import com.oceanbase.odc.core.shared.exception.UnexpectedException; import com.oceanbase.odc.service.integration.HttpOperationService; @@ -95,7 +96,8 @@ public SqlCheckStatus check(@NonNull SqlInterceptorProperties properties, Templa try { response = httpClient.execute(request, new BasicResponseHandler()); } catch (Exception e) { - throw new ExternalServiceError("Request execute failed: " + e.getMessage()); + throw new ExternalServiceError(ErrorCodes.ExternalServiceError, + "Request execute failed: " + e.getMessage()); } String decrypt = EncryptionUtil.decrypt(response, encryption); try { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/model/TemplateVariables.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/model/TemplateVariables.java index 08f9030063..e5554b8140 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/model/TemplateVariables.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/integration/model/TemplateVariables.java @@ -26,21 +26,21 @@ * @date 2023/3/28 19:52 */ public class TemplateVariables implements Serializable { - private final Map variables; + private final Map variables; public TemplateVariables() { this.variables = new HashMap<>(); } - public TemplateVariables(Map variables) { + public TemplateVariables(Map variables) { this.variables = new HashMap<>(variables); } - public void setAttribute(Variable variable, Object value) { + public void setAttribute(Variable variable, Serializable value) { this.variables.put(variable.key, value); } - public void setAttribute(Variable variable, String subKey, Object value) { + public void setAttribute(Variable variable, String subKey, Serializable value) { this.variables.put(variable.key + "." + subKey, value); } @@ -63,7 +63,7 @@ public enum Variable { SQL_CONTENT("sql.content"), SQL_CONTENT_JSON_ARRAY("sql.content.json.array"); - private String key; + private final String key; public String key() { return key;