Skip to content

Commit

Permalink
feat(broker-core): add support for event-based gateway
Browse files Browse the repository at this point in the history
* validate and transform event-based gateways
* open subscriptions when gateway is activated
* close subscriptions when first event is triggered or scope is
terminated
* uniform the sequence flow step handlers
* register the BPMN step processor for all workflow instance events
* rename BPMN element transformers from *handler to *transformer
  • Loading branch information
saig0 authored and npepinpe committed Nov 27, 2018
1 parent 1461422 commit 425d6ca
Show file tree
Hide file tree
Showing 50 changed files with 853 additions and 413 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.zeebe.model.bpmn.validation.zeebe;

import io.zeebe.model.bpmn.instance.EventBasedGateway;
import io.zeebe.model.bpmn.instance.EventDefinition;
import io.zeebe.model.bpmn.instance.FlowNode;
import io.zeebe.model.bpmn.instance.IntermediateCatchEvent;
import io.zeebe.model.bpmn.instance.MessageEventDefinition;
import io.zeebe.model.bpmn.instance.SequenceFlow;
import io.zeebe.model.bpmn.instance.TimerEventDefinition;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.camunda.bpm.model.xml.validation.ModelElementValidator;
import org.camunda.bpm.model.xml.validation.ValidationResultCollector;

