Skip to content

Commit

Permalink
feat(workflows-sdk): Configurable retries upon step creation (#5728)
Browse files Browse the repository at this point in the history
**What**
- Allow to create step that can be configured to have a max retry
- Step end retry mechanism on permanent failure

Also added an API to override a step configuration from within the createWorkflow
```ts
const step = createStep({ name: "step", maxRetries: 3 }, async (_, context) => {
  return new StepResponse({ output: "output" })
})

const workflow = createWorkflow("workflow", function () {
  const res = step().config({ maxRetries: 5 }) // This will override the original maxRetries of 3
})
```

**NOTE**
We can maybe find another name than config on the step workflow data to override the step config.
  • Loading branch information
adrien2p authored Dec 19, 2023
1 parent 1a2f513 commit 9cc787c
Show file tree
Hide file tree
Showing 21 changed files with 255 additions and 92 deletions.
7 changes: 7 additions & 0 deletions .changeset/eleven-jokes-tan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@medusajs/orchestration": patch
"@medusajs/utils": patch
"@medusajs/workflows-sdk": patch
---

feat(workflows-sdk): Configurable retries upon step creation
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,40 @@ describe("Workflow composer", function () {
jest.clearAllMocks()
})

it("should compose a new workflow composed retryable steps", async () => {
const maxRetries = 1

const mockStep1Fn = jest.fn().mockImplementation((input, context) => {
const attempt = context.metadata.attempt || 0
if (attempt <= maxRetries) {
throw new Error("test error")
}

return { inputs: [input], obj: "return from 1" }
})

const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn)

const workflow = createWorkflow("workflow1", function (input) {
return step1(input)
})

const workflowInput = { test: "payload1" }
const { result: workflowResult } = await workflow().run({
input: workflowInput,
})

expect(mockStep1Fn).toHaveBeenCalledTimes(2)
expect(mockStep1Fn.mock.calls[0]).toHaveLength(2)
expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput)
expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflowInput)

expect(workflowResult).toEqual({
inputs: [{ test: "payload1" }],
obj: "return from 1",
})
})

it("should compose a new workflow and execute it", async () => {
const mockStep1Fn = jest.fn().mockImplementation((input) => {
return { inputs: [input], obj: "return from 1" }
Expand Down Expand Up @@ -928,6 +962,73 @@ describe("Workflow composer", function () {
jest.clearAllMocks()
})

it("should compose a new workflow composed of retryable steps", async () => {
const maxRetries = 1

const mockStep1Fn = jest.fn().mockImplementation((input, context) => {
const attempt = context.metadata.attempt || 0
if (attempt <= maxRetries) {
throw new Error("test error")
}

return new StepResponse({ inputs: [input], obj: "return from 1" })
})

const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn)

const workflow = createWorkflow("workflow1", function (input) {
return step1(input)
})

const workflowInput = { test: "payload1" }
const { result: workflowResult } = await workflow().run({
input: workflowInput,
})

expect(mockStep1Fn).toHaveBeenCalledTimes(2)
expect(mockStep1Fn.mock.calls[0]).toHaveLength(2)
expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput)
expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflowInput)

expect(workflowResult).toEqual({
inputs: [{ test: "payload1" }],
obj: "return from 1",
})
})

it("should compose a new workflow composed of retryable steps that should stop retries on permanent failure", async () => {
const maxRetries = 1

const mockStep1Fn = jest.fn().mockImplementation((input, context) => {
return StepResponse.permanentFailure("fail permanently")
})

const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn)

const workflow = createWorkflow("workflow1", function (input) {
return step1(input)
})

const workflowInput = { test: "payload1" }
const { errors } = await workflow().run({
input: workflowInput,
throwOnError: false,
})

expect(mockStep1Fn).toHaveBeenCalledTimes(1)
expect(mockStep1Fn.mock.calls[0]).toHaveLength(2)
expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput)

expect(errors).toHaveLength(1)
expect(errors[0]).toEqual({
action: "step1",
handlerType: "invoke",
error: expect.objectContaining({
message: "fail permanently",
}),
})
})

