Skip to content

Commit

Permalink
Merge pull request serverlessworkflow#511 from fjtirado/Fix_#505
Browse files Browse the repository at this point in the history
[Fix serverlessworkflow#505] Switching to CompletableFuture
  • Loading branch information
fjtirado authored Jan 9, 2025
2 parents a2ca9f7 + 8101d81 commit 96000ad
Show file tree
Hide file tree
Showing 35 changed files with 1,154 additions and 559 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
import java.util.function.BiFunction;

@FunctionalInterface
public interface LongFilter extends BiFunction<WorkflowContext, TaskContext<?>, Long> {}
public interface LongFilter extends BiFunction<WorkflowContext, TaskContext, Long> {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
import java.util.function.BiFunction;

@FunctionalInterface
public interface StringFilter extends BiFunction<WorkflowContext, TaskContext<?>, String> {}
public interface StringFilter extends BiFunction<WorkflowContext, TaskContext, String> {}
106 changes: 52 additions & 54 deletions impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,76 +16,64 @@
package io.serverlessworkflow.impl;

import com.fasterxml.jackson.databind.JsonNode;
import io.serverlessworkflow.api.types.FlowDirective;
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.impl.executors.TransitionInfo;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class TaskContext<T extends TaskBase> {
public class TaskContext {

private final JsonNode rawInput;
private final T task;
private final TaskBase task;
private final WorkflowPosition position;
private final Instant startedAt;
private final String taskName;
private final Map<String, Object> contextVariables;
private final Optional<TaskContext> parentContext;

private JsonNode input;
private JsonNode output;
private JsonNode rawOutput;
private FlowDirective flowDirective;
private Map<String, Object> contextVariables;
private Instant completedAt;
private TransitionInfo transition;

public TaskContext(JsonNode input, WorkflowPosition position) {
this(input, null, position, Instant.now(), input, input, input, null, new HashMap<>());
}

public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
this(
input,
task,
taskContext.position,
Instant.now(),
input,
input,
input,
task.getThen(),
new HashMap<>(taskContext.variables()));
public TaskContext(
JsonNode input,
WorkflowPosition position,
Optional<TaskContext> parentContext,
String taskName,
TaskBase task) {
this(input, parentContext, taskName, task, position, Instant.now(), input, input, input);
}

private TaskContext(
JsonNode rawInput,
T task,
Optional<TaskContext> parentContext,
String taskName,
TaskBase task,
WorkflowPosition position,
Instant startedAt,
JsonNode input,
JsonNode output,
JsonNode rawOutput,
FlowDirective flowDirective,
Map<String, Object> contextVariables) {
JsonNode rawOutput) {
this.rawInput = rawInput;
this.parentContext = parentContext;
this.taskName = taskName;
this.task = task;
this.position = position;
this.startedAt = startedAt;
this.input = input;
this.output = output;
this.rawOutput = rawOutput;
this.flowDirective = flowDirective;
this.contextVariables = contextVariables;
this.contextVariables =
parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new);
}

public TaskContext<T> copy() {
return new TaskContext<T>(
rawInput,
task,
position.copy(),
startedAt,
input,
output,
rawOutput,
flowDirective,
new HashMap<>(contextVariables));
public TaskContext copy() {
return new TaskContext(
rawInput, parentContext, taskName, task, position, startedAt, input, output, rawOutput);
}

public void input(JsonNode input) {
Expand All @@ -102,54 +90,64 @@ public JsonNode rawInput() {
return rawInput;
}

public T task() {
public TaskBase task() {
return task;
}

public void rawOutput(JsonNode output) {
public TaskContext rawOutput(JsonNode output) {
this.rawOutput = output;
this.output = output;
return this;
}

public JsonNode rawOutput() {
return rawOutput;
}

public void output(JsonNode output) {
public TaskContext output(JsonNode output) {
this.output = output;
return this;
}

public JsonNode output() {
return output;
}

public void flowDirective(FlowDirective flowDirective) {
this.flowDirective = flowDirective;
}

public FlowDirective flowDirective() {
return flowDirective == null
? new FlowDirective().withFlowDirectiveEnum(FlowDirectiveEnum.CONTINUE)
: flowDirective;
public WorkflowPosition position() {
return position;
}

public Map<String, Object> variables() {
return contextVariables;
}

public WorkflowPosition position() {
return position;
}

public Instant startedAt() {
return startedAt;
}

public void completedAt(Instant instant) {
public Optional<TaskContext> parent() {
return parentContext;
}

public String taskName() {
return taskName;
}

public TaskContext completedAt(Instant instant) {
this.completedAt = instant;
return this;
}

public Instant completedAt() {
return completedAt;
}

public TransitionInfo transition() {
return transition;
}

public TaskContext transition(TransitionInfo transition) {
this.transition = transition;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public class WorkflowContext {
private final WorkflowDefinition definition;
private final WorkflowInstance instance;
private JsonNode context;

WorkflowContext(WorkflowDefinition definition, WorkflowInstance instance) {
this.definition = definition;
Expand All @@ -31,11 +32,11 @@ public WorkflowInstance instance() {
}

public JsonNode context() {
return instance.context();
return context;
}

public void context(JsonNode context) {
this.instance.context(context);
this.context = context;
}

public WorkflowDefinition definition() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@

import io.serverlessworkflow.api.types.Input;
import io.serverlessworkflow.api.types.Output;
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.executors.TaskExecutor;
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
import io.serverlessworkflow.impl.json.JsonUtils;
import io.serverlessworkflow.impl.jsonschema.SchemaValidator;
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

public class WorkflowDefinition implements AutoCloseable {

Expand All @@ -42,16 +36,13 @@ public class WorkflowDefinition implements AutoCloseable {
private Optional<SchemaValidator> outputSchemaValidator = Optional.empty();
private Optional<WorkflowFilter> inputFilter = Optional.empty();
private Optional<WorkflowFilter> outputFilter = Optional.empty();
private final Map<String, TaskExecutor<? extends TaskBase>> taskExecutors =
new ConcurrentHashMap<>();
private final ResourceLoader resourceLoader;
private final WorkflowApplication application;
private final TaskExecutor<?> taskExecutor;

private WorkflowDefinition(
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
this.workflow = workflow;
this.application = application;
this.resourceLoader = resourceLoader;
if (workflow.getInput() != null) {
Input input = workflow.getInput();
this.inputSchemaValidator =
Expand All @@ -64,6 +55,13 @@ private WorkflowDefinition(
getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema());
this.outputFilter = buildWorkflowFilter(application.expressionFactory(), output.getAs());
}
this.taskExecutor =
TaskExecutorHelper.createExecutorList(
application.positionFactory().get(),
workflow.getDo(),
workflow,
application,
resourceLoader);
}

static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) {
Expand All @@ -83,6 +81,10 @@ public Optional<SchemaValidator> inputSchemaValidator() {
return inputSchemaValidator;
}

public TaskExecutor<?> startTask() {
return taskExecutor;
}

public Optional<WorkflowFilter> inputFilter() {
return inputFilter;
}
Expand All @@ -95,14 +97,6 @@ public Collection<WorkflowExecutionListener> listeners() {
return application.listeners();
}

public Map<String, TaskExecutor<? extends TaskBase>> taskExecutors() {
return taskExecutors;
}

public TaskExecutorFactory taskFactory() {
return application.taskFactory();
}

public Optional<WorkflowFilter> outputFilter() {
return outputFilter;
}
Expand All @@ -115,26 +109,6 @@ public Optional<SchemaValidator> outputSchemaValidator() {
return outputSchemaValidator;
}

public ExpressionFactory expressionFactory() {
return application.expressionFactory();
}

public SchemaValidatorFactory validatorFactory() {
return application.validatorFactory();
}

public ResourceLoader resourceLoader() {
return resourceLoader;
}

public WorkflowPositionFactory positionFactory() {
return application.positionFactory();
}

public ExecutorService executorService() {
return application.executorService();
}

public RuntimeDescriptorFactory runtimeDescriptorFactory() {
return application.runtimeDescriptorFactory();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ public static Builder error(String type, int status) {
return new Builder(type, status);
}

public static Builder communication(int status, TaskContext<?> context, Exception ex) {
public static Builder communication(int status, TaskContext context, Exception ex) {
return new Builder(COMM_TYPE, status)
.instance(context.position().jsonPointer())
.title(ex.getMessage());
}

public static Builder runtime(int status, TaskContext<?> context, Exception ex) {
public static Builder runtime(int status, TaskContext context, Exception ex) {
return new Builder(RUNTIME_TYPE, status)
.instance(context.position().jsonPointer())
.title(ex.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@

@FunctionalInterface
public interface WorkflowFilter {
JsonNode apply(WorkflowContext workflow, TaskContext<?> task, JsonNode node);
JsonNode apply(WorkflowContext workflow, TaskContext task, JsonNode node);
}
Loading

0 comments on commit 96000ad

Please sign in to comment.