public class EventBasedGatewayValidator implements ModelElementValidator<EventBasedGateway> {

private static final List<Class<? extends EventDefinition>> SUPPORTED_EVENTS =
Arrays.asList(TimerEventDefinition.class, MessageEventDefinition.class);

private static final String ERROR_UNSUPPORTED_TARGET_NODE =
"Event-based gateway must not have an outgoing sequence flow to other elements than message/timer intermediate catch events.";

@Override
public Class<EventBasedGateway> getElementType() {
return EventBasedGateway.class;
}

@Override
public void validate(
EventBasedGateway element, ValidationResultCollector validationResultCollector) {

final Collection<SequenceFlow> outgoingSequenceFlows = element.getOutgoing();

if (outgoingSequenceFlows.size() < 2) {
validationResultCollector.addError(
0, "Event-based gateway must have at least 2 outgoing sequence flows.");
}

final boolean isValid =
outgoingSequenceFlows.stream().allMatch(this::isValidOutgoingSequenceFlow);
if (!isValid) {
validationResultCollector.addError(0, ERROR_UNSUPPORTED_TARGET_NODE);
}
}

private boolean isValidOutgoingSequenceFlow(SequenceFlow flow) {
final FlowNode targetNode = flow.getTarget();

if (targetNode instanceof IntermediateCatchEvent) {
return isValidEvent((IntermediateCatchEvent) targetNode);
} else {
return false;
}
}

private boolean isValidEvent(final IntermediateCatchEvent event) {
final Collection<EventDefinition> eventDefinitions = event.getEventDefinitions();

if (eventDefinitions.size() != 1) {
return false;

} else {
final EventDefinition eventDefinition = eventDefinitions.iterator().next();
return SUPPORTED_EVENTS
.stream()
.anyMatch(e -> e.isAssignableFrom(eventDefinition.getClass()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.zeebe.model.bpmn.instance.BoundaryEvent;
import io.zeebe.model.bpmn.instance.EndEvent;
import io.zeebe.model.bpmn.instance.EventBasedGateway;
import io.zeebe.model.bpmn.instance.ExclusiveGateway;
import io.zeebe.model.bpmn.instance.FlowElement;
import io.zeebe.model.bpmn.instance.IntermediateCatchEvent;
Expand All @@ -38,6 +39,7 @@ public class FlowElementValidator implements ModelElementValidator<FlowElement>
static {
SUPPORTED_ELEMENT_TYPES.add(BoundaryEvent.class);
SUPPORTED_ELEMENT_TYPES.add(EndEvent.class);
SUPPORTED_ELEMENT_TYPES.add(EventBasedGateway.class);
SUPPORTED_ELEMENT_TYPES.add(ExclusiveGateway.class);
SUPPORTED_ELEMENT_TYPES.add(IntermediateCatchEvent.class);
SUPPORTED_ELEMENT_TYPES.add(ParallelGateway.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,17 @@ public class ZeebeDesignTimeValidators {
VALIDATORS.add(new DefinitionsValidator());
VALIDATORS.add(new EndEventValidator());
VALIDATORS.add(new EventDefinitionValidator());
VALIDATORS.add(new EventBasedGatewayValidator());
VALIDATORS.add(new ExclusiveGatewayValidator());
VALIDATORS.add(new FlowElementValidator());
VALIDATORS.add(new FlowNodeValidator());
VALIDATORS.add(new IntermediateCatchEventValidator());
VALIDATORS.add(new MessageValidator());
VALIDATORS.add(new ProcessValidator());
VALIDATORS.add(new SequenceFlowValidator());
VALIDATORS.add(new ServiceTaskValidator());
VALIDATORS.add(new ReceiveTaskValidator());
VALIDATORS.add(new StartEventValidator());
VALIDATORS.add(new IntermediateCatchEventValidator());
VALIDATORS.add(new SubProcessValidator());
VALIDATORS.add(new ZeebeTaskDefinitionValidator());
VALIDATORS.add(new ZeebeIoMappingValidator());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.zeebe.model.bpmn.validation;

import static io.zeebe.model.bpmn.validation.ExpectedValidationResult.expect;
import static java.util.Collections.singletonList;

import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.instance.EventBasedGateway;
import org.junit.runners.Parameterized.Parameters;

public class ZeebeEventBasedGatewayValidationTest extends AbstractZeebeValidationTest {

@Parameters(name = "{index}: {1}")
public static Object[][] parameters() {
return new Object[][] {
{
Bpmn.createExecutableProcess("process")
.startEvent()
.eventBasedGateway()
.intermediateCatchEvent()
.timerWithDuration("PT1M")
.done(),
singletonList(
expect(
EventBasedGateway.class,
"Event-based gateway must have at least 2 outgoing sequence flows."))
},
{
Bpmn.createExecutableProcess("process")
.startEvent()
.eventBasedGateway()
.receiveTask()
.message(m -> m.name("this").zeebeCorrelationKey("$.foo"))
.moveToLastGateway()
.receiveTask()
.message(m -> m.name("that").zeebeCorrelationKey("$.foo"))
.done(),
singletonList(
expect(
EventBasedGateway.class,
"Event-based gateway must not have an outgoing sequence flow to other elements than message/timer intermediate catch events."))
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,44 @@
package io.zeebe.broker.workflow.model;

public enum BpmnStep {
NONE,

// exactly one outgoing sequence flow
TAKE_SEQUENCE_FLOW,

// end event, no outgoing sequence flow
CONSUME_TOKEN,

// xor-gateway
EXCLUSIVE_SPLIT,

// parallel gateway
PARALLEL_SPLIT,
PARALLEL_MERGE,
// flow element container (process, sub process)
TRIGGER_START_EVENT,
TRIGGER_END_EVENT,
COMPLETE_PROCESS,
TERMINATE_CONTAINED_INSTANCES,

// flow node
START_FLOW_NODE,
ACTIVATE_FLOW_NODE,
COMPLETE_FLOW_NODE,
TERMINATE_FLOW_NODE,
PROPAGATE_TERMINATION,

CONSUME_TOKEN,
TAKE_SEQUENCE_FLOW,

// activity
ACTIVATE_ACTIVITY,
COMPLETE_ACTIVITY,
TERMINATE_ACTIVITY,

// boundary events
TRIGGER_BOUNDARY_EVENT,

// service task
CREATE_JOB,
TERMINATE_JOB_TASK,

// exclusive gateway
ACTIVATE_GATEWAY,
EXCLUSIVE_SPLIT,

SUBSCRIBE_TO_INTERMEDIATE_MESSAGE,

CREATE_TIMER,

TRIGGER_END_EVENT,
TRIGGER_START_EVENT,
// parallel gateway
PARALLEL_SPLIT,
PARALLEL_MERGE,

TERMINATE_CONTAINED_INSTANCES,
TERMINATE_JOB_TASK,
TERMINATE_INTERMEDIATE_MESSAGE,
TERMINATE_TIMER,
TERMINATE_ELEMENT,
PROPAGATE_TERMINATION,
// event-based gateway
TRIGGER_EVENT_BASED_GATEWAY,

CANCEL_PROCESS,
COMPLETE_PROCESS,
// events
SUBSCRIBE_TO_EVENTS,
TRIGGER_EVENT,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Zeebe Broker Core
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.workflow.model.element;

import java.time.Duration;

public interface ExecutableCatchEvent extends ExecutableFlowElement {

boolean isTimer();

boolean isMessage();

ExecutableMessage getMessage();

Duration getDuration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package io.zeebe.broker.workflow.model.element;

public interface ExecutableMessageCatchElement extends ExecutableFlowElement {
import java.util.List;

ExecutableMessage getMessage();
public interface ExecutableCatchEventSupplier extends ExecutableFlowElement {

List<? extends ExecutableCatchEvent> getEvents();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.workflow.processor.message;
package io.zeebe.broker.workflow.model.element;

import io.zeebe.broker.logstreams.state.ZeebeState;
import io.zeebe.broker.workflow.model.element.ExecutableIntermediateCatchElement;
import io.zeebe.broker.workflow.processor.BpmnStepContext;
import io.zeebe.broker.workflow.processor.flownode.TerminateFlowNodeHandler;
import java.util.List;

public class TerminateIntermediateMessageHandler
extends TerminateFlowNodeHandler<ExecutableIntermediateCatchElement> {
public class ExecutableEventBasedGateway extends ExecutableFlowNode
implements ExecutableCatchEventSupplier {

public TerminateIntermediateMessageHandler(final ZeebeState zeebeState) {
super(zeebeState.getIncidentState());
private List<ExecutableIntermediateCatchElement> events;

public ExecutableEventBasedGateway(String id) {
super(id);
}

@Override
protected void terminate(BpmnStepContext<ExecutableIntermediateCatchElement> context) {
context.getCatchEventOutput().unsubscribeFromMessageEvents(context);
public List<ExecutableIntermediateCatchElement> getEvents() {
return events;
}

public void setEvents(List<ExecutableIntermediateCatchElement> events) {
this.events = events;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
package io.zeebe.broker.workflow.model.element;

import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class ExecutableIntermediateCatchElement extends ExecutableFlowNode
implements ExecutableMessageCatchElement {
implements ExecutableCatchEvent, ExecutableCatchEventSupplier {

private final List<ExecutableCatchEvent> events = Collections.singletonList(this);

private ExecutableMessage message;
private Duration duration;
Expand All @@ -38,11 +42,27 @@ public void setMessage(ExecutableMessage message) {
this.message = message;
}

@Override
public Duration getDuration() {
return duration;
}

public void setDuration(Duration duration) {
this.duration = duration;
}

@Override
public boolean isTimer() {
return duration != null;
}

@Override
public boolean isMessage() {
return message != null;
}

@Override
public List<ExecutableCatchEvent> getEvents() {
return events;
}
}
Loading

0 comments on commit 425d6ca

Please sign in to comment.