Skip to content

Commit 4626cfd

Browse files
committed
[Fix #727] Implementing retries
1 parent ca14e07 commit 4626cfd

File tree

14 files changed

+590
-20
lines changed

14 files changed

+590
-20
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/TaskContext.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class TaskContext implements TaskContextData {
3737
private WorkflowModel rawOutput;
3838
private Instant completedAt;
3939
private TransitionInfo transition;
40-
private boolean completed;
40+
private short retryAttempt;
4141

4242
public TaskContext(
4343
WorkflowModel input,
@@ -67,6 +67,7 @@ private TaskContext(
6767
this.input = input;
6868
this.output = output;
6969
this.rawOutput = rawOutput;
70+
this.retryAttempt = parentContext.map(TaskContext::retryAttempt).orElse((short) 0);
7071
this.contextVariables =
7172
parentContext.map(p -> new HashMap<>(p.contextVariables)).orElseGet(HashMap::new);
7273
}
@@ -110,7 +111,6 @@ public WorkflowModel rawOutput() {
110111

111112
public TaskContext output(WorkflowModel output) {
112113
this.output = output;
113-
this.completed = true;
114114
return this;
115115
}
116116

@@ -162,7 +162,19 @@ public TaskContext transition(TransitionInfo transition) {
162162
}
163163

164164
public boolean isCompleted() {
165-
return completed;
165+
return completedAt != null;
166+
}
167+
168+
public short retryAttempt() {
169+
return retryAttempt;
170+
}
171+
172+
public void retryAttempt(short retryAttempt) {
173+
this.retryAttempt = retryAttempt;
174+
}
175+
176+
public boolean isRetrying() {
177+
return retryAttempt > 0;
166178
}
167179

168180
@Override
@@ -175,6 +187,8 @@ public String toString() {
175187
+ taskName
176188
+ ", completedAt="
177189
+ completedAt
190+
+ ", retryAttempt="
191+
+ retryAttempt
178192
+ "]";
179193
}
180194
}

impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowUtils.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,20 @@
1515
*/
1616
package io.serverlessworkflow.impl;
1717

18+
import io.serverlessworkflow.api.types.DurationInline;
1819
import io.serverlessworkflow.api.types.ExportAs;
1920
import io.serverlessworkflow.api.types.InputFrom;
2021
import io.serverlessworkflow.api.types.OutputAs;
2122
import io.serverlessworkflow.api.types.SchemaUnion;
23+
import io.serverlessworkflow.api.types.TimeoutAfter;
2224
import io.serverlessworkflow.api.types.UriTemplate;
2325
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
2426
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
2527
import io.serverlessworkflow.impl.resources.ResourceLoader;
2628
import io.serverlessworkflow.impl.schema.SchemaValidator;
2729
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
2830
import java.net.URI;
31+
import java.time.Duration;
2932
import java.util.Map;
3033
import java.util.Optional;
3134
import org.slf4j.Logger;
@@ -153,4 +156,37 @@ public static void safeClose(AutoCloseable closeable) {
153156
}
154157
}
155158
}
159+
160+
public static boolean whenExceptTest(
161+
Optional<WorkflowPredicate> whenFilter,
162+
Optional<WorkflowPredicate> exceptFilter,
163+
WorkflowContext workflow,
164+
TaskContext taskContext,
165+
WorkflowModel model) {
166+
return whenFilter.map(w -> w.test(workflow, taskContext, model)).orElse(true)
167+
&& exceptFilter.map(w -> !w.test(workflow, taskContext, model)).orElse(true);
168+
}
169+
170+
public static WorkflowValueResolver<Duration> fromTimeoutAfter(
171+
WorkflowApplication application, TimeoutAfter timeout) {
172+
if (timeout.getDurationExpression() != null) {
173+
return (w, f, t) ->
174+
Duration.parse(
175+
application
176+
.expressionFactory()
177+
.resolveString(ExpressionDescriptor.from(timeout.getDurationExpression()))
178+
.apply(w, f, t));
179+
} else if (timeout.getDurationInline() != null) {
180+
DurationInline inlineDuration = timeout.getDurationInline();
181+
return (w, t, f) ->
182+
Duration.ofDays(inlineDuration.getDays())
183+
.plus(
184+
Duration.ofHours(inlineDuration.getHours())
185+
.plus(Duration.ofMinutes(inlineDuration.getMinutes()))
186+
.plus(Duration.ofSeconds(inlineDuration.getSeconds()))
187+
.plus(Duration.ofMillis(inlineDuration.getMilliseconds())));
188+
} else {
189+
return (w, t, f) -> Duration.ZERO;
190+
}
191+
}
156192
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/TryExecutor.java

Lines changed: 92 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
import io.serverlessworkflow.api.types.CatchErrors;
1919
import io.serverlessworkflow.api.types.ErrorFilter;
20+
import io.serverlessworkflow.api.types.Retry;
21+
import io.serverlessworkflow.api.types.RetryBackoff;
22+
import io.serverlessworkflow.api.types.RetryPolicy;
2023
import io.serverlessworkflow.api.types.TaskItem;
2124
import io.serverlessworkflow.api.types.TryTask;
2225
import io.serverlessworkflow.api.types.TryTaskCatch;
@@ -29,6 +32,12 @@
2932
import io.serverlessworkflow.impl.WorkflowMutablePosition;
3033
import io.serverlessworkflow.impl.WorkflowPredicate;
3134
import io.serverlessworkflow.impl.WorkflowUtils;
35+
import io.serverlessworkflow.impl.executors.retry.ConstantRetryIntervalFunction;
36+
import io.serverlessworkflow.impl.executors.retry.DefaultRetryExecutor;
37+
import io.serverlessworkflow.impl.executors.retry.ExponentialRetryIntervalFunction;
38+
import io.serverlessworkflow.impl.executors.retry.LinearRetryIntervalFunction;
39+
import io.serverlessworkflow.impl.executors.retry.RetryExecutor;
40+
import io.serverlessworkflow.impl.executors.retry.RetryIntervalFunction;
3241
import java.util.List;
3342
import java.util.Optional;
3443
import java.util.concurrent.CompletableFuture;
@@ -42,6 +51,7 @@ public class TryExecutor extends RegularTaskExecutor<TryTask> {
4251
private final Optional<Predicate<WorkflowError>> errorFilter;
4352
private final TaskExecutor<?> taskExecutor;
4453
private final Optional<TaskExecutor<?>> catchTaskExecutor;
54+
private final Optional<RetryExecutor> retryIntervalExecutor;
4555

4656
public static class TryExecutorBuilder extends RegularTaskExecutorBuilder<TryTask> {
4757

@@ -50,6 +60,7 @@ public static class TryExecutorBuilder extends RegularTaskExecutorBuilder<TryTas
5060
private final Optional<Predicate<WorkflowError>> errorFilter;
5161
private final TaskExecutor<?> taskExecutor;
5262
private final Optional<TaskExecutor<?>> catchTaskExecutor;
63+
private final Optional<RetryExecutor> retryIntervalExecutor;
5364

5465
protected TryExecutorBuilder(
5566
WorkflowMutablePosition position, TryTask task, WorkflowDefinition definition) {
@@ -60,13 +71,63 @@ protected TryExecutorBuilder(
6071
this.exceptFilter = WorkflowUtils.optionalPredicate(application, catchInfo.getExceptWhen());
6172
this.taskExecutor =
6273
TaskExecutorHelper.createExecutorList(position, task.getTry(), definition);
63-
List<TaskItem> catchTask = task.getCatch().getDo();
64-
this.catchTaskExecutor =
65-
catchTask != null && !catchTask.isEmpty()
66-
? Optional.of(
67-
TaskExecutorHelper.createExecutorList(
68-
position, task.getCatch().getDo(), definition))
69-
: Optional.empty();
74+
TryTaskCatch catchTask = task.getCatch();
75+
if (catchTask != null) {
76+
List<TaskItem> catchTaskDo = catchTask.getDo();
77+
78+
this.catchTaskExecutor =
79+
catchTaskDo != null && !catchTaskDo.isEmpty()
80+
? Optional.of(
81+
TaskExecutorHelper.createExecutorList(position, catchTaskDo, definition))
82+
: Optional.empty();
83+
84+
Retry retry = catchTask.getRetry();
85+
this.retryIntervalExecutor = retry != null ? buildRetryInterval(retry) : Optional.empty();
86+
} else {
87+
this.catchTaskExecutor = Optional.empty();
88+
this.retryIntervalExecutor = Optional.empty();
89+
}
90+
}
91+
92+
private Optional<RetryExecutor> buildRetryInterval(Retry retry) {
93+
RetryPolicy retryPolicy = null;
94+
if (retry.getRetryPolicyDefinition() != null) {
95+
retryPolicy = retry.getRetryPolicyDefinition();
96+
} else if (retry.getRetryPolicyReference() != null) {
97+
retryPolicy =
98+
workflow
99+
.getUse()
100+
.getRetries()
101+
.getAdditionalProperties()
102+
.get(retry.getRetryPolicyReference());
103+
if (retryPolicy == null) {
104+
throw new IllegalStateException("Retry policy " + retryPolicy + " was not found");
105+
}
106+
}
107+
return retryPolicy != null ? Optional.of(buildRetryExecutor(retryPolicy)) : Optional.empty();
108+
}
109+
110+
protected RetryExecutor buildRetryExecutor(RetryPolicy retryPolicy) {
111+
return new DefaultRetryExecutor(
112+
retryPolicy.getLimit().getAttempt().getCount(),
113+
buildIntervalFunction(retryPolicy),
114+
WorkflowUtils.optionalPredicate(application, retryPolicy.getWhen()),
115+
WorkflowUtils.optionalPredicate(application, retryPolicy.getExceptWhen()));
116+
}
117+
118+
private RetryIntervalFunction buildIntervalFunction(RetryPolicy retryPolicy) {
119+
RetryBackoff backoff = retryPolicy.getBackoff();
120+
if (backoff.getConstantBackoff() != null) {
121+
return new ConstantRetryIntervalFunction(
122+
application, retryPolicy.getDelay(), retryPolicy.getJitter());
123+
} else if (backoff.getLinearBackoff() != null) {
124+
return new LinearRetryIntervalFunction(
125+
application, retryPolicy.getDelay(), retryPolicy.getJitter());
126+
} else if (backoff.getExponentialBackOff() != null) {
127+
return new ExponentialRetryIntervalFunction(
128+
application, retryPolicy.getDelay(), retryPolicy.getJitter());
129+
}
130+
throw new IllegalStateException("A backoff strategy should be set");
70131
}
71132

72133
@Override
@@ -82,13 +143,19 @@ protected TryExecutor(TryExecutorBuilder builder) {
82143
this.exceptFilter = builder.exceptFilter;
83144
this.taskExecutor = builder.taskExecutor;
84145
this.catchTaskExecutor = builder.catchTaskExecutor;
146+
this.retryIntervalExecutor = builder.retryIntervalExecutor;
85147
}
86148

87149
@Override
88150
protected CompletableFuture<WorkflowModel> internalExecute(
89151
WorkflowContext workflow, TaskContext taskContext) {
152+
return doIt(workflow, taskContext, taskContext.input());
153+
}
154+
155+
private CompletableFuture<WorkflowModel> doIt(
156+
WorkflowContext workflow, TaskContext taskContext, WorkflowModel model) {
90157
return TaskExecutorHelper.processTaskList(
91-
taskExecutor, workflow, Optional.of(taskContext), taskContext.input())
158+
taskExecutor, workflow, Optional.of(taskContext), model)
92159
.exceptionallyCompose(e -> handleException(e, workflow, taskContext));
93160
}
94161

@@ -99,17 +166,27 @@ private CompletableFuture<WorkflowModel> handleException(
99166
}
100167
if (e instanceof WorkflowException) {
101168
WorkflowException exception = (WorkflowException) e;
169+
CompletableFuture<WorkflowModel> completable =
170+
CompletableFuture.completedFuture(taskContext.rawOutput());
102171
if (errorFilter.map(f -> f.test(exception.getWorkflowError())).orElse(true)
103-
&& whenFilter.map(w -> w.test(workflow, taskContext, taskContext.input())).orElse(true)
104-
&& exceptFilter
105-
.map(w -> !w.test(workflow, taskContext, taskContext.input()))
106-
.orElse(true)) {
172+
&& WorkflowUtils.whenExceptTest(
173+
whenFilter, exceptFilter, workflow, taskContext, taskContext.rawOutput())) {
107174
if (catchTaskExecutor.isPresent()) {
108-
return TaskExecutorHelper.processTaskList(
109-
catchTaskExecutor.get(), workflow, Optional.of(taskContext), taskContext.input());
175+
completable =
176+
completable.thenCompose(
177+
model ->
178+
TaskExecutorHelper.processTaskList(
179+
catchTaskExecutor.get(), workflow, Optional.of(taskContext), model));
180+
}
181+
if (retryIntervalExecutor.isPresent()) {
182+
completable =
183+
completable
184+
.thenCompose(
185+
model -> retryIntervalExecutor.get().retry(workflow, taskContext, model))
186+
.thenCompose(model -> doIt(workflow, taskContext, model));
110187
}
111188
}
112-
return CompletableFuture.completedFuture(taskContext.rawOutput());
189+
return completable;
113190
} else {
114191
if (e instanceof RuntimeException) {
115192
throw (RuntimeException) e;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.retry;
17+
18+
import io.serverlessworkflow.api.types.RetryPolicyJitter;
19+
import io.serverlessworkflow.api.types.TimeoutAfter;
20+
import io.serverlessworkflow.impl.TaskContext;
21+
import io.serverlessworkflow.impl.WorkflowApplication;
22+
import io.serverlessworkflow.impl.WorkflowContext;
23+
import io.serverlessworkflow.impl.WorkflowModel;
24+
import io.serverlessworkflow.impl.WorkflowUtils;
25+
import io.serverlessworkflow.impl.WorkflowValueResolver;
26+
import java.time.Duration;
27+
import java.util.Optional;
28+
29+
public abstract class AbstractRetryIntervalFunction implements RetryIntervalFunction {
30+
31+
private final Optional<WorkflowValueResolver<Duration>> minJitteringResolver;
32+
private final Optional<WorkflowValueResolver<Duration>> maxJitteringResolver;
33+
private final WorkflowValueResolver<Duration> delayResolver;
34+
35+
public AbstractRetryIntervalFunction(
36+
WorkflowApplication appl, TimeoutAfter delay, RetryPolicyJitter jitter) {
37+
if (jitter != null) {
38+
minJitteringResolver = Optional.of(WorkflowUtils.fromTimeoutAfter(appl, jitter.getFrom()));
39+
maxJitteringResolver = Optional.of(WorkflowUtils.fromTimeoutAfter(appl, jitter.getTo()));
40+
} else {
41+
minJitteringResolver = Optional.empty();
42+
maxJitteringResolver = Optional.empty();
43+
}
44+
delayResolver = WorkflowUtils.fromTimeoutAfter(appl, delay);
45+
}
46+
47+
@Override
48+
public Duration apply(
49+
WorkflowContext workflowContext,
50+
TaskContext taskContext,
51+
WorkflowModel model,
52+
short numAttempts) {
53+
Duration delay = delayResolver.apply(workflowContext, taskContext, model);
54+
Duration minJittering =
55+
minJitteringResolver
56+
.map(min -> min.apply(workflowContext, taskContext, model))
57+
.orElse(Duration.ZERO);
58+
Duration maxJittering =
59+
maxJitteringResolver
60+
.map(max -> max.apply(workflowContext, taskContext, model))
61+
.orElse(Duration.ZERO);
62+
return calcDelay(delay, numAttempts)
63+
.plus(
64+
Duration.ofMillis(
65+
(long) (minJittering.toMillis() + Math.random() * maxJittering.toMillis())));
66+
}
67+
68+
protected abstract Duration calcDelay(Duration delay, short numAttempts);
69+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.retry;
17+
18+
import io.serverlessworkflow.api.types.RetryPolicyJitter;
19+
import io.serverlessworkflow.api.types.TimeoutAfter;
20+
import io.serverlessworkflow.impl.WorkflowApplication;
21+
import java.time.Duration;
22+
23+
public class ConstantRetryIntervalFunction extends AbstractRetryIntervalFunction {
24+
25+
public ConstantRetryIntervalFunction(
26+
WorkflowApplication application, TimeoutAfter delay, RetryPolicyJitter jitter) {
27+
super(application, delay, jitter);
28+
}
29+
30+
@Override
31+
protected Duration calcDelay(Duration delay, short numAttempts) {
32+
return delay;
33+
}
34+
}

0 commit comments

Comments
 (0)