it("should compose a new workflow and execute it", async () => {
const mockStep1Fn = jest.fn().mockImplementation((input) => {
return new StepResponse({ inputs: [input], obj: "return from 1" })
Expand Down
15 changes: 15 additions & 0 deletions packages/orchestration/src/transaction/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export class PermanentStepFailureError extends Error {
static isPermanentStepFailureError(
error: Error
): error is PermanentStepFailureError {
return (
error instanceof PermanentStepFailureError ||
error.name === "PermanentStepFailure"
)
}

constructor(message?: string) {
super(message)
this.name = "PermanentStepFailure"
}
}
1 change: 1 addition & 0 deletions packages/orchestration/src/transaction/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from "./transaction-orchestrator"
export * from "./transaction-step"
export * from "./distributed-transaction"
export * from "./orchestrator-builder"
export * from "./errors"
41 changes: 28 additions & 13 deletions packages/orchestration/src/transaction/transaction-orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {

import { EventEmitter } from "events"
import { promiseAll } from "@medusajs/utils"
import { PermanentStepFailureError } from "./errors"

export type TransactionFlow = {
modelId: string
Expand Down Expand Up @@ -367,24 +368,37 @@ export class TransactionOrchestrator extends EventEmitter {
transaction.getContext()
)

const setStepFailure = async (
error: Error | any,
{ endRetry }: { endRetry?: boolean } = {}
) => {
return TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
endRetry ? 0 : step.definition.maxRetries
)
}

if (!step.definition.async) {
execution.push(
transaction
.handler(step.definition.action + "", type, payload, transaction)
.then(async (response) => {
.then(async (response: any) => {
await TransactionOrchestrator.setStepSuccess(
transaction,
step,
response
)
})
.catch(async (error) => {
await TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
step.definition.maxRetries
)
if (
PermanentStepFailureError.isPermanentStepFailureError(error)
) {
await setStepFailure(error, { endRetry: true })
return
}
await setStepFailure(error)
})
)
} else {
Expand All @@ -393,12 +407,13 @@ export class TransactionOrchestrator extends EventEmitter {
transaction
.handler(step.definition.action + "", type, payload, transaction)
.catch(async (error) => {
await TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
step.definition.maxRetries
)
if (
PermanentStepFailureError.isPermanentStepFailureError(error)
) {
await setStepFailure(error, { endRetry: true })
return
}
await setStepFailure(error)
})
)
)
Expand Down
1 change: 1 addition & 0 deletions packages/utils/src/bundles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export * as ModulesSdkUtils from "./modules-sdk"
export * as ProductUtils from "./product"
export * as SearchUtils from "./search"
export * as ShippingProfileUtils from "./shipping"
export * as OrchestrationUtils from "./orchestration"
1 change: 1 addition & 0 deletions packages/utils/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from "./pricing"
export * from "./product"
export * from "./search"
export * from "./shipping"
export * from "./orchestration"
1 change: 1 addition & 0 deletions packages/utils/src/orchestration/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./symbol"
File renamed without changes.
11 changes: 8 additions & 3 deletions packages/workflows-sdk/src/helper/workflow-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types"
import { MedusaModule } from "@medusajs/modules-sdk"
import { EOL } from "os"
import { ulid } from "ulid"
import { SymbolWorkflowWorkflowData } from "../utils/composer"
import { OrchestrationUtils } from "@medusajs/utils"

export type FlowRunOptions<TData = unknown> = {
input?: TData
Expand Down Expand Up @@ -99,11 +99,16 @@ export const exportWorkflow = <TData = unknown, TResult = unknown>(
if (Array.isArray(resultFrom)) {
result = resultFrom.map((from) => {
const res = transaction.getContext().invoke?.[from]
return res?.__type === SymbolWorkflowWorkflowData ? res.output : res
return res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData
? res.output
: res
})
} else {
const res = transaction.getContext().invoke?.[resultFrom]
result = res?.__type === SymbolWorkflowWorkflowData ? res.output : res
result =
res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData
? res.output
: res
}
}

Expand Down
Loading

0 comments on commit 9cc787c

Please sign in to comment.