From 74fc9589d8df501d7de784db28362963e440aac0 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Mon, 2 Dec 2024 21:24:37 +0100 Subject: [PATCH 1/2] [Fix #484] Execute Fork task Signed-off-by: Francisco Javier Tirado Sarti --- .../impl/ExecutorServiceFactory.java | 22 ++++ .../impl/QueueWorkflowPosition.java | 4 +- .../serverlessworkflow/impl/TaskContext.java | 71 ++++++++--- .../impl/WorkflowApplication.java | 23 ++++ .../impl/WorkflowDefinition.java | 5 + .../impl/WorkflowExecutionListener.java | 6 +- .../impl/WorkflowInstance.java | 22 ++-- .../impl/WorkflowUtils.java | 43 +++---- .../impl/executors/AbstractTaskExecutor.java | 15 +++ .../executors/DefaultTaskExecutorFactory.java | 2 + .../impl/executors/ForkExecutor.java | 114 ++++++++++++++++++ .../impl/generic/SortedArrayList.java | 67 ++++++++++ .../impl/SortedListTest.java | 64 ++++++++++ .../impl/WorkflowDefinitionTest.java | 31 ++++- .../src/test/resources/fork-no-compete.yaml | 18 +++ impl/core/src/test/resources/fork.yaml | 18 +++ 16 files changed, 473 insertions(+), 52 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/generic/SortedArrayList.java create mode 100644 impl/core/src/test/java/io/serverlessworkflow/impl/SortedListTest.java create mode 100644 impl/core/src/test/resources/fork-no-compete.yaml create mode 100644 impl/core/src/test/resources/fork.yaml diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java new file mode 100644 index 00000000..7c211149 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; + +@FunctionalInterface +public interface ExecutorServiceFactory extends Supplier {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java index c6d3f141..5ad4934f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java @@ -54,7 +54,7 @@ public String jsonPointer() { @Override public String toString() { - return "ListWorkflowPosition [list=" + queue + "]"; + return "QueueWorkflowPosition [queue=" + queue + "]"; } @Override @@ -65,6 +65,6 @@ public WorkflowPosition back() { @Override public Object last() { - return queue.pollLast(); + return queue.getLast(); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index cadde89c..8015c687 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -28,35 +28,64 @@ public class TaskContext { private final JsonNode rawInput; private final T task; private final WorkflowPosition position; - private final Instant startedAt = Instant.now(); + private final Instant startedAt; private JsonNode input; private JsonNode output; private JsonNode rawOutput; private FlowDirective flowDirective; private Map contextVariables; + private Instant completedAt; public TaskContext(JsonNode input, WorkflowPosition position) { - this.rawInput = input; - this.position = position; - this.task = null; - this.contextVariables = new HashMap<>(); - init(); + this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>()); } - public TaskContext(JsonNode input, TaskContext taskContext, T task) { - this.rawInput = input; - this.position = taskContext.position.copy(); - this.task = task; - this.flowDirective = task.getThen(); - this.contextVariables = new HashMap<>(taskContext.variables()); - init(); + public TaskContext copy() { + return new TaskContext( + rawInput, + task, + position.copy(), + startedAt, + input, + output, + rawOutput, + flowDirective, + new HashMap<>(contextVariables)); } - private void init() { - this.input = rawInput; - this.rawOutput = rawInput; - this.output = rawInput; + public TaskContext(JsonNode input, TaskContext taskContext, T task) { + this( + input, + task, + taskContext.position, + Instant.now(), + input, + input, + input, + task.getThen(), + new HashMap<>(taskContext.variables())); + } + + private TaskContext( + JsonNode rawInput, + T task, + WorkflowPosition position, + Instant startedAt, + JsonNode input, + JsonNode output, + JsonNode rawOutput, + FlowDirective flowDirective, + Map contextVariables) { + this.rawInput = rawInput; + this.task = task; + this.position = position; + this.startedAt = startedAt; + this.input = input; + this.output = output; + this.rawOutput = rawOutput; + this.flowDirective = flowDirective; + this.contextVariables = contextVariables; } public void input(JsonNode input) { @@ -115,4 +144,12 @@ public WorkflowPosition position() { public Instant startedAt() { return startedAt; } + + public void completedAt(Instant instant) { + this.completedAt = instant; + } + + public Instant completedAt() { + return completedAt; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 4f35de41..f36c23f6 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -32,6 +32,8 @@ import java.util.HashSet; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class WorkflowApplication implements AutoCloseable { @@ -43,8 +45,11 @@ public class WorkflowApplication implements AutoCloseable { private final Collection listeners; private final Map definitions; private final WorkflowPositionFactory positionFactory; + private final ExecutorServiceFactory executorFactory; private final RuntimeDescriptorFactory runtimeDescriptorFactory; + private ExecutorService executorService; + public WorkflowApplication( TaskExecutorFactory taskFactory, ExpressionFactory exprFactory, @@ -53,6 +58,7 @@ public WorkflowApplication( WorkflowPositionFactory positionFactory, WorkflowIdFactory idFactory, RuntimeDescriptorFactory runtimeDescriptorFactory, + ExecutorServiceFactory executorFactory, Collection listeners) { this.taskFactory = taskFactory; this.exprFactory = exprFactory; @@ -61,6 +67,7 @@ public WorkflowApplication( this.positionFactory = positionFactory; this.idFactory = idFactory; this.runtimeDescriptorFactory = runtimeDescriptorFactory; + this.executorFactory = executorFactory; this.listeners = listeners; this.definitions = new ConcurrentHashMap<>(); } @@ -101,6 +108,7 @@ public static class Builder { private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get(); private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition(); private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString(); + private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool(); private RuntimeDescriptorFactory descriptorFactory = () -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap()); @@ -129,6 +137,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) { return this; } + public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) { + this.executorFactory = executorFactory; + return this; + } + public Builder withPositionFactory(WorkflowPositionFactory positionFactory) { this.positionFactory = positionFactory; return this; @@ -158,6 +171,7 @@ public WorkflowApplication build() { positionFactory, idFactory, descriptorFactory, + executorFactory, listeners == null ? Collections.emptySet() : Collections.unmodifiableCollection(listeners)); @@ -190,4 +204,13 @@ public WorkflowPositionFactory positionFactory() { public RuntimeDescriptorFactory runtimeDescriptorFactory() { return runtimeDescriptorFactory; } + + public ExecutorService executorService() { + synchronized (executorFactory) { + if (executorService == null) { + executorService = executorFactory.get(); + } + } + return executorService; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index db8b5e4d..39d03809 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; public class WorkflowDefinition implements AutoCloseable { @@ -134,6 +135,10 @@ public WorkflowPositionFactory positionFactory() { return application.positionFactory(); } + public ExecutorService executorService() { + return application.executorService(); + } + public RuntimeDescriptorFactory runtimeDescriptorFactory() { return application.runtimeDescriptorFactory(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java index ce72c70e..c121bb41 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java @@ -15,11 +15,11 @@ */ package io.serverlessworkflow.impl; -import io.serverlessworkflow.api.types.Task; +import io.serverlessworkflow.api.types.TaskBase; public interface WorkflowExecutionListener { - void onTaskStarted(WorkflowPosition currentPos, Task task); + void onTaskStarted(WorkflowPosition currentPos, TaskBase task); - void onTaskEnded(WorkflowPosition currentPos, Task task); + void onTaskEnded(WorkflowPosition currentPos, TaskBase task); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java index 0f3bd410..8225439f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -20,14 +20,15 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NullNode; import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; public class WorkflowInstance { - private WorkflowState state; - private TaskContext taskContext; + private final AtomicReference state; + private final TaskContext taskContext; private final String id; private final JsonNode input; private final Instant startedAt; - private JsonNode context = NullNode.getInstance(); + private final AtomicReference context; WorkflowInstance(WorkflowDefinition definition, JsonNode input) { this.id = definition.idFactory().get(); @@ -39,7 +40,8 @@ public class WorkflowInstance { definition .inputFilter() .ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input))); - state = WorkflowState.STARTED; + state = new AtomicReference<>(WorkflowState.STARTED); + context = new AtomicReference<>(NullNode.getInstance()); WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext); definition .outputFilter() @@ -62,22 +64,26 @@ public JsonNode input() { } public JsonNode context() { - return context; + return context.get(); } public WorkflowState state() { - return state; + return state.get(); + } + + public void state(WorkflowState state) { + this.state.set(state); } public Object output() { return toJavaValue(taskContext.output()); } - public Object outputAsJsonNode() { + public JsonNode outputAsJsonNode() { return taskContext.output(); } void context(JsonNode context) { - this.context = context; + this.context.set(context); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 01486d5f..56b3499a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -153,23 +153,8 @@ public static void processTaskList( TaskItem nextTask = iter.next(); while (nextTask != null) { TaskItem task = nextTask; - parentTask.position().addIndex(iter.previousIndex()).addProperty(task.getName()); - context - .definition() - .listeners() - .forEach(l -> l.onTaskStarted(parentTask.position(), task.getTask())); - currentContext = - context - .definition() - .taskExecutors() - .computeIfAbsent( - parentTask.position().jsonPointer(), - k -> - context - .definition() - .taskFactory() - .getTaskExecutor(task.getTask(), context.definition())) - .apply(context, parentTask, currentContext.output()); + parentTask.position().addIndex(iter.previousIndex()); + currentContext = executeTask(context, parentTask, task, currentContext.output()); FlowDirective flowDirective = currentContext.flowDirective(); if (flowDirective.getFlowDirectiveEnum() != null) { switch (flowDirective.getFlowDirectiveEnum()) { @@ -177,6 +162,7 @@ public static void processTaskList( nextTask = iter.hasNext() ? iter.next() : null; break; case END: + context.instance().state(WorkflowState.COMPLETED); case EXIT: nextTask = null; break; @@ -184,10 +170,6 @@ public static void processTaskList( } else { nextTask = WorkflowUtils.findTaskByName(iter, flowDirective.getString()); } - context - .definition() - .listeners() - .forEach(l -> l.onTaskEnded(parentTask.position(), task.getTask())); parentTask.position().back(); } } @@ -195,6 +177,25 @@ public static void processTaskList( parentTask.rawOutput(currentContext.output()); } + public static TaskContext executeTask( + WorkflowContext context, TaskContext parentTask, TaskItem task, JsonNode input) { + parentTask.position().addProperty(task.getName()); + TaskContext result = + context + .definition() + .taskExecutors() + .computeIfAbsent( + parentTask.position().jsonPointer(), + k -> + context + .definition() + .taskFactory() + .getTaskExecutor(task.getTask(), context.definition())) + .apply(context, parentTask, input); + parentTask.position().back(); + return result; + } + public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) { assert str != null; Expression expression = exprFactory.getExpression(str); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 086c7c51..44c6a7fd 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -26,7 +26,9 @@ import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowFilter; +import io.serverlessworkflow.impl.WorkflowState; import io.serverlessworkflow.impl.jsonschema.SchemaValidator; +import java.time.Instant; import java.util.Optional; public abstract class AbstractTaskExecutor implements TaskExecutor { @@ -86,6 +88,14 @@ private void buildContextProcessors(WorkflowDefinition definition) { public TaskContext apply( WorkflowContext workflowContext, TaskContext parentContext, JsonNode input) { TaskContext taskContext = new TaskContext<>(input, parentContext, task); + if (workflowContext.instance().state() == WorkflowState.COMPLETED) { + return taskContext; + } + workflowContext + .definition() + .listeners() + .forEach(l -> l.onTaskStarted(parentContext.position(), task)); + inputSchemaValidator.ifPresent(s -> s.validate(taskContext.rawInput())); inputProcessor.ifPresent( p -> taskContext.input(p.apply(workflowContext, taskContext, taskContext.rawInput()))); @@ -98,6 +108,11 @@ public TaskContext apply( workflowContext.context( p.apply(workflowContext, taskContext, workflowContext.context()))); contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); + taskContext.completedAt(Instant.now()); + workflowContext + .definition() + .listeners() + .forEach(l -> l.onTaskEnded(parentContext.position(), task)); return taskContext; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java index 58bd18ac..89c099b6 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java @@ -71,6 +71,8 @@ public TaskExecutor getTaskExecutor( return new RaiseExecutor(task.getRaiseTask(), definition); } else if (task.getTryTask() != null) { return new TryExecutor(task.getTryTask(), definition); + } else if (task.getForkTask() != null) { + return new ForkExecutor(task.getForkTask(), definition); } throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet"); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java new file mode 100644 index 00000000..484a6b9f --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java @@ -0,0 +1,114 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.FlowDirectiveEnum; +import io.serverlessworkflow.api.types.ForkTask; +import io.serverlessworkflow.api.types.ForkTaskConfiguration; +import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowState; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.generic.SortedArrayList; +import io.serverlessworkflow.impl.json.JsonUtils; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ForkExecutor extends AbstractTaskExecutor { + + private static final Logger logger = LoggerFactory.getLogger(ForkExecutor.class); + private final ExecutorService service; + + protected ForkExecutor(ForkTask task, WorkflowDefinition definition) { + super(task, definition); + service = definition.executorService(); + } + + private record BranchContext(String taskName, TaskContext taskContext) {} + + @Override + protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { + ForkTaskConfiguration forkConfig = task.getFork(); + + if (!forkConfig.getBranches().isEmpty()) { + Map>> futures = new HashMap<>(); + int index = 0; + for (TaskItem item : forkConfig.getBranches()) { + final int i = index++; + futures.put( + item.getName(), + service.submit(() -> executeBranch(workflow, taskContext.copy(), item, i))); + } + List results = + new SortedArrayList<>( + (arg1, arg2) -> + arg1.taskContext.completedAt().compareTo(arg2.taskContext.completedAt())); + for (Map.Entry>> entry : futures.entrySet()) { + try { + results.add(new BranchContext(entry.getKey(), entry.getValue().get())); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new UndeclaredThrowableException(ex); + } + } catch (InterruptedException ex) { + logger.warn( + "Thred executing branch {} was interrupted, this branch will be ignored", + entry.getKey(), + ex); + } + } + if (!results.isEmpty()) { + taskContext.rawOutput( + forkConfig.isCompete() + ? results.get(0).taskContext().output() + : JsonUtils.fromValue( + results.stream() + .map( + e -> + JsonUtils.mapper() + .createObjectNode() + .set(e.taskName(), e.taskContext().output())) + .collect(Collectors.toList()))); + } + } + } + + private TaskContext executeBranch( + WorkflowContext workflow, TaskContext taskContext, TaskItem taskItem, int index) { + taskContext.position().addIndex(index); + TaskContext result = + WorkflowUtils.executeTask(workflow, taskContext, taskItem, taskContext.input()); + if (result.flowDirective() != null + && result.flowDirective().getFlowDirectiveEnum() == FlowDirectiveEnum.END) { + workflow.instance().state(WorkflowState.COMPLETED); + } + taskContext.position().back(); + return result; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/generic/SortedArrayList.java b/impl/core/src/main/java/io/serverlessworkflow/impl/generic/SortedArrayList.java new file mode 100644 index 00000000..e0647d1f --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/generic/SortedArrayList.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.generic; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; + +public class SortedArrayList extends ArrayList { + + private static final long serialVersionUID = 1L; + private final Comparator comparator; + + public SortedArrayList() { + this(SortedArrayList::defaultCompare); + } + + @SuppressWarnings("unchecked") + private static int defaultCompare(V a, V b) { + return a instanceof Comparable ? ((Comparable) a).compareTo(b) : 0; + } + + public SortedArrayList(Comparator comparator) { + this.comparator = comparator; + } + + public SortedArrayList(Collection collection) { + this(collection, SortedArrayList::defaultCompare); + } + + public SortedArrayList(Collection collection, Comparator comparator) { + super(collection.size()); + this.comparator = comparator; + addAll(collection); + } + + @Override + public boolean add(T object) { + int i; + for (i = 0; i < size() && comparator.compare(object, get(i)) >= 0; i++) {} + super.add(i, object); + return true; + } + + public boolean addAll(Collection c) { + ensureCapacity(size() + c.size()); + c.forEach(this::add); + return !c.isEmpty(); + } + + public T set(int index, T element) { + throw new UnsupportedOperationException("Do not allow adding in a particular index"); + } +} diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/SortedListTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/SortedListTest.java new file mode 100644 index 00000000..1ad0ed98 --- /dev/null +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/SortedListTest.java @@ -0,0 +1,64 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.serverlessworkflow.impl.generic.SortedArrayList; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import org.junit.jupiter.api.Test; + +public class SortedListTest { + + private record Person(String name, int age) {} + + @Test + void testConstructor() { + assertThat(new SortedArrayList<>(Arrays.asList(3, 2, 1))).isEqualTo(Arrays.asList(1, 2, 3)); + } + + @Test + void testAdd() { + List list = new SortedArrayList<>(); + list.add(1); + list.add(4); + list.add(3); + list.add(2); + assertThat(list).isEqualTo(Arrays.asList(1, 2, 3, 4)); + } + + @Test + void testAddPojo() { + List list = new SortedArrayList<>((a, b) -> b.age() - a.age()); + list.add(new Person("Mariam", 5)); + list.add(new Person("Belen", 12)); + list.add(new Person("Alejandro", 7)); + list.add(new Person("Vicente", 16)); + list.add(new Person("Daniel", 14)); + assertThat(list.stream().map(Person::name)) + .isEqualTo(Arrays.asList("Vicente", "Daniel", "Belen", "Alejandro", "Mariam")); + } + + @Test + void testAddAll() { + List list = new SortedArrayList<>(); + Instant now = Instant.now(); + list.addAll(Arrays.asList(now.plusMillis(1000), now)); + assertThat(list.get(0)).isEqualTo(now); + } +} diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java index 6b29bac1..e324de2a 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java @@ -19,6 +19,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowableOfType; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.serverlessworkflow.impl.json.JsonUtils; import java.io.IOException; import java.time.Instant; import java.util.Arrays; @@ -108,7 +112,14 @@ private static Stream provideParameters() { args( "raise-reusable.yaml", WorkflowDefinitionTest::checkWorkflowException, - WorkflowException.class)); + WorkflowException.class), + args( + "fork.yaml", + Map.of(), + o -> + assertThat(((ObjectNode) o.outputAsJsonNode()).get("patientId").asText()) + .isIn("John", "Smith")), + args("fork-no-compete.yaml", Map.of(), WorkflowDefinitionTest::checkNotCompeteOuput)); } private static Arguments args( @@ -125,6 +136,24 @@ private static Arguments args( d -> consumer.accept(catchThrowableOfType(clazz, () -> d.execute(Map.of())))); } + private static void checkNotCompeteOuput(WorkflowInstance instance) { + JsonNode out = instance.outputAsJsonNode(); + assertThat(out).isInstanceOf(ArrayNode.class); + assertThat(out).hasSize(2); + ArrayNode array = (ArrayNode) out; + assertThat(array) + .containsExactlyInAnyOrder( + createObjectNode("callNurse", "patientId", "John", "room", 1), + createObjectNode("callDoctor", "patientId", "Smith", "room", 2)); + } + + private static JsonNode createObjectNode( + String parent, String key1, String value1, String key2, int value2) { + return JsonUtils.mapper() + .createObjectNode() + .set(parent, JsonUtils.mapper().createObjectNode().put(key1, value1).put(key2, value2)); + } + private static void checkWorkflowException(WorkflowException ex) { assertThat(ex.getWorflowError().type()) .isEqualTo("https://serverlessworkflow.io/errors/not-implemented"); diff --git a/impl/core/src/test/resources/fork-no-compete.yaml b/impl/core/src/test/resources/fork-no-compete.yaml new file mode 100644 index 00000000..5e78acf2 --- /dev/null +++ b/impl/core/src/test/resources/fork-no-compete.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: fork-example + version: '0.1.0' +do: + - callSomeone: + fork: + compete: false + branches: + - callNurse: + set: + patientId: John + room: 1 + - callDoctor: + set: + patientId: Smith + room: 2 \ No newline at end of file diff --git a/impl/core/src/test/resources/fork.yaml b/impl/core/src/test/resources/fork.yaml new file mode 100644 index 00000000..dfde183b --- /dev/null +++ b/impl/core/src/test/resources/fork.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: fork-no-compete + version: '0.1.0' +do: + - callSomeone: + fork: + compete: true + branches: + - callNurse: + set: + patientId: John + room: 1 + - callDoctor: + set: + patientId: Smith + room: 2 \ No newline at end of file From 202d485116c2b8a61b7dfc055d0addfd3bdd6367 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Tue, 3 Dec 2024 19:20:47 +0100 Subject: [PATCH 2/2] [Fix #484] Status related changes Signed-off-by: Francisco Javier Tirado Sarti --- impl/core/pom.xml | 5 + .../serverlessworkflow/impl/LongFilter.java | 21 ++++ .../serverlessworkflow/impl/TaskContext.java | 26 ++-- .../impl/WorkflowDefinition.java | 7 +- .../impl/WorkflowInstance.java | 16 +-- ...WorkflowState.java => WorkflowStatus.java} | 9 +- .../impl/WorkflowUtils.java | 99 ++++------------ .../impl/executors/AbstractTaskExecutor.java | 58 +++++---- .../executors/DefaultTaskExecutorFactory.java | 2 + .../impl/executors/DoExecutor.java | 3 +- .../impl/executors/ForExecutor.java | 2 +- .../impl/executors/ForkExecutor.java | 48 ++++---- .../impl/executors/TaskExecutorHelper.java | 111 ++++++++++++++++++ .../impl/executors/TryExecutor.java | 4 +- .../impl/executors/WaitExecutor.java | 58 +++++++++ .../impl/generic/SortedArrayList.java | 67 ----------- .../impl/json/JsonUtils.java | 39 ++++++ .../jsonschema/DefaultSchemaValidator.java | 17 +-- .../impl/SortedListTest.java | 64 ---------- .../impl/WorkflowDefinitionTest.java | 4 + .../src/test/resources/fork-no-compete.yaml | 22 +++- impl/core/src/test/resources/fork.yaml | 2 +- 22 files changed, 364 insertions(+), 320 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java rename impl/core/src/main/java/io/serverlessworkflow/impl/{WorkflowState.java => WorkflowStatus.java} (88%) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java delete mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/generic/SortedArrayList.java delete mode 100644 impl/core/src/test/java/io/serverlessworkflow/impl/SortedListTest.java diff --git a/impl/core/pom.xml b/impl/core/pom.xml index 940f79fa..b0cace0b 100644 --- a/impl/core/pom.xml +++ b/impl/core/pom.xml @@ -50,5 +50,10 @@ assertj-core test + + ch.qos.logback + logback-classic + test + diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java b/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java new file mode 100644 index 00000000..91b1b6c5 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.function.BiFunction; + +@FunctionalInterface +public interface LongFilter extends BiFunction, Long> {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 8015c687..dde5a315 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -41,19 +41,6 @@ public TaskContext(JsonNode input, WorkflowPosition position) { this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>()); } - public TaskContext copy() { - return new TaskContext( - rawInput, - task, - position.copy(), - startedAt, - input, - output, - rawOutput, - flowDirective, - new HashMap<>(contextVariables)); - } - public TaskContext(JsonNode input, TaskContext taskContext, T task) { this( input, @@ -88,6 +75,19 @@ private TaskContext( this.contextVariables = contextVariables; } + public TaskContext copy() { + return new TaskContext( + rawInput, + task, + position.copy(), + startedAt, + input, + output, + rawOutput, + flowDirective, + new HashMap<>(contextVariables)); + } + public void input(JsonNode input) { this.input = input; this.rawOutput = input; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 39d03809..df5b70e1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -49,22 +49,19 @@ public class WorkflowDefinition implements AutoCloseable { private WorkflowDefinition( WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) { - this.workflow = workflow; this.application = application; this.resourceLoader = resourceLoader; if (workflow.getInput() != null) { Input input = workflow.getInput(); this.inputSchemaValidator = - getSchemaValidator( - application.validatorFactory(), schemaToNode(resourceLoader, input.getSchema())); + getSchemaValidator(application.validatorFactory(), resourceLoader, input.getSchema()); this.inputFilter = buildWorkflowFilter(application.expressionFactory(), input.getFrom()); } if (workflow.getOutput() != null) { Output output = workflow.getOutput(); this.outputSchemaValidator = - getSchemaValidator( - application.validatorFactory(), schemaToNode(resourceLoader, output.getSchema())); + getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema()); this.outputFilter = buildWorkflowFilter(application.expressionFactory(), output.getAs()); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java index 8225439f..f81a6f24 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -19,11 +19,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NullNode; +import io.serverlessworkflow.impl.executors.TaskExecutorHelper; import java.time.Instant; import java.util.concurrent.atomic.AtomicReference; public class WorkflowInstance { - private final AtomicReference state; + private final AtomicReference status; private final TaskContext taskContext; private final String id; private final JsonNode input; @@ -40,15 +41,16 @@ public class WorkflowInstance { definition .inputFilter() .ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input))); - state = new AtomicReference<>(WorkflowState.STARTED); + status = new AtomicReference<>(WorkflowStatus.RUNNING); context = new AtomicReference<>(NullNode.getInstance()); - WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext); + TaskExecutorHelper.processTaskList(definition.workflow().getDo(), workflowContext, taskContext); definition .outputFilter() .ifPresent( f -> taskContext.output(f.apply(workflowContext, taskContext, taskContext.rawOutput()))); definition.outputSchemaValidator().ifPresent(v -> v.validate(taskContext.output())); + status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED); } public String id() { @@ -67,12 +69,12 @@ public JsonNode context() { return context.get(); } - public WorkflowState state() { - return state.get(); + public WorkflowStatus status() { + return status.get(); } - public void state(WorkflowState state) { - this.state.set(state); + public void status(WorkflowStatus state) { + this.status.set(state); } public Object output() { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowState.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java similarity index 88% rename from impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowState.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java index 310dbd0b..bc657839 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowState.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java @@ -15,8 +15,11 @@ */ package io.serverlessworkflow.impl; -public enum WorkflowState { - STARTED, +public enum WorkflowStatus { + PENDING, + RUNNING, WAITING, - COMPLETED + COMPLETED, + FAULTED, + CANCELLED } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 56b3499a..0866ba05 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -19,14 +19,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.serverlessworkflow.api.WorkflowFormat; import io.serverlessworkflow.api.types.ExportAs; -import io.serverlessworkflow.api.types.FlowDirective; import io.serverlessworkflow.api.types.InputFrom; import io.serverlessworkflow.api.types.OutputAs; import io.serverlessworkflow.api.types.SchemaExternal; import io.serverlessworkflow.api.types.SchemaInline; import io.serverlessworkflow.api.types.SchemaUnion; -import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.impl.expressions.Expression; import io.serverlessworkflow.impl.expressions.ExpressionFactory; import io.serverlessworkflow.impl.expressions.ExpressionUtils; @@ -38,8 +35,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; -import java.util.List; -import java.util.ListIterator; import java.util.Map; import java.util.Optional; @@ -48,11 +43,12 @@ public class WorkflowUtils { private WorkflowUtils() {} public static Optional getSchemaValidator( - SchemaValidatorFactory validatorFactory, Optional node) { - return node.map(n -> validatorFactory.getValidator(n)); + SchemaValidatorFactory validatorFactory, ResourceLoader resourceLoader, SchemaUnion schema) { + return schemaToNode(resourceLoader, schema).map(n -> validatorFactory.getValidator(n)); } - public static Optional schemaToNode(ResourceLoader resourceLoader, SchemaUnion schema) { + private static Optional schemaToNode( + ResourceLoader resourceLoader, SchemaUnion schema) { if (schema != null) { if (schema.getSchemaInline() != null) { SchemaInline inline = schema.getSchemaInline(); @@ -94,18 +90,22 @@ public static Optional buildWorkflowFilter( public static StringFilter buildStringFilter( ExpressionFactory exprFactory, String expression, String literal) { - return expression != null ? from(buildWorkflowFilter(exprFactory, expression)) : from(literal); + return expression != null + ? toString(buildWorkflowFilter(exprFactory, expression)) + : toString(literal); } public static StringFilter buildStringFilter(ExpressionFactory exprFactory, String str) { - return ExpressionUtils.isExpr(str) ? from(buildWorkflowFilter(exprFactory, str)) : from(str); + return ExpressionUtils.isExpr(str) + ? toString(buildWorkflowFilter(exprFactory, str)) + : toString(str); } - public static StringFilter from(WorkflowFilter filter) { + private static StringFilter toString(WorkflowFilter filter) { return (w, t) -> filter.apply(w, t, t.input()).asText(); } - private static StringFilter from(String literal) { + private static StringFilter toString(String literal) { return (w, t) -> literal; } @@ -124,76 +124,19 @@ private static WorkflowFilter buildWorkflowFilter( throw new IllegalStateException("Both object and str are null"); } - private static TaskItem findTaskByName(ListIterator iter, String taskName) { - int currentIndex = iter.nextIndex(); - while (iter.hasPrevious()) { - TaskItem item = iter.previous(); - if (item.getName().equals(taskName)) { - return item; - } - } - while (iter.nextIndex() < currentIndex) { - iter.next(); - } - while (iter.hasNext()) { - TaskItem item = iter.next(); - if (item.getName().equals(taskName)) { - return item; - } - } - throw new IllegalArgumentException("Cannot find task with name " + taskName); + public static LongFilter buildLongFilter( + ExpressionFactory exprFactory, String expression, Long literal) { + return expression != null + ? toLong(buildWorkflowFilter(exprFactory, expression)) + : toLong(literal); } - public static void processTaskList( - List tasks, WorkflowContext context, TaskContext parentTask) { - parentTask.position().addProperty("do"); - TaskContext currentContext = parentTask; - if (!tasks.isEmpty()) { - ListIterator iter = tasks.listIterator(); - TaskItem nextTask = iter.next(); - while (nextTask != null) { - TaskItem task = nextTask; - parentTask.position().addIndex(iter.previousIndex()); - currentContext = executeTask(context, parentTask, task, currentContext.output()); - FlowDirective flowDirective = currentContext.flowDirective(); - if (flowDirective.getFlowDirectiveEnum() != null) { - switch (flowDirective.getFlowDirectiveEnum()) { - case CONTINUE: - nextTask = iter.hasNext() ? iter.next() : null; - break; - case END: - context.instance().state(WorkflowState.COMPLETED); - case EXIT: - nextTask = null; - break; - } - } else { - nextTask = WorkflowUtils.findTaskByName(iter, flowDirective.getString()); - } - parentTask.position().back(); - } - } - parentTask.position().back(); - parentTask.rawOutput(currentContext.output()); + private static LongFilter toLong(WorkflowFilter filter) { + return (w, t) -> filter.apply(w, t, t.input()).asLong(); } - public static TaskContext executeTask( - WorkflowContext context, TaskContext parentTask, TaskItem task, JsonNode input) { - parentTask.position().addProperty(task.getName()); - TaskContext result = - context - .definition() - .taskExecutors() - .computeIfAbsent( - parentTask.position().jsonPointer(), - k -> - context - .definition() - .taskFactory() - .getTaskExecutor(task.getTask(), context.definition())) - .apply(context, parentTask, input); - parentTask.position().back(); - return result; + private static LongFilter toLong(Long literal) { + return (w, t) -> literal; } public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 44c6a7fd..f5ee1136 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -26,7 +26,6 @@ import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowFilter; -import io.serverlessworkflow.impl.WorkflowState; import io.serverlessworkflow.impl.jsonschema.SchemaValidator; import java.time.Instant; import java.util.Optional; @@ -55,8 +54,7 @@ private void buildInputProcessors(WorkflowDefinition definition) { this.inputProcessor = buildWorkflowFilter(definition.expressionFactory(), input.getFrom()); this.inputSchemaValidator = getSchemaValidator( - definition.validatorFactory(), - schemaToNode(definition.resourceLoader(), input.getSchema())); + definition.validatorFactory(), definition.resourceLoader(), input.getSchema()); } } @@ -66,8 +64,7 @@ private void buildOutputProcessors(WorkflowDefinition definition) { this.outputProcessor = buildWorkflowFilter(definition.expressionFactory(), output.getAs()); this.outputSchemaValidator = getSchemaValidator( - definition.validatorFactory(), - schemaToNode(definition.resourceLoader(), output.getSchema())); + definition.validatorFactory(), definition.resourceLoader(), output.getSchema()); } } @@ -79,8 +76,7 @@ private void buildContextProcessors(WorkflowDefinition definition) { } this.contextSchemaValidator = getSchemaValidator( - definition.validatorFactory(), - schemaToNode(definition.resourceLoader(), export.getSchema())); + definition.validatorFactory(), definition.resourceLoader(), export.getSchema()); } } @@ -88,31 +84,31 @@ private void buildContextProcessors(WorkflowDefinition definition) { public TaskContext apply( WorkflowContext workflowContext, TaskContext parentContext, JsonNode input) { TaskContext taskContext = new TaskContext<>(input, parentContext, task); - if (workflowContext.instance().state() == WorkflowState.COMPLETED) { - return taskContext; - } - workflowContext - .definition() - .listeners() - .forEach(l -> l.onTaskStarted(parentContext.position(), task)); + if (TaskExecutorHelper.isActive(workflowContext)) { + + workflowContext + .definition() + .listeners() + .forEach(l -> l.onTaskStarted(parentContext.position(), task)); - inputSchemaValidator.ifPresent(s -> s.validate(taskContext.rawInput())); - inputProcessor.ifPresent( - p -> taskContext.input(p.apply(workflowContext, taskContext, taskContext.rawInput()))); - internalExecute(workflowContext, taskContext); - outputProcessor.ifPresent( - p -> taskContext.output(p.apply(workflowContext, taskContext, taskContext.rawOutput()))); - outputSchemaValidator.ifPresent(s -> s.validate(taskContext.output())); - contextProcessor.ifPresent( - p -> - workflowContext.context( - p.apply(workflowContext, taskContext, workflowContext.context()))); - contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); - taskContext.completedAt(Instant.now()); - workflowContext - .definition() - .listeners() - .forEach(l -> l.onTaskEnded(parentContext.position(), task)); + inputSchemaValidator.ifPresent(s -> s.validate(taskContext.rawInput())); + inputProcessor.ifPresent( + p -> taskContext.input(p.apply(workflowContext, taskContext, taskContext.rawInput()))); + internalExecute(workflowContext, taskContext); + outputProcessor.ifPresent( + p -> taskContext.output(p.apply(workflowContext, taskContext, taskContext.rawOutput()))); + outputSchemaValidator.ifPresent(s -> s.validate(taskContext.output())); + contextProcessor.ifPresent( + p -> + workflowContext.context( + p.apply(workflowContext, taskContext, workflowContext.context()))); + contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); + taskContext.completedAt(Instant.now()); + workflowContext + .definition() + .listeners() + .forEach(l -> l.onTaskEnded(parentContext.position(), task)); + } return taskContext; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java index 89c099b6..e7dd07db 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java @@ -73,6 +73,8 @@ public TaskExecutor getTaskExecutor( return new TryExecutor(task.getTryTask(), definition); } else if (task.getForkTask() != null) { return new ForkExecutor(task.getForkTask(), definition); + } else if (task.getWaitTask() != null) { + return new WaitExecutor(task.getWaitTask(), definition); } throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet"); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java index 871a77da..c5dbc4fd 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java @@ -19,7 +19,6 @@ import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowUtils; public class DoExecutor extends AbstractTaskExecutor { @@ -29,6 +28,6 @@ protected DoExecutor(DoTask task, WorkflowDefinition definition) { @Override protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext); + TaskExecutorHelper.processTaskList(task.getDo(), workflow, taskContext); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java index e74a18f9..cb4ecec0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -52,7 +52,7 @@ protected void internalExecute(WorkflowContext workflow, TaskContext ta JsonNode item = iter.next(); taskContext.variables().put(task.getFor().getEach(), item); taskContext.variables().put(task.getFor().getAt(), i++); - WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext); + TaskExecutorHelper.processTaskList(task.getDo(), workflow, taskContext); } } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java index 484a6b9f..e0ce3b02 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.impl.executors; +import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.ForkTask; import io.serverlessworkflow.api.types.ForkTaskConfiguration; @@ -22,18 +23,17 @@ import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowState; -import io.serverlessworkflow.impl.WorkflowUtils; -import io.serverlessworkflow.impl.generic.SortedArrayList; +import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.json.JsonUtils; import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,8 +47,6 @@ protected ForkExecutor(ForkTask task, WorkflowDefinition definition) { service = definition.executorService(); } - private record BranchContext(String taskName, TaskContext taskContext) {} - @Override protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { ForkTaskConfiguration forkConfig = task.getFork(); @@ -62,13 +60,10 @@ protected void internalExecute(WorkflowContext workflow, TaskContext t item.getName(), service.submit(() -> executeBranch(workflow, taskContext.copy(), item, i))); } - List results = - new SortedArrayList<>( - (arg1, arg2) -> - arg1.taskContext.completedAt().compareTo(arg2.taskContext.completedAt())); + List>> results = new ArrayList<>(); for (Map.Entry>> entry : futures.entrySet()) { try { - results.add(new BranchContext(entry.getKey(), entry.getValue().get())); + results.add(Map.entry(entry.getKey(), entry.getValue().get())); } catch (ExecutionException ex) { Throwable cause = ex.getCause(); if (cause instanceof RuntimeException) { @@ -77,24 +72,25 @@ protected void internalExecute(WorkflowContext workflow, TaskContext t throw new UndeclaredThrowableException(ex); } } catch (InterruptedException ex) { - logger.warn( - "Thred executing branch {} was interrupted, this branch will be ignored", - entry.getKey(), - ex); + logger.warn("Branch {} was interrupted, no result will be recorded", entry.getKey(), ex); } } if (!results.isEmpty()) { + Stream>> sortedStream = + results.stream() + .sorted( + (arg1, arg2) -> + arg1.getValue().completedAt().compareTo(arg2.getValue().completedAt())); taskContext.rawOutput( forkConfig.isCompete() - ? results.get(0).taskContext().output() - : JsonUtils.fromValue( - results.stream() - .map( - e -> - JsonUtils.mapper() - .createObjectNode() - .set(e.taskName(), e.taskContext().output())) - .collect(Collectors.toList()))); + ? sortedStream.map(e -> e.getValue().output()).findFirst().orElseThrow() + : sortedStream + .map( + e -> + JsonUtils.mapper() + .createObjectNode() + .set(e.getKey(), e.getValue().output())) + .collect(JsonUtils.arrayNodeCollector())); } } } @@ -103,10 +99,10 @@ private TaskContext executeBranch( WorkflowContext workflow, TaskContext taskContext, TaskItem taskItem, int index) { taskContext.position().addIndex(index); TaskContext result = - WorkflowUtils.executeTask(workflow, taskContext, taskItem, taskContext.input()); + TaskExecutorHelper.executeTask(workflow, taskContext, taskItem, taskContext.input()); if (result.flowDirective() != null && result.flowDirective().getFlowDirectiveEnum() == FlowDirectiveEnum.END) { - workflow.instance().state(WorkflowState.COMPLETED); + workflow.instance().status(WorkflowStatus.COMPLETED); } taskContext.position().back(); return result; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java new file mode 100644 index 00000000..e16cb085 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java @@ -0,0 +1,111 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.FlowDirective; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.List; +import java.util.ListIterator; + +public class TaskExecutorHelper { + private TaskExecutorHelper() {} + + public static void processTaskList( + List tasks, WorkflowContext context, TaskContext parentTask) { + parentTask.position().addProperty("do"); + TaskContext currentContext = parentTask; + if (!tasks.isEmpty()) { + ListIterator iter = tasks.listIterator(); + TaskItem nextTask = iter.next(); + while (nextTask != null && isActive(context)) { + TaskItem task = nextTask; + parentTask.position().addIndex(iter.previousIndex()); + currentContext = executeTask(context, parentTask, task, currentContext.output()); + FlowDirective flowDirective = currentContext.flowDirective(); + if (flowDirective.getFlowDirectiveEnum() != null) { + switch (flowDirective.getFlowDirectiveEnum()) { + case CONTINUE: + nextTask = iter.hasNext() ? iter.next() : null; + break; + case END: + context.instance().status(WorkflowStatus.COMPLETED); + break; + case EXIT: + nextTask = null; + break; + } + } else { + nextTask = findTaskByName(iter, flowDirective.getString()); + } + parentTask.position().back(); + } + } + parentTask.position().back(); + parentTask.rawOutput(currentContext.output()); + } + + public static boolean isActive(WorkflowContext context) { + return isActive(context.instance().status()); + } + + public static boolean isActive(WorkflowStatus status) { + return status == WorkflowStatus.RUNNING; + } + + public static TaskContext executeTask( + WorkflowContext context, TaskContext parentTask, TaskItem task, JsonNode input) { + parentTask.position().addProperty(task.getName()); + TaskContext result = + context + .definition() + .taskExecutors() + .computeIfAbsent( + parentTask.position().jsonPointer(), + k -> + context + .definition() + .taskFactory() + .getTaskExecutor(task.getTask(), context.definition())) + .apply(context, parentTask, input); + parentTask.position().back(); + return result; + } + + private static TaskItem findTaskByName(ListIterator iter, String taskName) { + int currentIndex = iter.nextIndex(); + while (iter.hasPrevious()) { + TaskItem item = iter.previous(); + if (item.getName().equals(taskName)) { + return item; + } + } + while (iter.nextIndex() < currentIndex) { + iter.next(); + } + while (iter.hasNext()) { + TaskItem item = iter.next(); + if (item.getName().equals(taskName)) { + return item; + } + } + throw new IllegalArgumentException("Cannot find task with name " + taskName); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java index 67af7995..eed2801b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java @@ -48,7 +48,7 @@ protected TryExecutor(TryTask task, WorkflowDefinition definition) { @Override protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { try { - WorkflowUtils.processTaskList(task.getTry(), workflow, taskContext); + TaskExecutorHelper.processTaskList(task.getTry(), workflow, taskContext); } catch (WorkflowException exception) { if (errorFilter.map(f -> f.test(exception.getWorflowError())).orElse(true) && whenFilter @@ -58,7 +58,7 @@ protected void internalExecute(WorkflowContext workflow, TaskContext ta .map(w -> !w.apply(workflow, taskContext, taskContext.input()).asBoolean()) .orElse(true)) { if (task.getCatch().getDo() != null) { - WorkflowUtils.processTaskList(task.getCatch().getDo(), workflow, taskContext); + TaskExecutorHelper.processTaskList(task.getCatch().getDo(), workflow, taskContext); } } else { throw exception; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java new file mode 100644 index 00000000..a1fb31c5 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.DurationInline; +import io.serverlessworkflow.api.types.WaitTask; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import java.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WaitExecutor extends AbstractTaskExecutor { + + private static Logger logger = LoggerFactory.getLogger(WaitExecutor.class); + private final Duration millisToWait; + + protected WaitExecutor(WaitTask task, WorkflowDefinition definition) { + super(task, definition); + this.millisToWait = + task.getWait().getDurationInline() != null + ? toLong(task.getWait().getDurationInline()) + : Duration.parse(task.getWait().getDurationExpression()); + } + + private Duration toLong(DurationInline durationInline) { + Duration duration = Duration.ofMillis(durationInline.getMilliseconds()); + duration.plus(Duration.ofSeconds(durationInline.getSeconds())); + duration.plus(Duration.ofMinutes(durationInline.getMinutes())); + duration.plus(Duration.ofHours(durationInline.getHours())); + duration.plus(Duration.ofDays(durationInline.getDays())); + return duration; + } + + @Override + protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { + try { + Thread.sleep(millisToWait.toMillis()); + } catch (InterruptedException e) { + logger.warn("Waiting thread was interrupted", e); + Thread.currentThread().interrupt(); + } + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/generic/SortedArrayList.java b/impl/core/src/main/java/io/serverlessworkflow/impl/generic/SortedArrayList.java deleted file mode 100644 index e0647d1f..00000000 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/generic/SortedArrayList.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl.generic; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; - -public class SortedArrayList extends ArrayList { - - private static final long serialVersionUID = 1L; - private final Comparator comparator; - - public SortedArrayList() { - this(SortedArrayList::defaultCompare); - } - - @SuppressWarnings("unchecked") - private static int defaultCompare(V a, V b) { - return a instanceof Comparable ? ((Comparable) a).compareTo(b) : 0; - } - - public SortedArrayList(Comparator comparator) { - this.comparator = comparator; - } - - public SortedArrayList(Collection collection) { - this(collection, SortedArrayList::defaultCompare); - } - - public SortedArrayList(Collection collection, Comparator comparator) { - super(collection.size()); - this.comparator = comparator; - addAll(collection); - } - - @Override - public boolean add(T object) { - int i; - for (i = 0; i < size() && comparator.compare(object, get(i)) >= 0; i++) {} - super.add(i, object); - return true; - } - - public boolean addAll(Collection c) { - ensureCapacity(size() + c.size()); - c.forEach(this::add); - return !c.isEmpty(); - } - - public T set(int index, T element) { - throw new UnsupportedOperationException("Do not allow adding in a particular index"); - } -} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java index a13c8313..0726c2be 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java @@ -36,9 +36,16 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collector; public class JsonUtils { @@ -48,6 +55,38 @@ public static ObjectMapper mapper() { return mapper; } + public static Collector arrayNodeCollector() { + return new Collector() { + @Override + public BiConsumer accumulator() { + return (arrayNode, item) -> arrayNode.add(item); + } + + @Override + public Set characteristics() { + return Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH)); + } + + @Override + public BinaryOperator combiner() { + return (r1, r2) -> { + r1.addAll(r2); + return r1; + }; + } + + @Override + public Function finisher() { + return arrayNode -> arrayNode; + } + + @Override + public Supplier supplier() { + return () -> mapper.createArrayNode(); + } + }; + } + /* * Implementation note: * Although we can use directly ObjectMapper.convertValue for implementing fromValue and toJavaValue methods, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/jsonschema/DefaultSchemaValidator.java b/impl/core/src/main/java/io/serverlessworkflow/impl/jsonschema/DefaultSchemaValidator.java index d3ab3190..8982908f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/jsonschema/DefaultSchemaValidator.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/jsonschema/DefaultSchemaValidator.java @@ -21,33 +21,22 @@ import com.networknt.schema.SpecVersion.VersionFlag; import com.networknt.schema.ValidationMessage; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; public class DefaultSchemaValidator implements SchemaValidator { - private final JsonNode jsonNode; - private final AtomicReference schemaObject = new AtomicReference<>(); + private final JsonSchema schemaObject; public DefaultSchemaValidator(JsonNode jsonNode) { - this.jsonNode = jsonNode; + this.schemaObject = JsonSchemaFactory.getInstance(VersionFlag.V7).getSchema(jsonNode); } @Override public void validate(JsonNode node) { - Set report = getSchema().validate(node); + Set report = schemaObject.validate(node); if (!report.isEmpty()) { StringBuilder sb = new StringBuilder("There are JsonSchema validation errors:"); report.forEach(m -> sb.append(System.lineSeparator()).append(m.getMessage())); throw new IllegalArgumentException(sb.toString()); } } - - private JsonSchema getSchema() { - JsonSchema result = schemaObject.get(); - if (result == null) { - result = JsonSchemaFactory.getInstance(VersionFlag.V7).getSchema(jsonNode); - schemaObject.set(result); - } - return result; - } } diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/SortedListTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/SortedListTest.java deleted file mode 100644 index 1ad0ed98..00000000 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/SortedListTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.impl; - -import static org.assertj.core.api.Assertions.assertThat; - -import io.serverlessworkflow.impl.generic.SortedArrayList; -import java.time.Instant; -import java.util.Arrays; -import java.util.List; -import org.junit.jupiter.api.Test; - -public class SortedListTest { - - private record Person(String name, int age) {} - - @Test - void testConstructor() { - assertThat(new SortedArrayList<>(Arrays.asList(3, 2, 1))).isEqualTo(Arrays.asList(1, 2, 3)); - } - - @Test - void testAdd() { - List list = new SortedArrayList<>(); - list.add(1); - list.add(4); - list.add(3); - list.add(2); - assertThat(list).isEqualTo(Arrays.asList(1, 2, 3, 4)); - } - - @Test - void testAddPojo() { - List list = new SortedArrayList<>((a, b) -> b.age() - a.age()); - list.add(new Person("Mariam", 5)); - list.add(new Person("Belen", 12)); - list.add(new Person("Alejandro", 7)); - list.add(new Person("Vicente", 16)); - list.add(new Person("Daniel", 14)); - assertThat(list.stream().map(Person::name)) - .isEqualTo(Arrays.asList("Vicente", "Daniel", "Belen", "Alejandro", "Mariam")); - } - - @Test - void testAddAll() { - List list = new SortedArrayList<>(); - Instant now = Instant.now(); - list.addAll(Arrays.asList(now.plusMillis(1000), now)); - assertThat(list.get(0)).isEqualTo(now); - } -} diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java index e324de2a..e2a1dbf2 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java @@ -33,10 +33,13 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WorkflowDefinitionTest { private static WorkflowApplication appl; + private static Logger logger = LoggerFactory.getLogger(WorkflowDefinitionTest.class); private static Instant before; @BeforeAll @@ -138,6 +141,7 @@ private static Arguments args( private static void checkNotCompeteOuput(WorkflowInstance instance) { JsonNode out = instance.outputAsJsonNode(); + logger.debug("Output is {}", out); assertThat(out).isInstanceOf(ArrayNode.class); assertThat(out).hasSize(2); ArrayNode array = (ArrayNode) out; diff --git a/impl/core/src/test/resources/fork-no-compete.yaml b/impl/core/src/test/resources/fork-no-compete.yaml index 5e78acf2..ee882f13 100644 --- a/impl/core/src/test/resources/fork-no-compete.yaml +++ b/impl/core/src/test/resources/fork-no-compete.yaml @@ -9,10 +9,20 @@ do: compete: false branches: - callNurse: - set: - patientId: John - room: 1 + do: + - waitForNurse: + wait: + milliseconds: 500 + - nurseArrived: + set: + patientId: John + room: 1 - callDoctor: - set: - patientId: Smith - room: 2 \ No newline at end of file + do: + - waitForDoctor: + wait: + milliseconds: 499 + - doctorArrived: + set: + patientId: Smith + room: 2 \ No newline at end of file diff --git a/impl/core/src/test/resources/fork.yaml b/impl/core/src/test/resources/fork.yaml index dfde183b..e9f3e7a3 100644 --- a/impl/core/src/test/resources/fork.yaml +++ b/impl/core/src/test/resources/fork.yaml @@ -9,7 +9,7 @@ do: compete: true branches: - callNurse: - set: + set: patientId: John room: 1 - callDoctor: