Skip to content

Commit

Permalink
Merge pull request serverlessworkflow#486 from fjtirado/Fix_#463
Browse files Browse the repository at this point in the history
[Fix serverlessworkflow#463] Supporting runtime expressions
  • Loading branch information
fjtirado authored Nov 28, 2024
2 parents 4bfa0d9 + 997f423 commit d201c5f
Show file tree
Hide file tree
Showing 19 changed files with 452 additions and 132 deletions.
6 changes: 6 additions & 0 deletions impl/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
<artifactId>serverlessworkflow-impl-core</artifactId>
<properties>
<version.net.thisptr>1.1.0</version.net.thisptr>
<version.com.github.f4b6a3>5.2.3</version.com.github.f4b6a3>
</properties>
<dependencies>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-api</artifactId>
<version>7.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.github.f4b6a3</groupId>
<artifactId>ulid-creator</artifactId>
<version>${version.com.github.f4b6a3}</version>
</dependency>
<dependency>
<groupId>com.networknt</groupId>
<artifactId>json-schema-validator</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.stream.Collectors;

public class QueueWorkflowPosition implements WorkflowPosition {

private Deque<Object> queue;

QueueWorkflowPosition() {
this(new ArrayDeque<>());
}

private QueueWorkflowPosition(Deque<Object> list) {
this.queue = list;
}

public QueueWorkflowPosition copy() {
return new QueueWorkflowPosition(new ArrayDeque<>(this.queue));
}

@Override
public WorkflowPosition addIndex(int index) {
queue.add(index);
return this;
}

@Override
public WorkflowPosition addProperty(String prop) {
queue.add(prop);
return this;
}

@Override
public String jsonPointer() {
return queue.stream().map(Object::toString).collect(Collectors.joining("/"));
}

@Override
public String toString() {
return "ListWorkflowPosition [list=" + queue + "]";
}

@Override
public WorkflowPosition back() {
queue.removeLast();
return this;
}

@Override
public Object last() {
return queue.pollLast();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,8 @@
*/
package io.serverlessworkflow.impl;

class DefaultWorkflowPositionFactory implements WorkflowPositionFactory {
import io.serverlessworkflow.impl.expressions.RuntimeDescriptor;
import java.util.function.Supplier;

private static WorkflowPositionFactory instance = new DefaultWorkflowPositionFactory();

public static WorkflowPositionFactory get() {
return instance;
}

private DefaultWorkflowPositionFactory() {}

@Override
public WorkflowPosition buildPosition() {
return new DefaultWorkflowPosition();
}
}
@FunctionalInterface
public interface RuntimeDescriptorFactory extends Supplier<RuntimeDescriptor> {}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
*/
package io.serverlessworkflow.impl;

public class DefaultWorkflowPosition implements WorkflowPosition {
public class StringBufferWorkflowPosition implements WorkflowPosition {

private StringBuilder sb;

DefaultWorkflowPosition() {
this.sb = new StringBuilder("");
StringBufferWorkflowPosition() {
this("");
}

private DefaultWorkflowPosition(WorkflowPosition position) {
this.sb = new StringBuilder(position.toString());
private StringBufferWorkflowPosition(String str) {
this.sb = new StringBuilder(str);
}

public DefaultWorkflowPosition copy() {
return new DefaultWorkflowPosition(this);
public StringBufferWorkflowPosition copy() {
return new StringBufferWorkflowPosition(this.jsonPointer());
}

@Override
Expand All @@ -50,7 +50,7 @@ public String jsonPointer() {

@Override
public String toString() {
return "DefaultWorkflowPosition [sb=" + sb + "]";
return "StringBufferWorkflowPosition [sb=" + sb + "]";
}

@Override
Expand All @@ -61,4 +61,10 @@ public WorkflowPosition back() {
}
return this;
}

@Override
public Object last() {
int indexOf = sb.lastIndexOf("/");
return indexOf != -1 ? jsonPointer().substring(indexOf + 1) : "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.serverlessworkflow.api.types.FlowDirective;
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
import io.serverlessworkflow.api.types.TaskBase;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -27,6 +28,7 @@ public class TaskContext<T extends TaskBase> {
private final JsonNode rawInput;
private final T task;
private final WorkflowPosition position;
private final Instant startedAt = Instant.now();

private JsonNode input;
private JsonNode output;
Expand Down Expand Up @@ -109,4 +111,8 @@ public Map<String, Object> variables() {
public WorkflowPosition position() {
return position;
}

public Instant startedAt() {
return startedAt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package io.serverlessworkflow.impl;

import com.github.f4b6a3.ulid.UlidCreator;
import io.serverlessworkflow.api.types.Document;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory;
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import io.serverlessworkflow.impl.expressions.JQExpressionFactory;
import io.serverlessworkflow.impl.expressions.RuntimeDescriptor;
import io.serverlessworkflow.impl.jsonschema.DefaultSchemaValidatorFactory;
import io.serverlessworkflow.impl.jsonschema.SchemaValidatorFactory;
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
Expand All @@ -37,22 +39,28 @@ public class WorkflowApplication implements AutoCloseable {
private final ExpressionFactory exprFactory;
private final ResourceLoaderFactory resourceLoaderFactory;
private final SchemaValidatorFactory schemaValidatorFactory;
private final WorkflowIdFactory idFactory;
private final Collection<WorkflowExecutionListener> listeners;
private final Map<WorkflowId, WorkflowDefinition> definitions;
private final WorkflowPositionFactory positionFactory;
private final RuntimeDescriptorFactory runtimeDescriptorFactory;

public WorkflowApplication(
TaskExecutorFactory taskFactory,
ExpressionFactory exprFactory,
ResourceLoaderFactory resourceLoaderFactory,
SchemaValidatorFactory schemaValidatorFactory,
WorkflowPositionFactory positionFactory,
WorkflowIdFactory idFactory,
RuntimeDescriptorFactory runtimeDescriptorFactory,
Collection<WorkflowExecutionListener> listeners) {
this.taskFactory = taskFactory;
this.exprFactory = exprFactory;
this.resourceLoaderFactory = resourceLoaderFactory;
this.schemaValidatorFactory = schemaValidatorFactory;
this.positionFactory = positionFactory;
this.idFactory = idFactory;
this.runtimeDescriptorFactory = runtimeDescriptorFactory;
this.listeners = listeners;
this.definitions = new ConcurrentHashMap<>();
}
Expand Down Expand Up @@ -81,13 +89,20 @@ public Collection<WorkflowExecutionListener> listeners() {
return listeners;
}

public WorkflowIdFactory idFactory() {
return idFactory;
}

public static class Builder {
private TaskExecutorFactory taskFactory = DefaultTaskExecutorFactory.get();
private ExpressionFactory exprFactory = JQExpressionFactory.get();
private Collection<WorkflowExecutionListener> listeners;
private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get();
private SchemaValidatorFactory schemaValidatorFactory = DefaultSchemaValidatorFactory.get();
private WorkflowPositionFactory positionFactory = DefaultWorkflowPositionFactory.get();
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
private WorkflowIdFactory idFactory = () -> UlidCreator.getMonotonicUlid().toString();
private RuntimeDescriptorFactory descriptorFactory =
() -> new RuntimeDescriptor("reference impl", "1.0.0_alpha", Collections.emptyMap());

private Builder() {}

Expand Down Expand Up @@ -124,13 +139,25 @@ public Builder withSchemaValidatorFactory(SchemaValidatorFactory factory) {
return this;
}

public Builder withIdFactory(WorkflowIdFactory factory) {
this.idFactory = factory;
return this;
}

public Builder withDescriptorFactory(RuntimeDescriptorFactory factory) {
this.descriptorFactory = factory;
return this;
}

public WorkflowApplication build() {
return new WorkflowApplication(
taskFactory,
exprFactory,
resourceLoaderFactory,
schemaValidatorFactory,
positionFactory,
idFactory,
descriptorFactory,
listeners == null
? Collections.emptySet()
: Collections.unmodifiableCollection(listeners));
Expand Down Expand Up @@ -159,4 +186,8 @@ public void close() throws Exception {
public WorkflowPositionFactory positionFactory() {
return positionFactory;
}

public RuntimeDescriptorFactory runtimeDescriptorFactory() {
return runtimeDescriptorFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,26 @@
package io.serverlessworkflow.impl;

import com.fasterxml.jackson.databind.JsonNode;
import io.serverlessworkflow.impl.json.JsonUtils;

public class WorkflowContext {
private final WorkflowDefinition definition;
private final JsonNode input;
private JsonNode context;
private final WorkflowInstance instance;

WorkflowContext(WorkflowDefinition definition, JsonNode input) {
WorkflowContext(WorkflowDefinition definition, WorkflowInstance instance) {
this.definition = definition;
this.input = input;
this.context = JsonUtils.mapper().createObjectNode();
this.instance = instance;
}

public JsonNode context() {
return context;
public WorkflowInstance instance() {
return instance;
}

public void context(JsonNode context) {
this.context = context;
public JsonNode context() {
return instance.context();
}

public JsonNode rawInput() {
return input;
public void context(JsonNode context) {
this.instance.context(context);
}

public WorkflowDefinition definition() {
Expand Down
Loading

0 comments on commit d201c5f

Please sign in to comment.