diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java b/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java index 91b1b6c5..ec52d251 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/LongFilter.java @@ -18,4 +18,4 @@ import java.util.function.BiFunction; @FunctionalInterface -public interface LongFilter extends BiFunction, Long> {} +public interface LongFilter extends BiFunction {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/StringFilter.java b/impl/core/src/main/java/io/serverlessworkflow/impl/StringFilter.java index 5d0a648e..3ededc3f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/StringFilter.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/StringFilter.java @@ -18,4 +18,4 @@ import java.util.function.BiFunction; @FunctionalInterface -public interface StringFilter extends BiFunction, String> {} +public interface StringFilter extends BiFunction {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index dde5a315..4fc3d1f4 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -16,76 +16,64 @@ package io.serverlessworkflow.impl; import com.fasterxml.jackson.databind.JsonNode; -import io.serverlessworkflow.api.types.FlowDirective; -import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.impl.executors.TransitionInfo; import java.time.Instant; import java.util.HashMap; import java.util.Map; +import java.util.Optional; -public class TaskContext { +public class TaskContext { private final JsonNode rawInput; - private final T task; + private final TaskBase task; private final WorkflowPosition position; private final Instant startedAt; + private final String taskName; + private final Map contextVariables; + private final Optional parentContext; private JsonNode input; private JsonNode output; private JsonNode rawOutput; - private FlowDirective flowDirective; - private Map contextVariables; private Instant completedAt; + private TransitionInfo transition; - public TaskContext(JsonNode input, WorkflowPosition position) { - this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>()); - } - - public TaskContext(JsonNode input, TaskContext taskContext, T task) { - this( - input, - task, - taskContext.position, - Instant.now(), - input, - input, - input, - task.getThen(), - new HashMap<>(taskContext.variables())); + public TaskContext( + JsonNode input, + WorkflowPosition position, + Optional parentContext, + String taskName, + TaskBase task) { + this(input, parentContext, taskName, task, position, Instant.now(), input, input, input); } private TaskContext( JsonNode rawInput, - T task, + Optional parentContext, + String taskName, + TaskBase task, WorkflowPosition position, Instant startedAt, JsonNode input, JsonNode output, - JsonNode rawOutput, - FlowDirective flowDirective, - Map contextVariables) { + JsonNode rawOutput) { this.rawInput = rawInput; + this.parentContext = parentContext; + this.taskName = taskName; this.task = task; this.position = position; this.startedAt = startedAt; this.input = input; this.output = output; this.rawOutput = rawOutput; - this.flowDirective = flowDirective; - this.contextVariables = contextVariables; + this.contextVariables = + parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new); } - public TaskContext copy() { - return new TaskContext( - rawInput, - task, - position.copy(), - startedAt, - input, - output, - rawOutput, - flowDirective, - new HashMap<>(contextVariables)); + public TaskContext copy() { + return new TaskContext( + rawInput, parentContext, taskName, task, position, startedAt, input, output, rawOutput); } public void input(JsonNode input) { @@ -102,54 +90,64 @@ public JsonNode rawInput() { return rawInput; } - public T task() { + public TaskBase task() { return task; } - public void rawOutput(JsonNode output) { + public TaskContext rawOutput(JsonNode output) { this.rawOutput = output; this.output = output; + return this; } public JsonNode rawOutput() { return rawOutput; } - public void output(JsonNode output) { + public TaskContext output(JsonNode output) { this.output = output; + return this; } public JsonNode output() { return output; } - public void flowDirective(FlowDirective flowDirective) { - this.flowDirective = flowDirective; - } - - public FlowDirective flowDirective() { - return flowDirective == null - ? new FlowDirective().withFlowDirectiveEnum(FlowDirectiveEnum.CONTINUE) - : flowDirective; + public WorkflowPosition position() { + return position; } public Map variables() { return contextVariables; } - public WorkflowPosition position() { - return position; - } - public Instant startedAt() { return startedAt; } - public void completedAt(Instant instant) { + public Optional parent() { + return parentContext; + } + + public String taskName() { + return taskName; + } + + public TaskContext completedAt(Instant instant) { this.completedAt = instant; + return this; } public Instant completedAt() { return completedAt; } + + public TransitionInfo transition() { + return transition; + } + + public TaskContext transition(TransitionInfo transition) { + this.transition = transition; + return this; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java index f45f1b84..96890c8b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java @@ -20,6 +20,7 @@ public class WorkflowContext { private final WorkflowDefinition definition; private final WorkflowInstance instance; + private JsonNode context; WorkflowContext(WorkflowDefinition definition, WorkflowInstance instance) { this.definition = definition; @@ -31,11 +32,11 @@ public WorkflowInstance instance() { } public JsonNode context() { - return instance.context(); + return context; } public void context(JsonNode context) { - this.instance.context(context); + this.context = context; } public WorkflowDefinition definition() { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 5596f87e..c872a80c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -19,21 +19,15 @@ import io.serverlessworkflow.api.types.Input; import io.serverlessworkflow.api.types.Output; -import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.executors.TaskExecutor; -import io.serverlessworkflow.impl.executors.TaskExecutorFactory; -import io.serverlessworkflow.impl.expressions.ExpressionFactory; +import io.serverlessworkflow.impl.executors.TaskExecutorHelper; import io.serverlessworkflow.impl.json.JsonUtils; import io.serverlessworkflow.impl.jsonschema.SchemaValidator; -import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory; import io.serverlessworkflow.impl.resources.ResourceLoader; import java.nio.file.Path; import java.util.Collection; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; public class WorkflowDefinition implements AutoCloseable { @@ -42,16 +36,13 @@ public class WorkflowDefinition implements AutoCloseable { private Optional outputSchemaValidator = Optional.empty(); private Optional inputFilter = Optional.empty(); private Optional outputFilter = Optional.empty(); - private final Map> taskExecutors = - new ConcurrentHashMap<>(); - private final ResourceLoader resourceLoader; private final WorkflowApplication application; + private final TaskExecutor taskExecutor; private WorkflowDefinition( WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) { this.workflow = workflow; this.application = application; - this.resourceLoader = resourceLoader; if (workflow.getInput() != null) { Input input = workflow.getInput(); this.inputSchemaValidator = @@ -64,6 +55,13 @@ private WorkflowDefinition( getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema()); this.outputFilter = buildWorkflowFilter(application.expressionFactory(), output.getAs()); } + this.taskExecutor = + TaskExecutorHelper.createExecutorList( + application.positionFactory().get(), + workflow.getDo(), + workflow, + application, + resourceLoader); } static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) { @@ -83,6 +81,10 @@ public Optional inputSchemaValidator() { return inputSchemaValidator; } + public TaskExecutor startTask() { + return taskExecutor; + } + public Optional inputFilter() { return inputFilter; } @@ -95,14 +97,6 @@ public Collection listeners() { return application.listeners(); } - public Map> taskExecutors() { - return taskExecutors; - } - - public TaskExecutorFactory taskFactory() { - return application.taskFactory(); - } - public Optional outputFilter() { return outputFilter; } @@ -115,26 +109,6 @@ public Optional outputSchemaValidator() { return outputSchemaValidator; } - public ExpressionFactory expressionFactory() { - return application.expressionFactory(); - } - - public SchemaValidatorFactory validatorFactory() { - return application.validatorFactory(); - } - - public ResourceLoader resourceLoader() { - return resourceLoader; - } - - public WorkflowPositionFactory positionFactory() { - return application.positionFactory(); - } - - public ExecutorService executorService() { - return application.executorService(); - } - public RuntimeDescriptorFactory runtimeDescriptorFactory() { return application.runtimeDescriptorFactory(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java index 1823be94..b72cdbb0 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java @@ -26,13 +26,13 @@ public static Builder error(String type, int status) { return new Builder(type, status); } - public static Builder communication(int status, TaskContext context, Exception ex) { + public static Builder communication(int status, TaskContext context, Exception ex) { return new Builder(COMM_TYPE, status) .instance(context.position().jsonPointer()) .title(ex.getMessage()); } - public static Builder runtime(int status, TaskContext context, Exception ex) { + public static Builder runtime(int status, TaskContext context, Exception ex) { return new Builder(RUNTIME_TYPE, status) .instance(context.position().jsonPointer()) .title(ex.getMessage()); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java index 7d25df48..4475cacd 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowFilter.java @@ -19,5 +19,5 @@ @FunctionalInterface public interface WorkflowFilter { - JsonNode apply(WorkflowContext workflow, TaskContext task, JsonNode node); + JsonNode apply(WorkflowContext workflow, TaskContext task, JsonNode node); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java index f81a6f24..3692132d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -15,42 +15,52 @@ */ package io.serverlessworkflow.impl; -import static io.serverlessworkflow.impl.json.JsonUtils.toJavaValue; - import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.NullNode; import io.serverlessworkflow.impl.executors.TaskExecutorHelper; +import io.serverlessworkflow.impl.json.JsonUtils; import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; public class WorkflowInstance { private final AtomicReference status; - private final TaskContext taskContext; private final String id; private final JsonNode input; + private final Instant startedAt; - private final AtomicReference context; + private CompletableFuture completableFuture; + private final WorkflowContext workflowContext; WorkflowInstance(WorkflowDefinition definition, JsonNode input) { this.id = definition.idFactory().get(); this.input = input; definition.inputSchemaValidator().ifPresent(v -> v.validate(input)); this.startedAt = Instant.now(); - WorkflowContext workflowContext = new WorkflowContext(definition, this); - taskContext = new TaskContext<>(input, definition.positionFactory().get()); - definition - .inputFilter() - .ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input))); - status = new AtomicReference<>(WorkflowStatus.RUNNING); - context = new AtomicReference<>(NullNode.getInstance()); - TaskExecutorHelper.processTaskList(definition.workflow().getDo(), workflowContext, taskContext); - definition - .outputFilter() - .ifPresent( - f -> - taskContext.output(f.apply(workflowContext, taskContext, taskContext.rawOutput()))); - definition.outputSchemaValidator().ifPresent(v -> v.validate(taskContext.output())); + this.workflowContext = new WorkflowContext(definition, this); + this.status = new AtomicReference<>(WorkflowStatus.RUNNING); + this.completableFuture = + TaskExecutorHelper.processTaskList( + definition.startTask(), + workflowContext, + Optional.empty(), + definition + .inputFilter() + .map(f -> f.apply(workflowContext, null, input)) + .orElse(input)) + .thenApply(this::whenCompleted); + } + + private JsonNode whenCompleted(JsonNode node) { + JsonNode model = + workflowContext + .definition() + .outputFilter() + .map(f -> f.apply(workflowContext, null, node)) + .orElse(node); + workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(model)); status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED); + return model; } public String id() { @@ -65,10 +75,6 @@ public JsonNode input() { return input; } - public JsonNode context() { - return context.get(); - } - public WorkflowStatus status() { return status.get(); } @@ -77,15 +83,11 @@ public void status(WorkflowStatus state) { this.status.set(state); } - public Object output() { - return toJavaValue(taskContext.output()); - } - - public JsonNode outputAsJsonNode() { - return taskContext.output(); + public CompletableFuture output() { + return outputAsJsonNode().thenApply(JsonUtils::toJavaValue); } - void context(JsonNode context) { - this.context.set(context); + public CompletableFuture outputAsJsonNode() { + return completableFuture.thenApply(this::whenCompleted); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java index f5ee1136..f51b7a01 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java @@ -19,98 +19,198 @@ import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.Export; +import io.serverlessworkflow.api.types.FlowDirective; import io.serverlessworkflow.api.types.Input; import io.serverlessworkflow.api.types.Output; import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowFilter; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.jsonschema.SchemaValidator; +import io.serverlessworkflow.impl.resources.ResourceLoader; import java.time.Instant; +import java.util.Iterator; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; public abstract class AbstractTaskExecutor implements TaskExecutor { protected final T task; + protected final String taskName; + protected final WorkflowPosition position; + private final Optional inputProcessor; + private final Optional outputProcessor; + private final Optional contextProcessor; + private final Optional inputSchemaValidator; + private final Optional outputSchemaValidator; + private final Optional contextSchemaValidator; - private Optional inputProcessor = Optional.empty(); - private Optional outputProcessor = Optional.empty(); - private Optional contextProcessor = Optional.empty(); - private Optional inputSchemaValidator = Optional.empty(); - private Optional outputSchemaValidator = Optional.empty(); - private Optional contextSchemaValidator = Optional.empty(); + public abstract static class AbstractTaskExecutorBuilder + implements TaskExecutorBuilder { + private Optional inputProcessor = Optional.empty(); + private Optional outputProcessor = Optional.empty(); + private Optional contextProcessor = Optional.empty(); + private Optional inputSchemaValidator = Optional.empty(); + private Optional outputSchemaValidator = Optional.empty(); + private Optional contextSchemaValidator = Optional.empty(); + protected final WorkflowPosition position; + protected final T task; + protected final String taskName; + protected final WorkflowApplication application; + protected final Workflow workflow; + protected final ResourceLoader resourceLoader; - protected AbstractTaskExecutor(T task, WorkflowDefinition definition) { - this.task = task; - buildInputProcessors(definition); - buildOutputProcessors(definition); - buildContextProcessors(definition); - } + private TaskExecutor instance; - private void buildInputProcessors(WorkflowDefinition definition) { - if (task.getInput() != null) { - Input input = task.getInput(); - this.inputProcessor = buildWorkflowFilter(definition.expressionFactory(), input.getFrom()); - this.inputSchemaValidator = - getSchemaValidator( - definition.validatorFactory(), definition.resourceLoader(), input.getSchema()); + protected AbstractTaskExecutorBuilder( + WorkflowPosition position, + T task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + this.workflow = workflow; + this.taskName = position.last().toString(); + this.position = position; + this.task = task; + this.application = application; + this.resourceLoader = resourceLoader; + if (task.getInput() != null) { + Input input = task.getInput(); + this.inputProcessor = buildWorkflowFilter(application.expressionFactory(), input.getFrom()); + this.inputSchemaValidator = + getSchemaValidator(application.validatorFactory(), resourceLoader, input.getSchema()); + } + if (task.getOutput() != null) { + Output output = task.getOutput(); + this.outputProcessor = buildWorkflowFilter(application.expressionFactory(), output.getAs()); + this.outputSchemaValidator = + getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema()); + } + if (task.getExport() != null) { + Export export = task.getExport(); + if (export.getAs() != null) { + this.contextProcessor = + buildWorkflowFilter(application.expressionFactory(), export.getAs()); + } + this.contextSchemaValidator = + getSchemaValidator(application.validatorFactory(), resourceLoader, export.getSchema()); + } } - } - private void buildOutputProcessors(WorkflowDefinition definition) { - if (task.getOutput() != null) { - Output output = task.getOutput(); - this.outputProcessor = buildWorkflowFilter(definition.expressionFactory(), output.getAs()); - this.outputSchemaValidator = - getSchemaValidator( - definition.validatorFactory(), definition.resourceLoader(), output.getSchema()); + protected final TransitionInfoBuilder next( + FlowDirective flowDirective, Map> connections) { + if (flowDirective == null) { + return TransitionInfoBuilder.of(next(connections)); + } + if (flowDirective.getFlowDirectiveEnum() != null) { + switch (flowDirective.getFlowDirectiveEnum()) { + case CONTINUE: + return TransitionInfoBuilder.of(next(connections)); + case END: + return TransitionInfoBuilder.end(); + case EXIT: + return TransitionInfoBuilder.exit(); + } + } + return TransitionInfoBuilder.of(connections.get(flowDirective.getString())); } - } - private void buildContextProcessors(WorkflowDefinition definition) { - if (task.getExport() != null) { - Export export = task.getExport(); - if (export.getAs() != null) { - this.contextProcessor = buildWorkflowFilter(definition.expressionFactory(), export.getAs()); + private TaskExecutorBuilder next(Map> connections) { + Iterator> iter = connections.values().iterator(); + TaskExecutorBuilder next = null; + while (iter.hasNext()) { + TaskExecutorBuilder item = iter.next(); + if (item == this) { + next = iter.hasNext() ? iter.next() : null; + break; + } } - this.contextSchemaValidator = - getSchemaValidator( - definition.validatorFactory(), definition.resourceLoader(), export.getSchema()); + return next; } + + public TaskExecutor build() { + if (instance == null) { + instance = buildInstance(); + } + return instance; + } + + protected abstract TaskExecutor buildInstance(); } - @Override - public TaskContext apply( - WorkflowContext workflowContext, TaskContext parentContext, JsonNode input) { - TaskContext taskContext = new TaskContext<>(input, parentContext, task); - if (TaskExecutorHelper.isActive(workflowContext)) { + protected AbstractTaskExecutor(AbstractTaskExecutorBuilder builder) { + this.task = builder.task; + this.taskName = builder.taskName; + this.position = builder.position; + this.inputProcessor = builder.inputProcessor; + this.outputProcessor = builder.outputProcessor; + this.contextProcessor = builder.contextProcessor; + this.inputSchemaValidator = builder.inputSchemaValidator; + this.outputSchemaValidator = builder.outputSchemaValidator; + this.contextSchemaValidator = builder.contextSchemaValidator; + } - workflowContext - .definition() - .listeners() - .forEach(l -> l.onTaskStarted(parentContext.position(), task)); + protected final CompletableFuture executeNext( + CompletableFuture future, WorkflowContext workflow) { + return future.thenCompose( + t -> { + TransitionInfo transition = t.transition(); + if (transition.isEndNode()) { + workflow.instance().status(WorkflowStatus.COMPLETED); + } else if (transition.next() != null) { + return transition.next().apply(workflow, t.parent(), t.output()); + } + return CompletableFuture.completedFuture(t); + }); + } - inputSchemaValidator.ifPresent(s -> s.validate(taskContext.rawInput())); - inputProcessor.ifPresent( - p -> taskContext.input(p.apply(workflowContext, taskContext, taskContext.rawInput()))); - internalExecute(workflowContext, taskContext); - outputProcessor.ifPresent( - p -> taskContext.output(p.apply(workflowContext, taskContext, taskContext.rawOutput()))); - outputSchemaValidator.ifPresent(s -> s.validate(taskContext.output())); - contextProcessor.ifPresent( - p -> - workflowContext.context( - p.apply(workflowContext, taskContext, workflowContext.context()))); - contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); - taskContext.completedAt(Instant.now()); - workflowContext - .definition() - .listeners() - .forEach(l -> l.onTaskEnded(parentContext.position(), task)); + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, Optional parentContext, JsonNode input) { + TaskContext taskContext = new TaskContext(input, position, parentContext, taskName, task); + CompletableFuture completable = CompletableFuture.completedFuture(taskContext); + if (!TaskExecutorHelper.isActive(workflowContext)) { + return completable; } - return taskContext; + return executeNext( + completable + .thenApply( + t -> { + workflowContext + .definition() + .listeners() + .forEach(l -> l.onTaskStarted(position, task)); + inputSchemaValidator.ifPresent(s -> s.validate(t.rawInput())); + inputProcessor.ifPresent( + p -> taskContext.input(p.apply(workflowContext, t, t.rawInput()))); + return t; + }) + .thenCompose(t -> execute(workflowContext, t)) + .thenApply( + t -> { + outputProcessor.ifPresent( + p -> t.output(p.apply(workflowContext, t, t.rawOutput()))); + outputSchemaValidator.ifPresent(s -> s.validate(t.output())); + contextProcessor.ifPresent( + p -> + workflowContext.context( + p.apply(workflowContext, t, workflowContext.context()))); + contextSchemaValidator.ifPresent(s -> s.validate(workflowContext.context())); + t.completedAt(Instant.now()); + workflowContext + .definition() + .listeners() + .forEach(l -> l.onTaskEnded(position, task)); + return t; + }), + workflowContext); } - protected abstract void internalExecute(WorkflowContext workflow, TaskContext taskContext); + protected abstract CompletableFuture execute( + WorkflowContext workflow, TaskContext taskContext); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java index 535057fa..2a3d1ae9 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java @@ -15,23 +15,51 @@ */ package io.serverlessworkflow.impl.executors; +import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.executors.RegularTaskExecutor.RegularTaskExecutorBuilder; +import io.serverlessworkflow.impl.resources.ResourceLoader; +import java.util.concurrent.CompletableFuture; -public class CallTaskExecutor extends AbstractTaskExecutor { +public class CallTaskExecutor extends RegularTaskExecutor { private final CallableTask callable; - protected CallTaskExecutor(T task, WorkflowDefinition definition, CallableTask callable) { - super(task, definition); - this.callable = callable; - callable.init(task, definition); + public static class CallTaskExecutorBuilder + extends RegularTaskExecutorBuilder { + private CallableTask callable; + + protected CallTaskExecutorBuilder( + WorkflowPosition position, + T task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader, + CallableTask callable) { + super(position, task, workflow, application, resourceLoader); + this.callable = callable; + callable.init(task, application, resourceLoader); + } + + @Override + public TaskExecutor buildInstance() { + return new CallTaskExecutor(this); + } + } + + protected CallTaskExecutor(CallTaskExecutorBuilder builder) { + super(builder); + this.callable = builder.callable; } @Override - protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - taskContext.rawOutput(callable.apply(workflow, taskContext, taskContext.input())); + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { + return callable.apply(workflow, taskContext, taskContext.input()); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTask.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTask.java index ffb94912..ecff0662 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTask.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTask.java @@ -18,13 +18,16 @@ import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.resources.ResourceLoader; +import java.util.concurrent.CompletableFuture; public interface CallableTask { - void init(T task, WorkflowDefinition definition); + void init(T task, WorkflowApplication application, ResourceLoader loader); - JsonNode apply(WorkflowContext workflowContext, TaskContext taskContext, JsonNode input); + CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, JsonNode input); boolean accept(Class clazz); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java index e7dd07db..1aac152c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DefaultTaskExecutorFactory.java @@ -23,7 +23,19 @@ import io.serverlessworkflow.api.types.CallTask; import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.executors.CallTaskExecutor.CallTaskExecutorBuilder; +import io.serverlessworkflow.impl.executors.DoExecutor.DoExecutorBuilder; +import io.serverlessworkflow.impl.executors.ForExecutor.ForExecutorBuilder; +import io.serverlessworkflow.impl.executors.ForkExecutor.ForkExecutorBuilder; +import io.serverlessworkflow.impl.executors.RaiseExecutor.RaiseExecutorBuilder; +import io.serverlessworkflow.impl.executors.SetExecutor.SetExecutorBuilder; +import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder; +import io.serverlessworkflow.impl.executors.TryExecutor.TryExecutorBuilder; +import io.serverlessworkflow.impl.executors.WaitExecutor.WaitExecutorBuilder; +import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.ServiceLoader; import java.util.ServiceLoader.Provider; @@ -39,42 +51,80 @@ protected DefaultTaskExecutorFactory() {} private ServiceLoader callTasks = ServiceLoader.load(CallableTask.class); - public TaskExecutor getTaskExecutor( - Task task, WorkflowDefinition definition) { + @Override + public TaskExecutorBuilder getTaskExecutor( + WorkflowPosition position, + Task task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { if (task.getCallTask() != null) { CallTask callTask = task.getCallTask(); if (callTask.getCallHTTP() != null) { - return new CallTaskExecutor<>( - callTask.getCallHTTP(), definition, findCallTask(CallHTTP.class)); + return new CallTaskExecutorBuilder<>( + position, + callTask.getCallHTTP(), + workflow, + application, + resourceLoader, + findCallTask(CallHTTP.class)); } else if (callTask.getCallAsyncAPI() != null) { - return new CallTaskExecutor<>( - callTask.getCallAsyncAPI(), definition, findCallTask(CallAsyncAPI.class)); + return new CallTaskExecutorBuilder<>( + position, + callTask.getCallAsyncAPI(), + workflow, + application, + resourceLoader, + findCallTask(CallAsyncAPI.class)); } else if (callTask.getCallGRPC() != null) { - return new CallTaskExecutor<>( - callTask.getCallGRPC(), definition, findCallTask(CallGRPC.class)); + return new CallTaskExecutorBuilder<>( + position, + callTask.getCallGRPC(), + workflow, + application, + resourceLoader, + findCallTask(CallGRPC.class)); } else if (callTask.getCallOpenAPI() != null) { - return new CallTaskExecutor<>( - callTask.getCallOpenAPI(), definition, findCallTask(CallOpenAPI.class)); + return new CallTaskExecutorBuilder<>( + position, + callTask.getCallOpenAPI(), + workflow, + application, + resourceLoader, + findCallTask(CallOpenAPI.class)); } else if (callTask.getCallFunction() != null) { - return new CallTaskExecutor<>( - callTask.getCallFunction(), definition, findCallTask(CallFunction.class)); + return new CallTaskExecutorBuilder<>( + position, + callTask.getCallFunction(), + workflow, + application, + resourceLoader, + findCallTask(CallFunction.class)); } } else if (task.getSwitchTask() != null) { - return new SwitchExecutor(task.getSwitchTask(), definition); + return new SwitchExecutorBuilder( + position, task.getSwitchTask(), workflow, application, resourceLoader); } else if (task.getDoTask() != null) { - return new DoExecutor(task.getDoTask(), definition); + return new DoExecutorBuilder( + position, task.getDoTask(), workflow, application, resourceLoader); } else if (task.getSetTask() != null) { - return new SetExecutor(task.getSetTask(), definition); + return new SetExecutorBuilder( + position, task.getSetTask(), workflow, application, resourceLoader); } else if (task.getForTask() != null) { - return new ForExecutor(task.getForTask(), definition); + return new ForExecutorBuilder( + position, task.getForTask(), workflow, application, resourceLoader); } else if (task.getRaiseTask() != null) { - return new RaiseExecutor(task.getRaiseTask(), definition); + return new RaiseExecutorBuilder( + position, task.getRaiseTask(), workflow, application, resourceLoader); } else if (task.getTryTask() != null) { - return new TryExecutor(task.getTryTask(), definition); + return new TryExecutorBuilder( + position, task.getTryTask(), workflow, application, resourceLoader); } else if (task.getForkTask() != null) { - return new ForkExecutor(task.getForkTask(), definition); + return new ForkExecutorBuilder( + position, task.getForkTask(), workflow, application, resourceLoader); } else if (task.getWaitTask() != null) { - return new WaitExecutor(task.getWaitTask(), definition); + return new WaitExecutorBuilder( + position, task.getWaitTask(), workflow, application, resourceLoader); } throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet"); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java index c5dbc4fd..a35e4a87 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/DoExecutor.java @@ -15,19 +15,51 @@ */ package io.serverlessworkflow.impl.executors; +import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.DoTask; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.resources.ResourceLoader; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; -public class DoExecutor extends AbstractTaskExecutor { +public class DoExecutor extends RegularTaskExecutor { - protected DoExecutor(DoTask task, WorkflowDefinition definition) { - super(task, definition); + private final TaskExecutor taskExecutor; + + public static class DoExecutorBuilder extends RegularTaskExecutorBuilder { + private TaskExecutor taskExecutor; + + protected DoExecutorBuilder( + WorkflowPosition position, + DoTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + taskExecutor = + TaskExecutorHelper.createExecutorList( + position, task.getDo(), workflow, application, resourceLoader); + } + + @Override + public TaskExecutor buildInstance() { + return new DoExecutor(this); + } + } + + private DoExecutor(DoExecutorBuilder builder) { + super(builder); + this.taskExecutor = builder.taskExecutor; } @Override - protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - TaskExecutorHelper.processTaskList(task.getDo(), workflow, taskContext); + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { + return TaskExecutorHelper.processTaskList( + taskExecutor, workflow, Optional.of(taskContext), taskContext.input()); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java index cb4ecec0..8f7e04f1 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -18,32 +18,67 @@ import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.ForTask; import io.serverlessworkflow.api.types.ForTaskConfiguration; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowFilter; +import io.serverlessworkflow.impl.WorkflowPosition; import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.executors.RegularTaskExecutor.RegularTaskExecutorBuilder; +import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.Iterator; import java.util.Optional; +import java.util.concurrent.CompletableFuture; -public class ForExecutor extends AbstractTaskExecutor { +public class ForExecutor extends RegularTaskExecutor { private final WorkflowFilter collectionExpr; private final Optional whileExpr; + private final TaskExecutor taskExecutor; - protected ForExecutor(ForTask task, WorkflowDefinition definition) { - super(task, definition); - ForTaskConfiguration forConfig = task.getFor(); - this.collectionExpr = - WorkflowUtils.buildWorkflowFilter(definition.expressionFactory(), forConfig.getIn()); - this.whileExpr = WorkflowUtils.optionalFilter(definition.expressionFactory(), task.getWhile()); + public static class ForExecutorBuilder extends RegularTaskExecutorBuilder { + private WorkflowFilter collectionExpr; + private Optional whileExpr; + private TaskExecutor taskExecutor; + + protected ForExecutorBuilder( + WorkflowPosition position, + ForTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + ForTaskConfiguration forConfig = task.getFor(); + this.collectionExpr = + WorkflowUtils.buildWorkflowFilter(application.expressionFactory(), forConfig.getIn()); + this.whileExpr = + WorkflowUtils.optionalFilter(application.expressionFactory(), task.getWhile()); + this.taskExecutor = + TaskExecutorHelper.createExecutorList( + position, task.getDo(), workflow, application, resourceLoader); + } + + @Override + public TaskExecutor buildInstance() { + return new ForExecutor(this); + } + } + + protected ForExecutor(ForExecutorBuilder builder) { + super(builder); + this.collectionExpr = builder.collectionExpr; + this.whileExpr = builder.whileExpr; + this.taskExecutor = builder.taskExecutor; } @Override - protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { Iterator iter = collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator(); int i = 0; + CompletableFuture future = CompletableFuture.completedFuture(taskContext.input()); while (iter.hasNext() && whileExpr .map(w -> w.apply(workflow, taskContext, taskContext.rawOutput())) @@ -52,7 +87,12 @@ protected void internalExecute(WorkflowContext workflow, TaskContext ta JsonNode item = iter.next(); taskContext.variables().put(task.getFor().getEach(), item); taskContext.variables().put(task.getFor().getAt(), i++); - TaskExecutorHelper.processTaskList(task.getDo(), workflow, taskContext); + future = + future.thenCompose( + input -> + TaskExecutorHelper.processTaskList( + taskExecutor, workflow, Optional.of(taskContext), input)); } + return future; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java index e0ce3b02..85bd3f22 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForkExecutor.java @@ -16,95 +16,95 @@ package io.serverlessworkflow.impl.executors; import com.fasterxml.jackson.databind.JsonNode; -import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.ForkTask; import io.serverlessworkflow.api.types.ForkTaskConfiguration; -import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; -import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.executors.RegularTaskExecutor.RegularTaskExecutorBuilder; import io.serverlessworkflow.impl.json.JsonUtils; -import java.lang.reflect.UndeclaredThrowableException; -import java.util.ArrayList; +import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.stream.Collectors; import java.util.stream.Stream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class ForkExecutor extends AbstractTaskExecutor { +public class ForkExecutor extends RegularTaskExecutor { - private static final Logger logger = LoggerFactory.getLogger(ForkExecutor.class); private final ExecutorService service; + private final Map> taskExecutors; + private final boolean compete; - protected ForkExecutor(ForkTask task, WorkflowDefinition definition) { - super(task, definition); - service = definition.executorService(); - } + public static class ForkExecutorBuilder extends RegularTaskExecutorBuilder { - @Override - protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - ForkTaskConfiguration forkConfig = task.getFork(); + private final Map> taskExecutors; + private final boolean compete; - if (!forkConfig.getBranches().isEmpty()) { - Map>> futures = new HashMap<>(); - int index = 0; - for (TaskItem item : forkConfig.getBranches()) { - final int i = index++; - futures.put( - item.getName(), - service.submit(() -> executeBranch(workflow, taskContext.copy(), item, i))); - } - List>> results = new ArrayList<>(); - for (Map.Entry>> entry : futures.entrySet()) { - try { - results.add(Map.entry(entry.getKey(), entry.getValue().get())); - } catch (ExecutionException ex) { - Throwable cause = ex.getCause(); - if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } else { - throw new UndeclaredThrowableException(ex); - } - } catch (InterruptedException ex) { - logger.warn("Branch {} was interrupted, no result will be recorded", entry.getKey(), ex); - } - } - if (!results.isEmpty()) { - Stream>> sortedStream = - results.stream() - .sorted( - (arg1, arg2) -> - arg1.getValue().completedAt().compareTo(arg2.getValue().completedAt())); - taskContext.rawOutput( - forkConfig.isCompete() - ? sortedStream.map(e -> e.getValue().output()).findFirst().orElseThrow() - : sortedStream - .map( - e -> - JsonUtils.mapper() - .createObjectNode() - .set(e.getKey(), e.getValue().output())) - .collect(JsonUtils.arrayNodeCollector())); - } + protected ForkExecutorBuilder( + WorkflowPosition position, + ForkTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + ForkTaskConfiguration forkConfig = task.getFork(); + this.taskExecutors = + TaskExecutorHelper.createBranchList( + position, forkConfig.getBranches(), workflow, application, resourceLoader); + this.compete = forkConfig.isCompete(); } + + @Override + public TaskExecutor buildInstance() { + return new ForkExecutor(this); + } + } + + protected ForkExecutor(ForkExecutorBuilder builder) { + super(builder); + service = builder.application.executorService(); + this.taskExecutors = builder.taskExecutors; + this.compete = builder.compete; } - private TaskContext executeBranch( - WorkflowContext workflow, TaskContext taskContext, TaskItem taskItem, int index) { - taskContext.position().addIndex(index); - TaskContext result = - TaskExecutorHelper.executeTask(workflow, taskContext, taskItem, taskContext.input()); - if (result.flowDirective() != null - && result.flowDirective().getFlowDirectiveEnum() == FlowDirectiveEnum.END) { - workflow.instance().status(WorkflowStatus.COMPLETED); + @Override + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { + Map> futures = new HashMap<>(); + CompletableFuture initial = CompletableFuture.completedFuture(taskContext); + for (Map.Entry> entry : taskExecutors.entrySet()) { + futures.put( + entry.getKey(), + initial.thenComposeAsync( + t -> entry.getValue().apply(workflow, Optional.of(t), t.input()), service)); } - taskContext.position().back(); - return result; + return CompletableFuture.allOf( + futures.values().toArray(new CompletableFuture[futures.size()])) + .thenApply( + i -> + combine( + futures.entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, e -> e.getValue().join())))); + } + + private JsonNode combine(Map futures) { + + Stream> sortedStream = + futures.entrySet().stream() + .sorted( + (arg1, arg2) -> + arg1.getValue().completedAt().compareTo(arg2.getValue().completedAt())); + return compete + ? sortedStream.map(e -> e.getValue().output()).findFirst().orElseThrow() + : sortedStream + .map( + e -> JsonUtils.mapper().createObjectNode().set(e.getKey(), e.getValue().output())) + .collect(JsonUtils.arrayNodeCollector()); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RaiseExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RaiseExecutor.java index 1ddc315f..6dd43c2b 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RaiseExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RaiseExecutor.java @@ -15,90 +15,116 @@ */ package io.serverlessworkflow.impl.executors; +import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.Error; import io.serverlessworkflow.api.types.ErrorInstance; import io.serverlessworkflow.api.types.ErrorType; import io.serverlessworkflow.api.types.RaiseTask; import io.serverlessworkflow.api.types.RaiseTaskError; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.StringFilter; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowError; import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.WorkflowPosition; import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.executors.RegularTaskExecutor.RegularTaskExecutorBuilder; import io.serverlessworkflow.impl.expressions.ExpressionFactory; +import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; -public class RaiseExecutor extends AbstractTaskExecutor { +public class RaiseExecutor extends RegularTaskExecutor { - private final BiFunction, WorkflowError> errorBuilder; + private final BiFunction errorBuilder; - private final StringFilter typeFilter; - private final Optional instanceFilter; - private final StringFilter titleFilter; - private final StringFilter detailFilter; + public static class RaiseExecutorBuilder extends RegularTaskExecutorBuilder { - protected RaiseExecutor(RaiseTask task, WorkflowDefinition definition) { - super(task, definition); - RaiseTaskError raiseError = task.getRaise().getError(); - Error error = - raiseError.getRaiseErrorDefinition() != null - ? raiseError.getRaiseErrorDefinition() - : findError(definition, raiseError.getRaiseErrorReference()); - this.typeFilter = getTypeFunction(definition.expressionFactory(), error.getType()); - this.instanceFilter = getInstanceFunction(definition.expressionFactory(), error.getInstance()); - this.titleFilter = - WorkflowUtils.buildStringFilter(definition.expressionFactory(), error.getTitle()); - this.detailFilter = - WorkflowUtils.buildStringFilter(definition.expressionFactory(), error.getDetail()); - this.errorBuilder = (w, t) -> buildError(error, w, t); - } + private final BiFunction errorBuilder; + private final StringFilter typeFilter; + private final Optional instanceFilter; + private final StringFilter titleFilter; + private final StringFilter detailFilter; - private static Error findError(WorkflowDefinition definition, String raiseErrorReference) { - Map errorsMap = - definition.workflow().getUse().getErrors().getAdditionalProperties(); - Error error = errorsMap.get(raiseErrorReference); - if (error == null) { - throw new IllegalArgumentException("Error " + error + "is not defined in " + errorsMap); + protected RaiseExecutorBuilder( + WorkflowPosition position, + RaiseTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + RaiseTaskError raiseError = task.getRaise().getError(); + Error error = + raiseError.getRaiseErrorDefinition() != null + ? raiseError.getRaiseErrorDefinition() + : findError(raiseError.getRaiseErrorReference()); + this.typeFilter = getTypeFunction(application.expressionFactory(), error.getType()); + this.instanceFilter = + getInstanceFunction(application.expressionFactory(), error.getInstance()); + this.titleFilter = + WorkflowUtils.buildStringFilter(application.expressionFactory(), error.getTitle()); + this.detailFilter = + WorkflowUtils.buildStringFilter(application.expressionFactory(), error.getDetail()); + this.errorBuilder = (w, t) -> buildError(error, w, t); } - return error; - } - private WorkflowError buildError( - Error error, WorkflowContext context, TaskContext taskContext) { - return WorkflowError.error(typeFilter.apply(context, taskContext), error.getStatus()) - .instance( - instanceFilter - .map(f -> f.apply(context, taskContext)) - .orElseGet(() -> taskContext.position().jsonPointer())) - .title(titleFilter.apply(context, taskContext)) - .details(detailFilter.apply(context, taskContext)) - .build(); - } + private WorkflowError buildError( + Error error, WorkflowContext context, TaskContext taskContext) { + return WorkflowError.error(typeFilter.apply(context, taskContext), error.getStatus()) + .instance( + instanceFilter + .map(f -> f.apply(context, taskContext)) + .orElseGet(() -> taskContext.position().jsonPointer())) + .title(titleFilter.apply(context, taskContext)) + .details(detailFilter.apply(context, taskContext)) + .build(); + } + + private Optional getInstanceFunction( + ExpressionFactory expressionFactory, ErrorInstance errorInstance) { + return errorInstance != null + ? Optional.of( + WorkflowUtils.buildStringFilter( + expressionFactory, + errorInstance.getExpressionErrorInstance(), + errorInstance.getLiteralErrorInstance())) + : Optional.empty(); + } - private Optional getInstanceFunction( - ExpressionFactory expressionFactory, ErrorInstance errorInstance) { - return errorInstance != null - ? Optional.of( - WorkflowUtils.buildStringFilter( - expressionFactory, - errorInstance.getExpressionErrorInstance(), - errorInstance.getLiteralErrorInstance())) - : Optional.empty(); + private StringFilter getTypeFunction(ExpressionFactory expressionFactory, ErrorType type) { + return WorkflowUtils.buildStringFilter( + expressionFactory, + type.getExpressionErrorType(), + type.getLiteralErrorType().get().toString()); + } + + private Error findError(String raiseErrorReference) { + Map errorsMap = workflow.getUse().getErrors().getAdditionalProperties(); + Error error = errorsMap.get(raiseErrorReference); + if (error == null) { + throw new IllegalArgumentException("Error " + error + "is not defined in " + errorsMap); + } + return error; + } + + @Override + public TaskExecutor buildInstance() { + return new RaiseExecutor(this); + } } - private StringFilter getTypeFunction(ExpressionFactory expressionFactory, ErrorType type) { - return WorkflowUtils.buildStringFilter( - expressionFactory, - type.getExpressionErrorType(), - type.getLiteralErrorType().get().toString()); + protected RaiseExecutor(RaiseExecutorBuilder builder) { + super(builder); + this.errorBuilder = builder.errorBuilder; } @Override - protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { throw new WorkflowException(errorBuilder.apply(workflow, taskContext)); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java new file mode 100644 index 00000000..24c1e841 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/RegularTaskExecutor.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.resources.ResourceLoader; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public abstract class RegularTaskExecutor extends AbstractTaskExecutor { + + protected final TransitionInfo transition; + + protected RegularTaskExecutor(RegularTaskExecutorBuilder builder) { + super(builder); + this.transition = TransitionInfo.build(builder.transition); + } + + public abstract static class RegularTaskExecutorBuilder + extends AbstractTaskExecutorBuilder { + + private TransitionInfoBuilder transition; + + protected RegularTaskExecutorBuilder( + WorkflowPosition position, + T task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + } + + public void connect(Map> connections) { + this.transition = next(task.getThen(), connections); + } + } + + protected CompletableFuture execute( + WorkflowContext workflow, TaskContext taskContext) { + CompletableFuture future = + internalExecute(workflow, taskContext) + .thenApply(node -> taskContext.rawOutput(node).transition(transition)); + return future; + } + + protected abstract CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext task); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java index b06153f3..c5600891 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SetExecutor.java @@ -15,28 +15,54 @@ */ package io.serverlessworkflow.impl.executors; +import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.SetTask; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowPosition; import io.serverlessworkflow.impl.expressions.ExpressionUtils; import io.serverlessworkflow.impl.json.JsonUtils; +import io.serverlessworkflow.impl.resources.ResourceLoader; import java.util.Map; +import java.util.concurrent.CompletableFuture; -public class SetExecutor extends AbstractTaskExecutor { +public class SetExecutor extends RegularTaskExecutor { - private Map toBeSet; + private final Map toBeSet; - protected SetExecutor(SetTask task, WorkflowDefinition definition) { - super(task, definition); - this.toBeSet = - ExpressionUtils.buildExpressionMap( - task.getSet().getAdditionalProperties(), definition.expressionFactory()); + public static class SetExecutorBuilder extends RegularTaskExecutorBuilder { + + private final Map toBeSet; + + protected SetExecutorBuilder( + WorkflowPosition position, + SetTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + this.toBeSet = + ExpressionUtils.buildExpressionMap( + task.getSet().getAdditionalProperties(), application.expressionFactory()); + } + + @Override + public TaskExecutor buildInstance() { + return new SetExecutor(this); + } + } + + private SetExecutor(SetExecutorBuilder builder) { + super(builder); + this.toBeSet = builder.toBeSet; } @Override - protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - taskContext.rawOutput( + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { + return CompletableFuture.completedFuture( JsonUtils.fromValue( ExpressionUtils.evaluateExpressionMap( toBeSet, workflow, taskContext, taskContext.input()))); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java index dee0cee7..70b127c4 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/SwitchExecutor.java @@ -19,43 +19,83 @@ import io.serverlessworkflow.api.types.SwitchCase; import io.serverlessworkflow.api.types.SwitchItem; import io.serverlessworkflow.api.types.SwitchTask; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowFilter; +import io.serverlessworkflow.impl.WorkflowPosition; import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.resources.ResourceLoader; +import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; public class SwitchExecutor extends AbstractTaskExecutor { - private Map workflowFilters = new ConcurrentHashMap<>(); - private FlowDirective defaultDirective; - - protected SwitchExecutor(SwitchTask task, WorkflowDefinition definition) { - super(task, definition); - for (SwitchItem item : task.getSwitch()) { - SwitchCase switchCase = item.getSwitchCase(); - if (switchCase.getWhen() != null) { - workflowFilters.put( - switchCase, - WorkflowUtils.buildWorkflowFilter( - definition.expressionFactory(), switchCase.getWhen())); - } else { - defaultDirective = switchCase.getThen(); + private final Map workflowFilters; + private final TransitionInfo defaultTask; + + public static class SwitchExecutorBuilder extends AbstractTaskExecutorBuilder { + private final Map workflowFilters = new HashMap<>(); + private Map switchFilters = new HashMap<>(); + private FlowDirective defaultDirective; + private TransitionInfoBuilder defaultTask; + + public SwitchExecutorBuilder( + WorkflowPosition position, + SwitchTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + for (SwitchItem item : task.getSwitch()) { + SwitchCase switchCase = item.getSwitchCase(); + if (switchCase.getWhen() != null) { + workflowFilters.put( + switchCase, + WorkflowUtils.buildWorkflowFilter( + application.expressionFactory(), switchCase.getWhen())); + } else { + defaultDirective = switchCase.getThen(); + } } } + + @Override + public void connect(Map> connections) { + this.switchFilters = + this.workflowFilters.entrySet().stream() + .collect( + Collectors.toMap(Entry::getValue, e -> next(e.getKey().getThen(), connections))); + this.defaultTask = next(defaultDirective, connections); + } + + @Override + protected TaskExecutor buildInstance() { + return new SwitchExecutor(this); + } + } + + private SwitchExecutor(SwitchExecutorBuilder builder) { + super(builder); + this.defaultTask = TransitionInfo.build(builder.defaultTask); + this.workflowFilters = + builder.switchFilters.entrySet().stream() + .collect(Collectors.toMap(Entry::getKey, e -> TransitionInfo.build(e.getValue()))); } @Override - protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - for (Entry entry : workflowFilters.entrySet()) { - if (entry.getValue().apply(workflow, taskContext, taskContext.input()).asBoolean()) { - taskContext.flowDirective(entry.getKey().getThen()); - return; + protected CompletableFuture execute( + WorkflowContext workflow, TaskContext taskContext) { + CompletableFuture future = CompletableFuture.completedFuture(taskContext); + for (Entry entry : workflowFilters.entrySet()) { + if (entry.getKey().apply(workflow, taskContext, taskContext.input()).asBoolean()) { + return future.thenApply(t -> t.transition(entry.getValue())); } } - taskContext.flowDirective(defaultDirective); + return future.thenApply(t -> t.transition(defaultTask)); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java index b4b66a9a..b77398c3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutor.java @@ -19,9 +19,11 @@ import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; @FunctionalInterface public interface TaskExecutor { - TaskContext apply( - WorkflowContext workflowContext, TaskContext parentContext, JsonNode input); + CompletableFuture apply( + WorkflowContext workflowContext, Optional parentContext, JsonNode input); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorBuilder.java new file mode 100644 index 00000000..2cbb8c16 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorBuilder.java @@ -0,0 +1,26 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.TaskBase; +import java.util.Map; + +public interface TaskExecutorBuilder { + + void connect(Map> connections); + + TaskExecutor build(); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorFactory.java index 8c399cf6..b1be3429 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorFactory.java @@ -17,8 +17,16 @@ import io.serverlessworkflow.api.types.Task; import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.resources.ResourceLoader; public interface TaskExecutorFactory { - TaskExecutor getTaskExecutor(Task task, WorkflowDefinition definition); + TaskExecutorBuilder getTaskExecutor( + WorkflowPosition position, + Task task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java index e16cb085..ba6c33b2 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TaskExecutorHelper.java @@ -16,50 +16,37 @@ package io.serverlessworkflow.impl.executors; import com.fasterxml.jackson.databind.JsonNode; -import io.serverlessworkflow.api.types.FlowDirective; -import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowPosition; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.resources.ResourceLoader; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; -import java.util.ListIterator; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; public class TaskExecutorHelper { private TaskExecutorHelper() {} - public static void processTaskList( - List tasks, WorkflowContext context, TaskContext parentTask) { - parentTask.position().addProperty("do"); - TaskContext currentContext = parentTask; - if (!tasks.isEmpty()) { - ListIterator iter = tasks.listIterator(); - TaskItem nextTask = iter.next(); - while (nextTask != null && isActive(context)) { - TaskItem task = nextTask; - parentTask.position().addIndex(iter.previousIndex()); - currentContext = executeTask(context, parentTask, task, currentContext.output()); - FlowDirective flowDirective = currentContext.flowDirective(); - if (flowDirective.getFlowDirectiveEnum() != null) { - switch (flowDirective.getFlowDirectiveEnum()) { - case CONTINUE: - nextTask = iter.hasNext() ? iter.next() : null; - break; - case END: - context.instance().status(WorkflowStatus.COMPLETED); - break; - case EXIT: - nextTask = null; - break; - } - } else { - nextTask = findTaskByName(iter, flowDirective.getString()); - } - parentTask.position().back(); - } - } - parentTask.position().back(); - parentTask.rawOutput(currentContext.output()); + public static CompletableFuture processTaskList( + TaskExecutor taskExecutor, + WorkflowContext context, + Optional parentTask, + JsonNode input) { + return taskExecutor + .apply(context, parentTask, input) + .thenApply( + t -> { + parentTask.ifPresent(p -> p.rawOutput(t.output())); + return t.output(); + }); } public static boolean isActive(WorkflowContext context) { @@ -70,42 +57,55 @@ public static boolean isActive(WorkflowStatus status) { return status == WorkflowStatus.RUNNING; } - public static TaskContext executeTask( - WorkflowContext context, TaskContext parentTask, TaskItem task, JsonNode input) { - parentTask.position().addProperty(task.getName()); - TaskContext result = - context - .definition() - .taskExecutors() - .computeIfAbsent( - parentTask.position().jsonPointer(), - k -> - context - .definition() - .taskFactory() - .getTaskExecutor(task.getTask(), context.definition())) - .apply(context, parentTask, input); - parentTask.position().back(); - return result; + public static TaskExecutor createExecutorList( + WorkflowPosition position, + List taskItems, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + Map> executors = + createExecutorBuilderList(position, taskItems, workflow, application, resourceLoader, "do"); + executors.values().forEach(t -> t.connect(executors)); + Iterator> iter = executors.values().iterator(); + TaskExecutor first = iter.next().build(); + while (iter.hasNext()) { + iter.next().build(); + } + return first; } - private static TaskItem findTaskByName(ListIterator iter, String taskName) { - int currentIndex = iter.nextIndex(); - while (iter.hasPrevious()) { - TaskItem item = iter.previous(); - if (item.getName().equals(taskName)) { - return item; - } - } - while (iter.nextIndex() < currentIndex) { - iter.next(); - } - while (iter.hasNext()) { - TaskItem item = iter.next(); - if (item.getName().equals(taskName)) { - return item; - } + public static Map> createBranchList( + WorkflowPosition position, + List taskItems, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + return createExecutorBuilderList( + position, taskItems, workflow, application, resourceLoader, "branch") + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())); + } + + private static Map> createExecutorBuilderList( + WorkflowPosition position, + List taskItems, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader, + String containerName) { + TaskExecutorFactory taskFactory = application.taskFactory(); + Map> executors = new LinkedHashMap<>(); + position.addProperty(containerName); + int index = 0; + for (TaskItem item : taskItems) { + position.addIndex(index++).addProperty(item.getName()); + TaskExecutorBuilder taskExecutorBuilder = + taskFactory.getTaskExecutor( + position.copy(), item.getTask(), workflow, application, resourceLoader); + executors.put(item.getName(), taskExecutorBuilder); + position.back().back(); } - throw new IllegalArgumentException("Cannot find task with name " + taskName); + return executors; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TransitionInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TransitionInfo.java new file mode 100644 index 00000000..f330ac74 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TransitionInfo.java @@ -0,0 +1,27 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +public record TransitionInfo(TaskExecutor next, boolean isEndNode) { + private static final TransitionInfo END = new TransitionInfo(null, true); + private static final TransitionInfo EXIT = new TransitionInfo(null, false); + + static TransitionInfo build(TransitionInfoBuilder builder) { + if (builder == null || builder == TransitionInfoBuilder.exit()) return EXIT; + if (builder == TransitionInfoBuilder.end()) return END; + return new TransitionInfo(builder.next().build(), false); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TransitionInfoBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TransitionInfoBuilder.java new file mode 100644 index 00000000..7f62eaf7 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TransitionInfoBuilder.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +public record TransitionInfoBuilder(TaskExecutorBuilder next, boolean isEndNode) { + + private static final TransitionInfoBuilder END = new TransitionInfoBuilder(null, true); + private static final TransitionInfoBuilder EXIT = new TransitionInfoBuilder(null, false); + + static TransitionInfoBuilder of(TaskExecutorBuilder next) { + return next == null ? EXIT : new TransitionInfoBuilder(next, false); + } + + static TransitionInfoBuilder end() { + return END; + } + + static TransitionInfoBuilder exit() { + return EXIT; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java index eed2801b..a4442bf2 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java @@ -15,41 +15,99 @@ */ package io.serverlessworkflow.impl.executors; +import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.CatchErrors; import io.serverlessworkflow.api.types.ErrorFilter; +import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.TryTask; import io.serverlessworkflow.api.types.TryTaskCatch; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowError; import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.WorkflowFilter; +import io.serverlessworkflow.impl.WorkflowPosition; import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.resources.ResourceLoader; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.function.Predicate; -public class TryExecutor extends AbstractTaskExecutor { +public class TryExecutor extends RegularTaskExecutor { private final Optional whenFilter; private final Optional exceptFilter; private final Optional> errorFilter; + private final TaskExecutor taskExecutor; + private final Optional> catchTaskExecutor; - protected TryExecutor(TryTask task, WorkflowDefinition definition) { - super(task, definition); - TryTaskCatch catchInfo = task.getCatch(); - this.errorFilter = buildErrorFilter(catchInfo.getErrors()); - this.whenFilter = - WorkflowUtils.optionalFilter(definition.expressionFactory(), catchInfo.getWhen()); - this.exceptFilter = - WorkflowUtils.optionalFilter(definition.expressionFactory(), catchInfo.getExceptWhen()); + public static class TryExecutorBuilder extends RegularTaskExecutorBuilder { + + private final Optional whenFilter; + private final Optional exceptFilter; + private final Optional> errorFilter; + private final TaskExecutor taskExecutor; + private final Optional> catchTaskExecutor; + + protected TryExecutorBuilder( + WorkflowPosition position, + TryTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + TryTaskCatch catchInfo = task.getCatch(); + this.errorFilter = buildErrorFilter(catchInfo.getErrors()); + this.whenFilter = + WorkflowUtils.optionalFilter(application.expressionFactory(), catchInfo.getWhen()); + this.exceptFilter = + WorkflowUtils.optionalFilter(application.expressionFactory(), catchInfo.getExceptWhen()); + this.taskExecutor = + TaskExecutorHelper.createExecutorList( + position, task.getTry(), workflow, application, resourceLoader); + List catchTask = task.getCatch().getDo(); + this.catchTaskExecutor = + catchTask != null && !catchTask.isEmpty() + ? Optional.of( + TaskExecutorHelper.createExecutorList( + position, task.getCatch().getDo(), workflow, application, resourceLoader)) + : Optional.empty(); + } + + @Override + public TaskExecutor buildInstance() { + return new TryExecutor(this); + } + } + + protected TryExecutor(TryExecutorBuilder builder) { + super(builder); + this.errorFilter = builder.errorFilter; + this.whenFilter = builder.whenFilter; + this.exceptFilter = builder.exceptFilter; + this.taskExecutor = builder.taskExecutor; + this.catchTaskExecutor = builder.catchTaskExecutor; } @Override - protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - try { - TaskExecutorHelper.processTaskList(task.getTry(), workflow, taskContext); - } catch (WorkflowException exception) { + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { + return TaskExecutorHelper.processTaskList( + taskExecutor, workflow, Optional.of(taskContext), taskContext.input()) + .exceptionallyCompose(e -> handleException(e, workflow, taskContext)); + } + + private CompletableFuture handleException( + Throwable e, WorkflowContext workflow, TaskContext taskContext) { + if (e instanceof CompletionException) { + return handleException(e.getCause(), workflow, taskContext); + } + if (e instanceof WorkflowException) { + WorkflowException exception = (WorkflowException) e; if (errorFilter.map(f -> f.test(exception.getWorflowError())).orElse(true) && whenFilter .map(w -> w.apply(workflow, taskContext, taskContext.input()).asBoolean()) @@ -57,11 +115,17 @@ protected void internalExecute(WorkflowContext workflow, TaskContext ta && exceptFilter .map(w -> !w.apply(workflow, taskContext, taskContext.input()).asBoolean()) .orElse(true)) { - if (task.getCatch().getDo() != null) { - TaskExecutorHelper.processTaskList(task.getCatch().getDo(), workflow, taskContext); + if (catchTaskExecutor.isPresent()) { + return TaskExecutorHelper.processTaskList( + catchTaskExecutor.get(), workflow, Optional.of(taskContext), taskContext.input()); } + } + return CompletableFuture.completedFuture(taskContext.rawOutput()); + } else { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; } else { - throw exception; + throw new RuntimeException(e); } } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java index a1fb31c5..85598317 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/WaitExecutor.java @@ -15,44 +15,70 @@ */ package io.serverlessworkflow.impl.executors; +import com.fasterxml.jackson.databind.JsonNode; import io.serverlessworkflow.api.types.DurationInline; import io.serverlessworkflow.api.types.WaitTask; +import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.executors.RegularTaskExecutor.RegularTaskExecutorBuilder; +import io.serverlessworkflow.impl.resources.ResourceLoader; import java.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; -public class WaitExecutor extends AbstractTaskExecutor { +public class WaitExecutor extends RegularTaskExecutor { - private static Logger logger = LoggerFactory.getLogger(WaitExecutor.class); private final Duration millisToWait; - protected WaitExecutor(WaitTask task, WorkflowDefinition definition) { - super(task, definition); - this.millisToWait = - task.getWait().getDurationInline() != null - ? toLong(task.getWait().getDurationInline()) - : Duration.parse(task.getWait().getDurationExpression()); + public static class WaitExecutorBuilder extends RegularTaskExecutorBuilder { + private final Duration millisToWait; + + protected WaitExecutorBuilder( + WorkflowPosition position, + WaitTask task, + Workflow workflow, + WorkflowApplication application, + ResourceLoader resourceLoader) { + super(position, task, workflow, application, resourceLoader); + this.millisToWait = + task.getWait().getDurationInline() != null + ? toLong(task.getWait().getDurationInline()) + : Duration.parse(task.getWait().getDurationExpression()); + } + + private Duration toLong(DurationInline durationInline) { + Duration duration = Duration.ofMillis(durationInline.getMilliseconds()); + duration.plus(Duration.ofSeconds(durationInline.getSeconds())); + duration.plus(Duration.ofMinutes(durationInline.getMinutes())); + duration.plus(Duration.ofHours(durationInline.getHours())); + duration.plus(Duration.ofDays(durationInline.getDays())); + return duration; + } + + @Override + public TaskExecutor buildInstance() { + return new WaitExecutor(this); + } } - private Duration toLong(DurationInline durationInline) { - Duration duration = Duration.ofMillis(durationInline.getMilliseconds()); - duration.plus(Duration.ofSeconds(durationInline.getSeconds())); - duration.plus(Duration.ofMinutes(durationInline.getMinutes())); - duration.plus(Duration.ofHours(durationInline.getHours())); - duration.plus(Duration.ofDays(durationInline.getDays())); - return duration; + protected WaitExecutor(WaitExecutorBuilder builder) { + super(builder); + this.millisToWait = builder.millisToWait; } @Override - protected void internalExecute(WorkflowContext workflow, TaskContext taskContext) { - try { - Thread.sleep(millisToWait.toMillis()); - } catch (InterruptedException e) { - logger.warn("Waiting thread was interrupted", e); - Thread.currentThread().interrupt(); - } + protected CompletableFuture internalExecute( + WorkflowContext workflow, TaskContext taskContext) { + return CompletableFuture.supplyAsync( + () -> { + try { + Thread.sleep(millisToWait.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return taskContext.input(); + }); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/Expression.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/Expression.java index 42566c77..122fc6d8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/Expression.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/Expression.java @@ -20,5 +20,5 @@ import io.serverlessworkflow.impl.WorkflowContext; public interface Expression { - JsonNode eval(WorkflowContext workflowContext, TaskContext context, JsonNode node); + JsonNode eval(WorkflowContext workflowContext, TaskContext context, JsonNode node); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/ExpressionUtils.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/ExpressionUtils.java index 7f776322..c91ef3a2 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/ExpressionUtils.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/ExpressionUtils.java @@ -34,7 +34,7 @@ public static Map buildExpressionMap( } public static Map evaluateExpressionMap( - Map origMap, WorkflowContext workflow, TaskContext task, JsonNode n) { + Map origMap, WorkflowContext workflow, TaskContext task, JsonNode n) { return new ProxyMap( origMap, o -> @@ -50,7 +50,7 @@ public static Object buildExpressionObject(Object obj, ExpressionFactory factory } public static Object evaluateExpressionObject( - Object obj, WorkflowContext workflow, TaskContext task, JsonNode node) { + Object obj, WorkflowContext workflow, TaskContext task, JsonNode node) { return obj instanceof Map ? ExpressionUtils.evaluateExpressionMap((Map) obj, workflow, task, node) : obj; diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java index 0207d3b5..35ca13c7 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java @@ -41,7 +41,7 @@ public JQExpression(Supplier scope, String expr, Version version) } @Override - public JsonNode eval(WorkflowContext workflow, TaskContext task, JsonNode node) { + public JsonNode eval(WorkflowContext workflow, TaskContext task, JsonNode node) { JsonNodeOutput output = new JsonNodeOutput(); try { internalExpr.apply(createScope(workflow, task), node, output); @@ -75,17 +75,19 @@ public JsonNode getResult() { } } - private Scope createScope(WorkflowContext workflow, TaskContext task) { + private Scope createScope(WorkflowContext workflow, TaskContext task) { Scope childScope = Scope.newChildScope(scope.get()); - childScope.setValue("input", task.input()); - childScope.setValue("output", task.output()); + if (task != null) { + childScope.setValue("input", task.input()); + childScope.setValue("output", task.output()); + childScope.setValue("task", () -> JsonUtils.fromValue(TaskDescriptor.of(task))); + task.variables().forEach((k, v) -> childScope.setValue(k, JsonUtils.fromValue(v))); + } childScope.setValue("context", workflow.context()); childScope.setValue( "runtime", () -> JsonUtils.fromValue(workflow.definition().runtimeDescriptorFactory().get())); childScope.setValue("workflow", () -> JsonUtils.fromValue(WorkflowDescriptor.of(workflow))); - childScope.setValue("task", () -> JsonUtils.fromValue(TaskDescriptor.of(task))); - task.variables().forEach((k, v) -> childScope.setValue(k, JsonUtils.fromValue(v))); return childScope; } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/TaskDescriptor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/TaskDescriptor.java index a78bffa7..f1e04cba 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/TaskDescriptor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/TaskDescriptor.java @@ -19,17 +19,17 @@ import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.impl.TaskContext; -public record TaskDescriptor( +public record TaskDescriptor( String name, String reference, - T definition, + TaskBase definition, JsonNode rawInput, JsonNode rawOutput, DateTimeDescriptor startedAt) { - public static TaskDescriptor of(TaskContext context) { - return new TaskDescriptor( - context.position().last().toString(), + public static TaskDescriptor of(TaskContext context) { + return new TaskDescriptor( + context.taskName(), context.position().jsonPointer(), context.task(), context.rawInput(), diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/resources/DynamicResource.java b/impl/core/src/main/java/io/serverlessworkflow/impl/resources/DynamicResource.java index ee9432c0..accac01e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/resources/DynamicResource.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/resources/DynamicResource.java @@ -22,5 +22,5 @@ import java.util.Optional; public interface DynamicResource { - InputStream open(WorkflowContext workflow, Optional> task, JsonNode input); + InputStream open(WorkflowContext workflow, Optional task, JsonNode input); } diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java index 6ac708b7..53fd162d 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java @@ -28,19 +28,17 @@ import java.time.Instant; import java.util.Arrays; import java.util.Map; +import java.util.concurrent.CompletionException; import java.util.function.Consumer; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class WorkflowDefinitionTest { private static WorkflowApplication appl; - private static Logger logger = LoggerFactory.getLogger(WorkflowDefinitionTest.class); private static Instant before; @BeforeAll @@ -61,27 +59,29 @@ private static Stream provideParameters() { args( "switch-then-string.yaml", Map.of("orderType", "electronic"), - o -> assertThat(o.output()).isEqualTo(Map.of("validate", true, "status", "fulfilled"))), + o -> + assertThat(o.output().join()) + .isEqualTo(Map.of("validate", true, "status", "fulfilled"))), args( "switch-then-string.yaml", Map.of("orderType", "physical"), o -> - assertThat(o.output()) + assertThat(o.output().join()) .isEqualTo(Map.of("inventory", "clear", "items", 1, "address", "Elmer St"))), args( "switch-then-string.yaml", Map.of("orderType", "unknown"), o -> - assertThat(o.output()) + assertThat(o.output().join()) .isEqualTo(Map.of("log", "warn", "message", "something's wrong"))), args( "for-sum.yaml", Map.of("input", Arrays.asList(1, 2, 3)), - o -> assertThat(o.output()).isEqualTo(6)), + o -> assertThat(o.output().join()).isEqualTo(6)), args( "for-collect.yaml", Map.of("input", Arrays.asList(1, 2, 3)), - o -> assertThat(o.output()).isEqualTo(Map.of("output", Arrays.asList(2, 4, 6)))), + o -> assertThat(o.output().join()).isEqualTo(Map.of("output", Arrays.asList(2, 4, 6)))), args( "simple-expression.yaml", Map.of("input", Arrays.asList(1, 2, 3)), @@ -98,7 +98,7 @@ private static Stream provideParameters() { "fork.yaml", Map.of(), o -> - assertThat(((ObjectNode) o.outputAsJsonNode()).get("patientId").asText()) + assertThat(((ObjectNode) o.outputAsJsonNode().join()).get("patientId").asText()) .isIn("John", "Smith")), args("fork-no-compete.yaml", Map.of(), WorkflowDefinitionTest::checkNotCompeteOuput)); } @@ -114,12 +114,23 @@ private static Arguments args( return Arguments.of( fileName, (Consumer) - d -> consumer.accept(catchThrowableOfType(clazz, () -> d.execute(Map.of())))); + d -> + checkWorkflowException( + catchThrowableOfType( + CompletionException.class, + () -> d.execute(Map.of()).outputAsJsonNode().join()), + consumer, + clazz)); + } + + private static void checkWorkflowException( + CompletionException ex, Consumer consumer, Class clazz) { + assertThat(ex.getCause()).isInstanceOf(clazz); + consumer.accept(clazz.cast(ex.getCause())); } private static void checkNotCompeteOuput(WorkflowInstance instance) { - JsonNode out = instance.outputAsJsonNode(); - logger.debug("Output is {}", out); + JsonNode out = instance.outputAsJsonNode().join(); assertThat(out).isInstanceOf(ArrayNode.class); assertThat(out).hasSize(2); ArrayNode array = (ArrayNode) out; @@ -146,7 +157,7 @@ private static void checkWorkflowException(WorkflowException ex) { } private static void checkSpecialKeywords(WorkflowInstance obj) { - Map result = (Map) obj.output(); + Map result = (Map) obj.output().join(); assertThat(Instant.ofEpochMilli((long) result.get("startedAt"))) .isAfterOrEqualTo(before) .isBeforeOrEqualTo(Instant.now()); diff --git a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java index 13e61d35..79a57156 100644 --- a/impl/http/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java +++ b/impl/http/src/main/java/io/serverlessworkflow/impl/executors/HttpExecutor.java @@ -24,14 +24,15 @@ import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.api.types.UriTemplate; import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; -import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowError; import io.serverlessworkflow.impl.WorkflowException; import io.serverlessworkflow.impl.expressions.Expression; import io.serverlessworkflow.impl.expressions.ExpressionFactory; import io.serverlessworkflow.impl.expressions.ExpressionUtils; import io.serverlessworkflow.impl.json.JsonUtils; +import io.serverlessworkflow.impl.resources.ResourceLoader; import jakarta.ws.rs.HttpMethod; import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.client.Client; @@ -41,6 +42,7 @@ import jakarta.ws.rs.client.WebTarget; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; public class HttpExecutor implements CallableTask { @@ -53,33 +55,34 @@ public class HttpExecutor implements CallableTask { @FunctionalInterface private interface TargetSupplier { - WebTarget apply(WorkflowContext workflow, TaskContext task, JsonNode node); + WebTarget apply(WorkflowContext workflow, TaskContext task, JsonNode node); } @FunctionalInterface private interface RequestSupplier { - JsonNode apply(Builder request, WorkflowContext workflow, TaskContext task, JsonNode node); + JsonNode apply(Builder request, WorkflowContext workflow, TaskContext task, JsonNode node); } @Override - public void init(CallHTTP task, WorkflowDefinition definition) { + public void init(CallHTTP task, WorkflowApplication application, ResourceLoader resourceLoader) { HTTPArguments httpArgs = task.getWith(); - this.targetSupplier = getTargetSupplier(httpArgs.getEndpoint(), definition.expressionFactory()); + this.targetSupplier = + getTargetSupplier(httpArgs.getEndpoint(), application.expressionFactory()); this.headersMap = httpArgs.getHeaders() != null ? ExpressionUtils.buildExpressionMap( - httpArgs.getHeaders().getAdditionalProperties(), definition.expressionFactory()) + httpArgs.getHeaders().getAdditionalProperties(), application.expressionFactory()) : Map.of(); this.queryMap = httpArgs.getQuery() != null ? ExpressionUtils.buildExpressionMap( - httpArgs.getQuery().getAdditionalProperties(), definition.expressionFactory()) + httpArgs.getQuery().getAdditionalProperties(), application.expressionFactory()) : Map.of(); switch (httpArgs.getMethod().toUpperCase()) { case HttpMethod.POST: Object body = ExpressionUtils.buildExpressionObject( - httpArgs.getBody(), definition.expressionFactory()); + httpArgs.getBody(), application.expressionFactory()); this.requestFunction = (request, workflow, context, node) -> request.post( @@ -94,8 +97,8 @@ public void init(CallHTTP task, WorkflowDefinition definition) { } @Override - public JsonNode apply( - WorkflowContext workflow, TaskContext taskContext, JsonNode input) { + public CompletableFuture apply( + WorkflowContext workflow, TaskContext taskContext, JsonNode input) { WebTarget target = targetSupplier.apply(workflow, taskContext, input); for (Entry entry : ExpressionUtils.evaluateExpressionMap(queryMap, workflow, taskContext, input).entrySet()) { @@ -105,7 +108,8 @@ public JsonNode apply( ExpressionUtils.evaluateExpressionMap(headersMap, workflow, taskContext, input) .forEach(request::header); try { - return requestFunction.apply(request, workflow, taskContext, input); + return CompletableFuture.completedFuture( + requestFunction.apply(request, workflow, taskContext, input)); } catch (WebApplicationException exception) { throw new WorkflowException( WorkflowError.communication(exception.getResponse().getStatus(), taskContext, exception) @@ -158,7 +162,7 @@ public ExpressionURISupplier(Expression expr) { } @Override - public WebTarget apply(WorkflowContext workflow, TaskContext task, JsonNode node) { + public WebTarget apply(WorkflowContext workflow, TaskContext task, JsonNode node) { return client.target(expr.eval(workflow, task, node).asText()); } } diff --git a/impl/http/src/test/java/io/serverlessworkflow/impl/HTTPWorkflowDefinitionTest.java b/impl/http/src/test/java/io/serverlessworkflow/impl/HTTPWorkflowDefinitionTest.java index c6447141..1d11d4b9 100644 --- a/impl/http/src/test/java/io/serverlessworkflow/impl/HTTPWorkflowDefinitionTest.java +++ b/impl/http/src/test/java/io/serverlessworkflow/impl/HTTPWorkflowDefinitionTest.java @@ -42,7 +42,11 @@ static void init() { @MethodSource("provideParameters") void testWorkflowExecution(String fileName, Object input, Condition condition) throws IOException { - assertThat(appl.workflowDefinition(readWorkflowFromClasspath(fileName)).execute(input).output()) + assertThat( + appl.workflowDefinition(readWorkflowFromClasspath(fileName)) + .execute(input) + .output() + .join()) .is(condition); }