Skip to content

Commit ebcf898

Browse files
Add run retry feature (#448)
* Add run retry feature * Expose the other fields of the retry policy * Make sure we don't use the retry policy when no option is set, retaining the same behavior as before.
1 parent 677d892 commit ebcf898

File tree

4 files changed

+120
-28
lines changed

4 files changed

+120
-28
lines changed
Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1 @@
1-
exclusions:
2-
"alwaysSuspending":
3-
- "dev.restate.sdktesting.tests.RunRetry"
4-
"default":
5-
- "dev.restate.sdktesting.tests.RunRetry"
6-
"singleThreadSinglePartition":
7-
- "dev.restate.sdktesting.tests.RunRetry"
1+
exclusions: {}

packages/restate-e2e-services/src/failing.ts

Lines changed: 55 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99

1010
import * as restate from "@restatedev/restate-sdk";
1111
import { REGISTRY } from "./services.js";
12+
import { TerminalError } from "@restatedev/restate-sdk";
1213

1314
let eventualSuccessCalls = 0;
1415
let eventualSuccessSideEffectCalls = 0;
16+
let eventualFailureSideEffectCalls = 0;
1517

1618
const service = restate.object({
1719
name: "Failing",
@@ -46,24 +48,6 @@ const service = restate.object({
4648
}
4749
},
4850

49-
failingSideEffectWithEventualSuccess: async (
50-
context: restate.ObjectContext
51-
) => {
52-
const successAttempt = await context.run(() => {
53-
eventualSuccessSideEffectCalls += 1;
54-
const currentAttempt = eventualSuccessSideEffectCalls;
55-
56-
if (currentAttempt >= 4) {
57-
eventualSuccessSideEffectCalls = 0;
58-
return currentAttempt;
59-
} else {
60-
throw new Error("Failed at attempt: " + currentAttempt);
61-
}
62-
});
63-
64-
return successAttempt;
65-
},
66-
6751
terminallyFailingSideEffect: async (
6852
ctx: restate.ObjectContext,
6953
errorMessage: string
@@ -74,6 +58,59 @@ const service = restate.object({
7458

7559
throw new Error("Should not be reached.");
7660
},
61+
62+
sideEffectSucceedsAfterGivenAttempts: async (
63+
context: restate.ObjectContext,
64+
minimumAttempts: number
65+
) => {
66+
return await context.run(
67+
"failing-side-effect",
68+
() => {
69+
eventualSuccessSideEffectCalls += 1;
70+
const currentAttempt = eventualSuccessSideEffectCalls;
71+
72+
if (currentAttempt >= minimumAttempts) {
73+
eventualSuccessSideEffectCalls = 0;
74+
return currentAttempt;
75+
} else {
76+
throw new Error("Failed at attempt: " + currentAttempt);
77+
}
78+
},
79+
{ retryIntervalFactor: 1, initialRetryIntervalMillis: 10 }
80+
);
81+
},
82+
83+
sideEffectFailsAfterGivenAttempts: async (
84+
context: restate.ObjectContext,
85+
retryPolicyMaxRetryCount: number
86+
) => {
87+
try {
88+
await context.run(
89+
"failing-side-effect",
90+
() => {
91+
eventualFailureSideEffectCalls += 1;
92+
throw new Error(
93+
"Failed at attempt: " + eventualFailureSideEffectCalls
94+
);
95+
},
96+
{
97+
maxRetryAttempts: retryPolicyMaxRetryCount,
98+
retryIntervalFactor: 1,
99+
initialRetryIntervalMillis: 10,
100+
}
101+
);
102+
} catch (e) {
103+
if (e instanceof TerminalError) {
104+
context.console.log(
105+
`run failed as expected with ${JSON.stringify(e)}`
106+
);
107+
return eventualFailureSideEffectCalls;
108+
}
109+
// This is not a TerminalError!
110+
throw e;
111+
}
112+
throw new TerminalError("Side effect was supposed to fail!");
113+
},
77114
},
78115
});
79116

packages/restate-sdk/src/context.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,48 @@ export type RunAction<T> = (() => Promise<T>) | (() => T);
179179

180180
export type RunOptions<T> = {
181181
serde?: Serde<T>;
182+
183+
/**
184+
* Max number of retry attempts, before giving up.
185+
*
186+
* When giving up, `ctx.run` will throw a `TerminalError` wrapping the original error message.
187+
*/
188+
maxRetryAttempts?: number;
189+
190+
/**
191+
* Max duration of retries, before giving up.
192+
*
193+
* When giving up, `ctx.run` will throw a `TerminalError` wrapping the original error message.
194+
*/
195+
maxRetryDurationMillis?: number;
196+
197+
/**
198+
* Initial interval for the first retry attempt.
199+
* Retry interval will grow by a factor specified in `retryIntervalFactor`.
200+
*
201+
* The default is 50 milliseconds.
202+
*/
203+
initialRetryIntervalMillis?: number;
204+
205+
/**
206+
* Max interval between retries.
207+
* Retry interval will grow by a factor specified in `retryIntervalFactor`.
208+
*
209+
* The default is 10 seconds.
210+
*/
211+
maxRetryIntervalMillis?: number;
212+
213+
/**
214+
* Exponentiation factor to use when computing the next retry delay.
215+
*
216+
* The default value is `2`, meaning retry interval will double at each attempt.
217+
*/
218+
retryIntervalFactor?: number;
182219
};
183220

184221
/**
185222
* Call a handler directly avoiding restate's type safety checks.
186-
* This is a generic machnisim to invoke handlers directly by only knowing
223+
* This is a generic mechanism to invoke handlers directly by only knowing
187224
* the service and handler name, (or key in the case of objects or workflows)
188225
*/
189226
export type GenericCall<REQ, RES> = {
@@ -275,6 +312,7 @@ export interface Context extends RestateContext {
275312
* throw new TerminalError("Payment failed");
276313
* } else if (result.paymentGatewayBusy) {
277314
* // restate will retry automatically
315+
* // to bound retries, use RunOptions
278316
* throw new Exception("Payment gateway busy");
279317
* } else {
280318
* // success!

packages/restate-sdk/src/context_impl.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,13 +347,15 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
347347

348348
// We wrap the rest of the execution in this closure to create a future
349349
const doRun = async () => {
350+
const startTime = Date.now();
350351
let res: T;
351352
let err;
352353
try {
353354
res = await action();
354355
} catch (e) {
355356
err = ensureError(e);
356357
}
358+
const attemptDuration = Date.now() - startTime;
357359

358360
// Record the result/failure, get back the handle for the ack.
359361
let handle;
@@ -366,8 +368,29 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
366368
message: err.message,
367369
});
368370
} else {
369-
// TODO plug retries here!
370-
throw err;
371+
if (
372+
options?.retryIntervalFactor === undefined &&
373+
options?.initialRetryIntervalMillis === undefined &&
374+
options?.maxRetryAttempts === undefined &&
375+
options?.maxRetryDurationMillis === undefined &&
376+
options?.maxRetryIntervalMillis === undefined
377+
) {
378+
// If no retry option was set, simply throw the error.
379+
// This will lead to the invoker applying its retry, without the SDK overriding it.
380+
throw err;
381+
}
382+
handle = this.coreVm.sys_run_exit_failure_transient(
383+
err.message,
384+
err.cause?.toString(),
385+
BigInt(attemptDuration),
386+
{
387+
factor: options?.retryIntervalFactor || 2.0,
388+
initial_interval: options?.initialRetryIntervalMillis || 50,
389+
max_attempts: options?.maxRetryAttempts,
390+
max_duration: options?.maxRetryDurationMillis,
391+
max_interval: options?.maxRetryIntervalMillis || 10 * 1000,
392+
}
393+
);
371394
}
372395
} else {
373396
// eslint-disable-next-line @typescript-eslint/ban-ts-comment

0 commit comments

Comments
 (0)