From 231025df672695e1b578b1dabe037b4a71e3bea5 Mon Sep 17 00:00:00 2001 From: IceS2 Date: Mon, 18 Nov 2024 08:17:22 +0100 Subject: [PATCH] MINOR: Improve PeriodicBatchEntityTrigger Workflows performance (#18664) * Update default configurations * Refactor how we call the main Workflow in order to increase performance --- .../MainWorkflowHasFinishedListener.java | 19 --- .../governance/workflows/WorkflowHandler.java | 4 + .../workflows/elements/NodeInterface.java | 10 -- .../workflows/elements/TriggerInterface.java | 2 +- .../impl/SetEntityCertificationImpl.java | 11 ++ .../elements/nodes/endEvent/EndEvent.java | 1 - .../triggers/EventBasedEntityTrigger.java | 64 ++++++--- .../triggers/PeriodicBatchEntityTrigger.java | 80 ++++++++--- .../triggers/impl/FetchEntitiesImpl.java | 72 ++++++++++ ...orkflowImpl.java => FilterEntityImpl.java} | 29 +--- .../impl/TriggerBatchEntityWorkflowImpl.java | 124 ------------------ .../builders/CallActivityBuilder.java | 28 ++++ ...ltiInstanceLoopCharacteristicsBuilder.java | 32 +++++ .../triggers/periodicBatchEntityTrigger.json | 2 +- 14 files changed, 260 insertions(+), 218 deletions(-) delete mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/MainWorkflowHasFinishedListener.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java rename openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/{TriggerEntityWorkflowImpl.java => FilterEntityImpl.java} (64%) delete mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/TriggerBatchEntityWorkflowImpl.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/CallActivityBuilder.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/MultiInstanceLoopCharacteristicsBuilder.java diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/MainWorkflowHasFinishedListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/MainWorkflowHasFinishedListener.java deleted file mode 100644 index 58dcfda50a2e..000000000000 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/MainWorkflowHasFinishedListener.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.openmetadata.service.governance.workflows; - -import java.util.UUID; -import org.flowable.engine.delegate.DelegateExecution; -import org.flowable.engine.delegate.JavaDelegate; -import org.openmetadata.service.Entity; -import org.openmetadata.service.jdbi3.WorkflowInstanceRepository; - -public class MainWorkflowHasFinishedListener implements JavaDelegate { - @Override - public void execute(DelegateExecution execution) { - WorkflowInstanceRepository workflowInstanceRepository = - (WorkflowInstanceRepository) Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE); - - UUID workflowInstanceId = UUID.fromString(execution.getProcessInstanceBusinessKey()); - workflowInstanceRepository.updateWorkflowInstance( - workflowInstanceId, System.currentTimeMillis()); - } -} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index 0d46ea1bcd42..74c405df77b9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -40,6 +40,10 @@ private WorkflowHandler(OpenMetadataApplicationConfig config) { ProcessEngineConfiguration processEngineConfiguration = new StandaloneProcessEngineConfiguration() .setAsyncExecutorActivate(true) + .setAsyncExecutorCorePoolSize(50) + .setAsyncExecutorMaxPoolSize(100) + .setAsyncExecutorThreadPoolQueueSize(1000) + .setAsyncExecutorMaxAsyncJobsDuePerAcquisition(20) .setJdbcUrl(config.getDataSourceFactory().getUrl()) .setJdbcUsername(config.getDataSourceFactory().getUser()) .setJdbcPassword(config.getDataSourceFactory().getPassword()) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java index 0e3ea457c79e..d2c2fbcaef3a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeInterface.java @@ -8,7 +8,6 @@ import org.flowable.bpmn.model.FlowableListener; import org.flowable.bpmn.model.Process; import org.flowable.bpmn.model.StartEvent; -import org.openmetadata.service.governance.workflows.MainWorkflowHasFinishedListener; import org.openmetadata.service.governance.workflows.MainWorkflowTerminationListener; import org.openmetadata.service.governance.workflows.WorkflowInstanceExecutionIdSetterListener; import org.openmetadata.service.governance.workflows.WorkflowInstanceStageListener; @@ -52,15 +51,6 @@ default void attachWorkflowInstanceExecutionIdSetterListener(StartEvent startEve startEvent.getExecutionListeners().add(listener); } - default void attachMainWorkflowHasFinishedListener(EndEvent endEvent) { - FlowableListener listener = - new FlowableListenerBuilder() - .event("end") - .implementation(MainWorkflowHasFinishedListener.class.getName()) - .build(); - endEvent.getExecutionListeners().add(listener); - } - default void attachMainWorkflowTerminationListener(EndEvent endEvent) { FlowableListener listener = new FlowableListenerBuilder() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerInterface.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerInterface.java index b07d0848de22..43fa49c66538 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerInterface.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/TriggerInterface.java @@ -22,7 +22,7 @@ default void attachWorkflowInstanceListeners(Process process) { private List getWorkflowInstanceListeners() { List listeners = new ArrayList<>(); - List events = List.of("start"); + List events = List.of("start", "end"); for (String event : events) { FlowableListener listener = new FlowableListenerBuilder() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityCertificationImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityCertificationImpl.java index d5b4517f6289..1d4d3df1d1a4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityCertificationImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/SetEntityCertificationImpl.java @@ -44,10 +44,21 @@ private void setStatus( String originalJson = JsonUtils.pojoToJson(entity); Optional oCertification = Optional.ofNullable(certification); + Optional oEntityCertification = + Optional.ofNullable(entity.getCertification()); + + if (oCertification.isEmpty() && oEntityCertification.isEmpty()) { + return; + } if (oCertification.isEmpty()) { entity.setCertification(null); } else { + + if (oCertification.get().equals(oEntityCertification.get().getTagLabel().getTagFQN())) { + return; + } + AssetCertification assetCertification = new AssetCertification() .withTagLabel(EntityUtil.toTagLabel(TagLabelUtil.getTag(certification))); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/endEvent/EndEvent.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/endEvent/EndEvent.java index c64df0870334..29389e3d745b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/endEvent/EndEvent.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/endEvent/EndEvent.java @@ -12,7 +12,6 @@ public class EndEvent implements NodeInterface { public EndEvent(EndEventDefinition nodeDefinition) { this.endEvent = new EndEventBuilder().id(nodeDefinition.getName()).build(); attachWorkflowInstanceStageListeners(endEvent); - attachMainWorkflowHasFinishedListener(endEvent); } public void addToWorkflow(BpmnModel model, Process process) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java index 18a936fd98e9..9f4a8ce47f83 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/EventBasedEntityTrigger.java @@ -1,5 +1,6 @@ package org.openmetadata.service.governance.workflows.elements.triggers; +import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; import java.util.ArrayList; @@ -7,8 +8,10 @@ import java.util.ListIterator; import lombok.Getter; import org.flowable.bpmn.model.BpmnModel; +import org.flowable.bpmn.model.CallActivity; import org.flowable.bpmn.model.EndEvent; import org.flowable.bpmn.model.FieldExtension; +import org.flowable.bpmn.model.IOParameter; import org.flowable.bpmn.model.Process; import org.flowable.bpmn.model.SequenceFlow; import org.flowable.bpmn.model.ServiceTask; @@ -18,7 +21,8 @@ import org.openmetadata.schema.governance.workflows.elements.triggers.Event; import org.openmetadata.schema.governance.workflows.elements.triggers.EventBasedEntityTriggerDefinition; import org.openmetadata.service.governance.workflows.elements.TriggerInterface; -import org.openmetadata.service.governance.workflows.elements.triggers.impl.TriggerEntityWorkflowImpl; +import org.openmetadata.service.governance.workflows.elements.triggers.impl.FilterEntityImpl; +import org.openmetadata.service.governance.workflows.flowable.builders.CallActivityBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder; @@ -32,6 +36,8 @@ public class EventBasedEntityTrigger implements TriggerInterface { private final List startEvents = new ArrayList<>(); private final List signals = new ArrayList<>(); + public static String PASSES_FILTER_VARIABLE = "passesFilter"; + public EventBasedEntityTrigger( String mainWorkflowName, String triggerWorkflowId, @@ -43,20 +49,34 @@ public EventBasedEntityTrigger( setStartEvents(triggerWorkflowId, triggerDefinition); - ServiceTask triggerWorkflow = - getWorkflowTriggerTask(triggerWorkflowId, mainWorkflowName, triggerDefinition); - triggerWorkflow.setAsynchronous(true); - process.addFlowElement(triggerWorkflow); + ServiceTask filterTask = getFilterTask(triggerWorkflowId, triggerDefinition); + process.addFlowElement(filterTask); + + CallActivity workflowTrigger = getWorkflowTrigger(triggerWorkflowId, mainWorkflowName); + process.addFlowElement(workflowTrigger); EndEvent endEvent = new EndEventBuilder().id(getFlowableElementId(triggerWorkflowId, "endEvent")).build(); process.addFlowElement(endEvent); + // Start Events -> FilterTask for (StartEvent startEvent : startEvents) { process.addFlowElement(startEvent); - process.addFlowElement(new SequenceFlow(startEvent.getId(), triggerWorkflow.getId())); + process.addFlowElement(new SequenceFlow(startEvent.getId(), filterTask.getId())); } - process.addFlowElement(new SequenceFlow(triggerWorkflow.getId(), endEvent.getId())); + + SequenceFlow filterPassed = new SequenceFlow(filterTask.getId(), workflowTrigger.getId()); + filterPassed.setConditionExpression(String.format("${%s}", PASSES_FILTER_VARIABLE)); + + SequenceFlow filterNotPassed = new SequenceFlow(filterTask.getId(), endEvent.getId()); + filterNotPassed.setConditionExpression(String.format("${!%s}", PASSES_FILTER_VARIABLE)); + + // FilterTask -> WorkflowTrigger (if passes filter) + process.addFlowElement(filterPassed); + // FilterTask -> End (if not passes filter) + process.addFlowElement(filterNotPassed); + // WorkflowTrigger -> End + process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), endEvent.getId())); this.process = process; this.triggerWorkflowId = triggerWorkflowId; @@ -94,16 +114,25 @@ private void setStartEvents( } } - private ServiceTask getWorkflowTriggerTask( - String workflowTriggerId, - String mainWorkflowName, - EventBasedEntityTriggerDefinition triggerDefinition) { - FieldExtension workflowNameExpr = - new FieldExtensionBuilder() - .fieldName("workflowNameExpr") - .fieldValue(mainWorkflowName) + private CallActivity getWorkflowTrigger(String triggerWorkflowId, String mainWorkflowName) { + CallActivity workflowTrigger = + new CallActivityBuilder() + .id(getFlowableElementId(triggerWorkflowId, "workflowTrigger")) + .calledElement(mainWorkflowName) + .inheritBusinessKey(true) .build(); + IOParameter inputParameter = new IOParameter(); + inputParameter.setSource(RELATED_ENTITY_VARIABLE); + inputParameter.setTarget(RELATED_ENTITY_VARIABLE); + + workflowTrigger.setInParameters(List.of(inputParameter)); + + return workflowTrigger; + } + + private ServiceTask getFilterTask( + String workflowTriggerId, EventBasedEntityTriggerDefinition triggerDefinition) { FieldExtension excludedFilterExpr = new FieldExtensionBuilder() .fieldName("excludedFilterExpr") @@ -112,10 +141,9 @@ private ServiceTask getWorkflowTriggerTask( ServiceTask serviceTask = new ServiceTaskBuilder() - .id(getFlowableElementId(workflowTriggerId, "workflowTrigger")) - .implementation(TriggerEntityWorkflowImpl.class.getName()) + .id(getFlowableElementId(workflowTriggerId, "filterTask")) + .implementation(FilterEntityImpl.class.getName()) .build(); - serviceTask.getFieldExtensions().add(workflowNameExpr); serviceTask.getFieldExtensions().add(excludedFilterExpr); return serviceTask; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java index 86a55220a181..7690decbce24 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/PeriodicBatchEntityTrigger.java @@ -1,11 +1,16 @@ package org.openmetadata.service.governance.workflows.elements.triggers; +import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; +import java.util.List; import lombok.Getter; import org.flowable.bpmn.model.BpmnModel; +import org.flowable.bpmn.model.CallActivity; import org.flowable.bpmn.model.EndEvent; import org.flowable.bpmn.model.FieldExtension; +import org.flowable.bpmn.model.IOParameter; +import org.flowable.bpmn.model.MultiInstanceLoopCharacteristics; import org.flowable.bpmn.model.Process; import org.flowable.bpmn.model.SequenceFlow; import org.flowable.bpmn.model.ServiceTask; @@ -15,9 +20,11 @@ import org.openmetadata.schema.governance.workflows.elements.nodes.trigger.PeriodicBatchEntityTriggerDefinition; import org.openmetadata.service.apps.scheduler.AppScheduler; import org.openmetadata.service.governance.workflows.elements.TriggerInterface; -import org.openmetadata.service.governance.workflows.elements.triggers.impl.TriggerBatchEntityWorkflowImpl; +import org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchEntitiesImpl; +import org.openmetadata.service.governance.workflows.flowable.builders.CallActivityBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder; +import org.openmetadata.service.governance.workflows.flowable.builders.MultiInstanceLoopCharacteristicsBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder; import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder; import org.quartz.CronTrigger; @@ -26,6 +33,9 @@ public class PeriodicBatchEntityTrigger implements TriggerInterface { private final Process process; @Getter private final String triggerWorkflowId; + public static String HAS_FINISHED_VARIABLE = "hasFinished"; + public static String CARDINALITY_VARIABLE = "numberOfEntities"; + public static String COLLECTION_VARIABLE = "entityList"; public PeriodicBatchEntityTrigger( String mainWorkflowName, @@ -44,16 +54,31 @@ public PeriodicBatchEntityTrigger( startEvent.addEventDefinition(timerDefinition); process.addFlowElement(startEvent); - ServiceTask workflowTriggerTask = - getWorkflowTriggerTask(triggerWorkflowId, mainWorkflowName, triggerDefinition); - process.addFlowElement(workflowTriggerTask); + ServiceTask fetchEntitiesTask = getFetchEntitiesTask(triggerWorkflowId, triggerDefinition); + process.addFlowElement(fetchEntitiesTask); + + CallActivity workflowTrigger = + getWorkflowTriggerCallActivity(triggerWorkflowId, mainWorkflowName); + process.addFlowElement(workflowTrigger); EndEvent endEvent = new EndEventBuilder().id(getFlowableElementId(triggerWorkflowId, "endEvent")).build(); process.addFlowElement(endEvent); - process.addFlowElement(new SequenceFlow(startEvent.getId(), workflowTriggerTask.getId())); - process.addFlowElement(new SequenceFlow(workflowTriggerTask.getId(), endEvent.getId())); + SequenceFlow finished = new SequenceFlow(fetchEntitiesTask.getId(), endEvent.getId()); + finished.setConditionExpression(String.format("${%s}", HAS_FINISHED_VARIABLE)); + + SequenceFlow notFinished = new SequenceFlow(fetchEntitiesTask.getId(), workflowTrigger.getId()); + notFinished.setConditionExpression(String.format("${!%s}", HAS_FINISHED_VARIABLE)); + + // Start -> Fetch Entities + process.addFlowElement(new SequenceFlow(startEvent.getId(), fetchEntitiesTask.getId())); + // Fetch Entities -> End + process.addFlowElement(finished); + // Fetch Entities -> WorkflowTrigger + process.addFlowElement(notFinished); + // WorkflowTrigger -> Fetch Entities (Loop Back to get next batch) + process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), fetchEntitiesTask.getId())); this.process = process; this.triggerWorkflowId = triggerWorkflowId; @@ -69,10 +94,34 @@ private TimerEventDefinition getTimerEventDefinition(AppSchedule schedule) { return timerDefinition; } - private ServiceTask getWorkflowTriggerTask( - String workflowTriggerId, - String mainWorkflowName, - PeriodicBatchEntityTriggerDefinition triggerDefinition) { + private CallActivity getWorkflowTriggerCallActivity( + String triggerWorkflowId, String mainWorkflowName) { + MultiInstanceLoopCharacteristics multiInstance = + new MultiInstanceLoopCharacteristicsBuilder() + .loopCardinality(String.format("${%s}", CARDINALITY_VARIABLE)) + .inputDataItem(COLLECTION_VARIABLE) + .elementVariable(RELATED_ENTITY_VARIABLE) + .build(); + + CallActivity workflowTrigger = + new CallActivityBuilder() + .id(getFlowableElementId(triggerWorkflowId, "workflowTrigger")) + .calledElement(mainWorkflowName) + .inheritBusinessKey(true) + .build(); + + IOParameter inputParameter = new IOParameter(); + inputParameter.setSource(RELATED_ENTITY_VARIABLE); + inputParameter.setTarget(RELATED_ENTITY_VARIABLE); + + workflowTrigger.setInParameters(List.of(inputParameter)); + workflowTrigger.setLoopCharacteristics(multiInstance); + + return workflowTrigger; + } + + private ServiceTask getFetchEntitiesTask( + String workflowTriggerId, PeriodicBatchEntityTriggerDefinition triggerDefinition) { FieldExtension entityTypeExpr = new FieldExtensionBuilder() .fieldName("entityTypeExpr") @@ -85,12 +134,6 @@ private ServiceTask getWorkflowTriggerTask( .fieldValue(triggerDefinition.getConfig().getFilters()) .build(); - FieldExtension workflowNameExpr = - new FieldExtensionBuilder() - .fieldName("workflowNameExpr") - .fieldValue(mainWorkflowName) - .build(); - FieldExtension batchSizeExpr = new FieldExtensionBuilder() .fieldName("batchSizeExpr") @@ -99,13 +142,12 @@ private ServiceTask getWorkflowTriggerTask( ServiceTask serviceTask = new ServiceTaskBuilder() - .id(getFlowableElementId(workflowTriggerId, "workflowTrigger")) - .implementation(TriggerBatchEntityWorkflowImpl.class.getName()) + .id(getFlowableElementId(workflowTriggerId, "fetchEntityTask")) + .implementation(FetchEntitiesImpl.class.getName()) .build(); serviceTask.getFieldExtensions().add(entityTypeExpr); serviceTask.getFieldExtensions().add(searchFilterExpr); - serviceTask.getFieldExtensions().add(workflowNameExpr); serviceTask.getFieldExtensions().add(batchSizeExpr); return serviceTask; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java new file mode 100644 index 000000000000..7c110d3d5593 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FetchEntitiesImpl.java @@ -0,0 +1,72 @@ +package org.openmetadata.service.governance.workflows.elements.triggers.impl; + +import static org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger.CARDINALITY_VARIABLE; +import static org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger.COLLECTION_VARIABLE; +import static org.openmetadata.service.governance.workflows.elements.triggers.PeriodicBatchEntityTrigger.HAS_FINISHED_VARIABLE; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.JavaDelegate; +import org.openmetadata.service.Entity; +import org.openmetadata.service.resources.feeds.MessageParser; +import org.openmetadata.service.search.SearchClient; +import org.openmetadata.service.search.SearchRepository; +import org.openmetadata.service.search.SearchSortFilter; +import org.openmetadata.service.util.JsonUtils; + +public class FetchEntitiesImpl implements JavaDelegate { + private Expression entityTypeExpr; + private Expression searchFilterExpr; + private Expression batchSizeExpr; + + @Override + public void execute(DelegateExecution execution) { + String entityType = (String) entityTypeExpr.getValue(execution); + String searchFilter = + Optional.ofNullable(searchFilterExpr) + .map(expr -> (String) expr.getValue(execution)) + .orElse(null); + int batchSize = Integer.parseInt((String) batchSizeExpr.getValue(execution)); + + List searchAfter = + JsonUtils.readOrConvertValues(execution.getVariable("searchAfter"), Object.class); + + SearchClient.SearchResultListMapper response = + fetchEntities(searchAfter, entityType, searchFilter, batchSize); + + List entityList = + response.getResults().stream() + .map( + result -> + new MessageParser.EntityLink( + entityType, (String) result.get("fullyQualifiedName")) + .getLinkString()) + .toList(); + + int cardinality = entityList.size(); + boolean hasFinished = entityList.isEmpty(); + + execution.setVariable(CARDINALITY_VARIABLE, cardinality); + execution.setVariable(HAS_FINISHED_VARIABLE, hasFinished); + execution.setVariable(COLLECTION_VARIABLE, entityList); + execution.setVariable("searchAfter", JsonUtils.pojoToJson(response.getLastHitSortValues())); + } + + private SearchClient.SearchResultListMapper fetchEntities( + List searchAfterList, String entityType, String searchFilter, int batchSize) { + SearchRepository searchRepository = Entity.getSearchRepository(); + SearchSortFilter searchSortFilter = + new SearchSortFilter("fullyQualifiedName", null, null, null); + Object[] searchAfter = searchAfterList.isEmpty() ? null : searchAfterList.toArray(); + + try { + return searchRepository.listWithDeepPagination( + entityType, null, searchFilter, searchSortFilter, batchSize, searchAfter); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/TriggerEntityWorkflowImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java similarity index 64% rename from openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/TriggerEntityWorkflowImpl.java rename to openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java index 8a4fc7b4b3e6..3d8bd581415f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/TriggerEntityWorkflowImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java @@ -1,10 +1,9 @@ package org.openmetadata.service.governance.workflows.elements.triggers.impl; import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; +import static org.openmetadata.service.governance.workflows.elements.triggers.EventBasedEntityTrigger.PASSES_FILTER_VARIABLE; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import org.flowable.common.engine.api.delegate.Expression; import org.flowable.engine.delegate.DelegateExecution; @@ -14,31 +13,21 @@ import org.openmetadata.schema.type.FieldChange; import org.openmetadata.schema.type.Include; import org.openmetadata.service.Entity; -import org.openmetadata.service.governance.workflows.WorkflowHandler; -import org.openmetadata.service.governance.workflows.WorkflowInstanceListener; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.util.JsonUtils; -public class TriggerEntityWorkflowImpl implements JavaDelegate { - private Expression workflowNameExpr; +public class FilterEntityImpl implements JavaDelegate { private Expression excludedFilterExpr; @Override public void execute(DelegateExecution execution) { - String workflowName = (String) workflowNameExpr.getValue(execution); List excludedFilter = JsonUtils.readOrConvertValue(excludedFilterExpr.getValue(execution), List.class); String entityLinkStr = (String) execution.getVariable(RELATED_ENTITY_VARIABLE); - if (passesExcludedFilter(entityLinkStr, excludedFilter)) { - WorkflowHandler workflowHandler = WorkflowHandler.getInstance(); - triggerWorkflow( - workflowHandler, execution.getProcessInstanceBusinessKey(), entityLinkStr, workflowName); - } else { - execution.setEventName("end"); - new WorkflowInstanceListener().execute(execution); - } + execution.setVariable( + PASSES_FILTER_VARIABLE, passesExcludedFilter(entityLinkStr, excludedFilter)); } private boolean passesExcludedFilter(String entityLinkStr, List excludedFilter) { @@ -60,14 +49,4 @@ private boolean passesExcludedFilter(String entityLinkStr, List excluded || changedFields.stream() .anyMatch(changedField -> !excludedFilter.contains(changedField.getName())); } - - private void triggerWorkflow( - WorkflowHandler workflowHandler, - String businessKey, - String entityLinkStr, - String workflowName) { - Map variables = new HashMap<>(); - variables.put(RELATED_ENTITY_VARIABLE, entityLinkStr); - workflowHandler.triggerByKey(workflowName, businessKey, variables); - } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/TriggerBatchEntityWorkflowImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/TriggerBatchEntityWorkflowImpl.java deleted file mode 100644 index ab9984ea611f..000000000000 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/TriggerBatchEntityWorkflowImpl.java +++ /dev/null @@ -1,124 +0,0 @@ -package org.openmetadata.service.governance.workflows.elements.triggers.impl; - -import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import org.flowable.common.engine.api.delegate.Expression; -import org.flowable.engine.delegate.DelegateExecution; -import org.flowable.engine.delegate.JavaDelegate; -import org.openmetadata.service.Entity; -import org.openmetadata.service.governance.workflows.WorkflowHandler; -import org.openmetadata.service.resources.feeds.MessageParser; -import org.openmetadata.service.search.SearchClient; -import org.openmetadata.service.search.SearchRepository; -import org.openmetadata.service.search.SearchSortFilter; - -public class TriggerBatchEntityWorkflowImpl implements JavaDelegate { - private Expression entityTypeExpr; - private Expression searchFilterExpr; - private Expression workflowNameExpr; - private Expression batchSizeExpr; - - @Override - public void execute(DelegateExecution execution) { - String entityType = (String) entityTypeExpr.getValue(execution); - String searchFilter = - Optional.ofNullable(searchFilterExpr) - .map(expr -> (String) expr.getValue(execution)) - .orElse(null); - String workflowName = (String) workflowNameExpr.getValue(execution); - int batchSize = Integer.parseInt((String) batchSizeExpr.getValue(execution)); - - WorkflowHandler workflowHandler = WorkflowHandler.getInstance(); - - triggerBatchEntityWorkflow( - workflowHandler, - entityType, - execution.getProcessInstanceBusinessKey(), - searchFilter, - workflowName, - batchSize); - } - - private void triggerBatchEntityWorkflow( - WorkflowHandler workflowHandler, - String entityType, - String businessKey, - String searchFilter, - String workflowName, - int batchSize) { - SearchRepository searchRepository = Entity.getSearchRepository(); - SearchSortFilter searchSortFilter = - new SearchSortFilter("fullyQualifiedName", null, null, null); - Object[] searchAfter = null; - boolean isDone = false; - - while (!isDone) { - try { - SearchClient.SearchResultListMapper response = - searchRepository.listWithDeepPagination( - entityType, null, searchFilter, searchSortFilter, batchSize, searchAfter); - - List> results = response.getResults(); - searchAfter = response.getLastHitSortValues(); - - triggerWorkflow( - results.stream().map(result -> (String) result.get("fullyQualifiedName")).toList(), - entityType, - workflowName, - businessKey, - workflowHandler); - - if (Optional.ofNullable(searchAfter).isEmpty()) { - isDone = true; - } - - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - private void triggerWorkflow( - List entityFQNs, - String entityType, - String workflowName, - String businessKey, - WorkflowHandler workflowHandler) { - List runningProcessInstanceIds = new ArrayList<>(); - - for (String entityFQN : entityFQNs) { - MessageParser.EntityLink entityLink = new MessageParser.EntityLink(entityType, entityFQN); - - Map variables = new HashMap<>(); - variables.put(RELATED_ENTITY_VARIABLE, entityLink.getLinkString()); - - runningProcessInstanceIds.add( - workflowHandler.triggerByKey(workflowName, businessKey, variables).getId()); - } - - // TODO: Improve Waiting - while (!runningProcessInstanceIds.isEmpty()) { - runningProcessInstanceIds = - getRunningProcessInstances(runningProcessInstanceIds, workflowHandler); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - private List getRunningProcessInstances( - List processInstanceIds, WorkflowHandler workflowHandler) { - return processInstanceIds.stream() - .filter( - (processInstanceId) -> !workflowHandler.hasProcessInstanceFinished(processInstanceId)) - .toList(); - } -} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/CallActivityBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/CallActivityBuilder.java new file mode 100644 index 000000000000..3168d62170c1 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/CallActivityBuilder.java @@ -0,0 +1,28 @@ +package org.openmetadata.service.governance.workflows.flowable.builders; + +import org.flowable.bpmn.model.CallActivity; + +public class CallActivityBuilder extends FlowableElementBuilder { + private String calledElement; + private boolean inheritBusinessKey; + + public CallActivityBuilder calledElement(String calledElement) { + this.calledElement = calledElement; + return this; + } + + public CallActivityBuilder inheritBusinessKey(boolean inheritBusinessKey) { + this.inheritBusinessKey = inheritBusinessKey; + return this; + } + + @Override + public CallActivity build() { + CallActivity callActivity = new CallActivity(); + callActivity.setId(id); + callActivity.setName(id); + callActivity.setCalledElement(calledElement); + callActivity.setInheritBusinessKey(inheritBusinessKey); + return callActivity; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/MultiInstanceLoopCharacteristicsBuilder.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/MultiInstanceLoopCharacteristicsBuilder.java new file mode 100644 index 000000000000..4156c9688578 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/flowable/builders/MultiInstanceLoopCharacteristicsBuilder.java @@ -0,0 +1,32 @@ +package org.openmetadata.service.governance.workflows.flowable.builders; + +import org.flowable.bpmn.model.MultiInstanceLoopCharacteristics; + +public class MultiInstanceLoopCharacteristicsBuilder { + private String loopCardinality; + private String inputDataItem; + private String elementVariable; + + public MultiInstanceLoopCharacteristicsBuilder loopCardinality(String loopCardinality) { + this.loopCardinality = loopCardinality; + return this; + } + + public MultiInstanceLoopCharacteristicsBuilder inputDataItem(String inputDataItem) { + this.inputDataItem = inputDataItem; + return this; + } + + public MultiInstanceLoopCharacteristicsBuilder elementVariable(String elementVariable) { + this.elementVariable = elementVariable; + return this; + } + + public MultiInstanceLoopCharacteristics build() { + MultiInstanceLoopCharacteristics multiInstance = new MultiInstanceLoopCharacteristics(); + multiInstance.setLoopCardinality(loopCardinality); + multiInstance.setInputDataItem(inputDataItem); + multiInstance.setElementVariable(elementVariable); + return multiInstance; + } +} diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json index 908bae843efb..0bc6418a2f57 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/triggers/periodicBatchEntityTrigger.json @@ -25,7 +25,7 @@ "batchSize": { "description": "Number of Entities to process at once.", "type": "integer", - "default": 100 + "default": 1000 } }, "required": ["schedule", "entityType", "filters"],