From 997f4236fee1b2c4e506d77d639212fb2ef5f458 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Wed, 27 Nov 2024 17:07:39 +0100 Subject: [PATCH] [Fix #463] Supporting runtime expressions Signed-off-by: Francisco Javier Tirado Sarti --- impl/core/pom.xml | 6 ++ .../impl/QueueWorkflowPosition.java | 70 ++++++++++++++++ ...ory.java => RuntimeDescriptorFactory.java} | 18 +--- ...java => StringBufferWorkflowPosition.java} | 22 +++-- .../serverlessworkflow/impl/TaskContext.java | 6 ++ .../impl/WorkflowApplication.java | 33 +++++++- .../impl/WorkflowContext.java | 21 ++--- .../impl/WorkflowDefinition.java | 57 +++++-------- .../impl/WorkflowIdFactory.java | 21 +++++ .../impl/WorkflowInstance.java | 42 ++++++++-- .../impl/WorkflowPosition.java | 2 + .../impl/WorkflowPositionFactory.java | 6 +- .../impl/expressions/DateTimeDescriptor.java | 48 +++++++++++ .../impl/expressions/JQExpression.java | 45 +++++----- .../impl/expressions/RuntimeDescriptor.java | 21 +++++ .../impl/expressions/TaskDescriptor.java | 39 +++++++++ .../impl/expressions/WorkflowDescriptor.java | 33 ++++++++ .../impl/WorkflowDefinitionTest.java | 83 ++++++++++++------- .../src/test/resources/simple-expression.yaml | 11 +++ 19 files changed, 452 insertions(+), 132 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java rename impl/core/src/main/java/io/serverlessworkflow/impl/{DefaultWorkflowPositionFactory.java => RuntimeDescriptorFactory.java} (63%) rename impl/core/src/main/java/io/serverlessworkflow/impl/{DefaultWorkflowPosition.java => StringBufferWorkflowPosition.java} (69%) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowIdFactory.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/expressions/DateTimeDescriptor.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/expressions/RuntimeDescriptor.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/expressions/TaskDescriptor.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/expressions/WorkflowDescriptor.java create mode 100644 impl/core/src/test/resources/simple-expression.yaml diff --git a/impl/core/pom.xml b/impl/core/pom.xml index 597b3758..940f79fa 100644 --- a/impl/core/pom.xml +++ b/impl/core/pom.xml @@ -8,6 +8,7 @@ serverlessworkflow-impl-core 1.1.0 + 5.2.3 @@ -15,6 +16,11 @@ serverlessworkflow-api 7.0.0-SNAPSHOT + + com.github.f4b6a3 + ulid-creator + ${version.com.github.f4b6a3} + com.networknt json-schema-validator diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java new file mode 100644 index 00000000..c6d3f141 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/QueueWorkflowPosition.java @@ -0,0 +1,70 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.stream.Collectors; + +public class QueueWorkflowPosition implements WorkflowPosition { + + private Deque queue; + + QueueWorkflowPosition() { + this(new ArrayDeque<>()); + } + + private QueueWorkflowPosition(Deque list) { + this.queue = list; + } + + public QueueWorkflowPosition copy() { + return new QueueWorkflowPosition(new ArrayDeque<>(this.queue)); + } + + @Override + public WorkflowPosition addIndex(int index) { + queue.add(index); + return this; + } + + @Override + public WorkflowPosition addProperty(String prop) { + queue.add(prop); + return this; + } + + @Override + public String jsonPointer() { + return queue.stream().map(Object::toString).collect(Collectors.joining("/")); + } + + @Override + public String toString() { + return "ListWorkflowPosition [list=" + queue + "]"; + } + + @Override + public WorkflowPosition back() { + queue.removeLast(); + return this; + } + + @Override + public Object last() { + return queue.pollLast(); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPositionFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/RuntimeDescriptorFactory.java similarity index 63% rename from impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPositionFactory.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/RuntimeDescriptorFactory.java index 00b0085c..2d0601fb 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPositionFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/RuntimeDescriptorFactory.java @@ -15,18 +15,8 @@ */ package io.serverlessworkflow.impl; -class DefaultWorkflowPositionFactory implements WorkflowPositionFactory { +import io.serverlessworkflow.impl.expressions.RuntimeDescriptor; +import java.util.function.Supplier; - private static WorkflowPositionFactory instance = new DefaultWorkflowPositionFactory(); - - public static WorkflowPositionFactory get() { - return instance; - } - - private DefaultWorkflowPositionFactory() {} - - @Override - public WorkflowPosition buildPosition() { - return new DefaultWorkflowPosition(); - } -} +@FunctionalInterface +public interface RuntimeDescriptorFactory extends Supplier {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/StringBufferWorkflowPosition.java similarity index 69% rename from impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPosition.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/StringBufferWorkflowPosition.java index 54f993b1..18aaf8e4 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/DefaultWorkflowPosition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/StringBufferWorkflowPosition.java @@ -15,20 +15,20 @@ */ package io.serverlessworkflow.impl; -public class DefaultWorkflowPosition implements WorkflowPosition { +public class StringBufferWorkflowPosition implements WorkflowPosition { private StringBuilder sb; - DefaultWorkflowPosition() { - this.sb = new StringBuilder(""); + StringBufferWorkflowPosition() { + this(""); } - private DefaultWorkflowPosition(WorkflowPosition position) { - this.sb = new StringBuilder(position.toString()); + private StringBufferWorkflowPosition(String str) { + this.sb = new StringBuilder(str); } - public DefaultWorkflowPosition copy() { - return new DefaultWorkflowPosition(this); + public StringBufferWorkflowPosition copy() { + return new StringBufferWorkflowPosition(this.jsonPointer()); } @Override @@ -50,7 +50,7 @@ public String jsonPointer() { @Override public String toString() { - return "DefaultWorkflowPosition [sb=" + sb + "]"; + return "StringBufferWorkflowPosition [sb=" + sb + "]"; } @Override @@ -61,4 +61,10 @@ public WorkflowPosition back() { } return this; } + + @Override + public Object last() { + int indexOf = sb.lastIndexOf("/"); + return indexOf != -1 ? jsonPointer().substring(indexOf + 1) : ""; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java index 138a4aed..cadde89c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java @@ -19,6 +19,7 @@ import io.serverlessworkflow.api.types.FlowDirective; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.api.types.TaskBase; +import java.time.Instant; import java.util.HashMap; import java.util.Map; @@ -27,6 +28,7 @@ public class TaskContext { private final JsonNode rawInput; private final T task; private final WorkflowPosition position; + private final Instant startedAt = Instant.now(); private JsonNode input; private JsonNode output; @@ -109,4 +111,8 @@ public Map variables() { public WorkflowPosition position() { return position; } + + public Instant startedAt() { + return startedAt; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index d9da16b9..4f35de41 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -15,12 +15,14 @@ */ package io.serverlessworkflow.impl; +import com.github.f4b6a3.ulid.UlidCreator; import io.serverlessworkflow.api.types.Document; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory; import io.serverlessworkflow.impl.executors.TaskExecutorFactory; import io.serverlessworkflow.impl.expressions.ExpressionFactory; import io.serverlessworkflow.impl.expressions.JQExpressionFactory; +import io.serverlessworkflow.impl.expressions.RuntimeDescriptor; import io.serverlessworkflow.impl.jsonschema.DefaultSchemaValidatorFactory; import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory; import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory; @@ -37,9 +39,11 @@ public class WorkflowApplication implements AutoCloseable { private final ExpressionFactory exprFactory; private final ResourceLoaderFactory resourceLoaderFactory; private final SchemaValidatorFactory schemaValidatorFactory; + private final WorkflowIdFactory idFactory; private final Collection listeners; private final Map definitions; private final WorkflowPositionFactory positionFactory; + private final RuntimeDescriptorFactory runtimeDescriptorFactory; public WorkflowApplication( TaskExecutorFactory taskFactory, @@ -47,12 +51,16 @@ public WorkflowApplication( ResourceLoaderFactory resourceLoaderFactory, SchemaValidatorFactory schemaValidatorFactory, WorkflowPositionFactory positionFactory, + WorkflowIdFactory idFactory, + RuntimeDescriptorFactory runtimeDescriptorFactory, Collection listeners) { this.taskFactory = taskFactory; this.exprFactory = exprFactory; this.resourceLoaderFactory = resourceLoaderFactory; this.schemaValidatorFactory = schemaValidatorFactory; this.positionFactory = positionFactory; + this.idFactory = idFactory; + this.runtimeDescriptorFactory = runtimeDescriptorFactory; this.listeners = listeners; this.definitions = new ConcurrentHashMap<>(); } @@ -81,13 +89,20 @@ public Collection listeners() { return listeners; } + public WorkflowIdFactory idFactory() { + return idFactory; + } + public static class Builder { private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory.get(); private ExpressionFactory exprFactory = JQExpressionFactory.get(); private Collection listeners; private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get(); private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get(); - private WorkflowPositionFactory positionFactory = DefaultWorkflowPositionFactory.get(); + private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition(); + private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString(); + private RuntimeDescriptorFactory descriptorFactory = + () -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap()); private Builder() {} @@ -124,6 +139,16 @@ public Builder withSchemaValidatorFactory(SchemaValidatorFactory factory) { return this; } + public Builder withIdFactory(WorkflowIdFactory factory) { + this.idFactory = factory; + return this; + } + + public Builder withDescriptorFactory(RuntimeDescriptorFactory factory) { + this.descriptorFactory = factory; + return this; + } + public WorkflowApplication build() { return new WorkflowApplication( taskFactory, @@ -131,6 +156,8 @@ public WorkflowApplication build() { resourceLoaderFactory, schemaValidatorFactory, positionFactory, + idFactory, + descriptorFactory, listeners == null ? Collections.emptySet() : Collections.unmodifiableCollection(listeners)); @@ -159,4 +186,8 @@ public void close() throws Exception { public WorkflowPositionFactory positionFactory() { return positionFactory; } + + public RuntimeDescriptorFactory runtimeDescriptorFactory() { + return runtimeDescriptorFactory; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java index d5c1f428..f45f1b84 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowContext.java @@ -16,29 +16,26 @@ package io.serverlessworkflow.impl; import com.fasterxml.jackson.databind.JsonNode; -import io.serverlessworkflow.impl.json.JsonUtils; public class WorkflowContext { private final WorkflowDefinition definition; - private final JsonNode input; - private JsonNode context; + private final WorkflowInstance instance; - WorkflowContext(WorkflowDefinition definition, JsonNode input) { + WorkflowContext(WorkflowDefinition definition, WorkflowInstance instance) { this.definition = definition; - this.input = input; - this.context = JsonUtils.mapper().createObjectNode(); + this.instance = instance; } - public JsonNode context() { - return context; + public WorkflowInstance instance() { + return instance; } - public void context(JsonNode context) { - this.context = context; + public JsonNode context() { + return instance.context(); } - public JsonNode rawInput() { - return input; + public void context(JsonNode context) { + this.instance.context(context); } public WorkflowDefinition definition() { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index 3a76ff1f..db8b5e4d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -37,47 +37,34 @@ public class WorkflowDefinition implements AutoCloseable { private final Workflow workflow; - private final Collection listeners; private Optional inputSchemaValidator = Optional.empty(); private Optional outputSchemaValidator = Optional.empty(); private Optional inputFilter = Optional.empty(); private Optional outputFilter = Optional.empty(); - private final TaskExecutorFactory taskFactory; - private final ExpressionFactory exprFactory; - private final ResourceLoader resourceLoader; - private final SchemaValidatorFactory schemaValidatorFactory; - private final WorkflowPositionFactory positionFactory; private final Map> taskExecutors = new ConcurrentHashMap<>(); + private final ResourceLoader resourceLoader; + private final WorkflowApplication application; private WorkflowDefinition( - Workflow workflow, - Collection listeners, - TaskExecutorFactory taskFactory, - ResourceLoader resourceLoader, - ExpressionFactory exprFactory, - SchemaValidatorFactory schemaValidatorFactory, - WorkflowPositionFactory positionFactory) { + WorkflowApplication application, Workflow workflow, ResourceLoader resourceLoader) { + this.workflow = workflow; - this.listeners = listeners; - this.taskFactory = taskFactory; - this.exprFactory = exprFactory; - this.schemaValidatorFactory = schemaValidatorFactory; - this.positionFactory = positionFactory; + this.application = application; this.resourceLoader = resourceLoader; if (workflow.getInput() != null) { Input input = workflow.getInput(); this.inputSchemaValidator = getSchemaValidator( - schemaValidatorFactory, schemaToNode(resourceLoader, input.getSchema())); - this.inputFilter = buildWorkflowFilter(exprFactory, input.getFrom()); + application.validatorFactory(), schemaToNode(resourceLoader, input.getSchema())); + this.inputFilter = buildWorkflowFilter(application.expressionFactory(), input.getFrom()); } if (workflow.getOutput() != null) { Output output = workflow.getOutput(); this.outputSchemaValidator = getSchemaValidator( - schemaValidatorFactory, schemaToNode(resourceLoader, output.getSchema())); - this.outputFilter = buildWorkflowFilter(exprFactory, output.getAs()); + application.validatorFactory(), schemaToNode(resourceLoader, output.getSchema())); + this.outputFilter = buildWorkflowFilter(application.expressionFactory(), output.getAs()); } } @@ -87,13 +74,7 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, Path path) { return new WorkflowDefinition( - workflow, - application.listeners(), - application.taskFactory(), - application.resourceLoaderFactory().getResourceLoader(path), - application.expressionFactory(), - application.validatorFactory(), - application.positionFactory()); + application, workflow, application.resourceLoaderFactory().getResourceLoader(path)); } public WorkflowInstance execute(Object input) { @@ -113,7 +94,7 @@ public Workflow workflow() { } public Collection listeners() { - return listeners; + return application.listeners(); } public Map> taskExecutors() { @@ -121,23 +102,27 @@ public Map> taskExecutors() { } public TaskExecutorFactory taskFactory() { - return taskFactory; + return application.taskFactory(); } public Optional outputFilter() { return outputFilter; } + public WorkflowIdFactory idFactory() { + return application.idFactory(); + } + public Optional outputSchemaValidator() { return outputSchemaValidator; } public ExpressionFactory expressionFactory() { - return exprFactory; + return application.expressionFactory(); } public SchemaValidatorFactory validatorFactory() { - return schemaValidatorFactory; + return application.validatorFactory(); } public ResourceLoader resourceLoader() { @@ -146,7 +131,11 @@ public ResourceLoader resourceLoader() { } public WorkflowPositionFactory positionFactory() { - return positionFactory; + return application.positionFactory(); + } + + public RuntimeDescriptorFactory runtimeDescriptorFactory() { + return application.runtimeDescriptorFactory(); } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowIdFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowIdFactory.java new file mode 100644 index 00000000..12b0f7c6 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowIdFactory.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import java.util.function.Supplier; + +@FunctionalInterface +public interface WorkflowIdFactory extends Supplier {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java index 1361c43f..444d3fd5 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowInstance.java @@ -18,28 +18,54 @@ import static io.serverlessworkflow.impl.json.JsonUtils.toJavaValue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import java.time.Instant; public class WorkflowInstance { private WorkflowState state; - private WorkflowContext context; private TaskContext taskContext; + private final String id; + private final JsonNode input; + private final Instant startedAt; + private JsonNode context = NullNode.getInstance(); WorkflowInstance(WorkflowDefinition definition, JsonNode input) { + this.id = definition.idFactory().get(); + this.input = input; definition.inputSchemaValidator().ifPresent(v -> v.validate(input)); - context = new WorkflowContext(definition, input); - taskContext = new TaskContext<>(input, definition.positionFactory().buildPosition()); + this.startedAt = Instant.now(); + WorkflowContext workflowContext = new WorkflowContext(definition, this); + taskContext = new TaskContext<>(input, definition.positionFactory().get()); definition .inputFilter() - .ifPresent(f -> taskContext.input(f.apply(context, taskContext, input))); + .ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input))); state = WorkflowState.STARTED; taskContext.rawOutput( - WorkflowUtils.processTaskList(definition.workflow().getDo(), context, taskContext)); + WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext)); definition .outputFilter() - .ifPresent(f -> taskContext.output(f.apply(context, taskContext, taskContext.rawOutput()))); + .ifPresent( + f -> + taskContext.output(f.apply(workflowContext, taskContext, taskContext.rawOutput()))); definition.outputSchemaValidator().ifPresent(v -> v.validate(taskContext.output())); } + public String id() { + return id; + } + + public Instant startedAt() { + return startedAt; + } + + public JsonNode input() { + return input; + } + + public JsonNode context() { + return context; + } + public WorkflowState state() { return state; } @@ -51,4 +77,8 @@ public Object output() { public Object outputAsJsonNode() { return taskContext.output(); } + + void context(JsonNode context) { + this.context = context; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java index cf63844a..1c416100 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPosition.java @@ -26,4 +26,6 @@ public interface WorkflowPosition { WorkflowPosition back(); WorkflowPosition copy(); + + Object last(); } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java index e93a4c33..c2a3df7e 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowPositionFactory.java @@ -15,6 +15,6 @@ */ package io.serverlessworkflow.impl; -public interface WorkflowPositionFactory { - WorkflowPosition buildPosition(); -} +import java.util.function.Supplier; + +public interface WorkflowPositionFactory extends Supplier {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/DateTimeDescriptor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/DateTimeDescriptor.java new file mode 100644 index 00000000..7936763f --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/DateTimeDescriptor.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.expressions; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.Instant; + +public class DateTimeDescriptor { + + private final Instant instant; + + public static DateTimeDescriptor from(Instant instant) { + return new DateTimeDescriptor(instant); + } + + private DateTimeDescriptor(Instant instant) { + this.instant = instant; + } + + @JsonProperty("iso8601") + public String iso8601() { + return instant.toString(); + } + + @JsonProperty("epoch") + public Epoch epoch() { + return Epoch.of(instant); + } + + public static record Epoch(long seconds, long milliseconds) { + public static Epoch of(Instant instant) { + return new Epoch(instant.getEpochSecond(), instant.toEpochMilli()); + } + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java index 9da21dbe..0207d3b5 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/JQExpression.java @@ -31,26 +31,28 @@ public class JQExpression implements Expression { private final Supplier scope; private final String expr; - - private net.thisptr.jackson.jq.Expression internalExpr; + private final net.thisptr.jackson.jq.Expression internalExpr; public JQExpression(Supplier scope, String expr, Version version) throws JsonQueryException { this.expr = expr; this.scope = scope; - this.internalExpr = compile(version); - } - - private net.thisptr.jackson.jq.Expression compile(Version version) throws JsonQueryException { - return ExpressionParser.compile(expr, version); + this.internalExpr = ExpressionParser.compile(expr, version); } - private interface TypedOutput extends Output { - T getResult(); + @Override + public JsonNode eval(WorkflowContext workflow, TaskContext task, JsonNode node) { + JsonNodeOutput output = new JsonNodeOutput(); + try { + internalExpr.apply(createScope(workflow, task), node, output); + return output.getResult(); + } catch (JsonQueryException e) { + throw new IllegalArgumentException( + "Unable to evaluate content " + node + " using expr " + expr, e); + } } - private static class JsonNodeOutput implements TypedOutput { - + private static class JsonNodeOutput implements Output { private JsonNode result; private boolean arrayCreated; @@ -68,26 +70,21 @@ public void emit(JsonNode out) throws JsonQueryException { } } - @Override public JsonNode getResult() { return result; } } - @Override - public JsonNode eval(WorkflowContext workflow, TaskContext task, JsonNode node) { - TypedOutput output = new JsonNodeOutput(); - try { - internalExpr.apply(createScope(workflow, task), node, output); - return output.getResult(); - } catch (JsonQueryException e) { - throw new IllegalArgumentException( - "Unable to evaluate content " + node + " using expr " + expr, e); - } - } - private Scope createScope(WorkflowContext workflow, TaskContext task) { Scope childScope = Scope.newChildScope(scope.get()); + childScope.setValue("input", task.input()); + childScope.setValue("output", task.output()); + childScope.setValue("context", workflow.context()); + childScope.setValue( + "runtime", + () -> JsonUtils.fromValue(workflow.definition().runtimeDescriptorFactory().get())); + childScope.setValue("workflow", () -> JsonUtils.fromValue(WorkflowDescriptor.of(workflow))); + childScope.setValue("task", () -> JsonUtils.fromValue(TaskDescriptor.of(task))); task.variables().forEach((k, v) -> childScope.setValue(k, JsonUtils.fromValue(v))); return childScope; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/RuntimeDescriptor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/RuntimeDescriptor.java new file mode 100644 index 00000000..66286632 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/RuntimeDescriptor.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.serverlessworkflow.impl.expressions; + +import java.util.Map; + +public record RuntimeDescriptor(String name, String version, Map metadata) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/TaskDescriptor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/TaskDescriptor.java new file mode 100644 index 00000000..a78bffa7 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/TaskDescriptor.java @@ -0,0 +1,39 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.expressions; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.impl.TaskContext; + +public record TaskDescriptor( + String name, + String reference, + T definition, + JsonNode rawInput, + JsonNode rawOutput, + DateTimeDescriptor startedAt) { + + public static TaskDescriptor of(TaskContext context) { + return new TaskDescriptor( + context.position().last().toString(), + context.position().jsonPointer(), + context.task(), + context.rawInput(), + context.rawOutput(), + DateTimeDescriptor.from(context.startedAt())); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/WorkflowDescriptor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/WorkflowDescriptor.java new file mode 100644 index 00000000..f6b906fb --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/expressions/WorkflowDescriptor.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.expressions; + +import com.fasterxml.jackson.databind.JsonNode; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowContext; + +public record WorkflowDescriptor( + String id, Workflow definition, JsonNode input, DateTimeDescriptor startedAt) { + + public static WorkflowDescriptor of(WorkflowContext context) { + return new WorkflowDescriptor( + context.instance().id(), + context.definition().workflow(), + context.instance().input(), + DateTimeDescriptor.from(context.instance().startedAt())); + } +} diff --git a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java index 39adcf9d..27662797 100644 --- a/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java +++ b/impl/core/src/test/java/io/serverlessworkflow/impl/WorkflowDefinitionTest.java @@ -19,10 +19,11 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.IOException; +import java.time.Instant; import java.util.Arrays; import java.util.Map; +import java.util.function.Consumer; import java.util.stream.Stream; -import org.assertj.core.api.Condition; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -31,36 +32,38 @@ public class WorkflowDefinitionTest { private static WorkflowApplication appl; + private static Instant before; @BeforeAll static void init() { appl = WorkflowApplication.builder().build(); + before = Instant.now(); } @ParameterizedTest @MethodSource("provideParameters") - void testWorkflowExecution(String fileName, Object input, Condition condition) + void testWorkflowExecution(String fileName, Object input, Consumer assertions) throws IOException { - assertThat(appl.workflowDefinition(readWorkflowFromClasspath(fileName)).execute(input).output()) - .is(condition); + assertions.accept( + appl.workflowDefinition(readWorkflowFromClasspath(fileName)).execute(input).output()); } private static Stream provideParameters() { return Stream.of( - Arguments.of( + args( "switch-then-string.yaml", Map.of("orderType", "electronic"), - new Condition( - o -> - o.equals( - Map.of("orderType", "electronic", "validate", true, "status", "fulfilled")), - "switch-electronic")), - Arguments.of( + o -> + assertThat(o) + .isEqualTo( + Map.of( + "orderType", "electronic", "validate", true, "status", "fulfilled"))), + args( "switch-then-string.yaml", Map.of("orderType", "physical"), - new Condition( - o -> - o.equals( + o -> + assertThat(o) + .isEqualTo( Map.of( "orderType", "physical", @@ -69,28 +72,48 @@ private static Stream provideParameters() { "items", 1, "address", - "Elmer St")), - "switch-physical")), - Arguments.of( + "Elmer St"))), + args( "switch-then-string.yaml", Map.of("orderType", "unknown"), - new Condition( - o -> - o.equals( + o -> + assertThat(o) + .isEqualTo( Map.of( - "orderType", "unknown", "log", "warn", "message", "something's wrong")), - "switch-unknown")), - Arguments.of( + "orderType", + "unknown", + "log", + "warn", + "message", + "something's wrong"))), + args( "for-sum.yaml", Map.of("input", Arrays.asList(1, 2, 3)), - new Condition(o -> o.equals(6), "for-sum")), - Arguments.of( + o -> assertThat(o).isEqualTo(6)), + args( "for-collect.yaml", Map.of("input", Arrays.asList(1, 2, 3)), - new Condition( - o -> - o.equals( - Map.of("input", Arrays.asList(1, 2, 3), "output", Arrays.asList(2, 4, 6))), - "for-collect"))); + o -> + assertThat(o) + .isEqualTo( + Map.of("input", Arrays.asList(1, 2, 3), "output", Arrays.asList(2, 4, 6)))), + args( + "simple-expression.yaml", + Map.of("input", Arrays.asList(1, 2, 3)), + WorkflowDefinitionTest::checkSpecialKeywords)); + } + + private static Arguments args( + String fileName, Map input, Consumer object) { + return Arguments.of(fileName, input, object); + } + + private static void checkSpecialKeywords(Object obj) { + Map result = (Map) obj; + assertThat(Instant.ofEpochMilli((long) result.get("startedAt"))) + .isAfterOrEqualTo(before) + .isBeforeOrEqualTo(Instant.now()); + assertThat(result.get("id").toString()).hasSize(26); + assertThat(result.get("version").toString()).contains("alpha"); } } diff --git a/impl/core/src/test/resources/simple-expression.yaml b/impl/core/src/test/resources/simple-expression.yaml new file mode 100644 index 00000000..4e240d6b --- /dev/null +++ b/impl/core/src/test/resources/simple-expression.yaml @@ -0,0 +1,11 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: simple-expression + version: '0.1.0' +do: + - useExpression: + set: + startedAt: ${$task.startedAt.epoch.milliseconds} + id : ${$workflow.id} + version: ${$runtime.version} \ No newline at end of file