Skip to content

Commit

Permalink
[Fix #484] Execute Fork task
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Dec 3, 2024
1 parent 91bbe5c commit 976729f
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

@FunctionalInterface
public interface ExecutorServiceFactory extends Supplier<ExecutorService> {}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public String jsonPointer() {

@Override
public String toString() {
return "ListWorkflowPosition [list=" + queue + "]";
return "QueueWorkflowPosition [queue=" + queue + "]";
}

@Override
Expand All @@ -65,6 +65,6 @@ public WorkflowPosition back() {

@Override
public Object last() {
return queue.pollLast();
return queue.getLast();
}
}
71 changes: 54 additions & 17 deletions impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,64 @@ public class TaskContext<T extends TaskBase> {
private final JsonNode rawInput;
private final T task;
private final WorkflowPosition position;
private final Instant startedAt = Instant.now();
private final Instant startedAt;

private JsonNode input;
private JsonNode output;
private JsonNode rawOutput;
private FlowDirective flowDirective;
private Map<String, Object> contextVariables;
private Instant completedAt;

public TaskContext(JsonNode input, WorkflowPosition position) {
this.rawInput = input;
this.position = position;
this.task = null;
this.contextVariables = new HashMap<>();
init();
this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>());
}

public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
this.rawInput = input;
this.position = taskContext.position.copy();
this.task = task;
this.flowDirective = task.getThen();
this.contextVariables = new HashMap<>(taskContext.variables());
init();
public TaskContext<T> copy() {
return new TaskContext<T>(
rawInput,
task,
position.copy(),
startedAt,
input,
output,
rawOutput,
flowDirective,
new HashMap<>(contextVariables));
}

private void init() {
this.input = rawInput;
this.rawOutput = rawInput;
this.output = rawInput;
public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
this(
input,
task,
taskContext.position,
Instant.now(),
input,
input,
input,
task.getThen(),
new HashMap<>(taskContext.variables()));
}

private TaskContext(
JsonNode rawInput,
T task,
WorkflowPosition position,
Instant startedAt,
JsonNode input,
JsonNode output,
JsonNode rawOutput,
FlowDirective flowDirective,
Map<String, Object> contextVariables) {
this.rawInput = rawInput;
this.task = task;
this.position = position;
this.startedAt = startedAt;
this.input = input;
this.output = output;
this.rawOutput = rawOutput;
this.flowDirective = flowDirective;
this.contextVariables = contextVariables;
}

public void input(JsonNode input) {
Expand Down Expand Up @@ -115,4 +144,12 @@ public WorkflowPosition position() {
public Instant startedAt() {
return startedAt;
}

public void completedAt(Instant instant) {
this.completedAt = instant;
}

public Instant completedAt() {
return completedAt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class WorkflowApplication implements AutoCloseable {

Expand All @@ -43,8 +45,11 @@ public class WorkflowApplication implements AutoCloseable {
private final Collection<WorkflowExecutionListener> listeners;
private final Map<WorkflowId, WorkflowDefinition> definitions;
private final WorkflowPositionFactory positionFactory;
private final ExecutorServiceFactory executorFactory;
private final RuntimeDescriptorFactory runtimeDescriptorFactory;

private ExecutorService executorService;

public WorkflowApplication(
TaskExecutorFactory taskFactory,
ExpressionFactory exprFactory,
Expand All @@ -53,6 +58,7 @@ public WorkflowApplication(
WorkflowPositionFactory positionFactory,
WorkflowIdFactory idFactory,
RuntimeDescriptorFactory runtimeDescriptorFactory,
ExecutorServiceFactory executorFactory,
Collection<WorkflowExecutionListener> listeners) {
this.taskFactory = taskFactory;
this.exprFactory = exprFactory;
Expand All @@ -61,6 +67,7 @@ public WorkflowApplication(
this.positionFactory = positionFactory;
this.idFactory = idFactory;
this.runtimeDescriptorFactory = runtimeDescriptorFactory;
this.executorFactory = executorFactory;
this.listeners = listeners;
this.definitions = new ConcurrentHashMap<>();
}
Expand Down Expand Up @@ -101,6 +108,7 @@ public static class Builder {
private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get();
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
private ExecutorServiceFactory executorFactory = () -> Executors.newCachedThreadPool();
private RuntimeDescriptorFactory descriptorFactory =
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());

Expand Down Expand Up @@ -129,6 +137,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) {
return this;
}

public Builder withExecutorFactory(ExecutorServiceFactory executorFactory) {
this.executorFactory = executorFactory;
return this;
}

public Builder withPositionFactory(WorkflowPositionFactory positionFactory) {
this.positionFactory = positionFactory;
return this;
Expand Down Expand Up @@ -158,6 +171,7 @@ public WorkflowApplication build() {
positionFactory,
idFactory,
descriptorFactory,
executorFactory,
listeners == null
? Collections.emptySet()
: Collections.unmodifiableCollection(listeners));
Expand Down Expand Up @@ -190,4 +204,13 @@ public WorkflowPositionFactory positionFactory() {
public RuntimeDescriptorFactory runtimeDescriptorFactory() {
return runtimeDescriptorFactory;
}

public ExecutorService executorService() {
synchronized (executorFactory) {
if (executorService == null) {
executorService = executorFactory.get();
}
}
return executorService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

public class WorkflowDefinition implements AutoCloseable {

Expand Down Expand Up @@ -134,6 +135,10 @@ public WorkflowPositionFactory positionFactory() {
return application.positionFactory();
}

public ExecutorService executorService() {
return application.executorService();
}

public RuntimeDescriptorFactory runtimeDescriptorFactory() {
return application.runtimeDescriptorFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package io.serverlessworkflow.impl;

import io.serverlessworkflow.api.types.Task;
import io.serverlessworkflow.api.types.TaskBase;

public interface WorkflowExecutionListener {

void onTaskStarted(WorkflowPosition currentPos, Task task);
void onTaskStarted(WorkflowPosition currentPos, TaskBase task);

void onTaskEnded(WorkflowPosition currentPos, Task task);
void onTaskEnded(WorkflowPosition currentPos, TaskBase task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

public class WorkflowInstance {
private WorkflowState state;
private TaskContext<?> taskContext;
private final AtomicReference<WorkflowState> state;
private final TaskContext<?> taskContext;
private final String id;
private final JsonNode input;
private final Instant startedAt;
private JsonNode context = NullNode.getInstance();
private final AtomicReference<JsonNode> context;

WorkflowInstance(WorkflowDefinition definition, JsonNode input) {
this.id = definition.idFactory().get();
Expand All @@ -39,7 +40,8 @@ public class WorkflowInstance {
definition
.inputFilter()
.ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input)));
state = WorkflowState.STARTED;
state = new AtomicReference<>(WorkflowState.STARTED);
context = new AtomicReference<>(NullNode.getInstance());
WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
definition
.outputFilter()
Expand All @@ -62,22 +64,26 @@ public JsonNode input() {
}

public JsonNode context() {
return context;
return context.get();
}

public WorkflowState state() {
return state;
return state.get();
}

public void state(WorkflowState state) {
this.state.set(state);
}

public Object output() {
return toJavaValue(taskContext.output());
}

public Object outputAsJsonNode() {
public JsonNode outputAsJsonNode() {
return taskContext.output();
}

void context(JsonNode context) {
this.context = context;
this.context.set(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,48 +153,49 @@ public static void processTaskList(
TaskItem nextTask = iter.next();
while (nextTask != null) {
TaskItem task = nextTask;
parentTask.position().addIndex(iter.previousIndex()).addProperty(task.getName());
context
.definition()
.listeners()
.forEach(l -> l.onTaskStarted(parentTask.position(), task.getTask()));
currentContext =
context
.definition()
.taskExecutors()
.computeIfAbsent(
parentTask.position().jsonPointer(),
k ->
context
.definition()
.taskFactory()
.getTaskExecutor(task.getTask(), context.definition()))
.apply(context, parentTask, currentContext.output());
parentTask.position().addIndex(iter.previousIndex());
currentContext = executeTask(context, parentTask, task, currentContext.output());
FlowDirective flowDirective = currentContext.flowDirective();
if (flowDirective.getFlowDirectiveEnum() != null) {
switch (flowDirective.getFlowDirectiveEnum()) {
case CONTINUE:
nextTask = iter.hasNext() ? iter.next() : null;
break;
case END:
context.instance().state(WorkflowState.COMPLETED);
case EXIT:
nextTask = null;
break;
}
} else {
nextTask = WorkflowUtils.findTaskByName(iter, flowDirective.getString());
}
context
.definition()
.listeners()
.forEach(l -> l.onTaskEnded(parentTask.position(), task.getTask()));
parentTask.position().back();
}
}
parentTask.position().back();
parentTask.rawOutput(currentContext.output());
}

public static TaskContext<?> executeTask(
WorkflowContext context, TaskContext<?> parentTask, TaskItem task, JsonNode input) {
parentTask.position().addProperty(task.getName());
TaskContext<?> result =
context
.definition()
.taskExecutors()
.computeIfAbsent(
parentTask.position().jsonPointer(),
k ->
context
.definition()
.taskFactory()
.getTaskExecutor(task.getTask(), context.definition()))
.apply(context, parentTask, input);
parentTask.position().back();
return result;
}

public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) {
assert str != null;
Expression expression = exprFactory.getExpression(str);
Expand Down
Loading

0 comments on commit 976729f

Please sign in to comment.