Skip to content

Commit

Permalink
Merge pull request serverlessworkflow#483 from fjtirado/Fix_#474
Browse files Browse the repository at this point in the history
[Fix_#474] For task implementation
  • Loading branch information
fjtirado authored Nov 26, 2024
2 parents 9087cb9 + 399ad9d commit 0cc396a
Show file tree
Hide file tree
Showing 26 changed files with 332 additions and 119 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import java.util.Map;

public interface ContextAware {
Map<String, Object> variables();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,19 @@

public class DefaultWorkflowPosition implements WorkflowPosition {

private StringBuilder sb = new StringBuilder("");
private StringBuilder sb;

DefaultWorkflowPosition() {
this.sb = new StringBuilder("");
}

private DefaultWorkflowPosition(WorkflowPosition position) {
this.sb = new StringBuilder(position.toString());
}

public DefaultWorkflowPosition copy() {
return new DefaultWorkflowPosition(this);
}

@Override
public WorkflowPosition addIndex(int index) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

class DefaultWorkflowPositionFactory implements WorkflowPositionFactory {

private static WorkflowPositionFactory instance = new DefaultWorkflowPositionFactory();

public static WorkflowPositionFactory get() {
return instance;
}

private DefaultWorkflowPositionFactory() {}

@Override
public WorkflowPosition buildPosition() {
return new DefaultWorkflowPosition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,48 @@
import io.serverlessworkflow.api.types.FlowDirective;
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
import io.serverlessworkflow.api.types.TaskBase;
import java.util.HashMap;
import java.util.Map;

public class TaskContext<T extends TaskBase> {
public class TaskContext<T extends TaskBase> implements ContextAware {

private final JsonNode rawInput;
private final T task;
private final WorkflowPosition position;

private JsonNode input;
private JsonNode output;
private JsonNode rawOutput;
private FlowDirective flowDirective;
private Map<String, Object> contextVariables;

public TaskContext(JsonNode rawInput, T task) {
this.rawInput = rawInput;
public TaskContext(JsonNode input, WorkflowPosition position) {
this.rawInput = input;
this.position = position;
this.task = null;
this.contextVariables = new HashMap<>();
init();
}

public TaskContext(JsonNode input, TaskContext<?> taskContext, T task) {
this.rawInput = input;
this.position = taskContext.position.copy();
this.task = task;
this.flowDirective = task.getThen();
this.contextVariables = new HashMap<>(taskContext.variables());
init();
}

private void init() {
this.input = rawInput;
this.rawOutput = rawInput;
this.output = rawInput;
this.task = task;
this.flowDirective = task.getThen();
}

public void input(JsonNode input) {
this.input = input;
this.rawOutput = input;
this.output = input;
}

public JsonNode input() {
Expand Down Expand Up @@ -81,4 +101,12 @@ public FlowDirective flowDirective() {
? new FlowDirective().withFlowDirectiveEnum(FlowDirectiveEnum.CONTINUE)
: flowDirective;
}

public Map<String, Object> variables() {
return contextVariables;
}

public WorkflowPosition position() {
return position;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -40,17 +39,20 @@ public class WorkflowApplication implements AutoCloseable {
private final SchemaValidatorFactory schemaValidatorFactory;
private final Collection<WorkflowExecutionListener> listeners;
private final Map<WorkflowId, WorkflowDefinition> definitions;
private final WorkflowPositionFactory positionFactory;

public WorkflowApplication(
TaskExecutorFactory taskFactory,
ExpressionFactory exprFactory,
ResourceLoaderFactory resourceLoaderFactory,
SchemaValidatorFactory schemaValidatorFactory,
WorkflowPositionFactory positionFactory,
Collection<WorkflowExecutionListener> listeners) {
this.taskFactory = taskFactory;
this.exprFactory = exprFactory;
this.resourceLoaderFactory = resourceLoaderFactory;
this.schemaValidatorFactory = schemaValidatorFactory;
this.positionFactory = positionFactory;
this.listeners = listeners;
this.definitions = new ConcurrentHashMap<>();
}
Expand Down Expand Up @@ -85,6 +87,7 @@ public static class Builder {
private Collection<WorkflowExecutionListener> listeners;
private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get();
private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get();
private WorkflowPositionFactory positionFactory = DefaultWorkflowPositionFactory.get();

private Builder() {}

Expand All @@ -111,6 +114,11 @@ public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) {
return this;
}

public Builder withPositionFactory(WorkflowPositionFactory positionFactory) {
this.positionFactory = positionFactory;
return this;
}

public Builder withSchemaValidatorFactory(SchemaValidatorFactory factory) {
this.schemaValidatorFactory = factory;
return this;
Expand All @@ -122,6 +130,7 @@ public WorkflowApplication build() {
exprFactory,
resourceLoaderFactory,
schemaValidatorFactory,
positionFactory,
listeners == null
? Collections.emptySet()
: Collections.unmodifiableCollection(listeners));
Expand All @@ -146,4 +155,8 @@ public void close() throws Exception {
}
definitions.clear();
}

public WorkflowPositionFactory positionFactory() {
return positionFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,16 @@
import io.serverlessworkflow.impl.json.JsonUtils;

public class WorkflowContext {
private final WorkflowPosition position;
private final WorkflowDefinition definition;
private final JsonNode input;
private JsonNode current;
private JsonNode context;

private WorkflowContext(
WorkflowPosition position, WorkflowDefinition definition, JsonNode input) {
this.position = position;
WorkflowContext(WorkflowDefinition definition, JsonNode input) {
this.definition = definition;
this.input = input;
this.current = input.deepCopy();
this.context = JsonUtils.mapper().createObjectNode();
}

public static Builder builder(WorkflowDefinition definition, JsonNode input) {
return new Builder(definition, input);
}

public static class Builder {
private WorkflowPosition position = new DefaultWorkflowPosition();
private WorkflowDefinition definition;
private JsonNode input;

private Builder(WorkflowDefinition definition, JsonNode input) {
this.definition = definition;
this.input = input;
}

public Builder position(WorkflowPosition position) {
this.position = position;
return this;
}

public WorkflowContext build() {
return new WorkflowContext(position, definition, input);
}
}

public WorkflowPosition position() {
return position;
}

public JsonNode context() {
return context;
}
Expand All @@ -74,14 +41,6 @@ public JsonNode rawInput() {
return input;
}

public void current(JsonNode output) {
this.current = output;
}

public JsonNode current() {
return current;
}

public WorkflowDefinition definition() {
return definition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.serverlessworkflow.impl.jsonschema.SchemaValidator;
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
import io.serverlessworkflow.impl.resources.ResourceLoader;

import java.nio.file.Path;
import java.util.Collection;
import java.util.Map;
Expand All @@ -47,6 +46,7 @@ public class WorkflowDefinition implements AutoCloseable {
private final ExpressionFactory exprFactory;
private final ResourceLoader resourceLoader;
private final SchemaValidatorFactory schemaValidatorFactory;
private final WorkflowPositionFactory positionFactory;
private final Map<String, TaskExecutor<? extends TaskBase>> taskExecutors =
new ConcurrentHashMap<>();

Expand All @@ -56,12 +56,14 @@ private WorkflowDefinition(
TaskExecutorFactory taskFactory,
ResourceLoader resourceLoader,
ExpressionFactory exprFactory,
SchemaValidatorFactory schemaValidatorFactory) {
SchemaValidatorFactory schemaValidatorFactory,
WorkflowPositionFactory positionFactory) {
this.workflow = workflow;
this.listeners = listeners;
this.taskFactory = taskFactory;
this.exprFactory = exprFactory;
this.schemaValidatorFactory = schemaValidatorFactory;
this.positionFactory = positionFactory;
this.resourceLoader = resourceLoader;
if (workflow.getInput() != null) {
Input input = workflow.getInput();
Expand Down Expand Up @@ -90,7 +92,8 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
application.taskFactory(),
application.resourceLoaderFactory().getResourceLoader(path),
application.expressionFactory(),
application.validatorFactory());
application.validatorFactory(),
application.positionFactory());
}

public WorkflowInstance execute(Object input) {
Expand Down Expand Up @@ -142,6 +145,10 @@ public ResourceLoader resourceLoader() {
return resourceLoader;
}

public WorkflowPositionFactory positionFactory() {
return positionFactory;
}

@Override
public void close() {
// TODO close resourcers hold for uncompleted process instances, if any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
package io.serverlessworkflow.impl;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Optional;

@FunctionalInterface
public interface WorkflowFilter {
JsonNode apply(WorkflowContext workflow, Optional<TaskContext<?>> task, JsonNode node);
JsonNode apply(WorkflowContext workflow, TaskContext<?> task, JsonNode node);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,37 @@
import static io.serverlessworkflow.impl.json.JsonUtils.toJavaValue;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Optional;

public class WorkflowInstance {
private WorkflowState state;
private WorkflowContext context;
private TaskContext<?> taskContext;

WorkflowInstance(WorkflowDefinition definition, JsonNode input) {
definition.inputSchemaValidator().ifPresent(v -> v.validate(input));
context = WorkflowContext.builder(definition, input).build();
context = new WorkflowContext(definition, input);
taskContext = new TaskContext<>(input, definition.positionFactory().buildPosition());
definition
.inputFilter()
.ifPresent(f -> context.current(f.apply(context, Optional.empty(), context.current())));
.ifPresent(f -> taskContext.input(f.apply(context, taskContext, input)));
state = WorkflowState.STARTED;
WorkflowUtils.processTaskList(definition.workflow().getDo(), context);
taskContext.rawOutput(
WorkflowUtils.processTaskList(definition.workflow().getDo(), context, taskContext));
definition
.outputFilter()
.ifPresent(f -> context.current(f.apply(context, Optional.empty(), context.current())));
definition.outputSchemaValidator().ifPresent(v -> v.validate(context.current()));
.ifPresent(f -> taskContext.output(f.apply(context, taskContext, taskContext.rawOutput())));
definition.outputSchemaValidator().ifPresent(v -> v.validate(taskContext.output()));
}

public WorkflowState state() {
return state;
}

public Object output() {
return toJavaValue(context.current());
return toJavaValue(taskContext.output());
}

public Object outputAsJsonNode() {
return context.current();
return taskContext.output();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ public interface WorkflowPosition {
WorkflowPosition addIndex(int index);

WorkflowPosition back();

WorkflowPosition copy();
}
Loading

0 comments on commit 0cc396a

Please sign in to comment.