diff --git a/impl/core/pom.xml b/impl/core/pom.xml index 940f79fa..b0cace0b 100644 --- a/impl/core/pom.xml +++ b/impl/core/pom.xml @@ -50,5 +50,10 @@ assertj-core test + + ch.qos.logback + logback-classic + test + diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java new file mode 100644 index 00000000..7c211149 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/ExecutorServiceFactory.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; + +@FunctionalInterface +public interface ExecutorServiceFactory extends Supplier {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java b/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java new file mode 100644 index 00000000..91b1b6c5 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.function.BiFunction; + +@FunctionalInterface +public interface LongFilter extends BiFunction, Long> {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java index c6d3f141..5ad4934f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java @@ -54,7 +54,7 @@ public String jsonPointer() { @Override public String toString() { - return "ListWorkflowPosition [list=" + queue + "]"; + return "QueueWorkflowPosition [queue=" + queue + "]"; } @Override @@ -65,6 +65,6 @@ public WorkflowPosition back() { @Override public Object last() { - return queue.pollLast(); + return queue.getLast(); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index cadde89c..dde5a315 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -28,35 +28,64 @@ public class TaskContext { private final JsonNode rawInput; private final T task; private final WorkflowPosition position; - private final Instant startedAt = Instant.now(); + private final Instant startedAt; private JsonNode input; private JsonNode output; private JsonNode rawOutput; private FlowDirective flowDirective; private Map contextVariables; + private Instant completedAt; public TaskContext(JsonNode input, WorkflowPosition position) { - this.rawInput = input; - this.position = position; - this.task = null; - this.contextVariables = new HashMap<>(); - init(); + this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>()); } public TaskContext(JsonNode input, TaskContext taskContext, T task) { - this.rawInput = input; - this.position = taskContext.position.copy(); + this( + input, + task, + taskContext.position, + Instant.now(), + input, + input, + input, + task.getThen(), + new HashMap<>(taskContext.variables())); + } + + private TaskContext( + JsonNode rawInput, + T task, + WorkflowPosition position, + Instant startedAt, + JsonNode input, + JsonNode output, + JsonNode rawOutput, + FlowDirective flowDirective, + Map contextVariables) { + this.rawInput = rawInput; this.task = task; - this.flowDirective = task.getThen(); - this.contextVariables = new HashMap<>(taskContext.variables()); - init(); + this.position = position; + this.startedAt = startedAt; + this.input = input; + this.output = output; + this.rawOutput = rawOutput; + this.flowDirective = flowDirective; + this.contextVariables = contextVariables; } - private void init() { - this.input = rawInput; - this.rawOutput = rawInput; - this.output = rawInput; + public TaskContext copy() { + return new TaskContext( + rawInput, + task, + position.copy(), + startedAt, + input, + output, + rawOutput, + flowDirective, + new HashMap<>(contextVariables)); } public void input(JsonNode input) { @@ -115,4 +144,12 @@ public WorkflowPosition position() { public Instant startedAt() { return startedAt; } + + public void completedAt(Instant instant) { + this.completedAt = instant; + } + + public Instant completedAt() { + return completedAt; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 4f35de41..f36c23f6 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -32,6 +32,8 @@ import java.util.HashSet; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class WorkflowApplication implements AutoCloseable { @@ -43,8 +45,11 @@ public class WorkflowApplication implements AutoCloseable { private final Collection listeners; private final Map definitions; private final WorkflowPositionFactory positionFactory; + private final ExecutorServiceFactory executorFactory; private final RuntimeDescriptorFactory runtimeDescriptorFactory; + private ExecutorService executorService; + public WorkflowApplication( TaskExecutorFactory taskFactory, ExpressionFactory exprFactory, @@ -53,6 +58,7 @@ public WorkflowApplication( WorkflowPositionFactory positionFactory, WorkflowIdFactory idFactory, RuntimeDescriptorFactory runtimeDescriptorFactory, + ExecutorServiceFactory executorFactory, Collection listeners) { this.taskFactory = taskFactory; this.exprFactory = exprFactory; @@ -61,6 +67,7 @@ public WorkflowApplication( this.positionFactory = positionFactory; this.idFactory = idFactory; this.runtimeDescriptorFactory = runtimeDescriptorFactory; + this.executorFactory = executorFactory; this.listeners = listeners; this.definitions = new ConcurrentHashMap<>(); } @@ -101,6 +108,7 @@ public static class Builder { private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get(); private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition(); private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString(); + private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool(); private RuntimeDescriptorFactory descriptorFactory = () -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap()); @@ -129,6 +137,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) { return this; } + public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) { + this.executorFactory = executorFactory; + return this; + } + public Builder withPositionFactory(WorkflowPositionFactory positionFactory) { this.positionFactory = positionFactory; return this; @@ -158,6 +171,7 @@ public WorkflowApplication build() { positionFactory, idFactory, descriptorFactory, + executorFactory, listeners == null ? Collections.emptySet() : Collections.unmodifiableCollection(listeners)); @@ -190,4 +204,13 @@ public WorkflowPositionFactory positionFactory() { public RuntimeDescriptorFactory runtimeDescriptorFactory() { return runtimeDescriptorFactory; } + + public ExecutorService executorService() { + synchronized (executorFactory) { + if (executorService == null) { + executorService = executorFactory.get(); + } + } + return executorService; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index db8b5e4d..df5b70e1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; public class WorkflowDefinition implements AutoCloseable { @@ -48,22 +49,19 @@ public class WorkflowDefinition implements AutoCloseable { private WorkflowDefinition( WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) { - this.workflow = workflow; this.application = application; this.resourceLoader = resourceLoader; if (workflow.getInput() != null) { Input input = workflow.getInput(); this.inputSchemaValidator = - getSchemaValidator( - application.validatorFactory(), schemaToNode(resourceLoader, input.getSchema())); + getSchemaValidator(application.validatorFactory(), resourceLoader, input.getSchema()); this.inputFilter = buildWorkflowFilter(application.expressionFactory(), input.getFrom()); } if (workflow.getOutput() != null) { Output output = workflow.getOutput(); this.outputSchemaValidator = - getSchemaValidator( - application.validatorFactory(), schemaToNode(resourceLoader, output.getSchema())); + getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema()); this.outputFilter = buildWorkflowFilter(application.expressionFactory(), output.getAs()); } } @@ -134,6 +132,10 @@ public WorkflowPositionFactory positionFactory() { return application.positionFactory(); } + public ExecutorService executorService() { + return application.executorService(); + } + public RuntimeDescriptorFactory runtimeDescriptorFactory() { return application.runtimeDescriptorFactory(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java index ce72c70e..c121bb41 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowExecutionListener.java @@ -15,11 +15,11 @@ */ package io.serverlessworkflow.impl; -import io.serverlessworkflow.api.types.Task; +import io.serverlessworkflow.api.types.TaskBase; public interface WorkflowExecutionListener { - void onTaskStarted(WorkflowPosition currentPos, Task task); + void onTaskStarted(WorkflowPosition currentPos, TaskBase task); - void onTaskEnded(WorkflowPosition currentPos, Task task); + void onTaskEnded(WorkflowPosition currentPos, TaskBase task); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java index 0f3bd410..f81a6f24 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -19,15 +19,17 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NullNode; +import io.serverlessworkflow.impl.executors.TaskExecutorHelper; import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; public class WorkflowInstance { - private WorkflowState state; - private TaskContext taskContext; + private final AtomicReference status; + private final TaskContext taskContext; private final String id; private final JsonNode input; private final Instant startedAt; - private JsonNode context = NullNode.getInstance(); + private final AtomicReference context; WorkflowInstance(WorkflowDefinition definition, JsonNode input) { this.id = definition.idFactory().get(); @@ -39,14 +41,16 @@ public class WorkflowInstance { definition .inputFilter() .ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input))); - state = WorkflowState.STARTED; - WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext); + status = new AtomicReference<>(WorkflowStatus.RUNNING); + context = new AtomicReference<>(NullNode.getInstance()); + TaskExecutorHelper.processTaskList(definition.workflow().getDo(), workflowContext, taskContext); definition .outputFilter() .ifPresent( f -> taskContext.output(f.apply(workflowContext, taskContext, taskContext.rawOutput()))); definition.outputSchemaValidator().ifPresent(v -> v.validate(taskContext.output())); + status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED); } public String id() { @@ -62,22 +66,26 @@ public JsonNode input() { } public JsonNode context() { - return context; + return context.get(); } - public WorkflowState state() { - return state; + public WorkflowStatus status() { + return status.get(); + } + + public void status(WorkflowStatus state) { + this.status.set(state); } public Object output() { return toJavaValue(taskContext.output()); } - public Object outputAsJsonNode() { + public JsonNode outputAsJsonNode() { return taskContext.output(); } void context(JsonNode context) { - this.context = context; + this.context.set(context); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowState.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java similarity index 88% rename from impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowState.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java index 310dbd0b..bc657839 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowState.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowStatus.java @@ -15,8 +15,11 @@ */ package io.serverlessworkflow.impl; -public enum WorkflowState { - STARTED, +public enum WorkflowStatus { + PENDING, + RUNNING, WAITING, - COMPLETED + COMPLETED, + FAULTED, + CANCELLED } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java index 01486d5f..0866ba05 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java @@ -19,14 +19,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.serverlessworkflow.api.WorkflowFormat; import io.serverlessworkflow.api.types.ExportAs; -import io.serverlessworkflow.api.types.FlowDirective; import io.serverlessworkflow.api.types.InputFrom; import io.serverlessworkflow.api.types.OutputAs; import io.serverlessworkflow.api.types.SchemaExternal; import io.serverlessworkflow.api.types.SchemaInline; import io.serverlessworkflow.api.types.SchemaUnion; -import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.impl.expressions.Expression; import io.serverlessworkflow.impl.expressions.ExpressionFactory; import io.serverlessworkflow.impl.expressions.ExpressionUtils; @@ -38,8 +35,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; -import java.util.List; -import java.util.ListIterator; import java.util.Map; import java.util.Optional; @@ -48,11 +43,12 @@ public class WorkflowUtils { private WorkflowUtils() {} public static Optional getSchemaValidator( - SchemaValidatorFactory validatorFactory, Optional node) { - return node.map(n -> validatorFactory.getValidator(n)); + SchemaValidatorFactory validatorFactory, ResourceLoader resourceLoader, SchemaUnion schema) { + return schemaToNode(resourceLoader, schema).map(n -> validatorFactory.getValidator(n)); } - public static Optional schemaToNode(ResourceLoader resourceLoader, SchemaUnion schema) { + private static Optional schemaToNode( + ResourceLoader resourceLoader, SchemaUnion schema) { if (schema != null) { if (schema.getSchemaInline() != null) { SchemaInline inline = schema.getSchemaInline(); @@ -94,18 +90,22 @@ public static Optional buildWorkflowFilter( public static StringFilter buildStringFilter( ExpressionFactory exprFactory, String expression, String literal) { - return expression != null ? from(buildWorkflowFilter(exprFactory, expression)) : from(literal); + return expression != null + ? toString(buildWorkflowFilter(exprFactory, expression)) + : toString(literal); } public static StringFilter buildStringFilter(ExpressionFactory exprFactory, String str) { - return ExpressionUtils.isExpr(str) ? from(buildWorkflowFilter(exprFactory, str)) : from(str); + return ExpressionUtils.isExpr(str) + ? toString(buildWorkflowFilter(exprFactory, str)) + : toString(str); } - public static StringFilter from(WorkflowFilter filter) { + private static StringFilter toString(WorkflowFilter filter) { return (w, t) -> filter.apply(w, t, t.input()).asText(); } - private static StringFilter from(String literal) { + private static StringFilter toString(String literal) { return (w, t) -> literal; } @@ -124,75 +124,19 @@ private static WorkflowFilter buildWorkflowFilter( throw new IllegalStateException("Both object and str are null"); } - private static TaskItem findTaskByName(ListIterator iter, String taskName) { - int currentIndex = iter.nextIndex(); - while (iter.hasPrevious()) { - TaskItem item = iter.previous(); - if (item.getName().equals(taskName)) { - return item; - } - } - while (iter.nextIndex() < currentIndex) { - iter.next(); - } - while (iter.hasNext()) { - TaskItem item = iter.next(); - if (item.getName().equals(taskName)) { - return item; - } - } - throw new IllegalArgumentException("Cannot find task with name " + taskName); + public static LongFilter buildLongFilter( + ExpressionFactory exprFactory, String expression, Long literal) { + return expression != null + ? toLong(buildWorkflowFilter(exprFactory, expression)) + : toLong(literal); } - public static void processTaskList( - List tasks, WorkflowContext context, TaskContext parentTask) { - parentTask.position().addProperty("do"); - TaskContext currentContext = parentTask; - if (!tasks.isEmpty()) { - ListIterator iter = tasks.listIterator(); - TaskItem nextTask = iter.next(); - while (nextTask != null) { - TaskItem task = nextTask; - parentTask.position().addIndex(iter.previousIndex()).addProperty(task.getName()); - context - .definition() - .listeners() - .forEach(l -> l.onTaskStarted(parentTask.position(), task.getTask())); - currentContext = - context - .definition() - .taskExecutors() - .computeIfAbsent( - parentTask.position().jsonPointer(), - k -> - context - .definition() - .taskFactory() - .getTaskExecutor(task.getTask(), context.definition())) - .apply(context, parentTask, currentContext.output()); - FlowDirective flowDirective = currentContext.flowDirective(); - if (flowDirective.getFlowDirectiveEnum() != null) { - switch (flowDirective.getFlowDirectiveEnum()) { - case CONTINUE: - nextTask = iter.hasNext() ? iter.next() : null; - break; - case END: - case EXIT: - nextTask = null; - break; - } - } else { - nextTask = WorkflowUtils.findTaskByName(iter, flowDirective.getString()); - } - context - .definition() - .listeners() - .forEach(l -> l.onTaskEnded(parentTask.position(), task.getTask())); - parentTask.position().back(); - } - } - parentTask.position().back(); - parentTask.rawOutput(currentContext.output()); + private static LongFilter toLong(WorkflowFilter filter) { + return (w, t) -> filter.apply(w, t, t.input()).asLong(); + } + + private static LongFilter toLong(Long literal) { + return (w, t) -> literal; } public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index 086c7c51..f5ee1136 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -27,6 +27,7 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowFilter; import io.serverlessworkflow.impl.jsonschema.SchemaValidator; +import java.time.Instant; import java.util.Optional; public abstract class AbstractTaskExecutor implements TaskExecutor { @@ -53,8 +54,7 @@ private void buildInputProcessors(WorkflowDefinition definition) { this.inputProcessor = buildWorkflowFilter(definition.expressionFactory(), input.getFrom()); this.inputSchemaValidator = getSchemaValidator( - definition.validatorFactory(), - schemaToNode(definition.resourceLoader(), input.getSchema())); + definition.validatorFactory(), definition.resourceLoader(), input.getSchema()); } } @@ -64,8 +64,7 @@ private void buildOutputProcessors(WorkflowDefinition definition) { this.outputProcessor = buildWorkflowFilter(definition.expressionFactory(), output.getAs()); this.outputSchemaValidator = getSchemaValidator( - definition.validatorFactory(), - schemaToNode(definition.resourceLoader(), output.getSchema())); + definition.validatorFactory(), definition.resourceLoader(), output.getSchema()); } } @@ -77,8 +76,7 @@ private void buildContextProcessors(WorkflowDefinition definition) { } this.contextSchemaValidator = getSchemaValidator( - definition.validatorFactory(), - schemaToNode(definition.resourceLoader(), export.getSchema())); + definition.validatorFactory(), definition.resourceLoader(), export.getSchema()); } } @@ -86,18 +84,31 @@ private void buildContextProcessors(WorkflowDefinition definition) { public TaskContext apply( WorkflowContext workflowContext, TaskContext parentContext, JsonNode input) { TaskContext taskContext = new TaskContext<>(input, parentContext, task); - inputSchemaValidator.ifPresent(s -> s.validate(taskContext.rawInput())); - inputProcessor.ifPresent( - p -> taskContext.input(p.apply(workflowContext, taskContext, taskContext.rawInput()))); - internalExecute(workflowContext, taskContext); - outputProcessor.ifPresent( - p -> taskContext.output(p.apply(workflowContext, taskContext, taskContext.rawOutput()))); - outputSchemaValidator.ifPresent(s -> s.validate(taskContext.output())); - contextProcessor.ifPresent( - p -> - workflowContext.context( - p.apply(workflowContext, taskContext, workflowContext.context()))); - contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); + if (TaskExecutorHelper.isActive(workflowContext)) { + + workflowContext + .definition() + .listeners() + .forEach(l -> l.onTaskStarted(parentContext.position(), task)); + + inputSchemaValidator.ifPresent(s -> s.validate(taskContext.rawInput())); + inputProcessor.ifPresent( + p -> taskContext.input(p.apply(workflowContext, taskContext, taskContext.rawInput()))); + internalExecute(workflowContext, taskContext); + outputProcessor.ifPresent( + p -> taskContext.output(p.apply(workflowContext, taskContext, taskContext.rawOutput()))); + outputSchemaValidator.ifPresent(s -> s.validate(taskContext.output())); + contextProcessor.ifPresent( + p -> + workflowContext.context( + p.apply(workflowContext, taskContext, workflowContext.context()))); + contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); + taskContext.completedAt(Instant.now()); + workflowContext + .definition() + .listeners() + .forEach(l -> l.onTaskEnded(parentContext.position(), task)); + } return taskContext; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java index 58bd18ac..e7dd07db 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java @@ -71,6 +71,10 @@ public TaskExecutor getTaskExecutor( return new RaiseExecutor(task.getRaiseTask(), definition); } else if (task.getTryTask() != null) { return new TryExecutor(task.getTryTask(), definition); + } else if (task.getForkTask() != null) { + return new ForkExecutor(task.getForkTask(), definition); + } else if (task.getWaitTask() != null) { + return new WaitExecutor(task.getWaitTask(), definition); } throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet"); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java index 871a77da..c5dbc4fd 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java @@ -19,7 +19,6 @@ import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowUtils; public class DoExecutor extends AbstractTaskExecutor { @@ -29,6 +28,6 @@ protected DoExecutor(DoTask task, WorkflowDefinition definition) { @Override protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext); + TaskExecutorHelper.processTaskList(task.getDo(), workflow, taskContext); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java index e74a18f9..cb4ecec0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -52,7 +52,7 @@ protected void internalExecute(WorkflowContext workflow, TaskContext ta JsonNode item = iter.next(); taskContext.variables().put(task.getFor().getEach(), item); taskContext.variables().put(task.getFor().getAt(), i++); - WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext); + TaskExecutorHelper.processTaskList(task.getDo(), workflow, taskContext); } } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java new file mode 100644 index 00000000..e0ce3b02 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java @@ -0,0 +1,110 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.FlowDirectiveEnum; +import io.serverlessworkflow.api.types.ForkTask; +import io.serverlessworkflow.api.types.ForkTaskConfiguration; +import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.json.JsonUtils; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ForkExecutor extends AbstractTaskExecutor { + + private static final Logger logger = LoggerFactory.getLogger(ForkExecutor.class); + private final ExecutorService service; + + protected ForkExecutor(ForkTask task, WorkflowDefinition definition) { + super(task, definition); + service = definition.executorService(); + } + + @Override + protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { + ForkTaskConfiguration forkConfig = task.getFork(); + + if (!forkConfig.getBranches().isEmpty()) { + Map>> futures = new HashMap<>(); + int index = 0; + for (TaskItem item : forkConfig.getBranches()) { + final int i = index++; + futures.put( + item.getName(), + service.submit(() -> executeBranch(workflow, taskContext.copy(), item, i))); + } + List>> results = new ArrayList<>(); + for (Map.Entry>> entry : futures.entrySet()) { + try { + results.add(Map.entry(entry.getKey(), entry.getValue().get())); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else { + throw new UndeclaredThrowableException(ex); + } + } catch (InterruptedException ex) { + logger.warn("Branch {} was interrupted, no result will be recorded", entry.getKey(), ex); + } + } + if (!results.isEmpty()) { + Stream>> sortedStream = + results.stream() + .sorted( + (arg1, arg2) -> + arg1.getValue().completedAt().compareTo(arg2.getValue().completedAt())); + taskContext.rawOutput( + forkConfig.isCompete() + ? sortedStream.map(e -> e.getValue().output()).findFirst().orElseThrow() + : sortedStream + .map( + e -> + JsonUtils.mapper() + .createObjectNode() + .set(e.getKey(), e.getValue().output())) + .collect(JsonUtils.arrayNodeCollector())); + } + } + } + + private TaskContext executeBranch( + WorkflowContext workflow, TaskContext taskContext, TaskItem taskItem, int index) { + taskContext.position().addIndex(index); + TaskContext result = + TaskExecutorHelper.executeTask(workflow, taskContext, taskItem, taskContext.input()); + if (result.flowDirective() != null + && result.flowDirective().getFlowDirectiveEnum() == FlowDirectiveEnum.END) { + workflow.instance().status(WorkflowStatus.COMPLETED); + } + taskContext.position().back(); + return result; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java new file mode 100644 index 00000000..e16cb085 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java @@ -0,0 +1,111 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.FlowDirective; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowStatus; +import java.util.List; +import java.util.ListIterator; + +public class TaskExecutorHelper { + private TaskExecutorHelper() {} + + public static void processTaskList( + List tasks, WorkflowContext context, TaskContext parentTask) { + parentTask.position().addProperty("do"); + TaskContext currentContext = parentTask; + if (!tasks.isEmpty()) { + ListIterator iter = tasks.listIterator(); + TaskItem nextTask = iter.next(); + while (nextTask != null && isActive(context)) { + TaskItem task = nextTask; + parentTask.position().addIndex(iter.previousIndex()); + currentContext = executeTask(context, parentTask, task, currentContext.output()); + FlowDirective flowDirective = currentContext.flowDirective(); + if (flowDirective.getFlowDirectiveEnum() != null) { + switch (flowDirective.getFlowDirectiveEnum()) { + case CONTINUE: + nextTask = iter.hasNext() ? iter.next() : null; + break; + case END: + context.instance().status(WorkflowStatus.COMPLETED); + break; + case EXIT: + nextTask = null; + break; + } + } else { + nextTask = findTaskByName(iter, flowDirective.getString()); + } + parentTask.position().back(); + } + } + parentTask.position().back(); + parentTask.rawOutput(currentContext.output()); + } + + public static boolean isActive(WorkflowContext context) { + return isActive(context.instance().status()); + } + + public static boolean isActive(WorkflowStatus status) { + return status == WorkflowStatus.RUNNING; + } + + public static TaskContext executeTask( + WorkflowContext context, TaskContext parentTask, TaskItem task, JsonNode input) { + parentTask.position().addProperty(task.getName()); + TaskContext result = + context + .definition() + .taskExecutors() + .computeIfAbsent( + parentTask.position().jsonPointer(), + k -> + context + .definition() + .taskFactory() + .getTaskExecutor(task.getTask(), context.definition())) + .apply(context, parentTask, input); + parentTask.position().back(); + return result; + } + + private static TaskItem findTaskByName(ListIterator iter, String taskName) { + int currentIndex = iter.nextIndex(); + while (iter.hasPrevious()) { + TaskItem item = iter.previous(); + if (item.getName().equals(taskName)) { + return item; + } + } + while (iter.nextIndex() < currentIndex) { + iter.next(); + } + while (iter.hasNext()) { + TaskItem item = iter.next(); + if (item.getName().equals(taskName)) { + return item; + } + } + throw new IllegalArgumentException("Cannot find task with name " + taskName); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java index 67af7995..eed2801b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java @@ -48,7 +48,7 @@ protected TryExecutor(TryTask task, WorkflowDefinition definition) { @Override protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { try { - WorkflowUtils.processTaskList(task.getTry(), workflow, taskContext); + TaskExecutorHelper.processTaskList(task.getTry(), workflow, taskContext); } catch (WorkflowException exception) { if (errorFilter.map(f -> f.test(exception.getWorflowError())).orElse(true) && whenFilter @@ -58,7 +58,7 @@ protected void internalExecute(WorkflowContext workflow, TaskContext ta .map(w -> !w.apply(workflow, taskContext, taskContext.input()).asBoolean()) .orElse(true)) { if (task.getCatch().getDo() != null) { - WorkflowUtils.processTaskList(task.getCatch().getDo(), workflow, taskContext); + TaskExecutorHelper.processTaskList(task.getCatch().getDo(), workflow, taskContext); } } else { throw exception; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java new file mode 100644 index 00000000..a1fb31c5 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.DurationInline; +import io.serverlessworkflow.api.types.WaitTask; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import java.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WaitExecutor extends AbstractTaskExecutor { + + private static Logger logger = LoggerFactory.getLogger(WaitExecutor.class); + private final Duration millisToWait; + + protected WaitExecutor(WaitTask task, WorkflowDefinition definition) { + super(task, definition); + this.millisToWait = + task.getWait().getDurationInline() != null + ? toLong(task.getWait().getDurationInline()) + : Duration.parse(task.getWait().getDurationExpression()); + } + + private Duration toLong(DurationInline durationInline) { + Duration duration = Duration.ofMillis(durationInline.getMilliseconds()); + duration.plus(Duration.ofSeconds(durationInline.getSeconds())); + duration.plus(Duration.ofMinutes(durationInline.getMinutes())); + duration.plus(Duration.ofHours(durationInline.getHours())); + duration.plus(Duration.ofDays(durationInline.getDays())); + return duration; + } + + @Override + protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { + try { + Thread.sleep(millisToWait.toMillis()); + } catch (InterruptedException e) { + logger.warn("Waiting thread was interrupted", e); + Thread.currentThread().interrupt(); + } + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java index a13c8313..0726c2be 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/json/JsonUtils.java @@ -36,9 +36,16 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collector; public class JsonUtils { @@ -48,6 +55,38 @@ public static ObjectMapper mapper() { return mapper; } + public static Collector arrayNodeCollector() { + return new Collector() { + @Override + public BiConsumer accumulator() { + return (arrayNode, item) -> arrayNode.add(item); + } + + @Override + public Set characteristics() { + return Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH)); + } + + @Override + public BinaryOperator combiner() { + return (r1, r2) -> { + r1.addAll(r2); + return r1; + }; + } + + @Override + public Function finisher() { + return arrayNode -> arrayNode; + } + + @Override + public Supplier supplier() { + return () -> mapper.createArrayNode(); + } + }; + } + /* * Implementation note: * Although we can use directly ObjectMapper.convertValue for implementing fromValue and toJavaValue methods, diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/jsonschema/DefaultSchemaValidator.java b/impl/core/src/main/java/io/serverlessworkflow/impl/jsonschema/DefaultSchemaValidator.java index d3ab3190..8982908f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/jsonschema/DefaultSchemaValidator.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/jsonschema/DefaultSchemaValidator.java @@ -21,33 +21,22 @@ import com.networknt.schema.SpecVersion.VersionFlag; import com.networknt.schema.ValidationMessage; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; public class DefaultSchemaValidator implements SchemaValidator { - private final JsonNode jsonNode; - private final AtomicReference schemaObject = new AtomicReference<>(); + private final JsonSchema schemaObject; public DefaultSchemaValidator(JsonNode jsonNode) { - this.jsonNode = jsonNode; + this.schemaObject = JsonSchemaFactory.getInstance(VersionFlag.V7).getSchema(jsonNode); } @Override public void validate(JsonNode node) { - Set report = getSchema().validate(node); + Set report = schemaObject.validate(node); if (!report.isEmpty()) { StringBuilder sb = new StringBuilder("There are JsonSchema validation errors:"); report.forEach(m -> sb.append(System.lineSeparator()).append(m.getMessage())); throw new IllegalArgumentException(sb.toString()); } } - - private JsonSchema getSchema() { - JsonSchema result = schemaObject.get(); - if (result == null) { - result = JsonSchemaFactory.getInstance(VersionFlag.V7).getSchema(jsonNode); - schemaObject.set(result); - } - return result; - } } diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java index 6b29bac1..e2a1dbf2 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java @@ -19,6 +19,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowableOfType; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.serverlessworkflow.impl.json.JsonUtils; import java.io.IOException; import java.time.Instant; import java.util.Arrays; @@ -29,10 +33,13 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class WorkflowDefinitionTest { private static WorkflowApplication appl; + private static Logger logger = LoggerFactory.getLogger(WorkflowDefinitionTest.class); private static Instant before; @BeforeAll @@ -108,7 +115,14 @@ private static Stream provideParameters() { args( "raise-reusable.yaml", WorkflowDefinitionTest::checkWorkflowException, - WorkflowException.class)); + WorkflowException.class), + args( + "fork.yaml", + Map.of(), + o -> + assertThat(((ObjectNode) o.outputAsJsonNode()).get("patientId").asText()) + .isIn("John", "Smith")), + args("fork-no-compete.yaml", Map.of(), WorkflowDefinitionTest::checkNotCompeteOuput)); } private static Arguments args( @@ -125,6 +139,25 @@ private static Arguments args( d -> consumer.accept(catchThrowableOfType(clazz, () -> d.execute(Map.of())))); } + private static void checkNotCompeteOuput(WorkflowInstance instance) { + JsonNode out = instance.outputAsJsonNode(); + logger.debug("Output is {}", out); + assertThat(out).isInstanceOf(ArrayNode.class); + assertThat(out).hasSize(2); + ArrayNode array = (ArrayNode) out; + assertThat(array) + .containsExactlyInAnyOrder( + createObjectNode("callNurse", "patientId", "John", "room", 1), + createObjectNode("callDoctor", "patientId", "Smith", "room", 2)); + } + + private static JsonNode createObjectNode( + String parent, String key1, String value1, String key2, int value2) { + return JsonUtils.mapper() + .createObjectNode() + .set(parent, JsonUtils.mapper().createObjectNode().put(key1, value1).put(key2, value2)); + } + private static void checkWorkflowException(WorkflowException ex) { assertThat(ex.getWorflowError().type()) .isEqualTo("https://serverlessworkflow.io/errors/not-implemented"); diff --git a/impl/core/src/test/resources/fork-no-compete.yaml b/impl/core/src/test/resources/fork-no-compete.yaml new file mode 100644 index 00000000..ee882f13 --- /dev/null +++ b/impl/core/src/test/resources/fork-no-compete.yaml @@ -0,0 +1,28 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: fork-example + version: '0.1.0' +do: + - callSomeone: + fork: + compete: false + branches: + - callNurse: + do: + - waitForNurse: + wait: + milliseconds: 500 + - nurseArrived: + set: + patientId: John + room: 1 + - callDoctor: + do: + - waitForDoctor: + wait: + milliseconds: 499 + - doctorArrived: + set: + patientId: Smith + room: 2 \ No newline at end of file diff --git a/impl/core/src/test/resources/fork.yaml b/impl/core/src/test/resources/fork.yaml new file mode 100644 index 00000000..e9f3e7a3 --- /dev/null +++ b/impl/core/src/test/resources/fork.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: fork-no-compete + version: '0.1.0' +do: + - callSomeone: + fork: + compete: true + branches: + - callNurse: + set: + patientId: John + room: 1 + - callDoctor: + set: + patientId: Smith + room: 2 \ No newline at end of file