From e65e5278beea8e97f1fb58cb6dde9a581b1adef5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Sch=C3=B6nburg?= Date: Mon, 30 Jan 2023 18:17:45 +0100 Subject: [PATCH] refactor(engine): append side-effects to writer This removes difficult to understand abstractions around side effects and ensures that the engine does not re-use side-effect queues which complicate batch processing and processing of uncommitted. Adds a `SideEffectWriter` backed by the `ProcessingResultBuilder` that is used throughout like other writers. (cherry picked from commit 9eac8c32c3bbe8aa0002eaf816de1e495a88b5c9) --- .../java/io/camunda/zeebe/engine/Engine.java | 8 +-- .../engine/processing/EngineProcessors.java | 18 ++--- .../processing/ProcessEventProcessors.java | 7 +- .../processing/bpmn/BpmnStreamProcessor.java | 13 +--- .../bpmn/behavior/BpmnBehaviorsImpl.java | 6 +- .../BpmnEventSubscriptionBehavior.java | 9 +-- .../processing/common/CatchEventBehavior.java | 67 +++++++------------ .../common/ElementActivationBehavior.java | 9 +-- .../common/EventTriggerBehavior.java | 11 +-- .../deployment/DeploymentCreateProcessor.java | 27 ++------ .../DeploymentDistributionBehavior.java | 11 ++- .../incident/ResolveIncidentProcessor.java | 19 ++---- .../DefaultJobCommandPreconditionGuard.java | 11 +-- .../processing/job/JobAcceptFunction.java | 7 +- .../processing/job/JobCompleteProcessor.java | 5 +- .../processing/job/JobEventProcessors.java | 2 +- .../processing/job/JobFailProcessor.java | 19 +++--- .../job/JobThrowErrorProcessor.java | 5 +- .../processing/message/MessageCorrelator.java | 17 ++--- .../message/MessagePublishProcessor.java | 15 ++--- ...MessageSubscriptionCorrelateProcessor.java | 12 ++-- .../MessageSubscriptionCreateProcessor.java | 22 +++--- .../MessageSubscriptionDeleteProcessor.java | 11 ++- .../MessageSubscriptionRejectProcessor.java | 17 ++--- ...MessageSubscriptionCorrelateProcessor.java | 2 +- .../CreateProcessInstanceProcessor.java | 6 +- ...ateProcessInstanceWithResultProcessor.java | 7 +- .../ProcessInstanceModificationProcessor.java | 30 +++------ .../streamprocessor/CommandProcessor.java | 9 --- .../streamprocessor/CommandProcessorImpl.java | 13 +--- .../streamprocessor/TypedRecordProcessor.java | 10 --- .../sideeffect/SideEffectQueue.java | 55 --------------- .../ResultBuilderBackedSideEffectWriter.java | 26 +++++++ .../SideEffectWriter.java} | 8 ++- .../streamprocessor/writers/Writers.java | 8 ++- .../timer/TriggerTimerProcessor.java | 19 ++---- .../SkipFailingEventsTest.java | 4 +- .../StreamProcessorReplayModeTest.java | 2 +- .../StreamProcessorReprocessingTest.java | 8 +-- 39 files changed, 176 insertions(+), 379 deletions(-) delete mode 100644 engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/sideeffect/SideEffectQueue.java create mode 100644 engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/ResultBuilderBackedSideEffectWriter.java rename engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/{sideeffect/SideEffects.java => writers/SideEffectWriter.java} (69%) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/Engine.java b/engine/src/main/java/io/camunda/zeebe/engine/Engine.java index cc64676ecbcd..771974299a9b 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/Engine.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/Engine.java @@ -104,6 +104,7 @@ public void replay(final TypedRecord event) { @Override public ProcessingResult process( final TypedRecord record, final ProcessingResultBuilder processingResultBuilder) { + try (final var scope = new ProcessingResultBuilderScope(processingResultBuilder)) { TypedRecordProcessor currentProcessor = null; @@ -124,12 +125,7 @@ public ProcessingResult process( final boolean isNotOnBlacklist = !zeebeState.getBlackListState().isOnBlacklist(typedCommand); if (isNotOnBlacklist) { - currentProcessor.processRecord( - record, - (sep) -> { - processingResultBuilder.resetPostCommitTasks(); - processingResultBuilder.appendPostCommitTask(sep::flush); - }); + currentProcessor.processRecord(record); } } return processingResultBuilder.build(); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java index 47a84f73ee0a..82213dc881f1 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/EngineProcessors.java @@ -24,7 +24,6 @@ import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker; import io.camunda.zeebe.engine.state.KeyGenerator; @@ -67,7 +66,6 @@ public static TypedRecordProcessors createEngineProcessors( final var jobMetrics = new JobMetrics(partitionId); final var processEngineMetrics = new ProcessEngineMetrics(zeebeState.getPartitionId()); - final var sideEffectQueue = new SideEffectQueue(); final BpmnBehaviorsImpl bpmnBehaviors = createBehaviors( @@ -77,8 +75,7 @@ public static TypedRecordProcessors createEngineProcessors( partitionsCount, timerChecker, jobMetrics, - processEngineMetrics, - sideEffectQueue); + processEngineMetrics); addDeploymentRelatedProcessorAndServices( bpmnBehaviors, @@ -98,8 +95,7 @@ public static TypedRecordProcessors createEngineProcessors( typedRecordProcessors, subscriptionCommandSender, writers, - timerChecker, - sideEffectQueue); + timerChecker); JobEventProcessors.addJobProcessors( typedRecordProcessors, @@ -121,10 +117,8 @@ private static BpmnBehaviorsImpl createBehaviors( final int partitionsCount, final DueDateTimerChecker timerChecker, final JobMetrics jobMetrics, - final ProcessEngineMetrics processEngineMetrics, - final SideEffectQueue sideEffectQueue) { + final ProcessEngineMetrics processEngineMetrics) { return new BpmnBehaviorsImpl( - sideEffectQueue, zeebeState, writers, jobMetrics, @@ -140,16 +134,14 @@ private static TypedRecordProcessor addProcessProcessors( final TypedRecordProcessors typedRecordProcessors, final SubscriptionCommandSender subscriptionCommandSender, final Writers writers, - final DueDateTimerChecker timerChecker, - final SideEffectQueue sideEffectQueue) { + final DueDateTimerChecker timerChecker) { return ProcessEventProcessors.addProcessProcessors( zeebeState, bpmnBehaviors, typedRecordProcessors, subscriptionCommandSender, timerChecker, - writers, - sideEffectQueue); + writers); } private static void addDeploymentRelatedProcessorAndServices( diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/ProcessEventProcessors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/ProcessEventProcessors.java index 858abbc602b4..71e3a616581d 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/ProcessEventProcessors.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/ProcessEventProcessors.java @@ -21,7 +21,6 @@ import io.camunda.zeebe.engine.processing.processinstance.ProcessInstanceModificationProcessor; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.engine.processing.timer.CancelTimerProcessor; import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker; @@ -51,8 +50,7 @@ public static TypedRecordProcessor addProcessProcessors( final TypedRecordProcessors typedRecordProcessors, final SubscriptionCommandSender subscriptionCommandSender, final DueDateTimerChecker timerChecker, - final Writers writers, - final SideEffectQueue sideEffectQueue) { + final Writers writers) { final MutableProcessMessageSubscriptionState subscriptionState = zeebeState.getProcessMessageSubscriptionState(); final var keyGenerator = zeebeState.getKeyGenerator(); @@ -63,8 +61,7 @@ public static TypedRecordProcessor addProcessProcessors( writers, typedRecordProcessors, zeebeState.getElementInstanceState()); final var bpmnStreamProcessor = - new BpmnStreamProcessor( - bpmnBehaviors, zeebeState, writers, sideEffectQueue, processEngineMetrics); + new BpmnStreamProcessor(bpmnBehaviors, zeebeState, writers, processEngineMetrics); addBpmnStepProcessor(typedRecordProcessors, bpmnStreamProcessor); addMessageStreamProcessors( diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/BpmnStreamProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/BpmnStreamProcessor.java index 79c3d1987141..8ccd455389df 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/BpmnStreamProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/BpmnStreamProcessor.java @@ -15,8 +15,6 @@ import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateTransitionBehavior; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.engine.state.immutable.ProcessState; @@ -25,7 +23,6 @@ import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; import io.camunda.zeebe.protocol.record.value.BpmnElementType; -import java.util.function.Consumer; import org.slf4j.Logger; public final class BpmnStreamProcessor implements TypedRecordProcessor { @@ -34,7 +31,6 @@ public final class BpmnStreamProcessor implements TypedRecordProcessor getContainerProcessor( @@ -70,14 +64,9 @@ private BpmnElementContainerProcessor getContainerProcess } @Override - public void processRecord( - final TypedRecord record, - final Consumer sideEffect) { + public void processRecord(final TypedRecord record) { // initialize - sideEffectQueue.clear(); - sideEffect.accept(sideEffectQueue); - final var intent = (ProcessInstanceIntent) record.getIntent(); final var recordValue = record.getValue(); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java index 609f855df9cc..0d2ce3072632 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java @@ -17,7 +17,6 @@ import io.camunda.zeebe.engine.processing.common.EventTriggerBehavior; import io.camunda.zeebe.engine.processing.common.ExpressionProcessor; import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker; import io.camunda.zeebe.engine.processing.variable.VariableBehavior; @@ -45,7 +44,6 @@ public final class BpmnBehaviorsImpl implements BpmnBehaviors { private final ElementActivationBehavior elementActivationBehavior; public BpmnBehaviorsImpl( - final SideEffects sideEffects, final MutableZeebeState zeebeState, final Writers writers, final JobMetrics jobMetrics, @@ -69,6 +67,7 @@ public BpmnBehaviorsImpl( expressionBehavior, subscriptionCommandSender, writers.state(), + writers.sideEffect(), timerChecker, partitionsCount); @@ -94,8 +93,7 @@ public BpmnBehaviorsImpl( new BpmnVariableMappingBehavior(expressionBehavior, zeebeState, variableBehavior); eventSubscriptionBehavior = - new BpmnEventSubscriptionBehavior( - catchEventBehavior, eventTriggerBehavior, sideEffects, zeebeState); + new BpmnEventSubscriptionBehavior(catchEventBehavior, eventTriggerBehavior, zeebeState); incidentBehavior = new BpmnIncidentBehavior(zeebeState, zeebeState.getKeyGenerator(), writers.state()); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnEventSubscriptionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnEventSubscriptionBehavior.java index e985574dba3e..0244e680fe5e 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnEventSubscriptionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnEventSubscriptionBehavior.java @@ -13,7 +13,6 @@ import io.camunda.zeebe.engine.processing.common.Failure; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableCatchEventSupplier; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects; import io.camunda.zeebe.engine.state.immutable.EventScopeInstanceState; import io.camunda.zeebe.engine.state.immutable.ProcessState; import io.camunda.zeebe.engine.state.immutable.ZeebeState; @@ -27,19 +26,15 @@ public final class BpmnEventSubscriptionBehavior { private final EventScopeInstanceState eventScopeInstanceState; private final CatchEventBehavior catchEventBehavior; - private final SideEffects sideEffects; - private final ProcessState processState; private final EventTriggerBehavior eventTriggerBehavior; public BpmnEventSubscriptionBehavior( final CatchEventBehavior catchEventBehavior, final EventTriggerBehavior eventTriggerBehavior, - final SideEffects sideEffects, final ZeebeState zeebeState) { this.catchEventBehavior = catchEventBehavior; this.eventTriggerBehavior = eventTriggerBehavior; - this.sideEffects = sideEffects; processState = zeebeState.getProcessState(); eventScopeInstanceState = zeebeState.getEventScopeInstanceState(); @@ -50,11 +45,11 @@ public BpmnEventSubscriptionBehavior( */ public Either subscribeToEvents( final T element, final BpmnElementContext context) { - return catchEventBehavior.subscribeToEvents(context, element, sideEffects); + return catchEventBehavior.subscribeToEvents(context, element); } public void unsubscribeFromEvents(final BpmnElementContext context) { - catchEventBehavior.unsubscribeFromEvents(context.getElementInstanceKey(), sideEffects); + catchEventBehavior.unsubscribeFromEvents(context.getElementInstanceKey()); } /** diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/common/CatchEventBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/common/CatchEventBehavior.java index 61bcd8d7b378..d08446cee21f 100755 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/common/CatchEventBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/common/CatchEventBehavior.java @@ -16,7 +16,7 @@ import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMessage; import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker; import io.camunda.zeebe.engine.state.KeyGenerator; @@ -46,6 +46,7 @@ public final class CatchEventBehavior { private final SubscriptionCommandSender subscriptionCommandSender; private final int partitionsCount; private final StateWriter stateWriter; + private final SideEffectWriter sideEffectWriter; private final ProcessMessageSubscriptionState processMessageSubscriptionState; private final TimerInstanceState timerInstanceState; @@ -63,11 +64,13 @@ public CatchEventBehavior( final ExpressionProcessor expressionProcessor, final SubscriptionCommandSender subscriptionCommandSender, final StateWriter stateWriter, + final SideEffectWriter sideEffectWriter, final DueDateTimerChecker timerChecker, final int partitionsCount) { this.expressionProcessor = expressionProcessor; this.subscriptionCommandSender = subscriptionCommandSender; this.stateWriter = stateWriter; + this.sideEffectWriter = sideEffectWriter; this.partitionsCount = partitionsCount; timerInstanceState = zeebeState.getTimerState(); @@ -82,10 +85,9 @@ public CatchEventBehavior( * Unsubscribe from all events in the scope of the element instance. * * @param elementInstanceKey the element instance key to subscript from - * @param sideEffects the side effects for unsubscribe actions */ - public void unsubscribeFromEvents(final long elementInstanceKey, final SideEffects sideEffects) { - unsubscribeFromEvents(elementInstanceKey, sideEffects, elementId -> true); + public void unsubscribeFromEvents(final long elementInstanceKey) { + unsubscribeFromEvents(elementInstanceKey, elementId -> true); } /** @@ -93,14 +95,10 @@ public void unsubscribeFromEvents(final long elementInstanceKey, final SideEffec * event subscriptions in the scope. * * @param context the context to subscript from - * @param sideEffects the side effects for unsubscribe actions */ - public void unsubscribeEventSubprocesses( - final BpmnElementContext context, final SideEffects sideEffects) { + public void unsubscribeEventSubprocesses(final BpmnElementContext context) { unsubscribeFromEvents( - context.getElementInstanceKey(), - sideEffects, - elementId -> isEventSubprocess(context, elementId)); + context.getElementInstanceKey(), elementId -> isEventSubprocess(context, elementId)); } private boolean isEventSubprocess( @@ -119,25 +117,20 @@ private boolean isEventSubprocess( * Ignore other event subscriptions that don't match the filter. * * @param elementInstanceKey the element instance key to subscript from - * @param sideEffects the side effects for unsubscribe actions * @param elementIdFilter the filter for events to unsubscribe */ private void unsubscribeFromEvents( - final long elementInstanceKey, - final SideEffects sideEffects, - final Predicate elementIdFilter) { + final long elementInstanceKey, final Predicate elementIdFilter) { unsubscribeFromTimerEvents(elementInstanceKey, elementIdFilter); - unsubscribeFromMessageEvents(elementInstanceKey, sideEffects, elementIdFilter); + unsubscribeFromMessageEvents(elementInstanceKey, elementIdFilter); } /** * @return either a failure or nothing */ public Either subscribeToEvents( - final BpmnElementContext context, - final ExecutableCatchEventSupplier supplier, - final SideEffects sideEffects) { + final BpmnElementContext context, final ExecutableCatchEventSupplier supplier) { final var evaluationResults = supplier.getEvents().stream() .filter(event -> event.isTimer() || event.isMessage()) @@ -146,8 +139,8 @@ public Either subscribeToEvents( evaluationResults.ifRight( results -> { - subscribeToMessageEvents(context, sideEffects, results); - subscribeToTimerEvents(context, sideEffects, results); + subscribeToMessageEvents(context, results); + subscribeToTimerEvents(context, results); }); return evaluationResults.map(r -> null); @@ -216,16 +209,13 @@ private Either evaluateTimer(final OngoingEvaluation } private void subscribeToMessageEvents( - final BpmnElementContext context, - final SideEffects sideEffects, - final List results) { + final BpmnElementContext context, final List results) { results.stream() .filter(EvalResult::isMessage) - .forEach(result -> subscribeToMessageEvent(context, sideEffects, result)); + .forEach(result -> subscribeToMessageEvent(context, result)); } - private void subscribeToMessageEvent( - final BpmnElementContext context, final SideEffects sideEffects, final EvalResult result) { + private void subscribeToMessageEvent(final BpmnElementContext context, final EvalResult result) { final var event = result.event; final var correlationKey = result.correlationKey; final var messageName = result.messageName; @@ -250,7 +240,7 @@ private void subscribeToMessageEvent( stateWriter.appendFollowUpEvent( subscriptionKey, ProcessMessageSubscriptionIntent.CREATING, subscription); - sideEffects.add( + sideEffectWriter.appendSideEffect( () -> sendOpenMessageSubscription( subscriptionPartitionId, @@ -263,9 +253,7 @@ private void subscribeToMessageEvent( } private void subscribeToTimerEvents( - final BpmnElementContext context, - final SideEffects sideEffects, - final List results) { + final BpmnElementContext context, final List results) { results.stream() .filter(EvalResult::isTimer) .forEach( @@ -277,8 +265,7 @@ private void subscribeToTimerEvents( context.getProcessInstanceKey(), context.getProcessDefinitionKey(), event.getId(), - timer, - sideEffects); + timer); }); } @@ -287,8 +274,7 @@ public void subscribeToTimerEvent( final long processInstanceKey, final long processDefinitionKey, final DirectBuffer handlerNodeId, - final Timer timer, - final SideEffects sideEffects) { + final Timer timer) { final long dueDate = timer.getDueDate(ActorClock.currentTimeMillis()); timerRecord.reset(); timerRecord @@ -299,7 +285,7 @@ public void subscribeToTimerEvent( .setTargetElementId(handlerNodeId) .setProcessDefinitionKey(processDefinitionKey); - sideEffects.add( + sideEffectWriter.appendSideEffect( () -> { /* timerChecker implements onRecovered to recover from restart, so no need to schedule this in TimerCreatedApplier.*/ @@ -335,22 +321,19 @@ public void unsubscribeFromTimerEvent(final TimerInstance timer) { } private void unsubscribeFromMessageEvents( - final long elementInstanceKey, - final SideEffects sideEffects, - final Predicate elementIdFilter) { + final long elementInstanceKey, final Predicate elementIdFilter) { processMessageSubscriptionState.visitElementSubscriptions( elementInstanceKey, subscription -> { final var elementId = subscription.getRecord().getElementIdBuffer(); if (elementIdFilter.test(elementId)) { - unsubscribeFromMessageEvent(subscription, sideEffects); + unsubscribeFromMessageEvent(subscription); } return true; }); } - private void unsubscribeFromMessageEvent( - final ProcessMessageSubscription subscription, final SideEffects sideEffects) { + private void unsubscribeFromMessageEvent(final ProcessMessageSubscription subscription) { final DirectBuffer messageName = cloneBuffer(subscription.getRecord().getMessageNameBuffer()); final int subscriptionPartitionId = subscription.getRecord().getSubscriptionPartitionId(); @@ -359,7 +342,7 @@ private void unsubscribeFromMessageEvent( stateWriter.appendFollowUpEvent( subscription.getKey(), ProcessMessageSubscriptionIntent.DELETING, subscription.getRecord()); - sideEffects.add( + sideEffectWriter.appendSideEffect( () -> sendCloseMessageSubscriptionCommand( subscriptionPartitionId, processInstanceKey, elementInstanceKey, messageName)); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/common/ElementActivationBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/common/ElementActivationBehavior.java index 377d0c39a9ce..7b6c0f641e8c 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/common/ElementActivationBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/common/ElementActivationBehavior.java @@ -11,7 +11,6 @@ import io.camunda.zeebe.engine.processing.deployment.model.element.AbstractFlowElement; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableCatchEventSupplier; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; @@ -35,8 +34,6 @@ public final class ElementActivationBehavior { - private final SideEffectQueue sideEffectQueue = new SideEffectQueue(); - private final KeyGenerator keyGenerator; private final TypedCommandWriter commandWriter; private final StateWriter stateWriter; @@ -101,9 +98,6 @@ public ActivatedElementKeys activateElement( createVariablesCallback.accept(elementToActivate.getId(), elementInstanceKey); activatedElementKeys.setElementInstanceKey(elementInstanceKey); - // applying the side effects is part of creating the event subscriptions - sideEffectQueue.flush(); - return activatedElementKeys; } @@ -282,8 +276,7 @@ private void createEventSubscriptions( elementInstanceKey, elementRecord, ProcessInstanceIntent.ELEMENT_ACTIVATED); final Either subscribedOrFailure = - catchEventBehavior.subscribeToEvents( - bpmnElementContext, catchEventSupplier, sideEffectQueue); + catchEventBehavior.subscribeToEvents(bpmnElementContext, catchEventSupplier); if (subscribedOrFailure.isLeft()) { final var message = diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/common/EventTriggerBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/common/EventTriggerBehavior.java index 6a417e856cc9..564fecfa7b0b 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/common/EventTriggerBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/common/EventTriggerBehavior.java @@ -12,7 +12,6 @@ import io.camunda.zeebe.engine.processing.bpmn.ProcessInstanceLifecycle; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableStartEvent; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; @@ -59,14 +58,6 @@ public EventTriggerBehavior( new VariableBehavior(zeebeState.getVariableState(), writers.state(), keyGenerator); } - private void unsubscribeEventSubprocesses(final BpmnElementContext context) { - final var sideEffectQueue = new SideEffectQueue(); - catchEventBehavior.unsubscribeEventSubprocesses(context, sideEffectQueue); - - // side effect can immediately executed, since on restart we not reprocess anymore the commands - sideEffectQueue.flush(); - } - public void triggerEventSubProcess( final ExecutableStartEvent startEvent, final long flowScopeElementInstanceKey, @@ -98,7 +89,7 @@ public void triggerEventSubProcess( } if (startEvent.interrupting()) { - unsubscribeEventSubprocesses(flowScopeContext); + catchEventBehavior.unsubscribeEventSubprocesses(flowScopeContext); final var noActiveChildInstances = terminateChildInstances(flowScopeContext); if (!noActiveChildInstances) { diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java index 167cd7129674..d598f1d91947 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java @@ -21,9 +21,6 @@ import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableStartEvent; import io.camunda.zeebe.engine.processing.deployment.transform.DeploymentTransformer; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter; @@ -40,7 +37,6 @@ import io.camunda.zeebe.protocol.record.intent.DeploymentIntent; import io.camunda.zeebe.util.Either; import java.util.List; -import java.util.function.Consumer; import org.agrona.DirectBuffer; public final class DeploymentCreateProcessor implements TypedRecordProcessor { @@ -48,8 +44,6 @@ public final class DeploymentCreateProcessor implements TypedRecordProcessor command, final Consumer sideEffect) { - - sideEffect.accept(sideEffects); + public void processRecord(final TypedRecord command) { final DeploymentRecord deploymentEvent = command.getValue(); @@ -100,7 +91,7 @@ public void processRecord( final long key = keyGenerator.nextKey(); try { - createTimerIfTimerStartEvent(command, sideEffects); + createTimerIfTimerStartEvent(command); } catch (final RuntimeException e) { final String reason = String.format(COULD_NOT_CREATE_TIMER_MESSAGE, e.getMessage()); responseWriter.writeRejectionOnCommand(command, RejectionType.PROCESSING_ERROR, reason); @@ -112,7 +103,7 @@ public void processRecord( stateWriter.appendFollowUpEvent(key, DeploymentIntent.CREATED, deploymentEvent); - deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key, sideEffects); + deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key); messageStartEventSubscriptionManager.tryReOpenMessageStartEventSubscription( deploymentEvent, stateWriter); @@ -128,23 +119,20 @@ public void processRecord( } } - private void createTimerIfTimerStartEvent( - final TypedRecord record, final SideEffects sideEffects) { + private void createTimerIfTimerStartEvent(final TypedRecord record) { for (final ProcessMetadata processMetadata : record.getValue().processesMetadata()) { if (!processMetadata.isDuplicate()) { final List startEvents = processState.getProcessByKey(processMetadata.getKey()).getProcess().getStartEvents(); unsubscribeFromPreviousTimers(processMetadata); - subscribeToTimerStartEventIfExists(sideEffects, processMetadata, startEvents); + subscribeToTimerStartEventIfExists(processMetadata, startEvents); } } } private void subscribeToTimerStartEventIfExists( - final SideEffects sideEffects, - final ProcessMetadata processMetadata, - final List startEvents) { + final ProcessMetadata processMetadata, final List startEvents) { for (final ExecutableCatchEventElement startEvent : startEvents) { if (startEvent.isTimer()) { // There are no variables when there is no process instance yet, @@ -162,8 +150,7 @@ private void subscribeToTimerStartEventIfExists( NO_ELEMENT_INSTANCE, processMetadata.getKey(), startEvent.getId(), - timerOrError.get(), - sideEffects); + timerOrError.get()); } } } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java index a3ff8d7ff680..0cd15b14e281 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java @@ -7,7 +7,7 @@ */ package io.camunda.zeebe.engine.processing.deployment.distribute; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.protocol.Protocol; @@ -31,6 +31,7 @@ public final class DeploymentDistributionBehavior { private final DeploymentDistributionCommandSender deploymentDistributionCommandSender; private final StateWriter stateWriter; + private final SideEffectWriter sideEffectWriter; public DeploymentDistributionBehavior( final Writers writers, @@ -44,12 +45,10 @@ public DeploymentDistributionBehavior( this.deploymentDistributionCommandSender = deploymentDistributionCommandSender; stateWriter = writers.state(); + sideEffectWriter = writers.sideEffect(); } - public void distributeDeployment( - final DeploymentRecord deploymentEvent, - final long key, - final SideEffectQueue sideEffectQueue) { + public void distributeDeployment(final DeploymentRecord deploymentEvent, final long key) { final var copiedDeploymentBuffer = BufferUtil.createCopy(deploymentEvent); otherPartitions.forEach( @@ -58,7 +57,7 @@ public void distributeDeployment( stateWriter.appendFollowUpEvent( key, DeploymentDistributionIntent.DISTRIBUTING, deploymentDistributionRecord); - sideEffectQueue.add( + sideEffectWriter.appendSideEffect( () -> { distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer); return true; diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/ResolveIncidentProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/ResolveIncidentProcessor.java index b4fde9d891d9..044320f45a19 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/ResolveIncidentProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/incident/ResolveIncidentProcessor.java @@ -9,8 +9,6 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter; @@ -25,7 +23,6 @@ import io.camunda.zeebe.protocol.record.intent.IncidentIntent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; import io.camunda.zeebe.util.Either; -import java.util.function.Consumer; public final class ResolveIncidentProcessor implements TypedRecordProcessor { @@ -35,7 +32,6 @@ public final class ResolveIncidentProcessor implements TypedRecordProcessor bpmnStreamProcessor; private final StateWriter stateWriter; @@ -58,8 +54,7 @@ public ResolveIncidentProcessor( } @Override - public void processRecord( - final TypedRecord command, final Consumer sideEffect) { + public void processRecord(final TypedRecord command) { final long key = command.getKey(); final var incident = incidentState.getIncidentRecord(key); @@ -73,7 +68,7 @@ public void processRecord( responseWriter.writeEventOnCommand(key, IncidentIntent.RESOLVED, incident, command); // if it fails, a new incident is raised - attemptToContinueProcessProcessing(command, sideEffect, incident); + attemptToContinueProcessProcessing(command, incident); } private void rejectResolveCommand( @@ -86,9 +81,7 @@ private void rejectResolveCommand( } private void attemptToContinueProcessProcessing( - final TypedRecord command, - final Consumer sideEffect, - final IncidentRecord incident) { + final TypedRecord command, final IncidentRecord incident) { final long jobKey = incident.getJobKey(); final boolean isJobIncident = jobKey > 0; @@ -99,11 +92,7 @@ private void attemptToContinueProcessProcessing( getFailedCommand(incident) .ifRightOrLeft( failedCommand -> { - sideEffects.clear(); - - bpmnStreamProcessor.processRecord(failedCommand, sideEffects::add); - - sideEffect.accept(sideEffects); + bpmnStreamProcessor.processRecord(failedCommand); }, failure -> { final var message = diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/DefaultJobCommandPreconditionGuard.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/DefaultJobCommandPreconditionGuard.java index 45cd88348dec..e7f8dbea345b 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/DefaultJobCommandPreconditionGuard.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/DefaultJobCommandPreconditionGuard.java @@ -9,12 +9,10 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.processing.streamprocessor.CommandProcessor.CommandControl; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; import io.camunda.zeebe.engine.state.immutable.JobState; import io.camunda.zeebe.engine.state.immutable.JobState.State; import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord; import io.camunda.zeebe.protocol.record.RejectionType; -import java.util.function.Consumer; /** * Default implementation to process JobCommands to reduce duplication in CommandProcessor @@ -40,18 +38,11 @@ public DefaultJobCommandPreconditionGuard( public boolean onCommand( final TypedRecord command, final CommandControl commandControl) { - return onCommand(command, commandControl, sideEffectProducer -> {}); - } - - public boolean onCommand( - final TypedRecord command, - final CommandControl commandControl, - final Consumer sideEffect) { final long jobKey = command.getKey(); final State jobState = state.getState(jobKey); if (jobState == State.ACTIVATABLE || jobState == State.ACTIVATED) { - acceptCommand.accept(command, commandControl, sideEffect); + acceptCommand.accept(command, commandControl); } else if (jobState == State.NOT_FOUND) { final String message = String.format(NO_JOB_FOUND_MESSAGE, intent, jobKey); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobAcceptFunction.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobAcceptFunction.java index 9e6c045a7a56..3d96105a0e60 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobAcceptFunction.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobAcceptFunction.java @@ -9,15 +9,10 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.processing.streamprocessor.CommandProcessor.CommandControl; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord; -import java.util.function.Consumer; @FunctionalInterface public interface JobAcceptFunction { - void accept( - final TypedRecord record, - final CommandControl commandControl, - final Consumer sideEffect); + void accept(final TypedRecord record, final CommandControl commandControl); } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobCompleteProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobCompleteProcessor.java index 436a66b00101..d0500ae60350 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobCompleteProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobCompleteProcessor.java @@ -35,10 +35,7 @@ public JobCompleteProcessor( jobState = state.getJobState(); elementInstanceState = state.getElementInstanceState(); defaultProcessor = - new DefaultJobCommandPreconditionGuard( - "complete", - jobState, - (record, commandControl, sideEffect) -> acceptCommand(record, commandControl)); + new DefaultJobCommandPreconditionGuard("complete", jobState, this::acceptCommand); this.jobMetrics = jobMetrics; this.eventHandle = eventHandle; } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java index 2ba137a10c3b..0ade9f15a60b 100755 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobEventProcessors.java @@ -52,7 +52,7 @@ public static void addJobProcessors( ValueType.JOB, JobIntent.FAIL, new JobFailProcessor( - zeebeState, zeebeState.getKeyGenerator(), jobMetrics, jobBackoffChecker)) + zeebeState, writers, zeebeState.getKeyGenerator(), jobMetrics, jobBackoffChecker)) .onCommand( ValueType.JOB, JobIntent.THROW_ERROR, diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobFailProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobFailProcessor.java index b9ee0d71273a..fbac455b19d5 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobFailProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobFailProcessor.java @@ -12,9 +12,10 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.metrics.JobMetrics; import io.camunda.zeebe.engine.processing.streamprocessor.CommandProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.engine.state.KeyGenerator; import io.camunda.zeebe.engine.state.immutable.JobState; import io.camunda.zeebe.engine.state.immutable.ZeebeState; @@ -24,7 +25,6 @@ import io.camunda.zeebe.protocol.record.intent.Intent; import io.camunda.zeebe.protocol.record.intent.JobIntent; import io.camunda.zeebe.protocol.record.value.ErrorType; -import java.util.function.Consumer; import org.agrona.DirectBuffer; public final class JobFailProcessor implements CommandProcessor { @@ -37,9 +37,11 @@ public final class JobFailProcessor implements CommandProcessor { private final KeyGenerator keyGenerator; private final JobMetrics jobMetrics; private final JobBackoffChecker jobBackoffChecker; + private final SideEffectWriter sideEffectWriter; public JobFailProcessor( final ZeebeState state, + final Writers writers, final KeyGenerator keyGenerator, final JobMetrics jobMetrics, final JobBackoffChecker jobBackoffChecker) { @@ -49,14 +51,13 @@ public JobFailProcessor( defaultProcessor = new DefaultJobCommandPreconditionGuard("fail", jobState, this::acceptCommand); this.jobMetrics = jobMetrics; + sideEffectWriter = writers.sideEffect(); } @Override public boolean onCommand( - final TypedRecord command, - final CommandControl commandControl, - final Consumer sideEffect) { - return defaultProcessor.onCommand(command, commandControl, sideEffect); + final TypedRecord command, final CommandControl commandControl) { + return defaultProcessor.onCommand(command, commandControl); } @Override @@ -91,9 +92,7 @@ public void afterAccept( } private void acceptCommand( - final TypedRecord command, - final CommandControl commandControl, - final Consumer sideEffect) { + final TypedRecord command, final CommandControl commandControl) { final long key = command.getKey(); final JobRecord failedJob = jobState.getJob(key); final var retries = command.getValue().getRetries(); @@ -104,7 +103,7 @@ private void acceptCommand( if (retries > 0 && retryBackOff > 0) { final long receivedTime = command.getTimestamp(); failedJob.setRecurringTime(receivedTime + retryBackOff); - sideEffect.accept( + sideEffectWriter.appendSideEffect( () -> { jobBackoffChecker.scheduleBackOff(retryBackOff + receivedTime); return true; diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobThrowErrorProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobThrowErrorProcessor.java index e75b7a7c42a0..e8308a447084 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobThrowErrorProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/job/JobThrowErrorProcessor.java @@ -64,10 +64,7 @@ public JobThrowErrorProcessor( eventScopeInstanceState = state.getEventScopeInstanceState(); defaultProcessor = - new DefaultJobCommandPreconditionGuard( - "throw an error for", - jobState, - (record, commandControl, sideEffect) -> acceptCommand(record, commandControl)); + new DefaultJobCommandPreconditionGuard("throw an error for", jobState, this::acceptCommand); stateAnalyzer = new CatchEventAnalyzer(state.getProcessState(), elementInstanceState); this.eventPublicationBehavior = eventPublicationBehavior; diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageCorrelator.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageCorrelator.java index 2aac5e236042..03f7df38432d 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageCorrelator.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageCorrelator.java @@ -8,14 +8,13 @@ package io.camunda.zeebe.engine.processing.message; import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.state.immutable.MessageState; import io.camunda.zeebe.engine.state.message.StoredMessage; import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord; import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent; import io.camunda.zeebe.scheduler.clock.ActorClock; -import java.util.function.Consumer; import org.agrona.collections.MutableBoolean; public final class MessageCorrelator { @@ -23,23 +22,21 @@ public final class MessageCorrelator { private final MessageState messageState; private final SubscriptionCommandSender commandSender; private final StateWriter stateWriter; - - private Consumer sideEffect; + private SideEffectWriter sideEffectWriter; public MessageCorrelator( final MessageState messageState, final SubscriptionCommandSender commandSender, - final StateWriter stateWriter) { + final StateWriter stateWriter, + final SideEffectWriter sideEffectWriter) { this.messageState = messageState; this.commandSender = commandSender; this.stateWriter = stateWriter; + this.sideEffectWriter = sideEffectWriter; } public boolean correlateNextMessage( - final long subscriptionKey, - final MessageSubscriptionRecord subscriptionRecord, - final Consumer sideEffect) { - this.sideEffect = sideEffect; + final long subscriptionKey, final MessageSubscriptionRecord subscriptionRecord) { final var isMessageCorrelated = new MutableBoolean(false); @@ -75,7 +72,7 @@ private boolean correlateMessage( stateWriter.appendFollowUpEvent( subscriptionKey, MessageSubscriptionIntent.CORRELATING, subscriptionRecord); - sideEffect.accept(() -> sendCorrelateCommand(subscriptionRecord)); + sideEffectWriter.appendSideEffect(() -> sendCorrelateCommand(subscriptionRecord)); } return correlateMessage; diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessagePublishProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessagePublishProcessor.java index a2d7b11f1133..7720fc6036e0 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessagePublishProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessagePublishProcessor.java @@ -15,7 +15,7 @@ import io.camunda.zeebe.engine.processing.common.EventTriggerBehavior; import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter; @@ -30,7 +30,6 @@ import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.intent.MessageIntent; import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent; -import java.util.function.Consumer; public final class MessagePublishProcessor implements TypedRecordProcessor { @@ -51,6 +50,7 @@ public final class MessagePublishProcessor implements TypedRecordProcessor command, final Consumer sideEffect) { + public void processRecord(final TypedRecord command) { messageRecord = command.getValue(); correlatingSubscriptions.clear(); @@ -101,12 +101,11 @@ public void processRecord( responseWriter.writeRejectionOnCommand( command, RejectionType.ALREADY_EXISTS, rejectionReason); } else { - handleNewMessage(command, sideEffect); + handleNewMessage(command); } } - private void handleNewMessage( - final TypedRecord command, final Consumer sideEffect) { + private void handleNewMessage(final TypedRecord command) { messageKey = keyGenerator.nextKey(); // calculate the deadline based on the command's timestamp @@ -119,7 +118,7 @@ private void handleNewMessage( correlateToSubscriptions(messageKey, messageRecord); correlateToMessageStartEvents(messageRecord); - sideEffect.accept(this::sendCorrelateCommand); + sideEffectWriter.appendSideEffect(this::sendCorrelateCommand); if (messageRecord.getTimeToLive() <= 0L) { // avoid that the message can be correlated again by writing the EXPIRED event as a follow-up diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionCorrelateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionCorrelateProcessor.java index 625a5cbf17f8..eaf321417715 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionCorrelateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionCorrelateProcessor.java @@ -10,7 +10,6 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; @@ -21,7 +20,6 @@ import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent; import io.camunda.zeebe.util.buffer.BufferUtil; -import java.util.function.Consumer; public final class MessageSubscriptionCorrelateProcessor implements TypedRecordProcessor { @@ -43,13 +41,12 @@ public MessageSubscriptionCorrelateProcessor( this.subscriptionState = subscriptionState; stateWriter = writers.state(); rejectionWriter = writers.rejection(); - messageCorrelator = new MessageCorrelator(messageState, commandSender, stateWriter); + messageCorrelator = + new MessageCorrelator(messageState, commandSender, stateWriter, writers.sideEffect()); } @Override - public void processRecord( - final TypedRecord record, - final Consumer sideEffect) { + public void processRecord(final TypedRecord record) { final MessageSubscriptionRecord command = record.getValue(); final MessageSubscription subscription = @@ -65,8 +62,7 @@ public void processRecord( subscription.getKey(), MessageSubscriptionIntent.CORRELATED, messageSubscription); if (!messageSubscription.isInterrupting()) { - messageCorrelator.correlateNextMessage( - subscription.getKey(), messageSubscription, sideEffect); + messageCorrelator.correlateNextMessage(subscription.getKey(), messageSubscription); } } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionCreateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionCreateProcessor.java index 21e007fc62b8..adfaa737e4c7 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionCreateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionCreateProcessor.java @@ -10,7 +10,7 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; @@ -21,7 +21,6 @@ import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent; import io.camunda.zeebe.util.buffer.BufferUtil; -import java.util.function.Consumer; public final class MessageSubscriptionCreateProcessor implements TypedRecordProcessor { @@ -38,6 +37,7 @@ public final class MessageSubscriptionCreateProcessor private MessageSubscriptionRecord subscriptionRecord; private final TypedRejectionWriter rejectionWriter; + private final SideEffectWriter sideEffectWriter; public MessageSubscriptionCreateProcessor( final MessageState messageState, @@ -49,19 +49,19 @@ public MessageSubscriptionCreateProcessor( this.commandSender = commandSender; stateWriter = writers.state(); rejectionWriter = writers.rejection(); + sideEffectWriter = writers.sideEffect(); this.keyGenerator = keyGenerator; - messageCorrelator = new MessageCorrelator(messageState, commandSender, stateWriter); + messageCorrelator = + new MessageCorrelator(messageState, commandSender, stateWriter, sideEffectWriter); } @Override - public void processRecord( - final TypedRecord record, - final Consumer sideEffect) { + public void processRecord(final TypedRecord record) { subscriptionRecord = record.getValue(); if (subscriptionState.existSubscriptionForElementInstance( subscriptionRecord.getElementInstanceKey(), subscriptionRecord.getMessageNameBuffer())) { - sideEffect.accept(this::sendAcknowledgeCommand); + sideEffectWriter.appendSideEffect(this::sendAcknowledgeCommand); rejectionWriter.appendRejection( record, @@ -73,20 +73,20 @@ public void processRecord( return; } - handleNewSubscription(sideEffect); + handleNewSubscription(sideEffectWriter); } - private void handleNewSubscription(final Consumer sideEffect) { + private void handleNewSubscription(final SideEffectWriter sideEffectWriter) { final var subscriptionKey = keyGenerator.nextKey(); stateWriter.appendFollowUpEvent( subscriptionKey, MessageSubscriptionIntent.CREATED, subscriptionRecord); final var isMessageCorrelated = - messageCorrelator.correlateNextMessage(subscriptionKey, subscriptionRecord, sideEffect); + messageCorrelator.correlateNextMessage(subscriptionKey, subscriptionRecord); if (!isMessageCorrelated) { - sideEffect.accept(this::sendAcknowledgeCommand); + sideEffectWriter.appendSideEffect(this::sendAcknowledgeCommand); } } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionDeleteProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionDeleteProcessor.java index 126ab0379cfb..ae1ccadfe606 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionDeleteProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionDeleteProcessor.java @@ -10,7 +10,7 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; @@ -19,7 +19,6 @@ import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent; import io.camunda.zeebe.util.buffer.BufferUtil; -import java.util.function.Consumer; public final class MessageSubscriptionDeleteProcessor implements TypedRecordProcessor { @@ -32,6 +31,7 @@ public final class MessageSubscriptionDeleteProcessor private final SubscriptionCommandSender commandSender; private final StateWriter stateWriter; private final TypedRejectionWriter rejectionWriter; + private final SideEffectWriter sideEffectWriter; private MessageSubscriptionRecord subscriptionRecord; @@ -43,12 +43,11 @@ public MessageSubscriptionDeleteProcessor( this.commandSender = commandSender; stateWriter = writers.state(); rejectionWriter = writers.rejection(); + sideEffectWriter = writers.sideEffect(); } @Override - public void processRecord( - final TypedRecord record, - final Consumer sideEffect) { + public void processRecord(final TypedRecord record) { subscriptionRecord = record.getValue(); final var messageSubscription = @@ -65,7 +64,7 @@ public void processRecord( rejectCommand(record); } - sideEffect.accept(this::sendAcknowledgeCommand); + sideEffectWriter.appendSideEffect(this::sendAcknowledgeCommand); } private void rejectCommand(final TypedRecord record) { diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionRejectProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionRejectProcessor.java index 916d3a02b5c8..7f30f7342e39 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionRejectProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/MessageSubscriptionRejectProcessor.java @@ -10,7 +10,7 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.SideEffectWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; @@ -20,7 +20,6 @@ import io.camunda.zeebe.protocol.impl.record.value.message.MessageSubscriptionRecord; import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent; -import java.util.function.Consumer; public final class MessageSubscriptionRejectProcessor implements TypedRecordProcessor { @@ -30,6 +29,7 @@ public final class MessageSubscriptionRejectProcessor private final SubscriptionCommandSender commandSender; private final StateWriter stateWriter; private final TypedRejectionWriter rejectionWriter; + private final SideEffectWriter sideEffectWriter; public MessageSubscriptionRejectProcessor( final MessageState messageState, @@ -41,12 +41,11 @@ public MessageSubscriptionRejectProcessor( this.commandSender = commandSender; stateWriter = writers.state(); rejectionWriter = writers.rejection(); + sideEffectWriter = writers.sideEffect(); } @Override - public void processRecord( - final TypedRecord record, - final Consumer sideEffect) { + public void processRecord(final TypedRecord record) { final MessageSubscriptionRecord subscriptionRecord = record.getValue(); @@ -60,12 +59,10 @@ public void processRecord( stateWriter.appendFollowUpEvent( record.getKey(), MessageSubscriptionIntent.REJECTED, subscriptionRecord); - findSubscriptionToCorrelate(sideEffect, subscriptionRecord); + findSubscriptionToCorrelate(subscriptionRecord); } - private void findSubscriptionToCorrelate( - final Consumer sideEffect, - final MessageSubscriptionRecord subscriptionRecord) { + private void findSubscriptionToCorrelate(final MessageSubscriptionRecord subscriptionRecord) { final var messageKey = subscriptionRecord.getMessageKey(); @@ -97,7 +94,7 @@ private void findSubscriptionToCorrelate( MessageSubscriptionIntent.CORRELATING, correlatingSubscription); - sideEffect.accept(() -> sendCorrelateCommand(correlatingSubscription)); + sideEffectWriter.appendSideEffect(() -> sendCorrelateCommand(correlatingSubscription)); } return !canBeCorrelated; }); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/ProcessMessageSubscriptionCorrelateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/ProcessMessageSubscriptionCorrelateProcessor.java index 19f14193be1f..474aef1ff621 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/message/ProcessMessageSubscriptionCorrelateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/message/ProcessMessageSubscriptionCorrelateProcessor.java @@ -48,6 +48,7 @@ public final class ProcessMessageSubscriptionCorrelateProcessor private final ElementInstanceState elementInstanceState; private final StateWriter stateWriter; private final TypedRejectionWriter rejectionWriter; + private final EventHandle eventHandle; public ProcessMessageSubscriptionCorrelateProcessor( @@ -62,7 +63,6 @@ public ProcessMessageSubscriptionCorrelateProcessor( elementInstanceState = zeebeState.getElementInstanceState(); stateWriter = writers.state(); rejectionWriter = writers.rejection(); - eventHandle = new EventHandle( zeebeState.getKeyGenerator(), diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/CreateProcessInstanceProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/CreateProcessInstanceProcessor.java index 7911a8596ac5..0612485392bb 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/CreateProcessInstanceProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/CreateProcessInstanceProcessor.java @@ -21,7 +21,6 @@ import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableSequenceFlow; import io.camunda.zeebe.engine.processing.streamprocessor.CommandProcessor; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor.ProcessingError; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter; @@ -69,8 +68,6 @@ public final class CreateProcessInstanceProcessor private final ProcessInstanceRecord newProcessInstance = new ProcessInstanceRecord(); - private final SideEffectQueue sideEffectQueue = new SideEffectQueue(); - private final ProcessState processState; private final VariableBehavior variableBehavior; @@ -78,6 +75,7 @@ public final class CreateProcessInstanceProcessor private final TypedCommandWriter commandWriter; private final TypedRejectionWriter rejectionWriter; private final TypedResponseWriter responseWriter; + private final ProcessEngineMetrics metrics; private final ElementActivationBehavior elementActivationBehavior; @@ -102,8 +100,6 @@ public CreateProcessInstanceProcessor( public boolean onCommand( final TypedRecord command, final CommandControl controller) { - // cleanup side effects from previous command - sideEffectQueue.clear(); final ProcessInstanceCreationRecord record = command.getValue(); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/CreateProcessInstanceWithResultProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/CreateProcessInstanceWithResultProcessor.java index e6c1687f9ce4..e21e893de6d6 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/CreateProcessInstanceWithResultProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/CreateProcessInstanceWithResultProcessor.java @@ -9,7 +9,6 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.processing.streamprocessor.CommandProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter; import io.camunda.zeebe.engine.state.instance.AwaitProcessInstanceResultMetadata; @@ -19,7 +18,6 @@ import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord; import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.intent.Intent; -import java.util.function.Consumer; public final class CreateProcessInstanceWithResultProcessor implements CommandProcessor { @@ -44,10 +42,9 @@ public CreateProcessInstanceWithResultProcessor( @Override public boolean onCommand( final TypedRecord command, - final CommandControl controller, - final Consumer sideEffect) { + final CommandControl controller) { wrappedController.setCommand(command).setController(controller); - createProcessor.onCommand(command, wrappedController, sideEffect); + createProcessor.onCommand(command, wrappedController); return shouldRespond; } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/ProcessInstanceModificationProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/ProcessInstanceModificationProcessor.java index 9e81cc0399fb..44a6d88c226c 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/ProcessInstanceModificationProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/ProcessInstanceModificationProcessor.java @@ -23,9 +23,6 @@ import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableCatchEventElement; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter; @@ -51,7 +48,6 @@ import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.function.Consumer; import java.util.stream.Collectors; import org.agrona.DirectBuffer; import org.agrona.Strings; @@ -137,9 +133,7 @@ public ProcessInstanceModificationProcessor( } @Override - public void processRecord( - final TypedRecord command, - final Consumer sideEffect) { + public void processRecord(final TypedRecord command) { final long commandKey = command.getKey(); final var value = command.getValue(); @@ -198,9 +192,6 @@ public void processRecord( }) .collect(Collectors.toSet()); - final var sideEffectQueue = new SideEffectQueue(); - sideEffect.accept(sideEffectQueue); - value .getTerminateInstructions() .forEach( @@ -216,8 +207,8 @@ public void processRecord( } final var flowScopeKey = elementInstance.getValue().getFlowScopeKey(); - terminateElement(elementInstance, sideEffectQueue); - terminateFlowScopes(flowScopeKey, sideEffectQueue, requiredKeysForActivation); + terminateElement(elementInstance); + terminateFlowScopes(flowScopeKey, requiredKeysForActivation); }); stateWriter.appendFollowUpEvent( @@ -547,8 +538,7 @@ public void executeVariableInstruction( variableDocument)); } - private void terminateElement( - final ElementInstance elementInstance, final SideEffects sideEffects) { + private void terminateElement(final ElementInstance elementInstance) { final var elementInstanceKey = elementInstance.getKey(); final var elementInstanceRecord = elementInstance.getValue(); final BpmnElementType elementType = elementInstance.getValue().getBpmnElementType(); @@ -559,7 +549,7 @@ private void terminateElement( jobBehavior.cancelJob(elementInstance); incidentBehavior.resolveIncidents(elementInstanceKey); - catchEventBehavior.unsubscribeFromEvents(elementInstanceKey, sideEffects); + catchEventBehavior.unsubscribeFromEvents(elementInstanceKey); // terminate all child instances if the element is an event subprocess if (elementType == BpmnElementType.EVENT_SUB_PROCESS @@ -568,12 +558,12 @@ private void terminateElement( || elementType == BpmnElementType.MULTI_INSTANCE_BODY) { elementInstanceState.getChildren(elementInstanceKey).stream() .filter(ElementInstance::canTerminate) - .forEach(childInstance -> terminateElement(childInstance, sideEffects)); + .forEach(this::terminateElement); } else if (elementType == BpmnElementType.CALL_ACTIVITY) { final var calledActivityElementInstance = elementInstanceState.getInstance(elementInstance.getCalledChildInstanceKey()); if (calledActivityElementInstance != null && calledActivityElementInstance.canTerminate()) { - terminateElement(calledActivityElementInstance, sideEffects); + terminateElement(calledActivityElementInstance); } } @@ -582,9 +572,7 @@ private void terminateElement( } private void terminateFlowScopes( - final long elementInstanceKey, - final SideEffects sideEffects, - final Set requiredKeysForActivation) { + final long elementInstanceKey, final Set requiredKeysForActivation) { var currentElementInstance = elementInstanceState.getInstance(elementInstanceKey); while (canTerminateElementInstance(currentElementInstance, requiredKeysForActivation)) { @@ -601,7 +589,7 @@ private void terminateFlowScopes( final var flowScopeKey = currentElementInstance.getValue().getFlowScopeKey(); - terminateElement(currentElementInstance, sideEffects); + terminateElement(currentElementInstance); currentElementInstance = elementInstanceState.getInstance(flowScopeKey); } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/CommandProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/CommandProcessor.java index 16d49b4c51b0..39aab4ada9a0 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/CommandProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/CommandProcessor.java @@ -9,13 +9,11 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor.ProcessingError; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter; import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue; import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.intent.Intent; -import java.util.function.Consumer; /** * High-level record processor abstraction that implements the common behavior of most @@ -27,13 +25,6 @@ default boolean onCommand(final TypedRecord command, final CommandControl return true; } - default boolean onCommand( - final TypedRecord command, - final CommandControl commandControl, - final Consumer sideEffect) { - return onCommand(command, commandControl); - } - // TODO (#8003): clean up after refactoring; this is just a simple hook to be able to append // additional commands/events default void afterAccept( diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/CommandProcessorImpl.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/CommandProcessorImpl.java index ae0fe8d16017..35c628aaa8b3 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/CommandProcessorImpl.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/CommandProcessorImpl.java @@ -9,8 +9,6 @@ import io.camunda.zeebe.engine.api.TypedRecord; import io.camunda.zeebe.engine.processing.streamprocessor.CommandProcessor.CommandControl; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; @@ -20,7 +18,6 @@ import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue; import io.camunda.zeebe.protocol.record.RejectionType; import io.camunda.zeebe.protocol.record.intent.Intent; -import java.util.function.Consumer; /** * Decorates a command processor with simple accept and reject logic. @@ -36,8 +33,6 @@ public final class CommandProcessorImpl implements TypedRecordProcessor, CommandControl { - private final SideEffectQueue sideEffectQueue = new SideEffectQueue(); - private final CommandProcessor wrappedProcessor; private final KeyGenerator keyGenerator; @@ -68,15 +63,11 @@ public CommandProcessorImpl( } @Override - public void processRecord( - final TypedRecord command, final Consumer sideEffect) { + public void processRecord(final TypedRecord command) { entityKey = command.getKey(); - sideEffect.accept(sideEffectQueue); - sideEffectQueue.clear(); - - final boolean shouldRespond = wrappedProcessor.onCommand(command, this, sideEffectQueue::add); + final boolean shouldRespond = wrappedProcessor.onCommand(command, this); final boolean respond = shouldRespond && command.hasRequestMetadata(); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecordProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecordProcessor.java index 9a43528e15d8..389424b82c05 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecordProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/TypedRecordProcessor.java @@ -8,22 +8,12 @@ package io.camunda.zeebe.engine.processing.streamprocessor; import io.camunda.zeebe.engine.api.TypedRecord; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue; -import java.util.function.Consumer; public interface TypedRecordProcessor { default void processRecord(final TypedRecord record) {} - /** - * @see #processRecord(TypedRecord, Consumer) - */ - default void processRecord( - final TypedRecord record, final Consumer sideEffect) { - processRecord(record); - } - /** * Try to handle an error that occurred during processing. * diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/sideeffect/SideEffectQueue.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/sideeffect/SideEffectQueue.java deleted file mode 100644 index 21c3443fe335..000000000000 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/sideeffect/SideEffectQueue.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under - * one or more contributor license agreements. See the NOTICE file distributed - * with this work for additional information regarding copyright ownership. - * Licensed under the Zeebe Community License 1.1. You may not use this file - * except in compliance with the Zeebe Community License 1.1. - */ -package io.camunda.zeebe.engine.processing.streamprocessor.sideeffect; - -import java.util.ArrayList; -import java.util.List; - -public final class SideEffectQueue implements SideEffectProducer, SideEffects { - private final List sideEffects = new ArrayList<>(); - - public void clear() { - sideEffects.clear(); - } - - @Override - public boolean flush() { - if (sideEffects.isEmpty()) { - return true; - } - - boolean flushed = true; - - // iterates once over everything, setting the side effect to null to avoid reprocessing if we - // couldn't flush and this is retried. considered lesser evil than removing from the list and - // having to shuffle elements around in the list. - for (int i = 0; i < sideEffects.size(); i++) { - final SideEffectProducer sideEffect = sideEffects.get(i); - - if (sideEffect != null) { - if (sideEffect.flush()) { - sideEffects.set(i, null); - } else { - flushed = false; - } - } - } - - // reset list size to 0 if everything was flushed - if (flushed) { - sideEffects.clear(); - } - - return flushed; - } - - @Override - public void add(final SideEffectProducer sideEffectProducer) { - sideEffects.add(sideEffectProducer); - } -} diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/ResultBuilderBackedSideEffectWriter.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/ResultBuilderBackedSideEffectWriter.java new file mode 100644 index 000000000000..8920d406ceae --- /dev/null +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/ResultBuilderBackedSideEffectWriter.java @@ -0,0 +1,26 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.engine.processing.streamprocessor.writers; + +import io.camunda.zeebe.engine.api.ProcessingResultBuilder; +import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; +import java.util.function.Supplier; + +public class ResultBuilderBackedSideEffectWriter implements SideEffectWriter { + private final Supplier resultBuilderProvider; + + public ResultBuilderBackedSideEffectWriter( + final Supplier resultBuilderProvider) { + this.resultBuilderProvider = resultBuilderProvider; + } + + @Override + public void appendSideEffect(final SideEffectProducer sideEffect) { + resultBuilderProvider.get().appendPostCommitTask(sideEffect::flush); + } +} diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/sideeffect/SideEffects.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/SideEffectWriter.java similarity index 69% rename from engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/sideeffect/SideEffects.java rename to engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/SideEffectWriter.java index 2489bf30f9f2..286eba5201d0 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/sideeffect/SideEffects.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/SideEffectWriter.java @@ -5,11 +5,13 @@ * Licensed under the Zeebe Community License 1.1. You may not use this file * except in compliance with the Zeebe Community License 1.1. */ -package io.camunda.zeebe.engine.processing.streamprocessor.sideeffect; +package io.camunda.zeebe.engine.processing.streamprocessor.writers; + +import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; /** A chain of side effects that are executed/flushed together at the end of the processing. */ -public interface SideEffects { +public interface SideEffectWriter { /** Chain the given side effect. It will be executed/flushed at the end of the processing. */ - void add(SideEffectProducer sideEffect); + void appendSideEffect(SideEffectProducer sideEffect); } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/Writers.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/Writers.java index 2a6e0b1c84bd..e664dc53e81c 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/Writers.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/streamprocessor/writers/Writers.java @@ -19,19 +19,19 @@ public final class Writers { private final StateWriter stateWriter; private final TypedResponseWriter responseWriter; - private final Supplier resultBuilderSupplier; + private final SideEffectWriter sideEffectWriter; public Writers( final Supplier resultBuilderSupplier, final EventApplier eventApplier) { - this.resultBuilderSupplier = resultBuilderSupplier; commandWriter = new ResultBuilderBackedTypedCommandWriter(resultBuilderSupplier); rejectionWriter = new ResultBuilderBackedRejectionWriter(resultBuilderSupplier); stateWriter = new ResultBuilderBackedEventApplyingStateWriter(resultBuilderSupplier, eventApplier); responseWriter = new ResultBuilderBackedTypedResponseWriter(resultBuilderSupplier); + sideEffectWriter = new ResultBuilderBackedSideEffectWriter(resultBuilderSupplier); } /** @@ -48,6 +48,10 @@ public TypedRejectionWriter rejection() { return rejectionWriter; } + public SideEffectWriter sideEffect() { + return sideEffectWriter; + } + /** * @return the writer of events that also changes state for each event it writes */ diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/timer/TriggerTimerProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/timer/TriggerTimerProcessor.java index e0be51c6948a..dc5a954a64ca 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/timer/TriggerTimerProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/timer/TriggerTimerProcessor.java @@ -15,13 +15,11 @@ import io.camunda.zeebe.engine.processing.common.Failure; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableCatchEvent; import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; -import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.engine.state.KeyGenerator; import io.camunda.zeebe.engine.state.immutable.ElementInstanceState; -import io.camunda.zeebe.engine.state.immutable.EventScopeInstanceState; import io.camunda.zeebe.engine.state.immutable.ProcessState; import io.camunda.zeebe.engine.state.mutable.MutableTimerInstanceState; import io.camunda.zeebe.engine.state.mutable.MutableZeebeState; @@ -34,7 +32,6 @@ import io.camunda.zeebe.util.Either; import io.camunda.zeebe.util.buffer.BufferUtil; import java.time.Instant; -import java.util.function.Consumer; import org.agrona.DirectBuffer; import org.agrona.concurrent.UnsafeBuffer; @@ -52,9 +49,9 @@ public final class TriggerTimerProcessor implements TypedRecordProcessor record, final Consumer sideEffects) { + public void processRecord(final TypedRecord record) { final var timer = record.getValue(); final var elementInstanceKey = timer.getElementInstanceKey(); final var processDefinitionKey = timer.getProcessDefinitionKey(); @@ -115,7 +110,7 @@ public void processRecord( } if (shouldReschedule(timer)) { - rescheduleTimer(timer, catchEvent, sideEffects); + rescheduleTimer(timer, catchEvent); } } @@ -134,10 +129,7 @@ private boolean shouldReschedule(final TimerRecord timer) { return timer.getRepetitions() == RepeatingInterval.INFINITE || timer.getRepetitions() > 1; } - private void rescheduleTimer( - final TimerRecord record, - final ExecutableCatchEvent event, - final Consumer sideEffects) { + private void rescheduleTimer(final TimerRecord record, final ExecutableCatchEvent event) { final Either timer = event.getTimerFactory().apply(expressionProcessor, record.getElementInstanceKey()); if (timer.isLeft()) { @@ -155,8 +147,7 @@ private void rescheduleTimer( record.getProcessInstanceKey(), record.getProcessDefinitionKey(), event.getId(), - refreshedTimer, - sideEffects::accept); + refreshedTimer); } private Timer refreshTimer(final Timer timer, final TimerRecord record) { diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/SkipFailingEventsTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/SkipFailingEventsTest.java index 181ca19a289c..bba992cbc52c 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/SkipFailingEventsTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/SkipFailingEventsTest.java @@ -250,7 +250,7 @@ public void shouldBlacklistInstance() { new MockTypedRecord<>(0, metadata, PROCESS_INSTANCE_RECORD); Assertions.assertThat(zeebeState.getBlackListState().isOnBlacklist(mockTypedRecord)).isTrue(); - verify(dumpProcessorRef.get(), times(1)).processRecord(any(), any()); + verify(dumpProcessorRef.get(), times(1)).processRecord(any()); assertThat(dumpProcessorRef.get().processedInstances).containsExactly(2L); } @@ -390,7 +390,7 @@ public void processRecord(final TypedRecord record) { new MockTypedRecord<>(0, metadata, PROCESS_INSTANCE_RECORD); Assertions.assertThat(zeebeState.getBlackListState().isOnBlacklist(mockTypedRecord)).isFalse(); - verify(dumpProcessorRef.get(), timeout(1000).times(2)).processRecord(any(), any()); + verify(dumpProcessorRef.get(), timeout(1000).times(2)).processRecord(any()); assertThat(processedInstances).containsExactly(1L, 2L); } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReplayModeTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReplayModeTest.java index c2ab83c70be2..accce8d8f003 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReplayModeTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReplayModeTest.java @@ -82,7 +82,7 @@ public void shouldReplayUntilEnd() { // then final InOrder inOrder = inOrder(typedRecordProcessor, eventApplier); inOrder.verify(eventApplier, TIMEOUT).applyState(anyLong(), eq(ELEMENT_ACTIVATING), any()); - inOrder.verify(typedRecordProcessor, TIMEOUT).processRecord(any(), any()); + inOrder.verify(typedRecordProcessor, TIMEOUT).processRecord(any()); inOrder.verifyNoMoreInteractions(); } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReprocessingTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReprocessingTest.java index 09b5b0456e53..4d2494833235 100755 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReprocessingTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReprocessingTest.java @@ -104,9 +104,9 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) { streamProcessorRule.writeCommand( ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(0xcafe)); - verify(typedRecordProcessor, TIMEOUT.times(0)).processRecord(any(), any()); - verify(typedRecordProcessor, TIMEOUT.times(0)).processRecord(any(), any()); - verify(typedRecordProcessor, TIMEOUT.times(0)).processRecord(any(), any()); + verify(typedRecordProcessor, TIMEOUT.times(0)).processRecord(any()); + verify(typedRecordProcessor, TIMEOUT.times(0)).processRecord(any()); + verify(typedRecordProcessor, TIMEOUT.times(0)).processRecord(any()); } @Test @@ -156,7 +156,7 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) { streamProcessorRule.writeCommand( ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(0xcafe)); - verify(typedRecordProcessor, TIMEOUT.times(1)).processRecord(any(), any()); + verify(typedRecordProcessor, TIMEOUT.times(1)).processRecord(any()); } @Test