diff --git a/.changeset/eleven-jokes-tan.md b/.changeset/eleven-jokes-tan.md new file mode 100644 index 0000000000000..cfeac59b3478f --- /dev/null +++ b/.changeset/eleven-jokes-tan.md @@ -0,0 +1,7 @@ +--- +"@medusajs/orchestration": patch +"@medusajs/utils": patch +"@medusajs/workflows-sdk": patch +--- + +feat(workflows-sdk): Configurable retries upon step creation diff --git a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts index b913a6f42a16f..27fb772003df6 100644 --- a/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts +++ b/integration-tests/plugins/__tests__/workflows/utils/composer/compose.ts @@ -16,6 +16,40 @@ describe("Workflow composer", function () { jest.clearAllMocks() }) + it("should compose a new workflow composed retryable steps", async () => { + const maxRetries = 1 + + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + const attempt = context.metadata.attempt || 0 + if (attempt <= maxRetries) { + throw new Error("test error") + } + + return { inputs: [input], obj: "return from 1" } + }) + + const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn) + + const workflow = createWorkflow("workflow1", function (input) { + return step1(input) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflowInput) + + expect(workflowResult).toEqual({ + inputs: [{ test: "payload1" }], + obj: "return from 1", + }) + }) + it("should compose a new workflow and execute it", async () => { const mockStep1Fn = jest.fn().mockImplementation((input) => { return { inputs: [input], obj: "return from 1" } @@ -928,6 +962,73 @@ describe("Workflow composer", function () { jest.clearAllMocks() }) + it("should compose a new workflow composed of retryable steps", async () => { + const maxRetries = 1 + + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + const attempt = context.metadata.attempt || 0 + if (attempt <= maxRetries) { + throw new Error("test error") + } + + return new StepResponse({ inputs: [input], obj: "return from 1" }) + }) + + const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn) + + const workflow = createWorkflow("workflow1", function (input) { + return step1(input) + }) + + const workflowInput = { test: "payload1" } + const { result: workflowResult } = await workflow().run({ + input: workflowInput, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(2) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + expect(mockStep1Fn.mock.calls[1][0]).toEqual(workflowInput) + + expect(workflowResult).toEqual({ + inputs: [{ test: "payload1" }], + obj: "return from 1", + }) + }) + + it("should compose a new workflow composed of retryable steps that should stop retries on permanent failure", async () => { + const maxRetries = 1 + + const mockStep1Fn = jest.fn().mockImplementation((input, context) => { + return StepResponse.permanentFailure("fail permanently") + }) + + const step1 = createStep({ name: "step1", maxRetries }, mockStep1Fn) + + const workflow = createWorkflow("workflow1", function (input) { + return step1(input) + }) + + const workflowInput = { test: "payload1" } + const { errors } = await workflow().run({ + input: workflowInput, + throwOnError: false, + }) + + expect(mockStep1Fn).toHaveBeenCalledTimes(1) + expect(mockStep1Fn.mock.calls[0]).toHaveLength(2) + expect(mockStep1Fn.mock.calls[0][0]).toEqual(workflowInput) + + expect(errors).toHaveLength(1) + expect(errors[0]).toEqual({ + action: "step1", + handlerType: "invoke", + error: expect.objectContaining({ + message: "fail permanently", + }), + }) + }) + it("should compose a new workflow and execute it", async () => { const mockStep1Fn = jest.fn().mockImplementation((input) => { return new StepResponse({ inputs: [input], obj: "return from 1" }) diff --git a/packages/orchestration/src/transaction/errors.ts b/packages/orchestration/src/transaction/errors.ts new file mode 100644 index 0000000000000..95a093c5b3a95 --- /dev/null +++ b/packages/orchestration/src/transaction/errors.ts @@ -0,0 +1,15 @@ +export class PermanentStepFailureError extends Error { + static isPermanentStepFailureError( + error: Error + ): error is PermanentStepFailureError { + return ( + error instanceof PermanentStepFailureError || + error.name === "PermanentStepFailure" + ) + } + + constructor(message?: string) { + super(message) + this.name = "PermanentStepFailure" + } +} diff --git a/packages/orchestration/src/transaction/index.ts b/packages/orchestration/src/transaction/index.ts index 4380d8bfa6ee3..6633e3b3c3acf 100644 --- a/packages/orchestration/src/transaction/index.ts +++ b/packages/orchestration/src/transaction/index.ts @@ -3,3 +3,4 @@ export * from "./transaction-orchestrator" export * from "./transaction-step" export * from "./distributed-transaction" export * from "./orchestrator-builder" +export * from "./errors" diff --git a/packages/orchestration/src/transaction/transaction-orchestrator.ts b/packages/orchestration/src/transaction/transaction-orchestrator.ts index 6dbc1ef68dd19..fd6b74cd23afe 100644 --- a/packages/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/orchestration/src/transaction/transaction-orchestrator.ts @@ -13,6 +13,7 @@ import { import { EventEmitter } from "events" import { promiseAll } from "@medusajs/utils" +import { PermanentStepFailureError } from "./errors" export type TransactionFlow = { modelId: string @@ -367,11 +368,23 @@ export class TransactionOrchestrator extends EventEmitter { transaction.getContext() ) + const setStepFailure = async ( + error: Error | any, + { endRetry }: { endRetry?: boolean } = {} + ) => { + return TransactionOrchestrator.setStepFailure( + transaction, + step, + error, + endRetry ? 0 : step.definition.maxRetries + ) + } + if (!step.definition.async) { execution.push( transaction .handler(step.definition.action + "", type, payload, transaction) - .then(async (response) => { + .then(async (response: any) => { await TransactionOrchestrator.setStepSuccess( transaction, step, @@ -379,12 +392,13 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { - await TransactionOrchestrator.setStepFailure( - transaction, - step, - error, - step.definition.maxRetries - ) + if ( + PermanentStepFailureError.isPermanentStepFailureError(error) + ) { + await setStepFailure(error, { endRetry: true }) + return + } + await setStepFailure(error) }) ) } else { @@ -393,12 +407,13 @@ export class TransactionOrchestrator extends EventEmitter { transaction .handler(step.definition.action + "", type, payload, transaction) .catch(async (error) => { - await TransactionOrchestrator.setStepFailure( - transaction, - step, - error, - step.definition.maxRetries - ) + if ( + PermanentStepFailureError.isPermanentStepFailureError(error) + ) { + await setStepFailure(error, { endRetry: true }) + return + } + await setStepFailure(error) }) ) ) diff --git a/packages/utils/src/bundles.ts b/packages/utils/src/bundles.ts index 231a15d1328d0..96d38b19bf5f5 100644 --- a/packages/utils/src/bundles.ts +++ b/packages/utils/src/bundles.ts @@ -6,3 +6,4 @@ export * as ModulesSdkUtils from "./modules-sdk" export * as ProductUtils from "./product" export * as SearchUtils from "./search" export * as ShippingProfileUtils from "./shipping" +export * as OrchestrationUtils from "./orchestration" diff --git a/packages/utils/src/index.ts b/packages/utils/src/index.ts index e0e0870d5d6ed..dd2be395ca62f 100644 --- a/packages/utils/src/index.ts +++ b/packages/utils/src/index.ts @@ -9,3 +9,4 @@ export * from "./pricing" export * from "./product" export * from "./search" export * from "./shipping" +export * from "./orchestration" diff --git a/packages/utils/src/orchestration/index.ts b/packages/utils/src/orchestration/index.ts new file mode 100644 index 0000000000000..e6355e4311ea6 --- /dev/null +++ b/packages/utils/src/orchestration/index.ts @@ -0,0 +1 @@ +export * from "./symbol" diff --git a/packages/workflows-sdk/src/utils/composer/helpers/symbol.ts b/packages/utils/src/orchestration/symbol.ts similarity index 100% rename from packages/workflows-sdk/src/utils/composer/helpers/symbol.ts rename to packages/utils/src/orchestration/symbol.ts diff --git a/packages/workflows-sdk/src/helper/workflow-export.ts b/packages/workflows-sdk/src/helper/workflow-export.ts index a0614c6a4ab0c..7b940609f2cb7 100644 --- a/packages/workflows-sdk/src/helper/workflow-export.ts +++ b/packages/workflows-sdk/src/helper/workflow-export.ts @@ -10,7 +10,7 @@ import { Context, LoadedModule, MedusaContainer } from "@medusajs/types" import { MedusaModule } from "@medusajs/modules-sdk" import { EOL } from "os" import { ulid } from "ulid" -import { SymbolWorkflowWorkflowData } from "../utils/composer" +import { OrchestrationUtils } from "@medusajs/utils" export type FlowRunOptions = { input?: TData @@ -99,11 +99,16 @@ export const exportWorkflow = ( if (Array.isArray(resultFrom)) { result = resultFrom.map((from) => { const res = transaction.getContext().invoke?.[from] - return res?.__type === SymbolWorkflowWorkflowData ? res.output : res + return res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData + ? res.output + : res }) } else { const res = transaction.getContext().invoke?.[resultFrom] - result = res?.__type === SymbolWorkflowWorkflowData ? res.output : res + result = + res?.__type === OrchestrationUtils.SymbolWorkflowWorkflowData + ? res.output + : res } } diff --git a/packages/workflows-sdk/src/utils/composer/create-step.ts b/packages/workflows-sdk/src/utils/composer/create-step.ts index a63feb8e8cce1..78e47f6933aaa 100644 --- a/packages/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/workflows-sdk/src/utils/composer/create-step.ts @@ -1,12 +1,4 @@ -import { - resolveValue, - StepResponse, - SymbolMedusaWorkflowComposerContext, - SymbolWorkflowStep, - SymbolWorkflowStepBind, - SymbolWorkflowStepResponse, - SymbolWorkflowWorkflowData, -} from "./helpers" +import { resolveValue, StepResponse } from "./helpers" import { CreateWorkflowComposerContext, StepExecutionContext, @@ -15,6 +7,8 @@ import { WorkflowData, } from "./type" import { proxify } from "./helpers/proxy" +import { TransactionStepsDefinition } from "@medusajs/orchestration" +import { isString, OrchestrationUtils } from "@medusajs/utils" /** * The type of invocation function passed to a step. @@ -75,6 +69,7 @@ interface ApplyStepOptions< TInvokeResultCompensateInput > { stepName: string + stepConfig?: TransactionStepsDefinition input: TStepInputs invokeFn: InvokeFn< TInvokeInput, @@ -91,6 +86,7 @@ interface ApplyStepOptions< * This is where the inputs and context are passed to the underlying invoke and compensate function. * * @param stepName + * @param stepConfig * @param input * @param invokeFn * @param compensateFn @@ -104,6 +100,7 @@ function applyStep< TInvokeResultCompensateInput >({ stepName, + stepConfig = {}, input, invokeFn, compensateFn, @@ -135,12 +132,12 @@ function applyStep< ) const stepResponseJSON = - stepResponse?.__type === SymbolWorkflowStepResponse + stepResponse?.__type === OrchestrationUtils.SymbolWorkflowStepResponse ? stepResponse.toJSON() : stepResponse return { - __type: SymbolWorkflowWorkflowData, + __type: OrchestrationUtils.SymbolWorkflowWorkflowData, output: stepResponseJSON, } }, @@ -154,7 +151,8 @@ function applyStep< const stepOutput = transactionContext.invoke[stepName]?.output const invokeResult = - stepOutput?.__type === SymbolWorkflowStepResponse + stepOutput?.__type === + OrchestrationUtils.SymbolWorkflowStepResponse ? stepOutput.compensateInput && JSON.parse(JSON.stringify(stepOutput.compensateInput)) : stepOutput && JSON.parse(JSON.stringify(stepOutput)) @@ -168,14 +166,21 @@ function applyStep< : undefined, } - this.flow.addAction(stepName, { - noCompensation: !compensateFn, - }) + stepConfig!.noCompensation = !compensateFn + + this.flow.addAction(stepName, stepConfig) this.handlers.set(stepName, handler) const ret = { - __type: SymbolWorkflowStep, + __type: OrchestrationUtils.SymbolWorkflowStep, __step__: stepName, + config: (config: Pick) => { + this.flow.replaceAction(stepName, stepName, { + ...stepConfig, + ...config, + }) + return proxify(ret) + }, } return proxify(ret) @@ -236,9 +241,11 @@ export function createStep< TInvokeResultCompensateInput >( /** - * The name of the step. + * The name of the step or its configuration (currently support maxRetries). */ - name: string, + nameOrConfig: + | string + | ({ name: string } & Pick), /** * An invocation function that will be executed when the workflow is executed. The function must return an instance of {@link StepResponse}. The constructor of {@link StepResponse} * accepts the output of the step as a first argument, and optionally as a second argument the data to be passed to the compensation function as a parameter. @@ -256,12 +263,14 @@ export function createStep< */ compensateFn?: CompensateFn ): StepFunction { - const stepName = name ?? invokeFn.name + const stepName = + (isString(nameOrConfig) ? nameOrConfig : nameOrConfig.name) ?? invokeFn.name + const config = isString(nameOrConfig) ? {} : nameOrConfig const returnFn = function (input: { [K in keyof TInvokeInput]: WorkflowData }): WorkflowData { - if (!global[SymbolMedusaWorkflowComposerContext]) { + if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) { throw new Error( "createStep must be used inside a createWorkflow definition" ) @@ -269,7 +278,7 @@ export function createStep< const stepBinder = ( global[ - SymbolMedusaWorkflowComposerContext + OrchestrationUtils.SymbolMedusaWorkflowComposerContext ] as CreateWorkflowComposerContext ).stepBinder @@ -281,6 +290,7 @@ export function createStep< TInvokeResultCompensateInput >({ stepName, + stepConfig: config, input, invokeFn, compensateFn, @@ -288,7 +298,7 @@ export function createStep< ) } - returnFn.__type = SymbolWorkflowStepBind + returnFn.__type = OrchestrationUtils.SymbolWorkflowStepBind returnFn.__step__ = stepName return returnFn as unknown as StepFunction diff --git a/packages/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/workflows-sdk/src/utils/composer/create-workflow.ts index d44a47f629e87..546515a65efc3 100644 --- a/packages/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/workflows-sdk/src/utils/composer/create-workflow.ts @@ -4,13 +4,9 @@ import { WorkflowManager, } from "@medusajs/orchestration" import { LoadedModule, MedusaContainer } from "@medusajs/types" -import { FlowRunOptions, WorkflowResult, exportWorkflow } from "../../helper" -import { - SymbolInputReference, - SymbolMedusaWorkflowComposerContext, - SymbolWorkflowStep, - resolveValue, -} from "./helpers" +import { OrchestrationUtils } from "@medusajs/utils" +import { exportWorkflow, FlowRunOptions, WorkflowResult } from "../../helper" +import { resolveValue } from "./helpers" import { proxify } from "./helpers/proxy" import { CreateWorkflowComposerContext, @@ -18,7 +14,7 @@ import { WorkflowDataProperties, } from "./type" -global[SymbolMedusaWorkflowComposerContext] = null +global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null /** * An exported workflow, which is the type of a workflow constructed by the {@link createWorkflow} function. The exported workflow can be invoked to create @@ -189,16 +185,16 @@ export function createWorkflow< }, } - global[SymbolMedusaWorkflowComposerContext] = context + global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = context const inputPlaceHolder = proxify({ - __type: SymbolInputReference, + __type: OrchestrationUtils.SymbolInputReference, __step__: "", }) const returnedStep = composer.apply(context, [inputPlaceHolder]) - delete global[SymbolMedusaWorkflowComposerContext] + delete global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] WorkflowManager.update(name, context.flow, handlers) @@ -221,7 +217,7 @@ export function createWorkflow< > => { args ??= {} args.resultFrom ??= - returnedStep?.__type === SymbolWorkflowStep + returnedStep?.__type === OrchestrationUtils.SymbolWorkflowStep ? returnedStep.__step__ : undefined diff --git a/packages/workflows-sdk/src/utils/composer/helpers/index.ts b/packages/workflows-sdk/src/utils/composer/helpers/index.ts index 2636ee016ffcc..4f369c7a5e79d 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/index.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/index.ts @@ -1,3 +1,2 @@ export * from "./step-response" -export * from "./symbol" -export * from "./resolve-value" \ No newline at end of file +export * from "./resolve-value" diff --git a/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts b/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts index 68ca47b2f3359..91971fa9eb5c3 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/proxy.ts @@ -1,6 +1,6 @@ import { transform } from "../transform" import { WorkflowData, WorkflowTransactionContext } from "../type" -import { SymbolInputReference, SymbolWorkflowStepTransformer } from "./symbol" +import { OrchestrationUtils } from "@medusajs/utils" import { resolveValue } from "./resolve-value" export function proxify(obj: WorkflowData): T { @@ -13,8 +13,8 @@ export function proxify(obj: WorkflowData): T { return transform(target[prop], async function (input, context) { const { invoke } = context as WorkflowTransactionContext let output = - target.__type === SymbolInputReference || - target.__type === SymbolWorkflowStepTransformer + target.__type === OrchestrationUtils.SymbolInputReference || + target.__type === OrchestrationUtils.SymbolWorkflowStepTransformer ? target : invoke?.[obj.__step__]?.output diff --git a/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts b/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts index c5fb131771b81..8fb49aa423c1b 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/resolve-value.ts @@ -1,29 +1,26 @@ -import { promiseAll } from "@medusajs/utils" -import { - SymbolInputReference, - SymbolWorkflowHook, - SymbolWorkflowStep, - SymbolWorkflowStepResponse, - SymbolWorkflowStepTransformer, -} from "./symbol" +import { OrchestrationUtils, promiseAll } from "@medusajs/utils" async function resolveProperty(property, transactionContext) { const { invoke: invokeRes } = transactionContext - if (property?.__type === SymbolInputReference) { + if (property?.__type === OrchestrationUtils.SymbolInputReference) { return transactionContext.payload - } else if (property?.__type === SymbolWorkflowStepTransformer) { + } else if ( + property?.__type === OrchestrationUtils.SymbolWorkflowStepTransformer + ) { return await property.__resolver(transactionContext) - } else if (property?.__type === SymbolWorkflowHook) { + } else if (property?.__type === OrchestrationUtils.SymbolWorkflowHook) { return await property.__value(transactionContext) - } else if (property?.__type === SymbolWorkflowStep) { + } else if (property?.__type === OrchestrationUtils.SymbolWorkflowStep) { const output = invokeRes[property.__step__]?.output - if (output?.__type === SymbolWorkflowStepResponse) { + if (output?.__type === OrchestrationUtils.SymbolWorkflowStepResponse) { return output.output } return output - } else if (property?.__type === SymbolWorkflowStepResponse) { + } else if ( + property?.__type === OrchestrationUtils.SymbolWorkflowStepResponse + ) { return property.output } else { return property diff --git a/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts b/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts index a0ccaff81e337..11ce065e66f2f 100644 --- a/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts +++ b/packages/workflows-sdk/src/utils/composer/helpers/step-response.ts @@ -1,4 +1,5 @@ -import { SymbolWorkflowStepResponse } from "./symbol" +import { OrchestrationUtils } from "@medusajs/utils" +import { PermanentStepFailureError } from "@medusajs/orchestration" /** * This class is used to create the response returned by a step. A step return its data by returning an instance of `StepResponse`. @@ -9,7 +10,7 @@ import { SymbolWorkflowStepResponse } from "./symbol" * as that of `TOutput`. */ export class StepResponse { - readonly #__type = SymbolWorkflowStepResponse + readonly #__type = OrchestrationUtils.SymbolWorkflowStepResponse readonly #output: TOutput readonly #compensateInput?: TCompensateInput @@ -35,6 +36,15 @@ export class StepResponse { this.#compensateInput = (compensateInput ?? output) as TCompensateInput } + /** + * Creates a StepResponse that indicates that the step has failed and the retry mechanism should not kick in anymore. + * + * @param message - An optional message to be logged. Default to `Permanent failure`. + */ + static permanentFailure(message = "Permanent failure"): never { + throw new PermanentStepFailureError(message) + } + /** * @internal */ diff --git a/packages/workflows-sdk/src/utils/composer/hook.ts b/packages/workflows-sdk/src/utils/composer/hook.ts index 8902d2011a6cb..1dd8324455d57 100644 --- a/packages/workflows-sdk/src/utils/composer/hook.ts +++ b/packages/workflows-sdk/src/utils/composer/hook.ts @@ -1,8 +1,5 @@ -import { - resolveValue, - SymbolMedusaWorkflowComposerContext, - SymbolWorkflowHook, -} from "./helpers" +import { OrchestrationUtils } from "@medusajs/utils" +import { resolveValue } from "./helpers" import { CreateWorkflowComposerContext, StepExecutionContext, @@ -104,7 +101,9 @@ export function hook( value: any ): WorkflowData { const hookBinder = ( - global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext + global[ + OrchestrationUtils.SymbolMedusaWorkflowComposerContext + ] as CreateWorkflowComposerContext ).hookBinder return hookBinder(name, function (context) { @@ -130,7 +129,7 @@ export function hook( } return finalResult }, - __type: SymbolWorkflowHook, + __type: OrchestrationUtils.SymbolWorkflowHook, } }) } diff --git a/packages/workflows-sdk/src/utils/composer/index.ts b/packages/workflows-sdk/src/utils/composer/index.ts index 485d8451cc6f4..d654c7e7f08a8 100644 --- a/packages/workflows-sdk/src/utils/composer/index.ts +++ b/packages/workflows-sdk/src/utils/composer/index.ts @@ -3,7 +3,6 @@ export * from "./create-workflow" export * from "./hook" export * from "./parallelize" export * from "./helpers/resolve-value" -export * from "./helpers/symbol" export * from "./helpers/step-response" export * from "./transform" export * from "./type" diff --git a/packages/workflows-sdk/src/utils/composer/parallelize.ts b/packages/workflows-sdk/src/utils/composer/parallelize.ts index 500b3c5da106e..f9ab8df14947e 100644 --- a/packages/workflows-sdk/src/utils/composer/parallelize.ts +++ b/packages/workflows-sdk/src/utils/composer/parallelize.ts @@ -1,5 +1,5 @@ import { CreateWorkflowComposerContext, WorkflowData } from "./type" -import { SymbolMedusaWorkflowComposerContext } from "./helpers" +import { OrchestrationUtils } from "@medusajs/utils" /** * This function is used to run multiple steps in parallel. The result of each step will be returned as part of the result array. @@ -43,14 +43,16 @@ import { SymbolMedusaWorkflowComposerContext } from "./helpers" export function parallelize( ...steps: TResult ): TResult { - if (!global[SymbolMedusaWorkflowComposerContext]) { + if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) { throw new Error( "parallelize must be used inside a createWorkflow definition" ) } const parallelizeBinder = ( - global[SymbolMedusaWorkflowComposerContext] as CreateWorkflowComposerContext + global[ + OrchestrationUtils.SymbolMedusaWorkflowComposerContext + ] as CreateWorkflowComposerContext ).parallelizeBinder const resultSteps = steps.map((step) => step) diff --git a/packages/workflows-sdk/src/utils/composer/transform.ts b/packages/workflows-sdk/src/utils/composer/transform.ts index 98f5d076cb734..0b7020e63091c 100644 --- a/packages/workflows-sdk/src/utils/composer/transform.ts +++ b/packages/workflows-sdk/src/utils/composer/transform.ts @@ -1,6 +1,7 @@ -import { resolveValue, SymbolWorkflowStepTransformer } from "./helpers" +import { resolveValue } from "./helpers" import { StepExecutionContext, WorkflowData } from "./type" import { proxify } from "./helpers/proxy" +import { OrchestrationUtils } from "@medusajs/utils" type Func1 = ( input: T extends WorkflowData @@ -163,7 +164,7 @@ export function transform( ...functions: Function[] ): unknown { const ret = { - __type: SymbolWorkflowStepTransformer, + __type: OrchestrationUtils.SymbolWorkflowStepTransformer, __resolver: undefined, } diff --git a/packages/workflows-sdk/src/utils/composer/type.ts b/packages/workflows-sdk/src/utils/composer/type.ts index d74edf307ec48..14c0d8396e45c 100644 --- a/packages/workflows-sdk/src/utils/composer/type.ts +++ b/packages/workflows-sdk/src/utils/composer/type.ts @@ -2,18 +2,15 @@ import { OrchestratorBuilder, TransactionContext as OriginalWorkflowTransactionContext, TransactionPayload, + TransactionStepsDefinition, WorkflowHandler, } from "@medusajs/orchestration" import { Context, MedusaContainer } from "@medusajs/types" export type StepFunctionResult = - (this: CreateWorkflowComposerContext) => TOutput extends [] - ? [ - ...WorkflowData<{ - [K in keyof TOutput]: TOutput[number][K] - }>[] - ] - : WorkflowData<{ [K in keyof TOutput]: TOutput[K] }> + ( + this: CreateWorkflowComposerContext + ) => WorkflowData<{ [K in keyof TOutput]: TOutput[K] }> /** * A step function to be used in a workflow. @@ -24,7 +21,13 @@ export type StepFunctionResult = export type StepFunction = { (input: { [K in keyof TInput]: WorkflowData }): WorkflowData<{ [K in keyof TOutput]: TOutput[K] - }> + }> & { + config( + config: Pick + ): WorkflowData<{ + [K in keyof TOutput]: TOutput[K] + }> + } } & WorkflowDataProperties<{ [K in keyof TOutput]: TOutput[K] }>