Skip to content

Commit

Permalink
MINOR: Improve PeriodicBatchEntityTrigger Workflows performance (#18664)
Browse files Browse the repository at this point in the history
* Update default configurations

* Refactor how we call the main Workflow in order to increase performance
  • Loading branch information
IceS2 authored Nov 18, 2024
1 parent 5cdde20 commit 231025d
Show file tree
Hide file tree
Showing 14 changed files with 260 additions and 218 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ private WorkflowHandler(OpenMetadataApplicationConfig config) {
ProcessEngineConfiguration processEngineConfiguration =
new StandaloneProcessEngineConfiguration()
.setAsyncExecutorActivate(true)
.setAsyncExecutorCorePoolSize(50)
.setAsyncExecutorMaxPoolSize(100)
.setAsyncExecutorThreadPoolQueueSize(1000)
.setAsyncExecutorMaxAsyncJobsDuePerAcquisition(20)
.setJdbcUrl(config.getDataSourceFactory().getUrl())
.setJdbcUsername(config.getDataSourceFactory().getUser())
.setJdbcPassword(config.getDataSourceFactory().getPassword())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.flowable.bpmn.model.FlowableListener;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.StartEvent;
import org.openmetadata.service.governance.workflows.MainWorkflowHasFinishedListener;
import org.openmetadata.service.governance.workflows.MainWorkflowTerminationListener;
import org.openmetadata.service.governance.workflows.WorkflowInstanceExecutionIdSetterListener;
import org.openmetadata.service.governance.workflows.WorkflowInstanceStageListener;
Expand Down Expand Up @@ -52,15 +51,6 @@ default void attachWorkflowInstanceExecutionIdSetterListener(StartEvent startEve
startEvent.getExecutionListeners().add(listener);
}

default void attachMainWorkflowHasFinishedListener(EndEvent endEvent) {
FlowableListener listener =
new FlowableListenerBuilder()
.event("end")
.implementation(MainWorkflowHasFinishedListener.class.getName())
.build();
endEvent.getExecutionListeners().add(listener);
}

default void attachMainWorkflowTerminationListener(EndEvent endEvent) {
FlowableListener listener =
new FlowableListenerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ default void attachWorkflowInstanceListeners(Process process) {
private List<FlowableListener> getWorkflowInstanceListeners() {
List<FlowableListener> listeners = new ArrayList<>();

List<String> events = List.of("start");
List<String> events = List.of("start", "end");
for (String event : events) {
FlowableListener listener =
new FlowableListenerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,21 @@ private void setStatus(
String originalJson = JsonUtils.pojoToJson(entity);

Optional<String> oCertification = Optional.ofNullable(certification);
Optional<AssetCertification> oEntityCertification =
Optional.ofNullable(entity.getCertification());

if (oCertification.isEmpty() && oEntityCertification.isEmpty()) {
return;
}

if (oCertification.isEmpty()) {
entity.setCertification(null);
} else {

if (oCertification.get().equals(oEntityCertification.get().getTagLabel().getTagFQN())) {
return;
}

AssetCertification assetCertification =
new AssetCertification()
.withTagLabel(EntityUtil.toTagLabel(TagLabelUtil.getTag(certification)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ public class EndEvent implements NodeInterface {
public EndEvent(EndEventDefinition nodeDefinition) {
this.endEvent = new EndEventBuilder().id(nodeDefinition.getName()).build();
attachWorkflowInstanceStageListeners(endEvent);
attachMainWorkflowHasFinishedListener(endEvent);
}

public void addToWorkflow(BpmnModel model, Process process) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package org.openmetadata.service.governance.workflows.elements.triggers;

import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import lombok.Getter;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.CallActivity;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.FieldExtension;
import org.flowable.bpmn.model.IOParameter;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.bpmn.model.ServiceTask;
Expand All @@ -18,7 +21,8 @@
import org.openmetadata.schema.governance.workflows.elements.triggers.Event;
import org.openmetadata.schema.governance.workflows.elements.triggers.EventBasedEntityTriggerDefinition;
import org.openmetadata.service.governance.workflows.elements.TriggerInterface;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.TriggerEntityWorkflowImpl;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.FilterEntityImpl;
import org.openmetadata.service.governance.workflows.flowable.builders.CallActivityBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
Expand All @@ -32,6 +36,8 @@ public class EventBasedEntityTrigger implements TriggerInterface {
private final List<StartEvent> startEvents = new ArrayList<>();
private final List<Signal> signals = new ArrayList<>();

public static String PASSES_FILTER_VARIABLE = "passesFilter";

public EventBasedEntityTrigger(
String mainWorkflowName,
String triggerWorkflowId,
Expand All @@ -43,20 +49,34 @@ public EventBasedEntityTrigger(

setStartEvents(triggerWorkflowId, triggerDefinition);

ServiceTask triggerWorkflow =
getWorkflowTriggerTask(triggerWorkflowId, mainWorkflowName, triggerDefinition);
triggerWorkflow.setAsynchronous(true);
process.addFlowElement(triggerWorkflow);
ServiceTask filterTask = getFilterTask(triggerWorkflowId, triggerDefinition);
process.addFlowElement(filterTask);

CallActivity workflowTrigger = getWorkflowTrigger(triggerWorkflowId, mainWorkflowName);
process.addFlowElement(workflowTrigger);

EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(triggerWorkflowId, "endEvent")).build();
process.addFlowElement(endEvent);

// Start Events -> FilterTask
for (StartEvent startEvent : startEvents) {
process.addFlowElement(startEvent);
process.addFlowElement(new SequenceFlow(startEvent.getId(), triggerWorkflow.getId()));
process.addFlowElement(new SequenceFlow(startEvent.getId(), filterTask.getId()));
}
process.addFlowElement(new SequenceFlow(triggerWorkflow.getId(), endEvent.getId()));

SequenceFlow filterPassed = new SequenceFlow(filterTask.getId(), workflowTrigger.getId());
filterPassed.setConditionExpression(String.format("${%s}", PASSES_FILTER_VARIABLE));

SequenceFlow filterNotPassed = new SequenceFlow(filterTask.getId(), endEvent.getId());
filterNotPassed.setConditionExpression(String.format("${!%s}", PASSES_FILTER_VARIABLE));

// FilterTask -> WorkflowTrigger (if passes filter)
process.addFlowElement(filterPassed);
// FilterTask -> End (if not passes filter)
process.addFlowElement(filterNotPassed);
// WorkflowTrigger -> End
process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), endEvent.getId()));

this.process = process;
this.triggerWorkflowId = triggerWorkflowId;
Expand Down Expand Up @@ -94,16 +114,25 @@ private void setStartEvents(
}
}

private ServiceTask getWorkflowTriggerTask(
String workflowTriggerId,
String mainWorkflowName,
EventBasedEntityTriggerDefinition triggerDefinition) {
FieldExtension workflowNameExpr =
new FieldExtensionBuilder()
.fieldName("workflowNameExpr")
.fieldValue(mainWorkflowName)
private CallActivity getWorkflowTrigger(String triggerWorkflowId, String mainWorkflowName) {
CallActivity workflowTrigger =
new CallActivityBuilder()
.id(getFlowableElementId(triggerWorkflowId, "workflowTrigger"))
.calledElement(mainWorkflowName)
.inheritBusinessKey(true)
.build();

IOParameter inputParameter = new IOParameter();
inputParameter.setSource(RELATED_ENTITY_VARIABLE);
inputParameter.setTarget(RELATED_ENTITY_VARIABLE);

workflowTrigger.setInParameters(List.of(inputParameter));

return workflowTrigger;
}

private ServiceTask getFilterTask(
String workflowTriggerId, EventBasedEntityTriggerDefinition triggerDefinition) {
FieldExtension excludedFilterExpr =
new FieldExtensionBuilder()
.fieldName("excludedFilterExpr")
Expand All @@ -112,10 +141,9 @@ private ServiceTask getWorkflowTriggerTask(

ServiceTask serviceTask =
new ServiceTaskBuilder()
.id(getFlowableElementId(workflowTriggerId, "workflowTrigger"))
.implementation(TriggerEntityWorkflowImpl.class.getName())
.id(getFlowableElementId(workflowTriggerId, "filterTask"))
.implementation(FilterEntityImpl.class.getName())
.build();
serviceTask.getFieldExtensions().add(workflowNameExpr);
serviceTask.getFieldExtensions().add(excludedFilterExpr);

return serviceTask;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package org.openmetadata.service.governance.workflows.elements.triggers;

import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;

import java.util.List;
import lombok.Getter;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.CallActivity;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.FieldExtension;
import org.flowable.bpmn.model.IOParameter;
import org.flowable.bpmn.model.MultiInstanceLoopCharacteristics;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.bpmn.model.ServiceTask;
Expand All @@ -15,9 +20,11 @@
import org.openmetadata.schema.governance.workflows.elements.nodes.trigger.PeriodicBatchEntityTriggerDefinition;
import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.governance.workflows.elements.TriggerInterface;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.TriggerBatchEntityWorkflowImpl;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchEntitiesImpl;
import org.openmetadata.service.governance.workflows.flowable.builders.CallActivityBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.MultiInstanceLoopCharacteristicsBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder;
import org.quartz.CronTrigger;
Expand All @@ -26,6 +33,9 @@ public class PeriodicBatchEntityTrigger implements TriggerInterface {
private final Process process;

@Getter private final String triggerWorkflowId;
public static String HAS_FINISHED_VARIABLE = "hasFinished";
public static String CARDINALITY_VARIABLE = "numberOfEntities";
public static String COLLECTION_VARIABLE = "entityList";

public PeriodicBatchEntityTrigger(
String mainWorkflowName,
Expand All @@ -44,16 +54,31 @@ public PeriodicBatchEntityTrigger(
startEvent.addEventDefinition(timerDefinition);
process.addFlowElement(startEvent);

ServiceTask workflowTriggerTask =
getWorkflowTriggerTask(triggerWorkflowId, mainWorkflowName, triggerDefinition);
process.addFlowElement(workflowTriggerTask);
ServiceTask fetchEntitiesTask = getFetchEntitiesTask(triggerWorkflowId, triggerDefinition);
process.addFlowElement(fetchEntitiesTask);

CallActivity workflowTrigger =
getWorkflowTriggerCallActivity(triggerWorkflowId, mainWorkflowName);
process.addFlowElement(workflowTrigger);

EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(triggerWorkflowId, "endEvent")).build();
process.addFlowElement(endEvent);

process.addFlowElement(new SequenceFlow(startEvent.getId(), workflowTriggerTask.getId()));
process.addFlowElement(new SequenceFlow(workflowTriggerTask.getId(), endEvent.getId()));
SequenceFlow finished = new SequenceFlow(fetchEntitiesTask.getId(), endEvent.getId());
finished.setConditionExpression(String.format("${%s}", HAS_FINISHED_VARIABLE));

SequenceFlow notFinished = new SequenceFlow(fetchEntitiesTask.getId(), workflowTrigger.getId());
notFinished.setConditionExpression(String.format("${!%s}", HAS_FINISHED_VARIABLE));

// Start -> Fetch Entities
process.addFlowElement(new SequenceFlow(startEvent.getId(), fetchEntitiesTask.getId()));
// Fetch Entities -> End
process.addFlowElement(finished);
// Fetch Entities -> WorkflowTrigger
process.addFlowElement(notFinished);
// WorkflowTrigger -> Fetch Entities (Loop Back to get next batch)
process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), fetchEntitiesTask.getId()));

this.process = process;
this.triggerWorkflowId = triggerWorkflowId;
Expand All @@ -69,10 +94,34 @@ private TimerEventDefinition getTimerEventDefinition(AppSchedule schedule) {
return timerDefinition;
}

private ServiceTask getWorkflowTriggerTask(
String workflowTriggerId,
String mainWorkflowName,
PeriodicBatchEntityTriggerDefinition triggerDefinition) {
private CallActivity getWorkflowTriggerCallActivity(
String triggerWorkflowId, String mainWorkflowName) {
MultiInstanceLoopCharacteristics multiInstance =
new MultiInstanceLoopCharacteristicsBuilder()
.loopCardinality(String.format("${%s}", CARDINALITY_VARIABLE))
.inputDataItem(COLLECTION_VARIABLE)
.elementVariable(RELATED_ENTITY_VARIABLE)
.build();

CallActivity workflowTrigger =
new CallActivityBuilder()
.id(getFlowableElementId(triggerWorkflowId, "workflowTrigger"))
.calledElement(mainWorkflowName)
.inheritBusinessKey(true)
.build();

IOParameter inputParameter = new IOParameter();
inputParameter.setSource(RELATED_ENTITY_VARIABLE);
inputParameter.setTarget(RELATED_ENTITY_VARIABLE);

workflowTrigger.setInParameters(List.of(inputParameter));
workflowTrigger.setLoopCharacteristics(multiInstance);

return workflowTrigger;
}

private ServiceTask getFetchEntitiesTask(
String workflowTriggerId, PeriodicBatchEntityTriggerDefinition triggerDefinition) {
FieldExtension entityTypeExpr =
new FieldExtensionBuilder()
.fieldName("entityTypeExpr")
Expand All @@ -85,12 +134,6 @@ private ServiceTask getWorkflowTriggerTask(
.fieldValue(triggerDefinition.getConfig().getFilters())
.build();

FieldExtension workflowNameExpr =
new FieldExtensionBuilder()
.fieldName("workflowNameExpr")
.fieldValue(mainWorkflowName)
.build();

FieldExtension batchSizeExpr =
new FieldExtensionBuilder()
.fieldName("batchSizeExpr")
Expand All @@ -99,13 +142,12 @@ private ServiceTask getWorkflowTriggerTask(

ServiceTask serviceTask =
new ServiceTaskBuilder()
.id(getFlowableElementId(workflowTriggerId, "workflowTrigger"))
.implementation(TriggerBatchEntityWorkflowImpl.class.getName())
.id(getFlowableElementId(workflowTriggerId, "fetchEntityTask"))
.implementation(FetchEntitiesImpl.class.getName())
.build();

serviceTask.getFieldExtensions().add(entityTypeExpr);
serviceTask.getFieldExtensions().add(searchFilterExpr);
serviceTask.getFieldExtensions().add(workflowNameExpr);
serviceTask.getFieldExtensions().add(batchSizeExpr);

return serviceTask;
Expand Down
Loading

0 comments on commit 231025d

Please sign in to comment.