Skip to content

Commit

Permalink
fix(broker-core): fix xor incident resolving
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisKujawa committed Nov 21, 2018
1 parent 858f090 commit ab59268
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.zeebe.logstreams.state.StateController;
import io.zeebe.logstreams.state.StateLifecycleListener;
import java.util.List;
import java.util.function.ObjLongConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.agrona.ExpandableArrayBuffer;
Expand Down Expand Up @@ -165,4 +166,15 @@ public long getJobIncidentKey(long jobKey) {
public boolean isJobIncident(IncidentRecord record) {
return record.getJobKey() > 0;
}

public void forExistingWorkflowIncident(
long elementInstanceKey, ObjLongConsumer<IncidentRecord> resolver) {
final long workflowIncidentKey = getWorkflowInstanceIncidentKey(elementInstanceKey);

final boolean hasIncident = workflowIncidentKey != IncidentState.MISSING_INCIDENT;
if (hasIncident) {
final IncidentRecord incidentRecord = getIncidentRecord(workflowIncidentKey);
resolver.accept(incidentRecord, workflowIncidentKey);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public BpmnStepHandlers(WorkflowState workflowState, ZeebeState zeebeState) {
BpmnStep.TERMINATE_INTERMEDIATE_MESSAGE,
new TerminateIntermediateMessageHandler(zeebeState));
stepHandlers.put(
BpmnStep.TERMINATE_CONTAINED_INSTANCES,
new TerminateContainedElementsHandler(workflowState));
BpmnStep.TERMINATE_CONTAINED_INSTANCES, new TerminateContainedElementsHandler(zeebeState));
stepHandlers.put(BpmnStep.PROPAGATE_TERMINATION, new PropagateTerminationHandler());

// intermediate catch event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,22 @@ public class TerminateFlowNodeHandler<T extends ExecutableFlowElement>
implements BpmnStepHandler<T> {

protected final IncidentState incidentState;
private BpmnStepContext<T> context;

public TerminateFlowNodeHandler(IncidentState incidentState) {
this.incidentState = incidentState;
}

@Override
public void handle(BpmnStepContext<T> context) {
this.context = context;
final EventOutput output = context.getOutput();
final ElementInstance elementInstance = context.getElementInstance();
terminate(context);

resolveExistingIncident(context);
final long elementInstanceKey = elementInstance.getKey();
incidentState.forExistingWorkflowIncident(elementInstanceKey, this::resolveExistingIncident);

if (elementInstance.isInterrupted()) {
context
.getCatchEventOutput()
Expand All @@ -61,23 +65,7 @@ public void handle(BpmnStepContext<T> context) {
*/
protected void terminate(BpmnStepContext<T> context) {}

public void resolveExistingIncident(BpmnStepContext<T> context) {
ElementInstance elementInstance = context.getElementInstance();
if (elementInstance == null) {
// only elements with multi state/lifecycle are represented in the zeebe state
// and have an corresponding element instance
elementInstance = context.getFlowScopeInstance();
}

final long elementInstanceKey = elementInstance.getKey();

final long workflowIncidentKey =
incidentState.getWorkflowInstanceIncidentKey(elementInstanceKey);

final boolean hasIncident = workflowIncidentKey != IncidentState.MISSING_INCIDENT;
if (hasIncident) {
final IncidentRecord incidentRecord = incidentState.getIncidentRecord(workflowIncidentKey);
context.getOutput().appendResolvedIncidentEvent(workflowIncidentKey, incidentRecord);
}
private void resolveExistingIncident(IncidentRecord incidentRecord, long incidentKey) {
context.getOutput().appendResolvedIncidentEvent(incidentKey, incidentRecord);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,39 @@
*/
package io.zeebe.broker.workflow.processor.subprocess;

import io.zeebe.broker.incident.data.IncidentRecord;
import io.zeebe.broker.incident.processor.IncidentState;
import io.zeebe.broker.logstreams.state.ZeebeState;
import io.zeebe.broker.workflow.model.element.ExecutableFlowElementContainer;
import io.zeebe.broker.workflow.processor.BpmnStepContext;
import io.zeebe.broker.workflow.processor.BpmnStepHandler;
import io.zeebe.broker.workflow.processor.EventOutput;
import io.zeebe.broker.workflow.state.ElementInstance;
import io.zeebe.broker.workflow.state.ElementInstanceState;
import io.zeebe.broker.workflow.state.WorkflowState;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import java.util.List;

public class TerminateContainedElementsHandler
implements BpmnStepHandler<ExecutableFlowElementContainer> {

private final WorkflowState workflowState;
private final IncidentState incidentState;
private BpmnStepContext<ExecutableFlowElementContainer> context;

public TerminateContainedElementsHandler(final WorkflowState workflowState) {
this.workflowState = workflowState;
public TerminateContainedElementsHandler(final ZeebeState zeebeState) {
incidentState = zeebeState.getIncidentState();
this.workflowState = zeebeState.getWorkflowState();
}

@Override
public void handle(BpmnStepContext<ExecutableFlowElementContainer> context) {
this.context = context;
final ElementInstance elementInstance = context.getElementInstance();
final EventOutput output = context.getOutput();
final ElementInstanceState elementInstanceState = workflowState.getElementInstanceState();
final List<ElementInstance> children =
workflowState.getElementInstanceState().getChildren(elementInstance.getKey());
elementInstanceState.getChildren(elementInstance.getKey());

context.getCatchEventOutput().unsubscribeFromCatchEvents(context);

Expand All @@ -50,6 +60,13 @@ public void handle(BpmnStepContext<ExecutableFlowElementContainer> context) {
.triggerBoundaryEventFromInterruptedElement(elementInstance, output.getStreamWriter());
}

elementInstanceState.visitFailedTokens(
elementInstance.getKey(),
(token) -> {
incidentState.forExistingWorkflowIncident(
token.getKey(), this::resolveExistingIncident);
});

output.appendFollowUpEvent(
context.getRecord().getKey(),
WorkflowInstanceIntent.ELEMENT_TERMINATED,
Expand All @@ -63,4 +80,8 @@ public void handle(BpmnStepContext<ExecutableFlowElementContainer> context) {
}
}
}

private void resolveExistingIncident(IncidentRecord incidentRecord, long workflowIncidentKey) {
context.getOutput().appendResolvedIncidentEvent(workflowIncidentKey, incidentRecord);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public void removeStoredRecord(long scopeKey, long recordKey, Purpose purpose) {
}

public List<IndexedRecord> getDeferredTokens(long scopeKey) {
return getTokenEvents(scopeKey, Purpose.DEFERRED_TOKEN);
return collectTokenEvents(scopeKey, Purpose.DEFERRED_TOKEN);
}

public IndexedRecord getFailedToken(long key) {
Expand Down Expand Up @@ -308,23 +308,38 @@ public void updateFailedToken(IndexedRecord indexedRecord) {
}

public List<IndexedRecord> getFinishedTokens(long scopeKey) {
return getTokenEvents(scopeKey, Purpose.FINISHED_TOKEN);
return collectTokenEvents(scopeKey, Purpose.FINISHED_TOKEN);
}

private List<IndexedRecord> getTokenEvents(long scopeKey, Purpose purpose) {
private List<IndexedRecord> collectTokenEvents(long scopeKey, Purpose purpose) {
final List<IndexedRecord> records = new ArrayList<>();
visitTokens(scopeKey, purpose, records::add);
return records;
}

@FunctionalInterface
public interface TokenVisitor {
void visitToken(IndexedRecord indexedRecord);
}

public void visitFailedTokens(long scopeKey, TokenVisitor visitor) {
visitTokens(scopeKey, Purpose.FAILED_TOKEN, visitor);
}

private void visitTokens(long scopeKey, Purpose purpose, TokenVisitor visitor) {
longKeyPurposeBuffer.putLong(0, scopeKey, STATE_BYTE_ORDER);
longKeyPurposeBuffer.putByte(Long.BYTES, (byte) purpose.ordinal());

final List<IndexedRecord> records = new ArrayList<>();
rocksDbWrapper.whileEqualPrefix(
tokenParentChildHandle,
longKeyPurposeBuffer.byteArray(),
(key, value) -> {
final StoredRecord tokenEvent =
getTokenEvent(getLong(key, Long.BYTES + BitUtil.SIZE_OF_BYTE));
records.add(tokenEvent.getRecord());
if (tokenEvent != null) {
visitor.visitToken(tokenEvent.getRecord());
}
});
return records;
}

private static long getLong(byte[] array, int offset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ public class ExpressionIncidentTest {
public void init() {
testClient = apiRule.partitionClient();
apiRule.waitForPartition(1);
}

@Test
public void shouldCreateIncidentIfExclusiveGatewayHasNoMatchingCondition() {
// given
testClient.deploy(
Bpmn.createExecutableProcess("workflow")
.startEvent()
Expand All @@ -88,6 +84,11 @@ public void shouldCreateIncidentIfExclusiveGatewayHasNoMatchingCondition() {
.condition("$.foo >= 5 && $.foo < 10")
.endEvent()
.done());
}

@Test
public void shouldCreateIncidentIfExclusiveGatewayHasNoMatchingCondition() {
// given

// when
testClient.createWorkflowInstance("workflow", asMsgPack("foo", 12));
Expand All @@ -111,18 +112,6 @@ public void shouldCreateIncidentIfExclusiveGatewayHasNoMatchingCondition() {
@Test
public void shouldCreateIncidentIfConditionFailsToEvaluate() {
// given
testClient.deploy(
Bpmn.createExecutableProcess("workflow")
.startEvent()
.exclusiveGateway("xor")
.sequenceFlowId("s1")
.condition("$.foo < 5")
.endEvent()
.moveToLastGateway()
.sequenceFlowId("s2")
.condition("$.foo >= 5 && $.foo < 10")
.endEvent()
.done());

// when
testClient.createWorkflowInstance("workflow", asMsgPack("foo", "bar"));
Expand All @@ -141,18 +130,6 @@ public void shouldCreateIncidentIfConditionFailsToEvaluate() {
@Test
public void shouldResolveIncidentForFailedCondition() {
// given
testClient.deploy(
Bpmn.createExecutableProcess("workflow")
.startEvent()
.exclusiveGateway("xor")
.sequenceFlowId("s1")
.condition("$.foo < 5")
.endEvent()
.moveToLastGateway()
.sequenceFlowId("s2")
.condition("$.foo >= 5 && $.foo < 10")
.endEvent()
.done());

// when
testClient.createWorkflowInstance("workflow", asMsgPack("foo", "bar"));
Expand Down Expand Up @@ -215,18 +192,6 @@ public void shouldResolveIncidentForFailedCondition() {
@Test
public void shouldResolveIncidentForFailedConditionAfterUploadingWrongPayload() {
// given
testClient.deploy(
Bpmn.createExecutableProcess("workflow")
.startEvent()
.exclusiveGateway("xor")
.sequenceFlowId("s1")
.condition("$.foo < 5")
.endEvent()
.moveToLastGateway()
.sequenceFlowId("s2")
.condition("$.foo >= 5 && $.foo < 10")
.endEvent()
.done());

// when
testClient.createWorkflowInstance("workflow", asMsgPack("foo", "bar"));
Expand Down Expand Up @@ -322,18 +287,6 @@ public void shouldResolveIncidentForFailedConditionAfterUploadingWrongPayload()
@Test
public void shouldResolveIncidentForExclusiveGatewayWithoutMatchingCondition() {
// given
testClient.deploy(
Bpmn.createExecutableProcess("workflow")
.startEvent()
.exclusiveGateway("xor")
.sequenceFlowId("s1")
.condition("$.foo < 5")
.endEvent()
.moveToLastGateway()
.sequenceFlowId("s2")
.condition("$.foo >= 5 && $.foo < 10")
.endEvent()
.done());

// when
testClient.createWorkflowInstance("workflow", asMsgPack("foo", 12));
Expand All @@ -351,4 +304,26 @@ public void shouldResolveIncidentForExclusiveGatewayWithoutMatchingCondition() {
testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVED);
testClient.receiveElementInState("workflow", ELEMENT_COMPLETED);
}

@Test
public void shouldResolveIncidentIfInstanceCanceled() {
// given

final long workflowInstance =
testClient.createWorkflowInstance("workflow", asMsgPack("foo", "bar"));

// when
testClient.receiveFirstIncidentEvent(IncidentIntent.CREATED);
testClient.cancelWorkflowInstance(workflowInstance);

// then incident is resolved
final Record incidentEvent = testClient.receiveFirstIncidentEvent(IncidentIntent.RESOLVED);

assertThat(incidentEvent.getKey()).isGreaterThan(0);
assertIncidentRecordValue(
ErrorType.CONDITION_ERROR.name(),
"Cannot compare values of different types: STRING and INTEGER",
"xor",
incidentEvent);
}
}

0 comments on commit ab59268

Please sign in to comment.