Skip to content

Commit 37bd454

Browse files
authored
[Fix #822] TaskExecutorFactory to accept WorkflowDefinition (#823)
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent 4989629 commit 37bd454

27 files changed

+122
-293
lines changed

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaConsumerCallExecutor.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,20 @@
1616
package io.serverlessworkflow.impl.executors.func;
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
19-
import io.serverlessworkflow.api.types.Workflow;
2019
import io.serverlessworkflow.api.types.func.CallJava;
2120
import io.serverlessworkflow.impl.TaskContext;
22-
import io.serverlessworkflow.impl.WorkflowApplication;
2321
import io.serverlessworkflow.impl.WorkflowContext;
22+
import io.serverlessworkflow.impl.WorkflowDefinition;
2423
import io.serverlessworkflow.impl.WorkflowModel;
2524
import io.serverlessworkflow.impl.executors.CallableTask;
26-
import io.serverlessworkflow.impl.resources.ResourceLoader;
2725
import java.util.concurrent.CompletableFuture;
2826
import java.util.function.Consumer;
2927

3028
public class JavaConsumerCallExecutor implements CallableTask<CallJava.CallJavaConsumer> {
3129

3230
private Consumer consumer;
3331

34-
public void init(
35-
CallJava.CallJavaConsumer task,
36-
Workflow workflow,
37-
WorkflowApplication application,
38-
ResourceLoader loader) {
32+
public void init(CallJava.CallJavaConsumer task, WorkflowDefinition definition) {
3933
consumer = task.consumer();
4034
}
4135

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaForExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,23 @@
1818
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject;
1919

2020
import io.serverlessworkflow.api.types.ForTask;
21-
import io.serverlessworkflow.api.types.Workflow;
2221
import io.serverlessworkflow.api.types.func.ForTaskFunction;
2322
import io.serverlessworkflow.api.types.func.LoopPredicateIndex;
2423
import io.serverlessworkflow.api.types.func.TypedFunction;
25-
import io.serverlessworkflow.impl.WorkflowApplication;
24+
import io.serverlessworkflow.impl.WorkflowDefinition;
2625
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2726
import io.serverlessworkflow.impl.WorkflowPredicate;
2827
import io.serverlessworkflow.impl.WorkflowValueResolver;
2928
import io.serverlessworkflow.impl.executors.ForExecutor.ForExecutorBuilder;
3029
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
31-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3230
import java.util.Collection;
3331
import java.util.Optional;
3432

3533
public class JavaForExecutorBuilder extends ForExecutorBuilder {
3634

3735
protected JavaForExecutorBuilder(
38-
WorkflowMutablePosition position,
39-
ForTask task,
40-
Workflow workflow,
41-
WorkflowApplication application,
42-
ResourceLoader resourceLoader) {
43-
super(position, task, workflow, application, resourceLoader);
36+
WorkflowMutablePosition position, ForTask task, WorkflowDefinition definition) {
37+
super(position, task, definition);
4438
}
4539

4640
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFunctionCallExecutor.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@
1616
package io.serverlessworkflow.impl.executors.func;
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
19-
import io.serverlessworkflow.api.types.Workflow;
2019
import io.serverlessworkflow.api.types.func.CallJava;
2120
import io.serverlessworkflow.impl.TaskContext;
22-
import io.serverlessworkflow.impl.WorkflowApplication;
2321
import io.serverlessworkflow.impl.WorkflowContext;
22+
import io.serverlessworkflow.impl.WorkflowDefinition;
2423
import io.serverlessworkflow.impl.WorkflowModel;
2524
import io.serverlessworkflow.impl.WorkflowModelFactory;
2625
import io.serverlessworkflow.impl.executors.CallableTask;
27-
import io.serverlessworkflow.impl.resources.ResourceLoader;
2826
import java.util.Optional;
2927
import java.util.concurrent.CompletableFuture;
3028
import java.util.function.Function;
@@ -39,11 +37,7 @@ static String fromInt(Integer integer) {
3937
return Integer.toString(integer);
4038
}
4139

42-
public void init(
43-
CallJava.CallJavaFunction<T, V> task,
44-
Workflow workflow,
45-
WorkflowApplication application,
46-
ResourceLoader loader) {
40+
public void init(CallJava.CallJavaFunction<T, V> task, WorkflowDefinition definition) {
4741
function = task.function();
4842
inputClass = task.inputClass();
4943
}

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaListenExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,18 @@
1919

2020
import io.serverlessworkflow.api.types.ListenTask;
2121
import io.serverlessworkflow.api.types.Until;
22-
import io.serverlessworkflow.api.types.Workflow;
2322
import io.serverlessworkflow.api.types.func.UntilPredicate;
24-
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
2524
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2625
import io.serverlessworkflow.impl.WorkflowPredicate;
2726
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
2827
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
29-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3028

3129
public class JavaListenExecutorBuilder extends ListenExecutorBuilder {
3230

3331
protected JavaListenExecutorBuilder(
34-
WorkflowMutablePosition position,
35-
ListenTask task,
36-
Workflow workflow,
37-
WorkflowApplication application,
38-
ResourceLoader resourceLoader) {
39-
super(position, task, workflow, application, resourceLoader);
32+
WorkflowMutablePosition position, ListenTask task, WorkflowDefinition definition) {
33+
super(position, task, definition);
4034
}
4135

4236
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionCallExecutor.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,22 @@
1818
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject;
1919

2020
import io.serverlessworkflow.api.types.TaskBase;
21-
import io.serverlessworkflow.api.types.Workflow;
2221
import io.serverlessworkflow.api.types.func.CallJava;
2322
import io.serverlessworkflow.api.types.func.LoopFunction;
2423
import io.serverlessworkflow.impl.TaskContext;
25-
import io.serverlessworkflow.impl.WorkflowApplication;
2624
import io.serverlessworkflow.impl.WorkflowContext;
25+
import io.serverlessworkflow.impl.WorkflowDefinition;
2726
import io.serverlessworkflow.impl.WorkflowModel;
2827
import io.serverlessworkflow.impl.WorkflowModelFactory;
2928
import io.serverlessworkflow.impl.executors.CallableTask;
30-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3129
import java.util.concurrent.CompletableFuture;
3230

3331
public class JavaLoopFunctionCallExecutor implements CallableTask<CallJava.CallJavaLoopFunction> {
3432

3533
private LoopFunction function;
3634
private String varName;
3735

38-
public void init(
39-
CallJava.CallJavaLoopFunction task,
40-
Workflow workflow,
41-
WorkflowApplication application,
42-
ResourceLoader loader) {
36+
public void init(CallJava.CallJavaLoopFunction task, WorkflowDefinition definition) {
4337
function = task.function();
4438
varName = task.varName();
4539
}

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaLoopFunctionIndexCallExecutor.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@
1818
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.safeObject;
1919

2020
import io.serverlessworkflow.api.types.TaskBase;
21-
import io.serverlessworkflow.api.types.Workflow;
2221
import io.serverlessworkflow.api.types.func.CallJava;
2322
import io.serverlessworkflow.api.types.func.LoopFunctionIndex;
2423
import io.serverlessworkflow.impl.TaskContext;
25-
import io.serverlessworkflow.impl.WorkflowApplication;
2624
import io.serverlessworkflow.impl.WorkflowContext;
25+
import io.serverlessworkflow.impl.WorkflowDefinition;
2726
import io.serverlessworkflow.impl.WorkflowModel;
2827
import io.serverlessworkflow.impl.WorkflowModelFactory;
2928
import io.serverlessworkflow.impl.executors.CallableTask;
30-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3129
import java.util.concurrent.CompletableFuture;
3230

3331
public class JavaLoopFunctionIndexCallExecutor
@@ -37,11 +35,7 @@ public class JavaLoopFunctionIndexCallExecutor
3735
private String varName;
3836
private String indexName;
3937

40-
public void init(
41-
CallJava.CallJavaLoopFunctionIndex task,
42-
Workflow workflow,
43-
WorkflowApplication application,
44-
ResourceLoader loader) {
38+
public void init(CallJava.CallJavaLoopFunctionIndex task, WorkflowDefinition definition) {
4539
function = task.function();
4640
varName = task.varName();
4741
indexName = task.indexName();

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,19 @@
1919

2020
import io.serverlessworkflow.api.types.SwitchCase;
2121
import io.serverlessworkflow.api.types.SwitchTask;
22-
import io.serverlessworkflow.api.types.Workflow;
2322
import io.serverlessworkflow.api.types.func.SwitchCaseFunction;
24-
import io.serverlessworkflow.impl.WorkflowApplication;
23+
import io.serverlessworkflow.impl.WorkflowDefinition;
2524
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2625
import io.serverlessworkflow.impl.WorkflowPredicate;
2726
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
2827
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
29-
import io.serverlessworkflow.impl.resources.ResourceLoader;
3028
import java.util.Optional;
3129

3230
public class JavaSwitchExecutorBuilder extends SwitchExecutorBuilder {
3331

3432
protected JavaSwitchExecutorBuilder(
35-
WorkflowMutablePosition position,
36-
SwitchTask task,
37-
Workflow workflow,
38-
WorkflowApplication application,
39-
ResourceLoader resourceLoader) {
40-
super(position, task, workflow, application, resourceLoader);
33+
WorkflowMutablePosition position, SwitchTask task, WorkflowDefinition definition) {
34+
super(position, task, definition);
4135
}
4236

4337
@Override

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,23 @@
1717

1818
import io.serverlessworkflow.api.types.Task;
1919
import io.serverlessworkflow.api.types.TaskBase;
20-
import io.serverlessworkflow.api.types.Workflow;
21-
import io.serverlessworkflow.impl.WorkflowApplication;
20+
import io.serverlessworkflow.impl.WorkflowDefinition;
2221
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2322
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
2423
import io.serverlessworkflow.impl.executors.TaskExecutorBuilder;
25-
import io.serverlessworkflow.impl.resources.ResourceLoader;
2624

2725
public class JavaTaskExecutorFactory extends DefaultTaskExecutorFactory {
2826

2927
public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
30-
WorkflowMutablePosition position,
31-
Task task,
32-
Workflow workflow,
33-
WorkflowApplication application,
34-
ResourceLoader resourceLoader) {
28+
WorkflowMutablePosition position, Task task, WorkflowDefinition definition) {
3529
if (task.getForTask() != null) {
36-
return new JavaForExecutorBuilder(
37-
position, task.getForTask(), workflow, application, resourceLoader);
30+
return new JavaForExecutorBuilder(position, task.getForTask(), definition);
3831
} else if (task.getSwitchTask() != null) {
39-
return new JavaSwitchExecutorBuilder(
40-
position, task.getSwitchTask(), workflow, application, resourceLoader);
32+
return new JavaSwitchExecutorBuilder(position, task.getSwitchTask(), definition);
4133
} else if (task.getListenTask() != null) {
42-
return new JavaListenExecutorBuilder(
43-
position, task.getListenTask(), workflow, application, resourceLoader);
34+
return new JavaListenExecutorBuilder(position, task.getListenTask(), definition);
4435
} else {
45-
return super.getTaskExecutor(position, task, workflow, application, resourceLoader);
36+
return super.getTaskExecutor(position, task, definition);
4637
}
4738
}
4839
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ public class WorkflowDefinition implements AutoCloseable, WorkflowDefinitionData
3636
private Optional<WorkflowFilter> outputFilter = Optional.empty();
3737
private final WorkflowApplication application;
3838
private final TaskExecutor<?> taskExecutor;
39+
private final ResourceLoader resourceLoader;
3940

4041
private WorkflowDefinition(
4142
WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) {
4243
this.workflow = workflow;
4344
this.application = application;
45+
this.resourceLoader = resourceLoader;
4446
if (workflow.getInput() != null) {
4547
Input input = workflow.getInput();
4648
this.inputSchemaValidator =
@@ -55,11 +57,7 @@ private WorkflowDefinition(
5557
}
5658
this.taskExecutor =
5759
TaskExecutorHelper.createExecutorList(
58-
application.positionFactory().get(),
59-
workflow.getDo(),
60-
workflow,
61-
application,
62-
resourceLoader);
60+
application.positionFactory().get(), workflow.getDo(), this);
6361
}
6462

6563
static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) {
@@ -105,6 +103,10 @@ public WorkflowApplication application() {
105103
return application;
106104
}
107105

106+
public ResourceLoader resourceLoader() {
107+
return resourceLoader;
108+
}
109+
108110
@Override
109111
public void close() {
110112
// TODO close resourcers hold for uncompleted process instances, if any

impl/core/src/main/java/io/serverlessworkflow/impl/executors/AbstractTaskExecutor.java

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.serverlessworkflow.impl.TaskContext;
2929
import io.serverlessworkflow.impl.WorkflowApplication;
3030
import io.serverlessworkflow.impl.WorkflowContext;
31+
import io.serverlessworkflow.impl.WorkflowDefinition;
3132
import io.serverlessworkflow.impl.WorkflowFilter;
3233
import io.serverlessworkflow.impl.WorkflowModel;
3334
import io.serverlessworkflow.impl.WorkflowMutablePosition;
@@ -81,17 +82,13 @@ public abstract static class AbstractTaskExecutorBuilder<
8182
private V instance;
8283

8384
protected AbstractTaskExecutorBuilder(
84-
WorkflowMutablePosition position,
85-
T task,
86-
Workflow workflow,
87-
WorkflowApplication application,
88-
ResourceLoader resourceLoader) {
89-
this.workflow = workflow;
85+
WorkflowMutablePosition position, T task, WorkflowDefinition definition) {
86+
this.workflow = definition.workflow();
9087
this.taskName = position.last().toString();
9188
this.position = position;
9289
this.task = task;
93-
this.application = application;
94-
this.resourceLoader = resourceLoader;
90+
this.application = definition.application();
91+
this.resourceLoader = definition.resourceLoader();
9592
if (task.getInput() != null) {
9693
Input input = task.getInput();
9794
this.inputProcessor = buildWorkflowFilter(application, input.getFrom());
@@ -174,16 +171,18 @@ protected AbstractTaskExecutor(AbstractTaskExecutorBuilder<T, ?> builder) {
174171

175172
protected final CompletableFuture<TaskContext> executeNext(
176173
CompletableFuture<TaskContext> future, WorkflowContext workflow) {
177-
return future.thenCompose(
178-
t -> {
179-
TransitionInfo transition = t.transition();
180-
if (transition.isEndNode()) {
181-
workflow.instance().status(WorkflowStatus.COMPLETED);
182-
} else if (transition.next() != null) {
183-
return transition.next().apply(workflow, t.parent(), t.output());
184-
}
185-
return CompletableFuture.completedFuture(t);
186-
});
174+
return future.thenCompose(t -> executeNext(workflow, t));
175+
}
176+
177+
private CompletableFuture<TaskContext> executeNext(
178+
WorkflowContext workflow, TaskContext taskContext) {
179+
TransitionInfo transition = taskContext.transition();
180+
if (transition.isEndNode()) {
181+
workflow.instance().status(WorkflowStatus.COMPLETED);
182+
} else if (transition.next() != null) {
183+
return transition.next().apply(workflow, taskContext.parent(), taskContext.output());
184+
}
185+
return CompletableFuture.completedFuture(taskContext);
187186
}
188187

189188
@Override

0 commit comments

Comments
 (0)