Skip to content

Commit

Permalink
[Fix serverlessworkflow#468] Try/raise implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco Javier Tirado Sarti <ftirados@redhat.com>
  • Loading branch information
fjtirado committed Dec 2, 2024
1 parent d201c5f commit 29831cf
Show file tree
Hide file tree
Showing 18 changed files with 474 additions and 32 deletions.
30 changes: 27 additions & 3 deletions api/src/main/resources/schema/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -777,19 +777,22 @@ $defs:
errors:
type: object
title: CatchErrors
description: The configuration of a concept used to catch errors.
properties:
with:
$ref: '#/$defs/errorFilter'
description: static error filter
as:
type: string
title: CatchAs
description: The name of the runtime expression variable to save the error as. Defaults to 'error'.
when:
type: string
title: CatchWhen
description: A runtime expression used to determine whether or not to catch the filtered error.
description: A runtime expression used to determine whether to catch the filtered error.
exceptWhen:
type: string
title: CatchExceptWhen
description: A runtime expression used to determine whether or not to catch the filtered error.
description: A runtime expression used to determine whether not to catch the filtered error.
retry:
oneOf:
- $ref: '#/$defs/retryPolicy'
Expand Down Expand Up @@ -1152,6 +1155,27 @@ $defs:
title: ErrorDetails
description: A human-readable explanation specific to this occurrence of the error.
required: [ type, status ]
errorFilter:
type: object
title: ErrorFilter
description: Error filtering base on static values. For error filtering on dynamic values, use catch.when property
minProperties: 1
properties:
type:
type: string
description: if present, means this value should be used for filtering
status:
type: integer
description: if present, means this value should be used for filtering
instance:
type: string
description: if present, means this value should be used for filtering
title:
type: string
description: if present, means this value should be used for filtering
details:
type: string
description: if present, means this value should be used for filtering
uriTemplate:
title: UriTemplate
anyOf:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import java.util.function.BiFunction;

@FunctionalInterface
public interface StringFilter extends BiFunction<WorkflowContext, TaskContext<?>, String> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

public record WorkflowError(
String type, int status, String instance, String title, String details) {

private static final String ERROR_FORMAT = "https://serverlessworkflow.io/spec/1.0.0/errors/%s";
public static final String RUNTIME_TYPE = String.format(ERROR_FORMAT, "runtime");
public static final String COMM_TYPE = String.format(ERROR_FORMAT, "communication");

public static Builder error(String type, int status) {
return new Builder(type, status);
}

public static Builder communication(int status, TaskContext<?> context, Exception ex) {
return new Builder(COMM_TYPE, status)
.instance(context.position().jsonPointer())
.title(ex.getMessage());
}

public static Builder runtime(int status, TaskContext<?> context, Exception ex) {
return new Builder(RUNTIME_TYPE, status)
.instance(context.position().jsonPointer())
.title(ex.getMessage());
}

public static class Builder {

private final String type;
private int status;
private String instance;
private String title;
private String details;

private Builder(String type, int status) {
this.type = type;
this.status = status;
}

public Builder instance(String instance) {
this.instance = instance;
return this;
}

public Builder title(String title) {
this.title = title;
return this;
}

public Builder details(String details) {
this.details = details;
return this;
}

public WorkflowError build() {
return new WorkflowError(type, status, instance, title, details);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

public class WorkflowException extends RuntimeException {

private static final long serialVersionUID = 1L;

private final WorkflowError worflowError;

public WorkflowException(WorkflowError error) {
this(error, null);
}

public WorkflowException(WorkflowError error, Throwable cause) {
super(error.toString(), cause);
this.worflowError = error;
}

public WorkflowError getWorflowError() {
return worflowError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public class WorkflowInstance {
.inputFilter()
.ifPresent(f -> taskContext.input(f.apply(workflowContext, taskContext, input)));
state = WorkflowState.STARTED;
taskContext.rawOutput(
WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext));

WorkflowUtils.processTaskList(definition.workflow().getDo(), workflowContext, taskContext);
definition
.outputFilter()
.ifPresent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@

import java.util.function.Supplier;

@FunctionalInterface
public interface WorkflowPositionFactory extends Supplier<WorkflowPosition> {}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ public static Optional<WorkflowFilter> buildWorkflowFilter(
: Optional.empty();
}

public static StringFilter buildStringFilter(
ExpressionFactory exprFactory, String expression, String literal) {
return expression != null ? from(buildWorkflowFilter(exprFactory, expression)) : from(literal);
}

public static StringFilter buildStringFilter(ExpressionFactory exprFactory, String str) {
return ExpressionUtils.isExpr(str) ? from(buildWorkflowFilter(exprFactory, str)) : from(str);
}

public static StringFilter from(WorkflowFilter filter) {
return (w, t) -> filter.apply(w, t, t.input()).asText();
}

private static StringFilter from(String literal) {
return (w, t) -> literal;
}

private static WorkflowFilter buildWorkflowFilter(
ExpressionFactory exprFactory, String str, Object object) {
if (str != null) {
Expand Down Expand Up @@ -127,7 +144,7 @@ private static TaskItem findTaskByName(ListIterator<TaskItem> iter, String taskN
throw new IllegalArgumentException("Cannot find task with name " + taskName);
}

public static JsonNode processTaskList(
public static void processTaskList(
List<TaskItem> tasks, WorkflowContext context, TaskContext<?> parentTask) {
parentTask.position().addProperty("do");
TaskContext<? extends TaskBase> currentContext = parentTask;
Expand All @@ -136,7 +153,7 @@ public static JsonNode processTaskList(
TaskItem nextTask = iter.next();
while (nextTask != null) {
TaskItem task = nextTask;
parentTask.position().addIndex(iter.nextIndex()).addProperty(task.getName());
parentTask.position().addIndex(iter.previousIndex()).addProperty(task.getName());
context
.definition()
.listeners()
Expand Down Expand Up @@ -175,7 +192,7 @@ public static JsonNode processTaskList(
}
}
parentTask.position().back();
return currentContext.output();
parentTask.rawOutput(currentContext.output());
}

public static WorkflowFilter buildWorkflowFilter(ExpressionFactory exprFactory, String str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public TaskExecutor<? extends TaskBase> getTaskExecutor(
return new SetExecutor(task.getSetTask(), definition);
} else if (task.getForTask() != null) {
return new ForExecutor(task.getForTask(), definition);
} else if (task.getRaiseTask() != null) {
return new RaiseExecutor(task.getRaiseTask(), definition);
} else if (task.getTryTask() != null) {
return new TryExecutor(task.getTryTask(), definition);
}
throw new UnsupportedOperationException(task.get().getClass().getName() + " not supported yet");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ protected DoExecutor(DoTask task, WorkflowDefinition definition) {

@Override
protected void internalExecute(WorkflowContext workflow, TaskContext<DoTask> taskContext) {
taskContext.rawOutput(WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext));
WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected void internalExecute(WorkflowContext workflow, TaskContext<ForTask> ta
JsonNode item = iter.next();
taskContext.variables().put(task.getFor().getEach(), item);
taskContext.variables().put(task.getFor().getAt(), i++);
taskContext.rawOutput(WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext));
WorkflowUtils.processTaskList(task.getDo(), workflow, taskContext);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors;

import io.serverlessworkflow.api.types.Error;
import io.serverlessworkflow.api.types.ErrorInstance;
import io.serverlessworkflow.api.types.ErrorType;
import io.serverlessworkflow.api.types.RaiseTask;
import io.serverlessworkflow.api.types.RaiseTaskError;
import io.serverlessworkflow.impl.StringFilter;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowError;
import io.serverlessworkflow.impl.WorkflowException;
import io.serverlessworkflow.impl.WorkflowUtils;
import io.serverlessworkflow.impl.expressions.ExpressionFactory;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;

public class RaiseExecutor extends AbstractTaskExecutor<RaiseTask> {

private final BiFunction<WorkflowContext, TaskContext<RaiseTask>, WorkflowError> errorBuilder;

private final StringFilter typeFilter;
private final Optional<StringFilter> instanceFilter;
private final StringFilter titleFilter;
private final StringFilter detailFilter;

protected RaiseExecutor(RaiseTask task, WorkflowDefinition definition) {
super(task, definition);
RaiseTaskError raiseError = task.getRaise().getError();
Error error =
raiseError.getRaiseErrorDefinition() != null
? raiseError.getRaiseErrorDefinition()
: findError(definition, raiseError.getRaiseErrorReference());
this.typeFilter = getTypeFunction(definition.expressionFactory(), error.getType());
this.instanceFilter = getInstanceFunction(definition.expressionFactory(), error.getInstance());
this.titleFilter =
WorkflowUtils.buildStringFilter(definition.expressionFactory(), error.getTitle());
this.detailFilter =
WorkflowUtils.buildStringFilter(definition.expressionFactory(), error.getDetail());
this.errorBuilder = (w, t) -> buildError(error, w, t);
}

private static Error findError(WorkflowDefinition definition, String raiseErrorReference) {
Map<String, Error> errorsMap =
definition.workflow().getUse().getErrors().getAdditionalProperties();
Error error = errorsMap.get(raiseErrorReference);
if (error == null) {
throw new IllegalArgumentException("Error " + error + "is not defined in " + errorsMap);
}
return error;
}

private WorkflowError buildError(
Error error, WorkflowContext context, TaskContext<RaiseTask> taskContext) {
return WorkflowError.error(typeFilter.apply(context, taskContext), error.getStatus())
.instance(
instanceFilter
.map(f -> f.apply(context, taskContext))
.orElseGet(() -> taskContext.position().jsonPointer()))
.title(titleFilter.apply(context, taskContext))
.details(detailFilter.apply(context, taskContext))
.build();
}

private Optional<StringFilter> getInstanceFunction(
ExpressionFactory expressionFactory, ErrorInstance errorInstance) {
return errorInstance != null
? Optional.of(
WorkflowUtils.buildStringFilter(
expressionFactory,
errorInstance.getExpressionErrorInstance(),
errorInstance.getLiteralErrorInstance()))
: Optional.empty();
}

private StringFilter getTypeFunction(ExpressionFactory expressionFactory, ErrorType type) {
return WorkflowUtils.buildStringFilter(
expressionFactory,
type.getExpressionErrorType(),
type.getLiteralErrorType().get().toString());
}

@Override
protected void internalExecute(WorkflowContext workflow, TaskContext<RaiseTask> taskContext) {
throw new WorkflowException(errorBuilder.apply(workflow, taskContext));
}
}
Loading

0 comments on commit 29831cf

Please sign in to comment.