From f6672b13caa4d7878c6972346d3fbf2458400d4d Mon Sep 17 00:00:00 2001 From: adrien2p Date: Mon, 27 Nov 2023 12:54:26 +0100 Subject: [PATCH 1/5] feat(workflows-sdk): Configurable retries upon step creation --- .../workflows/utils/composer/compose.ts | 68 +++++++++++++++++++ .../src/utils/composer/create-step.ts | 22 ++++-- 2 files changed, 84 insertions(+), 6 deletions(-) diff --git a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts index b913a6f42a16f..f4e8c0aaae2fb 100644 --- a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts +++ b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts @@ -16,6 +16,40 @@ describe("Workflow composer", function () { jest.clearAllMocks() }) + it("should compose a new workflow composed retryable steps", async () => { + const maxRetries = 1 + + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + const attempt = context.metadata.attempt || 0 + if (attempt <= maxRetries) { + throw new Error("test error") + } + + return { inputs: [input], obj: "return from 1" } + }) + + const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn) + + const workflow = createWorkflow("workflow1", function (input) { + return step1(input) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflowInput) + + expect(workflowResult).toEqual({ + inputs: [{ test: "payload1" }], + obj: "return from 1", + }) + }) + it("should compose a new workflow and execute it", async () => { const mockStep1Fn = jest.fn().mockImplementation((input) => { return { inputs: [input], obj: "return from 1" } @@ -928,6 +962,40 @@ describe("Workflow composer", function () { jest.clearAllMocks() }) + it("should compose a new workflow composed retryable steps", async () => { + const maxRetries = 1 + + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + const attempt = context.metadata.attempt || 0 + if (attempt <= maxRetries) { + throw new Error("test error") + } + + return new StepResponse({ inputs: [input], obj: "return from 1" }) + }) + + const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn) + + const workflow = createWorkflow("workflow1", function (input) { + return step1(input) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflowInput) + + expect(workflowResult).toEqual({ + inputs: [{ test: "payload1" }], + obj: "return from 1", + }) + }) + it("should compose a new workflow and execute it", async () => { const mockStep1Fn = jest.fn().mockImplementation((input) => { return new StepResponse({ inputs: [input], obj: "return from 1" }) diff --git a/packages/workflows-sdk/src/utils/composer/create-step.ts b/packages/workflows-sdk/src/utils/composer/create-step.ts index a63feb8e8cce1..ec225a386925b 100644 --- a/packages/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/workflows-sdk/src/utils/composer/create-step.ts @@ -15,6 +15,8 @@ import { WorkflowData, } from "./type" import { proxify } from "./helpers/proxy" +import { TransactionStepsDefinition } from "@medusajs/orchestration" +import { isString } from "@medusajs/utils" /** * The type of invocation function passed to a step. @@ -75,6 +77,7 @@ interface ApplyStepOptions< TInvokeResultCompensateInput > { stepName: string + stepConfig?: TransactionStepsDefinition input: TStepInputs invokeFn: InvokeFn< TInvokeInput, @@ -91,6 +94,7 @@ interface ApplyStepOptions< * This is where the inputs and context are passed to the underlying invoke and compensate function. * * @param stepName + * @param stepConfig * @param input * @param invokeFn * @param compensateFn @@ -104,6 +108,7 @@ function applyStep< TInvokeResultCompensateInput >({ stepName, + stepConfig = {}, input, invokeFn, compensateFn, @@ -168,9 +173,9 @@ function applyStep< : undefined, } - this.flow.addAction(stepName, { - noCompensation: !compensateFn, - }) + stepConfig!.noCompensation = !compensateFn + + this.flow.addAction(stepName, stepConfig) this.handlers.set(stepName, handler) const ret = { @@ -236,9 +241,11 @@ export function createStep< TInvokeResultCompensateInput >( /** - * The name of the step. + * The name of the step or its configuration (currently support maxRetries). */ - name: string, + nameOrConfig: + | string + | ({ name: string } & Pick), /** * An invocation function that will be executed when the workflow is executed. The function must return an instance of {@link StepResponse}. The constructor of {@link StepResponse} * accepts the output of the step as a first argument, and optionally as a second argument the data to be passed to the compensation function as a parameter. @@ -256,7 +263,9 @@ export function createStep< */ compensateFn?: CompensateFn ): StepFunction { - const stepName = name ?? invokeFn.name + const stepName = + (isString(nameOrConfig) ? nameOrConfig : nameOrConfig.name) ?? invokeFn.name + const config = isString(nameOrConfig) ? {} : nameOrConfig const returnFn = function (input: { [K in keyof TInvokeInput]: WorkflowData @@ -281,6 +290,7 @@ export function createStep< TInvokeResultCompensateInput >({ stepName, + stepConfig: config, input, invokeFn, compensateFn, From bb415fe2c77d6a63143ee1c2a1b535839066bd41 Mon Sep 17 00:00:00 2001 From: adrien2p Date: Tue, 28 Nov 2023 10:28:36 +0100 Subject: [PATCH 2/5] add support for permanent failure response --- .../workflows/utils/composer/compose.ts | 35 ++++++++++++- .../transaction/transaction-orchestrator.ts | 49 +++++++++++++------ packages/utils/src/bundles.ts | 1 + packages/utils/src/index.ts | 1 + packages/utils/src/orchestration/index.ts | 1 + .../src/orchestration}/symbol.ts | 3 ++ .../src/helper/workflow-export.ts | 11 +++-- .../src/utils/composer/create-step.ts | 27 ++++------ .../src/utils/composer/create-workflow.ts | 20 +++----- .../src/utils/composer/helpers/index.ts | 3 +- .../src/utils/composer/helpers/proxy.ts | 6 +-- .../utils/composer/helpers/resolve-value.ts | 25 +++++----- .../utils/composer/helpers/step-response.ts | 33 ++++++++++++- .../workflows-sdk/src/utils/composer/hook.ts | 13 +++-- .../workflows-sdk/src/utils/composer/index.ts | 1 - .../src/utils/composer/parallelize.ts | 8 +-- .../src/utils/composer/transform.ts | 5 +- 17 files changed, 161 insertions(+), 81 deletions(-) create mode 100644 packages/utils/src/orchestration/index.ts rename packages/{workflows-sdk/src/utils/composer/helpers => utils/src/orchestration}/symbol.ts (84%) diff --git a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts index f4e8c0aaae2fb..69dda9b3c2f44 100644 --- a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts +++ b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts @@ -962,7 +962,7 @@ describe("Workflow composer", function () { jest.clearAllMocks() }) - it("should compose a new workflow composed retryable steps", async () => { + it("should compose a new workflow composed of retryable steps", async () => { const maxRetries = 1 const mockStep1Fn = jest.fn().mockImplementation((input, context) => { @@ -996,6 +996,39 @@ describe("Workflow composer", function () { }) }) + it("should compose a new workflow composed of retryable steps that should stop retries on permanent failure", async () => { + const maxRetries = 1 + + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return StepResponse.permanentFailure({ message: "fail permanently" }) + }) + + const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn) + + const workflow = createWorkflow("workflow1", function (input) { + return step1(input) + }) + + const workflowInput = { test: "payload1" } + const { errors } = await workflow().run({ + input: workflowInput, + throwOnError: false, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(errors).toHaveLength(1) + expect(errors[0]).toEqual({ + action: "step1", + handlerType: "invoke", + error: expect.objectContaining({ + message: "fail permanently", + }), + }) + }) + it("should compose a new workflow and execute it", async () => { const mockStep1Fn = jest.fn().mockImplementation((input) => { return new StepResponse({ inputs: [input], obj: "return from 1" }) diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index 6dbc1ef68dd19..776e82f9f1870 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -12,7 +12,7 @@ import { } from "./types" import { EventEmitter } from "events" -import { promiseAll } from "@medusajs/utils" +import { OrchestrationUtils, promiseAll } from "@medusajs/utils" export type TransactionFlow = { modelId: string @@ -367,11 +367,32 @@ export class TransactionOrchestrator extends EventEmitter { transaction.getContext() ) + const setStepFailure = async ( + error: Error | any, + { endRetry }: { endRetry?: boolean } = {} + ) => { + return TransactionOrchestrator.setStepFailure( + transaction, + step, + error, + endRetry ? 0 : step.definition.maxRetries + ) + } + if (!step.definition.async) { execution.push( transaction .handler(step.definition.action + "", type, payload, transaction) - .then(async (response) => { + .then(async (response: any) => { + // Specific case for steps that return a permanentFailure response and should not be retried + if ( + response?.output?.output?.__type === + OrchestrationUtils.SymbolWorkflowStepPermanentFailureResponse + ) { + const error = response.output.output.error + return await setStepFailure(error, { endRetry: true }) + } + await TransactionOrchestrator.setStepSuccess( transaction, step, @@ -379,12 +400,7 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { - await TransactionOrchestrator.setStepFailure( - transaction, - step, - error, - step.definition.maxRetries - ) + await setStepFailure(error) }) ) } else { @@ -392,13 +408,18 @@ export class TransactionOrchestrator extends EventEmitter { transaction.saveCheckpoint().then(async () => transaction .handler(step.definition.action + "", type, payload, transaction) + .then(async (response: any) => { + // Specific case for steps that return a permanentFailure response and should not be retried + if ( + response?.output?.__type === + OrchestrationUtils.SymbolWorkflowStepPermanentFailureResponse + ) { + const error = response.output.error + return await setStepFailure(error, { endRetry: true }) + } + }) .catch(async (error) => { - await TransactionOrchestrator.setStepFailure( - transaction, - step, - error, - step.definition.maxRetries - ) + await setStepFailure(error) }) ) ) diff --git a/packages/utils/src/bundles.ts b/packages/utils/src/bundles.ts index 231a15d1328d0..96d38b19bf5f5 100644 --- a/packages/utils/src/bundles.ts +++ b/packages/utils/src/bundles.ts @@ -6,3 +6,4 @@ export * as ModulesSdkUtils from "./modules-sdk" export * as ProductUtils from "./product" export * as SearchUtils from "./search" export * as ShippingProfileUtils from "./shipping" +export * as OrchestrationUtils from "./orchestration" diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index e0e0870d5d6ed..dd2be395ca62f 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -9,3 +9,4 @@ export * from "./pricing" export * from "./product" export * from "./search" export * from "./shipping" +export * from "./orchestration" diff --git a/packages/utils/src/orchestration/index.ts b/packages/utils/src/orchestration/index.ts new file mode 100644 index 0000000000000..e6355e4311ea6 --- /dev/null +++ b/packages/utils/src/orchestration/index.ts @@ -0,0 +1 @@ +export * from "./symbol" diff --git a/packages/workflows-sdk/src/utils/composer/helpers/symbol.ts b/packages/utils/src/orchestration/symbol.ts similarity index 84% rename from packages/workflows-sdk/src/utils/composer/helpers/symbol.ts rename to packages/utils/src/orchestration/symbol.ts index 8ec8177d7f28d..1ba557248732a 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/symbol.ts +++ b/packages/utils/src/orchestration/symbol.ts @@ -1,3 +1,6 @@ +export const SymbolWorkflowStepPermanentFailureResponse = Symbol.for( + "WorkflowStepPermanentFailureResponse" +) export const SymbolMedusaWorkflowComposerContext = Symbol.for( "MedusaWorkflowComposerContext" ) diff --git a/packages/workflows-sdk/src/helper/workflow-export.ts b/packages/workflows-sdk/src/helper/workflow-export.ts index a0614c6a4ab0c..7b940609f2cb7 100644 --- a/packages/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/workflows-sdk/src/helper/workflow-export.ts @@ -10,7 +10,7 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { MedusaModule } from "@medusajs/modules-sdk" import { EOL } from "os" import { ulid } from "ulid" -import { SymbolWorkflowWorkflowData } from "../utils/composer" +import { OrchestrationUtils } from "@medusajs/utils" export type FlowRunOptions = { input?: TData @@ -99,11 +99,16 @@ export const exportWorkflow = ( if (Array.isArray(resultFrom)) { result = resultFrom.map((from) => { const res = transaction.getContext().invoke?.[from] - return res?.__type === SymbolWorkflowWorkflowData ? res.output : res + return res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData + ? res.output + : res }) } else { const res = transaction.getContext().invoke?.[resultFrom] - result = res?.__type === SymbolWorkflowWorkflowData ? res.output : res + result = + res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData + ? res.output + : res } } diff --git a/packages/workflows-sdk/src/utils/composer/create-step.ts b/packages/workflows-sdk/src/utils/composer/create-step.ts index ec225a386925b..a6ab176faac95 100644 --- a/packages/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/workflows-sdk/src/utils/composer/create-step.ts @@ -1,12 +1,4 @@ -import { - resolveValue, - StepResponse, - SymbolMedusaWorkflowComposerContext, - SymbolWorkflowStep, - SymbolWorkflowStepBind, - SymbolWorkflowStepResponse, - SymbolWorkflowWorkflowData, -} from "./helpers" +import { resolveValue, StepResponse } from "./helpers" import { CreateWorkflowComposerContext, StepExecutionContext, @@ -16,7 +8,7 @@ import { } from "./type" import { proxify } from "./helpers/proxy" import { TransactionStepsDefinition } from "@medusajs/orchestration" -import { isString } from "@medusajs/utils" +import { isString, OrchestrationUtils } from "@medusajs/utils" /** * The type of invocation function passed to a step. @@ -140,12 +132,12 @@ function applyStep< ) const stepResponseJSON = - stepResponse?.__type === SymbolWorkflowStepResponse + stepResponse?.__type === OrchestrationUtils.SymbolWorkflowStepResponse ? stepResponse.toJSON() : stepResponse return { - __type: SymbolWorkflowWorkflowData, + __type: OrchestrationUtils.SymbolWorkflowWorkflowData, output: stepResponseJSON, } }, @@ -159,7 +151,8 @@ function applyStep< const stepOutput = transactionContext.invoke[stepName]?.output const invokeResult = - stepOutput?.__type === SymbolWorkflowStepResponse + stepOutput?.__type === + OrchestrationUtils.SymbolWorkflowStepResponse ? stepOutput.compensateInput && JSON.parse(JSON.stringify(stepOutput.compensateInput)) : stepOutput && JSON.parse(JSON.stringify(stepOutput)) @@ -179,7 +172,7 @@ function applyStep< this.handlers.set(stepName, handler) const ret = { - __type: SymbolWorkflowStep, + __type: OrchestrationUtils.SymbolWorkflowStep, __step__: stepName, } @@ -270,7 +263,7 @@ export function createStep< const returnFn = function (input: { [K in keyof TInvokeInput]: WorkflowData }): WorkflowData { - if (!global[SymbolMedusaWorkflowComposerContext]) { + if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) { throw new Error( "createStep must be used inside a createWorkflow definition" ) @@ -278,7 +271,7 @@ export function createStep< const stepBinder = ( global[ - SymbolMedusaWorkflowComposerContext + OrchestrationUtils.SymbolMedusaWorkflowComposerContext ] as CreateWorkflowComposerContext ).stepBinder @@ -298,7 +291,7 @@ export function createStep< ) } - returnFn.__type = SymbolWorkflowStepBind + returnFn.__type = OrchestrationUtils.SymbolWorkflowStepBind returnFn.__step__ = stepName return returnFn as unknown as StepFunction diff --git a/packages/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/workflows-sdk/src/utils/composer/create-workflow.ts index d44a47f629e87..546515a65efc3 100644 --- a/packages/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/workflows-sdk/src/utils/composer/create-workflow.ts @@ -4,13 +4,9 @@ import { WorkflowManager, } from "@medusajs/orchestration" import { LoadedModule, MedusaContainer } from "@medusajs/types" -import { FlowRunOptions, WorkflowResult, exportWorkflow } from "../../helper" -import { - SymbolInputReference, - SymbolMedusaWorkflowComposerContext, - SymbolWorkflowStep, - resolveValue, -} from "./helpers" +import { OrchestrationUtils } from "@medusajs/utils" +import { exportWorkflow, FlowRunOptions, WorkflowResult } from "../../helper" +import { resolveValue } from "./helpers" import { proxify } from "./helpers/proxy" import { CreateWorkflowComposerContext, @@ -18,7 +14,7 @@ import { WorkflowDataProperties, } from "./type" -global[SymbolMedusaWorkflowComposerContext] = null +global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null /** * An exported workflow, which is the type of a workflow constructed by the {@link createWorkflow} function. The exported workflow can be invoked to create @@ -189,16 +185,16 @@ export function createWorkflow< }, } - global[SymbolMedusaWorkflowComposerContext] = context + global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = context const inputPlaceHolder = proxify({ - __type: SymbolInputReference, + __type: OrchestrationUtils.SymbolInputReference, __step__: "", }) const returnedStep = composer.apply(context, [inputPlaceHolder]) - delete global[SymbolMedusaWorkflowComposerContext] + delete global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] WorkflowManager.update(name, context.flow, handlers) @@ -221,7 +217,7 @@ export function createWorkflow< > => { args ??= {} args.resultFrom ??= - returnedStep?.__type === SymbolWorkflowStep + returnedStep?.__type === OrchestrationUtils.SymbolWorkflowStep ? returnedStep.__step__ : undefined diff --git a/packages/workflows-sdk/src/utils/composer/helpers/index.ts b/packages/workflows-sdk/src/utils/composer/helpers/index.ts index 2636ee016ffcc..4f369c7a5e79d 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/index.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/index.ts @@ -1,3 +1,2 @@ export * from "./step-response" -export * from "./symbol" -export * from "./resolve-value" \ No newline at end of file +export * from "./resolve-value" diff --git a/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts b/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts index 68ca47b2f3359..91971fa9eb5c3 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts @@ -1,6 +1,6 @@ import { transform } from "../transform" import { WorkflowData, WorkflowTransactionContext } from "../type" -import { SymbolInputReference, SymbolWorkflowStepTransformer } from "./symbol" +import { OrchestrationUtils } from "@medusajs/utils" import { resolveValue } from "./resolve-value" export function proxify(obj: WorkflowData): T { @@ -13,8 +13,8 @@ export function proxify(obj: WorkflowData): T { return transform(target[prop], async function (input, context) { const { invoke } = context as WorkflowTransactionContext let output = - target.__type === SymbolInputReference || - target.__type === SymbolWorkflowStepTransformer + target.__type === OrchestrationUtils.SymbolInputReference || + target.__type === OrchestrationUtils.SymbolWorkflowStepTransformer ? target : invoke?.[obj.__step__]?.output diff --git a/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts b/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts index c5fb131771b81..8fb49aa423c1b 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts @@ -1,29 +1,26 @@ -import { promiseAll } from "@medusajs/utils" -import { - SymbolInputReference, - SymbolWorkflowHook, - SymbolWorkflowStep, - SymbolWorkflowStepResponse, - SymbolWorkflowStepTransformer, -} from "./symbol" +import { OrchestrationUtils, promiseAll } from "@medusajs/utils" async function resolveProperty(property, transactionContext) { const { invoke: invokeRes } = transactionContext - if (property?.__type === SymbolInputReference) { + if (property?.__type === OrchestrationUtils.SymbolInputReference) { return transactionContext.payload - } else if (property?.__type === SymbolWorkflowStepTransformer) { + } else if ( + property?.__type === OrchestrationUtils.SymbolWorkflowStepTransformer + ) { return await property.__resolver(transactionContext) - } else if (property?.__type === SymbolWorkflowHook) { + } else if (property?.__type === OrchestrationUtils.SymbolWorkflowHook) { return await property.__value(transactionContext) - } else if (property?.__type === SymbolWorkflowStep) { + } else if (property?.__type === OrchestrationUtils.SymbolWorkflowStep) { const output = invokeRes[property.__step__]?.output - if (output?.__type === SymbolWorkflowStepResponse) { + if (output?.__type === OrchestrationUtils.SymbolWorkflowStepResponse) { return output.output } return output - } else if (property?.__type === SymbolWorkflowStepResponse) { + } else if ( + property?.__type === OrchestrationUtils.SymbolWorkflowStepResponse + ) { return property.output } else { return property diff --git a/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts b/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts index a0ccaff81e337..ed70bb4922e20 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts @@ -1,4 +1,4 @@ -import { SymbolWorkflowStepResponse } from "./symbol" +import { OrchestrationUtils } from "@medusajs/utils" /** * This class is used to create the response returned by a step. A step return its data by returning an instance of `StepResponse`. @@ -9,7 +9,7 @@ import { SymbolWorkflowStepResponse } from "./symbol" * as that of `TOutput`. */ export class StepResponse { - readonly #__type = SymbolWorkflowStepResponse + readonly #__type = OrchestrationUtils.SymbolWorkflowStepResponse readonly #output: TOutput readonly #compensateInput?: TCompensateInput @@ -35,6 +35,35 @@ export class StepResponse { this.#compensateInput = (compensateInput ?? output) as TCompensateInput } + /** + * Creates a StepResponse that indicates that the step has failed and the retry mechanism should not kick in anymore. + * + * @param message - An optional message to be logged. Default to `Permanent failure`. + * @param endRetry - Whether the retry mechanism should be stopped or not. Defaults to `true`. + * @param compensateInput - The input to be passed as a parameter to the step's compensation function. If not provided, the `output` will be provided instead. + */ + static permanentFailure({ + message = "Permanent failure", + compensateInput, + }: { + message: string + compensateInput?: TCompensateInput + }) { + return new StepResponse< + { + error: Error + __type: Symbol + }, + TCompensateInput + >( + { + __type: OrchestrationUtils.SymbolWorkflowStepPermanentFailureResponse, + error: new Error(message), + }, + compensateInput as TCompensateInput + ) + } + /** * @internal */ diff --git a/packages/workflows-sdk/src/utils/composer/hook.ts b/packages/workflows-sdk/src/utils/composer/hook.ts index 8902d2011a6cb..1dd8324455d57 100644 --- a/packages/workflows-sdk/src/utils/composer/hook.ts +++ b/packages/workflows-sdk/src/utils/composer/hook.ts @@ -1,8 +1,5 @@ -import { - resolveValue, - SymbolMedusaWorkflowComposerContext, - SymbolWorkflowHook, -} from "./helpers" +import { OrchestrationUtils } from "@medusajs/utils" +import { resolveValue } from "./helpers" import { CreateWorkflowComposerContext, StepExecutionContext, @@ -104,7 +101,9 @@ export function hook( value: any ): WorkflowData { const hookBinder = ( - global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext + global[ + OrchestrationUtils.SymbolMedusaWorkflowComposerContext + ] as CreateWorkflowComposerContext ).hookBinder return hookBinder(name, function (context) { @@ -130,7 +129,7 @@ export function hook( } return finalResult }, - __type: SymbolWorkflowHook, + __type: OrchestrationUtils.SymbolWorkflowHook, } }) } diff --git a/packages/workflows-sdk/src/utils/composer/index.ts b/packages/workflows-sdk/src/utils/composer/index.ts index 485d8451cc6f4..d654c7e7f08a8 100644 --- a/packages/workflows-sdk/src/utils/composer/index.ts +++ b/packages/workflows-sdk/src/utils/composer/index.ts @@ -3,7 +3,6 @@ export * from "./create-workflow" export * from "./hook" export * from "./parallelize" export * from "./helpers/resolve-value" -export * from "./helpers/symbol" export * from "./helpers/step-response" export * from "./transform" export * from "./type" diff --git a/packages/workflows-sdk/src/utils/composer/parallelize.ts b/packages/workflows-sdk/src/utils/composer/parallelize.ts index dc864e0f1b170..178e1a0b56866 100644 --- a/packages/workflows-sdk/src/utils/composer/parallelize.ts +++ b/packages/workflows-sdk/src/utils/composer/parallelize.ts @@ -1,5 +1,5 @@ import { CreateWorkflowComposerContext, WorkflowData } from "./type" -import { SymbolMedusaWorkflowComposerContext } from "./helpers" +import { OrchestrationUtils } from "@medusajs/utils" /** * This function is used to run multiple steps in parallel. The result of each step will be returned as part of the result array. @@ -44,14 +44,16 @@ import { SymbolMedusaWorkflowComposerContext } from "./helpers" export function parallelize( ...steps: TResult ): TResult { - if (!global[SymbolMedusaWorkflowComposerContext]) { + if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) { throw new Error( "parallelize must be used inside a createWorkflow definition" ) } const parallelizeBinder = ( - global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext + global[ + OrchestrationUtils.SymbolMedusaWorkflowComposerContext + ] as CreateWorkflowComposerContext ).parallelizeBinder const resultSteps = steps.map((step) => step) diff --git a/packages/workflows-sdk/src/utils/composer/transform.ts b/packages/workflows-sdk/src/utils/composer/transform.ts index 98f5d076cb734..0b7020e63091c 100644 --- a/packages/workflows-sdk/src/utils/composer/transform.ts +++ b/packages/workflows-sdk/src/utils/composer/transform.ts @@ -1,6 +1,7 @@ -import { resolveValue, SymbolWorkflowStepTransformer } from "./helpers" +import { resolveValue } from "./helpers" import { StepExecutionContext, WorkflowData } from "./type" import { proxify } from "./helpers/proxy" +import { OrchestrationUtils } from "@medusajs/utils" type Func1 = ( input: T extends WorkflowData @@ -163,7 +164,7 @@ export function transform( ...functions: Function[] ): unknown { const ret = { - __type: SymbolWorkflowStepTransformer, + __type: OrchestrationUtils.SymbolWorkflowStepTransformer, __resolver: undefined, } From e0b1abd005bef500454581ee15cab7e4d00b26c5 Mon Sep 17 00:00:00 2001 From: adrien2p Date: Thu, 30 Nov 2023 14:47:24 +0100 Subject: [PATCH 3/5] make StepResponse.permanentFailure throw --- .../workflows/utils/composer/compose.ts | 2 +- .../orchestration/src/transaction/errors.ts | 15 ++++++++++ .../orchestration/src/transaction/index.ts | 1 + .../transaction/transaction-orchestrator.ts | 30 ++++++++----------- packages/utils/src/orchestration/symbol.ts | 3 -- .../utils/composer/helpers/step-response.ts | 25 ++-------------- 6 files changed, 32 insertions(+), 44 deletions(-) create mode 100644 packages/orchestration/src/transaction/errors.ts diff --git a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts index 69dda9b3c2f44..27fb772003df6 100644 --- a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts +++ b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts @@ -1000,7 +1000,7 @@ describe("Workflow composer", function () { const maxRetries = 1 const mockStep1Fn = jest.fn().mockImplementation((input, context) => { - return StepResponse.permanentFailure({ message: "fail permanently" }) + return StepResponse.permanentFailure("fail permanently") }) const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn) diff --git a/packages/orchestration/src/transaction/errors.ts b/packages/orchestration/src/transaction/errors.ts new file mode 100644 index 0000000000000..95a093c5b3a95 --- /dev/null +++ b/packages/orchestration/src/transaction/errors.ts @@ -0,0 +1,15 @@ +export class PermanentStepFailureError extends Error { + static isPermanentStepFailureError( + error: Error + ): error is PermanentStepFailureError { + return ( + error instanceof PermanentStepFailureError || + error.name === "PermanentStepFailure" + ) + } + + constructor(message?: string) { + super(message) + this.name = "PermanentStepFailure" + } +} diff --git a/packages/orchestration/src/transaction/index.ts b/packages/orchestration/src/transaction/index.ts index 4380d8bfa6ee3..6633e3b3c3acf 100644 --- a/packages/orchestration/src/transaction/index.ts +++ b/packages/orchestration/src/transaction/index.ts @@ -3,3 +3,4 @@ export * from "./transaction-orchestrator" export * from "./transaction-step" export * from "./distributed-transaction" export * from "./orchestrator-builder" +export * from "./errors" diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index 776e82f9f1870..fd6b74cd23afe 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -12,7 +12,8 @@ import { } from "./types" import { EventEmitter } from "events" -import { OrchestrationUtils, promiseAll } from "@medusajs/utils" +import { promiseAll } from "@medusajs/utils" +import { PermanentStepFailureError } from "./errors" export type TransactionFlow = { modelId: string @@ -384,15 +385,6 @@ export class TransactionOrchestrator extends EventEmitter { transaction .handler(step.definition.action + "", type, payload, transaction) .then(async (response: any) => { - // Specific case for steps that return a permanentFailure response and should not be retried - if ( - response?.output?.output?.__type === - OrchestrationUtils.SymbolWorkflowStepPermanentFailureResponse - ) { - const error = response.output.output.error - return await setStepFailure(error, { endRetry: true }) - } - await TransactionOrchestrator.setStepSuccess( transaction, step, @@ -400,6 +392,12 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { + if ( + PermanentStepFailureError.isPermanentStepFailureError(error) + ) { + await setStepFailure(error, { endRetry: true }) + return + } await setStepFailure(error) }) ) @@ -408,17 +406,13 @@ export class TransactionOrchestrator extends EventEmitter { transaction.saveCheckpoint().then(async () => transaction .handler(step.definition.action + "", type, payload, transaction) - .then(async (response: any) => { - // Specific case for steps that return a permanentFailure response and should not be retried + .catch(async (error) => { if ( - response?.output?.__type === - OrchestrationUtils.SymbolWorkflowStepPermanentFailureResponse + PermanentStepFailureError.isPermanentStepFailureError(error) ) { - const error = response.output.error - return await setStepFailure(error, { endRetry: true }) + await setStepFailure(error, { endRetry: true }) + return } - }) - .catch(async (error) => { await setStepFailure(error) }) ) diff --git a/packages/utils/src/orchestration/symbol.ts b/packages/utils/src/orchestration/symbol.ts index 1ba557248732a..8ec8177d7f28d 100644 --- a/packages/utils/src/orchestration/symbol.ts +++ b/packages/utils/src/orchestration/symbol.ts @@ -1,6 +1,3 @@ -export const SymbolWorkflowStepPermanentFailureResponse = Symbol.for( - "WorkflowStepPermanentFailureResponse" -) export const SymbolMedusaWorkflowComposerContext = Symbol.for( "MedusaWorkflowComposerContext" ) diff --git a/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts b/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts index ed70bb4922e20..11ce065e66f2f 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts @@ -1,4 +1,5 @@ import { OrchestrationUtils } from "@medusajs/utils" +import { PermanentStepFailureError } from "@medusajs/orchestration" /** * This class is used to create the response returned by a step. A step return its data by returning an instance of `StepResponse`. @@ -39,29 +40,9 @@ export class StepResponse { * Creates a StepResponse that indicates that the step has failed and the retry mechanism should not kick in anymore. * * @param message - An optional message to be logged. Default to `Permanent failure`. - * @param endRetry - Whether the retry mechanism should be stopped or not. Defaults to `true`. - * @param compensateInput - The input to be passed as a parameter to the step's compensation function. If not provided, the `output` will be provided instead. */ - static permanentFailure({ - message = "Permanent failure", - compensateInput, - }: { - message: string - compensateInput?: TCompensateInput - }) { - return new StepResponse< - { - error: Error - __type: Symbol - }, - TCompensateInput - >( - { - __type: OrchestrationUtils.SymbolWorkflowStepPermanentFailureResponse, - error: new Error(message), - }, - compensateInput as TCompensateInput - ) + static permanentFailure(message = "Permanent failure"): never { + throw new PermanentStepFailureError(message) } /** From d9dad4318c4f585a5a761f9ede07595c585e96ad Mon Sep 17 00:00:00 2001 From: adrien2p Date: Thu, 30 Nov 2023 16:34:41 +0100 Subject: [PATCH 4/5] Attemp to add an config API on the step inside createWorkflow --- .../src/utils/composer/create-step.ts | 7 +++++++ .../workflows-sdk/src/utils/composer/type.ts | 19 +++++++++++-------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/packages/workflows-sdk/src/utils/composer/create-step.ts b/packages/workflows-sdk/src/utils/composer/create-step.ts index a6ab176faac95..78e47f6933aaa 100644 --- a/packages/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/workflows-sdk/src/utils/composer/create-step.ts @@ -174,6 +174,13 @@ function applyStep< const ret = { __type: OrchestrationUtils.SymbolWorkflowStep, __step__: stepName, + config: (config: Pick) => { + this.flow.replaceAction(stepName, stepName, { + ...stepConfig, + ...config, + }) + return proxify(ret) + }, } return proxify(ret) diff --git a/packages/workflows-sdk/src/utils/composer/type.ts b/packages/workflows-sdk/src/utils/composer/type.ts index f6f019d9e8151..5a4c8e2698c77 100644 --- a/packages/workflows-sdk/src/utils/composer/type.ts +++ b/packages/workflows-sdk/src/utils/composer/type.ts @@ -2,18 +2,15 @@ import { OrchestratorBuilder, TransactionContext as OriginalWorkflowTransactionContext, TransactionPayload, + TransactionStepsDefinition, WorkflowHandler, } from "@medusajs/orchestration" import { Context, MedusaContainer } from "@medusajs/types" export type StepFunctionResult = - (this: CreateWorkflowComposerContext) => TOutput extends [] - ? [ - ...WorkflowData<{ - [K in keyof TOutput]: TOutput[number][K] - }>[] - ] - : WorkflowData<{ [K in keyof TOutput]: TOutput[K] }> + ( + this: CreateWorkflowComposerContext + ) => WorkflowData<{ [K in keyof TOutput]: TOutput[K] }> /** * A step function to be used in a workflow. @@ -24,7 +21,13 @@ export type StepFunctionResult = export type StepFunction = { (input: { [K in keyof TInput]: WorkflowData }): WorkflowData<{ [K in keyof TOutput]: TOutput[K] - }> + }> & { + config( + config: Pick + ): WorkflowData<{ + [K in keyof TOutput]: TOutput[K] + }> + } } & WorkflowDataProperties<{ [K in keyof TOutput]: TOutput[K] }> From 091edf3ec9af55903ef34e03a41c4648905c0e17 Mon Sep 17 00:00:00 2001 From: Adrien de Peretti Date: Tue, 19 Dec 2023 10:04:34 +0100 Subject: [PATCH 5/5] Create eleven-jokes-tan.md --- .changeset/eleven-jokes-tan.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/eleven-jokes-tan.md diff --git a/.changeset/eleven-jokes-tan.md b/.changeset/eleven-jokes-tan.md new file mode 100644 index 0000000000000..cfeac59b3478f --- /dev/null +++ b/.changeset/eleven-jokes-tan.md @@ -0,0 +1,7 @@ +--- +"@medusajs/orchestration": patch +"@medusajs/utils": patch +"@medusajs/workflows-sdk": patch +--- + +feat(workflows-sdk): Configurable retries upon step creation