Skip to content

Commit

Permalink
fix(integration): uncatched exception when failed to get flow instance (
Browse files Browse the repository at this point in the history
  • Loading branch information
smallsheeeep authored Sep 5, 2023
1 parent 6d44d49 commit aff12d9
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ private FlowInstanceDetailResp buildFlowInstance(List<RiskLevel> riskLevels,
flowInstance.dealloc();
}
Map<String, Object> variables = new HashMap<>();
FlowTaskUtil.setFlowInstanceId(variables, flowInstance.getId());
FlowTaskUtil.setTemplateVariables(variables, buildTemplateVariables(flowInstanceReq, connectionConfig));
initVariables(variables, taskEntity, preCheckTaskEntity, connectionConfig,
buildRiskLevelDescriber(flowInstanceReq));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ protected FlowInstanceConfigurer next(@NonNull FlowApprovalInstance nextNode,
ExclusiveGatewayBuilder gatewayBuilder = nullSafeGetNodeBuilder(gatewayName, nextNode,
() -> new ExclusiveGatewayBuilder(gatewayName));
targetExecution.next(serviceTaskBuilder).next(gatewayBuilder);
String expr = RuntimeTaskConstants.SUCCESS_CREATE_EXT_INS + "_" + nextNode.getId();
targetExecution.route(String.format("${!%s}", expr), this.targetProcessBuilder.endProcess());
targetExecution.route(String.format("${!%s}", RuntimeTaskConstants.SUCCESS_CREATE_EXT_INS),
this.targetProcessBuilder.endProcess());
targetExecution.next(userTaskBuilder, new ConditionSequenceFlowBuilder(
gatewayBuilder.getGraphId() + " -> " + serviceTaskBuilder.getGraphId(),
String.format("${%s}", expr)));
String.format("${%s}", RuntimeTaskConstants.SUCCESS_CREATE_EXT_INS)));
} else {
targetExecution.next(userTaskBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package com.oceanbase.odc.service.flow.task;

import java.util.Objects;
import java.util.Optional;

import org.flowable.engine.delegate.DelegateExecution;
import org.springframework.beans.factory.annotation.Autowired;

import com.oceanbase.odc.common.util.RetryExecutor;
import com.oceanbase.odc.core.flow.BaseFlowableDelegate;
import com.oceanbase.odc.core.shared.Verify;
import com.oceanbase.odc.core.shared.constant.FlowStatus;
import com.oceanbase.odc.metadb.flow.FlowInstanceRepository;
import com.oceanbase.odc.service.flow.FlowableAdaptor;
Expand Down Expand Up @@ -62,31 +62,39 @@ protected void run(DelegateExecution execution) throws Exception {
try {
flowApprovalInstance = getFlowApprovalInstance(execution);
} catch (Exception e) {
log.warn("Get flow approval instance failed, activityId={}, processDefinitionId={}",
log.warn(
"Get flow approval instance failed, the flow instance is coming to an end, activityId={}, processDefinitionId={}",
execution.getCurrentActivityId(), execution.getProcessDefinitionId(), e);
try {
flowInstanceRepository.updateStatusById(FlowTaskUtil.getFlowInstanceId(execution),
FlowStatus.EXECUTION_FAILED);
} finally {
execution.setVariable(RuntimeTaskConstants.SUCCESS_CREATE_EXT_INS, false);
}
return;
}
Long externalApprovalId = flowApprovalInstance.getExternalApprovalId();
String expr = RuntimeTaskConstants.SUCCESS_CREATE_EXT_INS + "_" + flowApprovalInstance.getId();
if (Objects.nonNull(externalApprovalId)) {
try {
Verify.notNull(externalApprovalId, "externalApprovalId");
IntegrationConfig config = integrationService.detailWithoutPermissionCheck(externalApprovalId);
ApprovalProperties properties = ApprovalProperties.from(config);
TemplateVariables variables = FlowTaskUtil.getTemplateVariables(execution.getVariables());
String externalFlowInstanceId = approvalClient.start(properties, variables);
flowApprovalInstance.setExternalFlowInstanceId(externalFlowInstanceId);
flowApprovalInstance.update();
execution.setVariable(RuntimeTaskConstants.SUCCESS_CREATE_EXT_INS, true);
} catch (Exception e) {
log.warn("Create external approval instance failed, the flow instance is coming to an end, "
+ "flowApprovalInstanceId={}, externalApprovalId={}",
flowApprovalInstance.getId(), externalApprovalId, e);
try {
IntegrationConfig config = integrationService.detailWithoutPermissionCheck(externalApprovalId);
ApprovalProperties properties = ApprovalProperties.from(config);
TemplateVariables variables = FlowTaskUtil.getTemplateVariables(execution.getVariables());
String externalFlowInstanceId = approvalClient.start(properties, variables);
flowApprovalInstance.setExternalFlowInstanceId(externalFlowInstanceId);
flowApprovalInstance.update();
execution.setVariable(expr, true);
} catch (Exception e) {
log.warn("Create external approval instance failed, the flow instance is coming to an end, "
+ "flowApprovalInstanceId={}, externalApprovalId={}",
flowApprovalInstance.getId(), externalApprovalId, e);
flowApprovalInstance.setStatus(FlowNodeStatus.FAILED);
flowApprovalInstance.setComment(e.getLocalizedMessage());
flowApprovalInstance.update();
flowInstanceRepository.updateStatusById(flowApprovalInstance.getFlowInstanceId(),
FlowStatus.EXECUTION_FAILED);
execution.setVariable(expr, false);
} finally {
execution.setVariable(RuntimeTaskConstants.SUCCESS_CREATE_EXT_INS, false);
}
}
}
Expand All @@ -97,8 +105,7 @@ private FlowApprovalInstance getFlowApprovalInstance(DelegateExecution execution
Optional<Optional<Long>> flowInstanceIdOpt = retryExecutor.run(
() -> flowableAdaptor.getFlowInstanceIdByProcessDefinitionId(processDefinitionId), Optional::isPresent);
if (!flowInstanceIdOpt.isPresent() || !flowInstanceIdOpt.get().isPresent()) {
log.warn("Flow instance id does not exist, activityId={}, processDefinitionId={}", activityId,
processDefinitionId);
log.warn("Flow instance id does not exist, processDefinitionId={}", processDefinitionId);
throw new IllegalStateException(
"Can not find flow instance id by process definition id " + processDefinitionId);
}
Expand All @@ -108,7 +115,7 @@ private FlowApprovalInstance getFlowApprovalInstance(DelegateExecution execution
if (!instanceOpt.isPresent()) {
log.warn("Flow approval instance does not exist, activityId={}, flowInstanceId={}", activityId,
flowInstanceId);
throw new IllegalStateException("Can not find instance by activityId " + activityId);
throw new IllegalStateException("Can not find flow approval instance by activityId " + activityId);
}
return instanceOpt.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public class RuntimeTaskConstants {
public static final String PRE_CHECK_TASK_ID = "preCheckTaskId";
public static final String TASK_ID = "taskId";
public static final String FLOW_INSTANCE_ID = "flowInstanceId";
public static final String TIMEOUT_MILLI_SECONDS = "timeOutMilliSeconds";
public static final String CONNECTION_CONFIG = "connectionConfig";
public static final String SCHEMA_NAME = "schemaName";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@ public static ConnectionConfig getConnectionConfig(@NonNull DelegateExecution ex
() -> new VerifyException("ConnectionConfig is absent"));
}

public static void setFlowInstanceId(@NonNull Map<String, Object> variables, @NonNull Long flowInstanceId) {
variables.put(RuntimeTaskConstants.FLOW_INSTANCE_ID, flowInstanceId);
}

public static Long getFlowInstanceId(@NonNull DelegateExecution execution) {
Object value = execution.getVariables().get(RuntimeTaskConstants.FLOW_INSTANCE_ID);
return internalGet(value, Long.class).orElseThrow(() -> new VerifyException("FlowInstanceId is absent"));
}

public static void setTemplateVariables(@NonNull Map<String, Object> variables,
@NonNull TemplateVariables templateVariables) {
variables.put(RuntimeTaskConstants.INTEGRATION_TEMPLATE_VARIABLES, templateVariables);
Expand Down

0 comments on commit aff12d9

Please sign in to comment.