Skip to content

Commit

Permalink
merge: #11565
Browse files Browse the repository at this point in the history
11565: [Backport stable/8.1] refactor(engine): append side-effects to writer r=oleschoenburg a=oleschoenburg

Manual backport of #11502

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg authored Feb 7, 2023
2 parents 433061a + e65e527 commit 40b14c1
Show file tree
Hide file tree
Showing 39 changed files with 176 additions and 379 deletions.
8 changes: 2 additions & 6 deletions engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void replay(final TypedRecord event) {
@Override
public ProcessingResult process(
final TypedRecord record, final ProcessingResultBuilder processingResultBuilder) {

try (final var scope = new ProcessingResultBuilderScope(processingResultBuilder)) {
TypedRecordProcessor<?> currentProcessor = null;

Expand All @@ -124,12 +125,7 @@ public ProcessingResult process(

final boolean isNotOnBlacklist = !zeebeState.getBlackListState().isOnBlacklist(typedCommand);
if (isNotOnBlacklist) {
currentProcessor.processRecord(
record,
(sep) -> {
processingResultBuilder.resetPostCommitTasks();
processingResultBuilder.appendPostCommitTask(sep::flush);
});
currentProcessor.processRecord(record);
}
}
return processingResultBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker;
import io.camunda.zeebe.engine.state.KeyGenerator;
Expand Down Expand Up @@ -67,7 +66,6 @@ public static TypedRecordProcessors createEngineProcessors(

final var jobMetrics = new JobMetrics(partitionId);
final var processEngineMetrics = new ProcessEngineMetrics(zeebeState.getPartitionId());
final var sideEffectQueue = new SideEffectQueue();

final BpmnBehaviorsImpl bpmnBehaviors =
createBehaviors(
Expand All @@ -77,8 +75,7 @@ public static TypedRecordProcessors createEngineProcessors(
partitionsCount,
timerChecker,
jobMetrics,
processEngineMetrics,
sideEffectQueue);
processEngineMetrics);

addDeploymentRelatedProcessorAndServices(
bpmnBehaviors,
Expand All @@ -98,8 +95,7 @@ public static TypedRecordProcessors createEngineProcessors(
typedRecordProcessors,
subscriptionCommandSender,
writers,
timerChecker,
sideEffectQueue);
timerChecker);

JobEventProcessors.addJobProcessors(
typedRecordProcessors,
Expand All @@ -121,10 +117,8 @@ private static BpmnBehaviorsImpl createBehaviors(
final int partitionsCount,
final DueDateTimerChecker timerChecker,
final JobMetrics jobMetrics,
final ProcessEngineMetrics processEngineMetrics,
final SideEffectQueue sideEffectQueue) {
final ProcessEngineMetrics processEngineMetrics) {
return new BpmnBehaviorsImpl(
sideEffectQueue,
zeebeState,
writers,
jobMetrics,
Expand All @@ -140,16 +134,14 @@ private static TypedRecordProcessor<ProcessInstanceRecord> addProcessProcessors(
final TypedRecordProcessors typedRecordProcessors,
final SubscriptionCommandSender subscriptionCommandSender,
final Writers writers,
final DueDateTimerChecker timerChecker,
final SideEffectQueue sideEffectQueue) {
final DueDateTimerChecker timerChecker) {
return ProcessEventProcessors.addProcessProcessors(
zeebeState,
bpmnBehaviors,
typedRecordProcessors,
subscriptionCommandSender,
timerChecker,
writers,
sideEffectQueue);
writers);
}

private static void addDeploymentRelatedProcessorAndServices(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.camunda.zeebe.engine.processing.processinstance.ProcessInstanceModificationProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.processing.timer.CancelTimerProcessor;
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker;
Expand Down Expand Up @@ -51,8 +50,7 @@ public static TypedRecordProcessor<ProcessInstanceRecord> addProcessProcessors(
final TypedRecordProcessors typedRecordProcessors,
final SubscriptionCommandSender subscriptionCommandSender,
final DueDateTimerChecker timerChecker,
final Writers writers,
final SideEffectQueue sideEffectQueue) {
final Writers writers) {
final MutableProcessMessageSubscriptionState subscriptionState =
zeebeState.getProcessMessageSubscriptionState();
final var keyGenerator = zeebeState.getKeyGenerator();
Expand All @@ -63,8 +61,7 @@ public static TypedRecordProcessor<ProcessInstanceRecord> addProcessProcessors(
writers, typedRecordProcessors, zeebeState.getElementInstanceState());

final var bpmnStreamProcessor =
new BpmnStreamProcessor(
bpmnBehaviors, zeebeState, writers, sideEffectQueue, processEngineMetrics);
new BpmnStreamProcessor(bpmnBehaviors, zeebeState, writers, processEngineMetrics);
addBpmnStepProcessor(typedRecordProcessors, bpmnStreamProcessor);

addMessageStreamProcessors(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateTransitionBehavior;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
Expand All @@ -25,7 +23,6 @@
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import java.util.function.Consumer;
import org.slf4j.Logger;

public final class BpmnStreamProcessor implements TypedRecordProcessor<ProcessInstanceRecord> {
Expand All @@ -34,7 +31,6 @@ public final class BpmnStreamProcessor implements TypedRecordProcessor<ProcessIn

private final BpmnElementContextImpl context = new BpmnElementContextImpl();

private final SideEffectQueue sideEffectQueue;
private final ProcessState processState;
private final BpmnElementProcessors processors;
private final ProcessInstanceStateTransitionGuard stateTransitionGuard;
Expand All @@ -46,7 +42,6 @@ public BpmnStreamProcessor(
final BpmnBehaviors bpmnBehaviors,
final MutableZeebeState zeebeState,
final Writers writers,
final SideEffectQueue sideEffectQueue,
final ProcessEngineMetrics processEngineMetrics) {
processState = zeebeState.getProcessState();

Expand All @@ -61,7 +56,6 @@ public BpmnStreamProcessor(
this::getContainerProcessor,
writers);
processors = new BpmnElementProcessors(bpmnBehaviors, stateTransitionBehavior);
this.sideEffectQueue = sideEffectQueue;
}

private BpmnElementContainerProcessor<ExecutableFlowElement> getContainerProcessor(
Expand All @@ -70,14 +64,9 @@ private BpmnElementContainerProcessor<ExecutableFlowElement> getContainerProcess
}

@Override
public void processRecord(
final TypedRecord<ProcessInstanceRecord> record,
final Consumer<SideEffectProducer> sideEffect) {
public void processRecord(final TypedRecord<ProcessInstanceRecord> record) {

// initialize
sideEffectQueue.clear();
sideEffect.accept(sideEffectQueue);

final var intent = (ProcessInstanceIntent) record.getIntent();
final var recordValue = record.getValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.camunda.zeebe.engine.processing.common.EventTriggerBehavior;
import io.camunda.zeebe.engine.processing.common.ExpressionProcessor;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker;
import io.camunda.zeebe.engine.processing.variable.VariableBehavior;
Expand Down Expand Up @@ -45,7 +44,6 @@ public final class BpmnBehaviorsImpl implements BpmnBehaviors {
private final ElementActivationBehavior elementActivationBehavior;

public BpmnBehaviorsImpl(
final SideEffects sideEffects,
final MutableZeebeState zeebeState,
final Writers writers,
final JobMetrics jobMetrics,
Expand All @@ -69,6 +67,7 @@ public BpmnBehaviorsImpl(
expressionBehavior,
subscriptionCommandSender,
writers.state(),
writers.sideEffect(),
timerChecker,
partitionsCount);

Expand All @@ -94,8 +93,7 @@ public BpmnBehaviorsImpl(
new BpmnVariableMappingBehavior(expressionBehavior, zeebeState, variableBehavior);

eventSubscriptionBehavior =
new BpmnEventSubscriptionBehavior(
catchEventBehavior, eventTriggerBehavior, sideEffects, zeebeState);
new BpmnEventSubscriptionBehavior(catchEventBehavior, eventTriggerBehavior, zeebeState);

incidentBehavior =
new BpmnIncidentBehavior(zeebeState, zeebeState.getKeyGenerator(), writers.state());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.camunda.zeebe.engine.processing.common.Failure;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableCatchEventSupplier;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects;
import io.camunda.zeebe.engine.state.immutable.EventScopeInstanceState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
Expand All @@ -27,19 +26,15 @@ public final class BpmnEventSubscriptionBehavior {
private final EventScopeInstanceState eventScopeInstanceState;
private final CatchEventBehavior catchEventBehavior;

private final SideEffects sideEffects;

private final ProcessState processState;
private final EventTriggerBehavior eventTriggerBehavior;

public BpmnEventSubscriptionBehavior(
final CatchEventBehavior catchEventBehavior,
final EventTriggerBehavior eventTriggerBehavior,
final SideEffects sideEffects,
final ZeebeState zeebeState) {
this.catchEventBehavior = catchEventBehavior;
this.eventTriggerBehavior = eventTriggerBehavior;
this.sideEffects = sideEffects;

processState = zeebeState.getProcessState();
eventScopeInstanceState = zeebeState.getEventScopeInstanceState();
Expand All @@ -50,11 +45,11 @@ public BpmnEventSubscriptionBehavior(
*/
public <T extends ExecutableCatchEventSupplier> Either<Failure, Void> subscribeToEvents(
final T element, final BpmnElementContext context) {
return catchEventBehavior.subscribeToEvents(context, element, sideEffects);
return catchEventBehavior.subscribeToEvents(context, element);
}

public void unsubscribeFromEvents(final BpmnElementContext context) {
catchEventBehavior.unsubscribeFromEvents(context.getElementInstanceKey(), sideEffects);
catchEventBehavior.unsubscribeFromEvents(context.getElementInstanceKey());
}

/**
Expand Down
Loading

0 comments on commit 40b14c1

Please sign in to comment.