diff --git a/.changeset/fresh-olives-exercise.md b/.changeset/fresh-olives-exercise.md new file mode 100644 index 0000000000000..e4e1b71861b43 --- /dev/null +++ b/.changeset/fresh-olives-exercise.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/workflows-sdk": patch +"@medusajs/medusa": patch +--- + +Workflow engine API diff --git a/integration-tests/api/medusa-config.js b/integration-tests/api/medusa-config.js index 3438fc82ccb11..e3ff70f29f99f 100644 --- a/integration-tests/api/medusa-config.js +++ b/integration-tests/api/medusa-config.js @@ -25,5 +25,6 @@ module.exports = { resolve: "@medusajs/cache-inmemory", options: { ttl: cacheTTL }, }, + workflows: true, }, } diff --git a/integration-tests/api/package.json b/integration-tests/api/package.json index fa7cbc9d27fe0..c2c76e1ebc778 100644 --- a/integration-tests/api/package.json +++ b/integration-tests/api/package.json @@ -12,6 +12,7 @@ "@medusajs/cache-inmemory": "workspace:*", "@medusajs/event-bus-local": "workspace:*", "@medusajs/medusa": "workspace:*", + "@medusajs/workflow-engine-inmemory": "workspace:*", "faker": "^5.5.3", "medusa-interfaces": "workspace:*", "pg": "^8.11.0", diff --git a/integration-tests/environment-helpers/use-db.js b/integration-tests/environment-helpers/use-db.js index 6c88804cc61fb..3581933d905a9 100644 --- a/integration-tests/environment-helpers/use-db.js +++ b/integration-tests/environment-helpers/use-db.js @@ -88,7 +88,12 @@ const DbTestUtil = { const instance = DbTestUtil module.exports = { - initDb: async function ({ cwd, database_extra, env }) { + initDb: async function ({ + cwd, + database_extra, + env, + force_modules_migration, + }) { if (isObject(env)) { Object.entries(env).forEach(([k, v]) => (process.env[k] = v)) } @@ -149,7 +154,10 @@ module.exports = { instance.setDb(dbDataSource) - if (featureFlagRouter.isFeatureEnabled(MedusaV2Flag.key)) { + if ( + force_modules_migration || + featureFlagRouter.isFeatureEnabled(MedusaV2Flag.key) + ) { const pgConnectionLoader = require("@medusajs/medusa/dist/loaders/pg-connection").default diff --git a/integration-tests/plugins/__tests__/workflow-engine/api.ts b/integration-tests/plugins/__tests__/workflow-engine/api.ts new file mode 100644 index 0000000000000..7498e32e3069f --- /dev/null +++ b/integration-tests/plugins/__tests__/workflow-engine/api.ts @@ -0,0 +1,9 @@ +import { workflowEngineTestSuite } from "./tests" + +jest.setTimeout(5000000) + +const env = { + MEDUSA_FF_MEDUSA_V2: false, +} + +workflowEngineTestSuite(env, { force_modules_migration: true }) diff --git a/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts b/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts new file mode 100644 index 0000000000000..0422b215e28df --- /dev/null +++ b/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts @@ -0,0 +1,9 @@ +import { workflowEngineTestSuite } from "./tests" + +jest.setTimeout(5000000) + +const env = { + MEDUSA_FF_MEDUSA_V2: true, +} + +workflowEngineTestSuite(env) diff --git a/integration-tests/plugins/__tests__/workflow-engine/tests.ts b/integration-tests/plugins/__tests__/workflow-engine/tests.ts new file mode 100644 index 0000000000000..01a9dbbd8015f --- /dev/null +++ b/integration-tests/plugins/__tests__/workflow-engine/tests.ts @@ -0,0 +1,273 @@ +import { useApi } from "../../../environment-helpers/use-api" +import { initDb, useDb } from "../../../environment-helpers/use-db" + +import { + StepResponse, + WorkflowData, + createStep, + createWorkflow, +} from "@medusajs/workflows-sdk" +import { AxiosInstance } from "axios" +import path from "path" +import { startBootstrapApp } from "../../../environment-helpers/bootstrap-app" +import { getContainer } from "../../../environment-helpers/use-container" +import adminSeeder from "../../../helpers/admin-seeder" + +export const workflowEngineTestSuite = (env, extraParams = {}) => { + const adminHeaders = { + headers: { + "x-medusa-access-token": "test_token", + }, + } + + describe("Workflow Engine API", () => { + let medusaContainer + let dbConnection + let shutdownServer + + beforeAll(async () => { + const cwd = path.resolve(path.join(__dirname, "..", "..")) + dbConnection = await initDb({ cwd, env, ...extraParams } as any) + shutdownServer = await startBootstrapApp({ cwd, env }) + medusaContainer = getContainer() + + await adminSeeder(dbConnection) + }) + + afterAll(async () => { + const db = useDb() + await db.shutdown() + await shutdownServer() + }) + + describe("running workflows", () => { + beforeAll(async () => { + const step1 = createStep( + { + name: "my-step", + }, + async (input: { initial: string }) => { + return new StepResponse({ + result: input.initial, + }) + } + ) + const step2 = createStep( + { + name: "my-step-async", + async: true, + }, + async () => {} + ) + + createWorkflow( + { + name: "my-workflow-name", + retentionTime: 1000, + }, + function (input: WorkflowData<{ initial: string }>) { + step1(input) + return step2() + } + ) + }) + + it("Should list all workflows in execution or completed and retrieve them by id", async () => { + const api = useApi()! as AxiosInstance + + for (let i = 3; i--; ) { + await api.post( + `/admin/workflows-executions/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + }, + adminHeaders + ) + } + + const executions = await api.get( + `/admin/workflows-executions`, + adminHeaders + ) + + expect(executions.data.count).toEqual(3) + expect(executions.data.workflow_executions.length).toEqual(3) + expect(executions.data.workflow_executions[0]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.any(String), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + + const retrivedById = await api.get( + `/admin/workflows-executions/` + + executions.data.workflow_executions[0].id, + adminHeaders + ) + expect(retrivedById.data.workflow_execution).toEqual( + expect.objectContaining(executions.data.workflow_executions[0]) + ) + }) + + it("Should list all workflows matching the filters", async () => { + const api = useApi()! as AxiosInstance + + for (let i = 3; i--; ) { + await api.post( + `/admin/workflows-executions/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + transaction_id: "transaction_" + (i + 1), + }, + adminHeaders + ) + } + + const executions = await api.get( + `/admin/workflows-executions?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, + adminHeaders + ) + + expect(executions.data.count).toEqual(2) + expect(executions.data.workflow_executions.length).toEqual(2) + expect(executions.data.workflow_executions[0]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.stringMatching(/transaction_1|transaction_2/), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + expect(executions.data.workflow_executions[1]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.stringMatching(/transaction_1|transaction_2/), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + }) + + it("Should execute a workflow and retrieve its execution while running and when it is completed", async () => { + const api = useApi()! as AxiosInstance + + const wf = await api.post( + `/admin/workflows-executions/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + transaction_id: "trx_123", + }, + adminHeaders + ) + + expect(wf.data).toEqual({ + acknowledgement: { + transactionId: "trx_123", + workflowId: "my-workflow-name", + }, + }) + + const execution = await api.get( + `/admin/workflows-executions/my-workflow-name/trx_123`, + adminHeaders + ) + + expect(execution.data).toEqual({ + workflow_execution: expect.objectContaining({ + workflow_id: "my-workflow-name", + transaction_id: "trx_123", + id: expect.any(String), + state: "invoking", + execution: expect.objectContaining({ + hasAsyncSteps: true, + hasFailedSteps: false, + hasSkippedSteps: false, + hasWaitingSteps: false, + hasRevertedSteps: false, + }), + context: expect.objectContaining({ + data: expect.objectContaining({ + invoke: { + "my-step": { + __type: "WorkflowWorkflowData", + output: { + __type: "WorkflowStepResponse", + output: { + result: "abc", + }, + compensateInput: { + result: "abc", + }, + }, + }, + }, + payload: { + initial: "abc", + }, + }), + }), + }), + }) + + const respondAsync = await api.post( + `/admin/workflows-executions/my-workflow-name/steps/success`, + { + transaction_id: "trx_123", + step_id: "my-step-async", + response: { + all: "good", + }, + }, + adminHeaders + ) + + expect(respondAsync.data.success).toEqual(true) + + const completed = await api.get( + `/admin/workflows-executions/my-workflow-name/trx_123`, + adminHeaders + ) + + expect(completed.data).toEqual({ + workflow_execution: expect.objectContaining({ + workflow_id: "my-workflow-name", + transaction_id: "trx_123", + state: "done", + context: expect.objectContaining({ + data: expect.objectContaining({ + invoke: expect.objectContaining({ + "my-step-async": { + __type: "WorkflowStepResponse", + output: { + all: "good", + }, + compensateInput: { + all: "good", + }, + }, + }), + }), + }), + }), + }) + }) + }) + }) +} + +describe("Noop test", () => { + it("noop check", async () => { + expect(true).toBe(true) + }) +}) diff --git a/integration-tests/plugins/medusa-config.js b/integration-tests/plugins/medusa-config.js index 4eba0580ad9a5..0a7a04fac5073 100644 --- a/integration-tests/plugins/medusa-config.js +++ b/integration-tests/plugins/medusa-config.js @@ -91,6 +91,7 @@ module.exports = { resources: "shared", resolve: "@medusajs/cart", }, + [Modules.WORKFLOW_ENGINE]: true, [Modules.REGION]: { scope: "internal", resources: "shared", diff --git a/integration-tests/plugins/package.json b/integration-tests/plugins/package.json index 50db4008443ad..4579c99c036e9 100644 --- a/integration-tests/plugins/package.json +++ b/integration-tests/plugins/package.json @@ -21,6 +21,7 @@ "@medusajs/promotion": "workspace:^", "@medusajs/region": "workspace:^", "@medusajs/utils": "workspace:^", + "@medusajs/workflow-engine-inmemory": "workspace:*", "faker": "^5.5.3", "medusa-fulfillment-webshipper": "workspace:*", "medusa-interfaces": "workspace:*", diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/[id]/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[id]/route.ts new file mode 100644 index 0000000000000..f411fe397c8cb --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[id]/route.ts @@ -0,0 +1,20 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" + +export const GET = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id } = req.params + + const execution = await workflowEngineService.retrieveWorkflowExecution(id, { + select: req.retrieveConfig.select, + relations: req.retrieveConfig.relations, + }) + + res.status(200).json({ + workflow_execution: execution, + }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/subscribe/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/subscribe/route.ts new file mode 100644 index 0000000000000..0e75c536e07f0 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/subscribe/route.ts @@ -0,0 +1,65 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { + MedusaRequest, + MedusaResponse, +} from "../../../../../../../types/routing" + +export const GET = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { workflow_id, transaction_id } = req.query as any + + const subscriberId = "__sub__" + Math.random().toString(36).substring(2, 9) + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }) + + req.on("close", () => { + res.end() + + void workflowEngineService.unsubscribe({ + workflowId: workflow_id, + transactionId: transaction_id, + subscriberOrId: subscriberId, + }) + }) + + req.on("error", (err: any) => { + if (err.code === "ECONNRESET") { + res.end() + } + }) + + void workflowEngineService.subscribe({ + workflowId: workflow_id, + transactionId: transaction_id, + subscriber: async (args) => { + const { + eventType, + workflowId, + transactionId, + step, + response, + result, + errors, + } = args + + const data = { + event_type: eventType, + workflow_id: workflowId, + transaction_id: transactionId, + step, + response, + result, + errors, + } + res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`) + }, + subscriberId, + }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/route.ts new file mode 100644 index 0000000000000..177ee8a93456b --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/route.ts @@ -0,0 +1,26 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../../types/routing" + +export const GET = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { workflow_id, transaction_id } = req.params + + const execution = await workflowEngineService.retrieveWorkflowExecution( + { + workflow_id, + transaction_id, + }, + { + select: req.retrieveConfig.select, + relations: req.retrieveConfig.relations, + } + ) + + res.status(200).json({ + workflow_execution: execution, + }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/run/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/run/route.ts new file mode 100644 index 0000000000000..270313cab717e --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/run/route.ts @@ -0,0 +1,34 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { + IWorkflowEngineService, + WorkflowOrchestratorTypes, +} from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../../types/routing" +import { AdminPostWorkflowsRunReq } from "../../validators" + +export const POST = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { workflow_id } = req.params + + const { transaction_id, input } = + req.validatedBody as AdminPostWorkflowsRunReq + + const options = { + transactionId: transaction_id, + input, + context: { + requestId: req.requestId, + }, + throwOnError: false, + } as WorkflowOrchestratorTypes.WorkflowOrchestratorRunDTO + + const { acknowledgement } = await workflowEngineService.run( + workflow_id, + options + ) + + return res.status(200).json({ acknowledgement }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/failure/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/failure/route.ts new file mode 100644 index 0000000000000..29e3e67a650d1 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/failure/route.ts @@ -0,0 +1,41 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { TransactionHandlerType, isDefined } from "@medusajs/utils" +import { IWorkflowEngineService, StepResponse } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../../../types/routing" +import { AdminPostWorkflowsAsyncResponseReq } from "../../../validators" + +export const POST = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { workflow_id } = req.params + + const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + + const { transaction_id, step_id } = body + + const compensateInput = body.compensate_input + const stepResponse = isDefined(body.response) + ? new StepResponse(body.response, compensateInput) + : undefined + const stepAction = body.action || TransactionHandlerType.INVOKE + + await workflowEngineService.setStepFailure({ + idempotencyKey: { + action: stepAction, + transactionId: transaction_id, + stepId: step_id, + workflowId: workflow_id, + }, + stepResponse, + options: { + container: req.scope, + context: { + requestId: req.requestId, + }, + }, + }) + + return res.status(200).json({ success: true }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/success/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/success/route.ts new file mode 100644 index 0000000000000..ac5e5d7658746 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/success/route.ts @@ -0,0 +1,41 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { TransactionHandlerType, isDefined } from "@medusajs/utils" +import { IWorkflowEngineService, StepResponse } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../../../types/routing" +import { AdminPostWorkflowsAsyncResponseReq } from "../../../validators" + +export const POST = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { workflow_id } = req.params + + const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + + const { transaction_id, step_id } = body + + const compensateInput = body.compensate_input + const stepResponse = isDefined(body.response) + ? new StepResponse(body.response, compensateInput) + : undefined + const stepAction = body.action || TransactionHandlerType.INVOKE + + await workflowEngineService.setStepSuccess({ + idempotencyKey: { + action: stepAction, + transactionId: transaction_id, + stepId: step_id, + workflowId: workflow_id, + }, + stepResponse, + options: { + container: req.scope, + context: { + requestId: req.requestId, + }, + }, + }) + + return res.status(200).json({ success: true }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/subscribe/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/subscribe/route.ts new file mode 100644 index 0000000000000..6bd3af50c4e0d --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/subscribe/route.ts @@ -0,0 +1,60 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../../types/routing" + +export const GET = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { workflow_id } = req.query as any + + const subscriberId = "__sub__" + Math.random().toString(36).substring(2, 9) + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }) + + req.on("close", () => { + res.end() + + void workflowEngineService.unsubscribe({ + workflowId: workflow_id, + subscriberOrId: subscriberId, + }) + }) + + req.on("error", (err: any) => { + if (err.code === "ECONNRESET") { + res.end() + } + }) + + void workflowEngineService.subscribe({ + workflowId: workflow_id, + subscriber: async (args) => { + const { + eventType, + workflowId, + transactionId, + step, + response, + result, + errors, + } = args + + const data = { + event_type: eventType, + workflow_id: workflowId, + transaction_id: transactionId, + step, + response, + result, + errors, + } + res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`) + }, + subscriberId, + }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/middlewares.ts b/packages/medusa/src/api-v2/admin/workflows-executions/middlewares.ts new file mode 100644 index 0000000000000..47b00944951bd --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/middlewares.ts @@ -0,0 +1,58 @@ +import { transformBody, transformQuery } from "../../../api/middlewares" +import { MiddlewareRoute } from "../../../loaders/helpers/routing/types" +import * as QueryConfig from "./query-config" +import { + AdminGetWorkflowExecutionDetailsParams, + AdminGetWorkflowExecutionsParams, + AdminPostWorkflowsAsyncResponseReq, + AdminPostWorkflowsRunReq, +} from "./validators" + +export const adminWorkflowsExecutionsMiddlewares: MiddlewareRoute[] = [ + { + method: ["GET"], + matcher: "/admin/workflows-executions", + middlewares: [ + transformQuery( + AdminGetWorkflowExecutionsParams, + QueryConfig.listTransformQueryConfig + ), + ], + }, + { + method: ["GET"], + matcher: "/admin/workflows-executions/:id", + middlewares: [ + transformQuery( + AdminGetWorkflowExecutionDetailsParams, + QueryConfig.retrieveTransformQueryConfig + ), + ], + }, + { + method: ["GET"], + matcher: "/admin/workflows-executions/:workflow_id/:transaction_id", + middlewares: [ + transformQuery( + AdminGetWorkflowExecutionDetailsParams, + QueryConfig.retrieveTransformQueryConfig + ), + ], + }, + { + method: ["POST"], + matcher: "/admin/workflows-executions/:id/run", + middlewares: [transformBody(AdminPostWorkflowsRunReq)], + }, + { + method: ["POST"], + + matcher: "/admin/workflows-executions/:id/steps/success", + middlewares: [transformBody(AdminPostWorkflowsAsyncResponseReq)], + }, + { + method: ["POST"], + matcher: "/admin/workflows-executions/:id/steps/failure", + middlewares: [transformBody(AdminPostWorkflowsAsyncResponseReq)], + }, +] diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/query-config.ts b/packages/medusa/src/api-v2/admin/workflows-executions/query-config.ts new file mode 100644 index 0000000000000..3fd0fb7a18914 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/query-config.ts @@ -0,0 +1,36 @@ +export const defaultAdminWorkflowExecutionsRelations = [] +export const allowedAdminWorkflowExecutionsRelations = [] +export const defaultAdminWorkflowExecutionsFields = [ + "id", + "workflow_id", + "transaction_id", + "state", + "created_at", + "updated_at", + "deleted_at", +] + +export const defaultAdminWorkflowExecutionDetailFields = [ + "id", + "workflow_id", + "transaction_id", + "context", + "execution", + "state", + "created_at", + "updated_at", + "deleted_at", +] + +export const retrieveTransformQueryConfig = { + defaultFields: defaultAdminWorkflowExecutionDetailFields, + defaultRelations: defaultAdminWorkflowExecutionsRelations, + allowedRelations: allowedAdminWorkflowExecutionsRelations, + isList: false, +} + +export const listTransformQueryConfig = { + ...retrieveTransformQueryConfig, + defaultFields: defaultAdminWorkflowExecutionsFields, + isList: true, +} diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/route.ts new file mode 100644 index 0000000000000..ab8127411e148 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/route.ts @@ -0,0 +1,29 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../types/routing" + +export const GET = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const listConfig = req.listConfig + + const [workflow_executions, count] = + await workflowEngineService.listAndCountWorkflowExecution( + req.filterableFields, + { + select: req.listConfig.select, + relations: req.listConfig.relations, + skip: listConfig.skip, + take: listConfig.take, + } + ) + + res.json({ + workflow_executions, + count, + offset: listConfig.skip, + limit: listConfig.take, + }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/validators.ts b/packages/medusa/src/api-v2/admin/workflows-executions/validators.ts new file mode 100644 index 0000000000000..27da3231fa2cd --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/validators.ts @@ -0,0 +1,54 @@ +import { TransactionHandlerType } from "@medusajs/utils" +import { Transform } from "class-transformer" +import { IsEnum, IsOptional, IsString } from "class-validator" +import { FindParams, extendedFindParamsMixin } from "../../../types/common" +import { IsType } from "../../../utils" + +export class AdminGetWorkflowExecutionDetailsParams extends FindParams {} + +export class AdminGetWorkflowExecutionsParams extends extendedFindParamsMixin({ + limit: 100, + offset: 0, +}) { + /** + * transaction id(s) to filter workflow executions by transaction_id. + */ + @IsOptional() + @IsType([String, [String]]) + transaction_id?: string | string[] + + /** + * workflow id(s) to filter workflow executions by workflow_id + */ + @IsOptional() + @IsType([String, [String]]) + workflow_id?: string | string[] +} + +export class AdminPostWorkflowsRunReq { + @IsOptional() + input?: unknown + + @IsOptional() + @IsString() + transaction_id?: string +} + +export class AdminPostWorkflowsAsyncResponseReq { + @IsString() + transaction_id: string + + @IsString() + step_id: string + + @IsOptional() + response?: unknown + + @IsOptional() + compensate_input?: unknown + + @IsOptional() + @Transform(({ value }) => (value + "").toLowerCase()) + @IsEnum(TransactionHandlerType) + action?: TransactionHandlerType +} diff --git a/packages/medusa/src/api-v2/middlewares.ts b/packages/medusa/src/api-v2/middlewares.ts index 2496d72287aa2..aea1658b5ecf9 100644 --- a/packages/medusa/src/api-v2/middlewares.ts +++ b/packages/medusa/src/api-v2/middlewares.ts @@ -4,6 +4,7 @@ import { adminCustomerGroupRoutesMiddlewares } from "./admin/customer-groups/mid import { adminCustomerRoutesMiddlewares } from "./admin/customers/middlewares" import { adminPromotionRoutesMiddlewares } from "./admin/promotions/middlewares" import { adminRegionRoutesMiddlewares } from "./admin/regions/middlewares" +import { adminWorkflowsExecutionsMiddlewares } from "./admin/workflows-executions/middlewares" import { authRoutesMiddlewares } from "./auth/middlewares" import { storeCartRoutesMiddlewares } from "./store/carts/middlewares" import { storeCustomerRoutesMiddlewares } from "./store/customers/middlewares" @@ -18,6 +19,7 @@ export const config: MiddlewaresConfig = { ...storeCustomerRoutesMiddlewares, ...storeCartRoutesMiddlewares, ...authRoutesMiddlewares, + ...adminWorkflowsExecutionsMiddlewares, ...storeRegionRoutesMiddlewares, ...adminRegionRoutesMiddlewares, ], diff --git a/packages/medusa/src/api/index.js b/packages/medusa/src/api/index.js index d9b60fbb71ce7..fd08f26953135 100644 --- a/packages/medusa/src/api/index.js +++ b/packages/medusa/src/api/index.js @@ -1,9 +1,9 @@ +import compression from "compression" import { Router } from "express" +import { compressionOptions, shouldCompressResponse } from "../utils/api" import errorHandler from "./middlewares/error-handler" -import compression from "compression" import admin from "./routes/admin" import store from "./routes/store" -import { shouldCompressResponse, compressionOptions } from "../utils/api" // guaranteed to get dependencies export default (container, config) => { @@ -53,9 +53,9 @@ export * from "./routes/admin/product-types" export * from "./routes/admin/products" export * from "./routes/admin/publishable-api-keys" export * from "./routes/admin/regions" +export * from "./routes/admin/reservations" export * from "./routes/admin/return-reasons" export * from "./routes/admin/returns" -export * from "./routes/admin/reservations" export * from "./routes/admin/sales-channels" export * from "./routes/admin/shipping-options" export * from "./routes/admin/shipping-profiles" @@ -66,6 +66,7 @@ export * from "./routes/admin/tax-rates" export * from "./routes/admin/uploads" export * from "./routes/admin/users" export * from "./routes/admin/variants" +export * from "./routes/admin/workflows-executions" // Store export * from "./routes/store/auth" export * from "./routes/store/carts" diff --git a/packages/medusa/src/api/routes/admin/index.js b/packages/medusa/src/api/routes/admin/index.js index da882ef408d05..dc4ed7dfc5c0e 100644 --- a/packages/medusa/src/api/routes/admin/index.js +++ b/packages/medusa/src/api/routes/admin/index.js @@ -41,6 +41,7 @@ import taxRateRoutes from "./tax-rates" import uploadRoutes from "./uploads" import userRoutes, { unauthenticatedUserRoutes } from "./users" import variantRoutes from "./variants" +import workflowRoutes from "./workflows-executions" const route = Router() @@ -115,6 +116,7 @@ export default (app, container, config) => { paymentCollectionRoutes(route) paymentRoutes(route) productCategoryRoutes(route) + workflowRoutes(route) return app } diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/get-execution.ts b/packages/medusa/src/api/routes/admin/workflows-executions/get-execution.ts new file mode 100644 index 0000000000000..41315fba68566 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows-executions/get-execution.ts @@ -0,0 +1,26 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id, workflow_id, transaction_id } = req.params + + const execution = await workflowEngineService.retrieveWorkflowExecution( + id ?? { + workflow_id, + transaction_id, + }, + { + select: req.retrieveConfig.select, + relations: req.retrieveConfig.relations, + } + ) + + res.status(200).json({ + workflow_execution: execution, + }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/index.ts b/packages/medusa/src/api/routes/admin/workflows-executions/index.ts new file mode 100644 index 0000000000000..092568bf04572 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows-executions/index.ts @@ -0,0 +1,83 @@ +import middlewares, { + transformBody, + transformQuery, +} from "../../../middlewares" + +import { Router } from "express" +import { + allowedAdminWorkflowExecutionsRelations, + defaultAdminWorkflowExecutionDetailFields, + defaultAdminWorkflowExecutionsFields, + defaultAdminWorkflowExecutionsRelations, +} from "./query-config" +import { + AdminGetWorkflowExecutionDetailsParams, + AdminGetWorkflowExecutionsParams, + AdminPostWorkflowsAsyncResponseReq, + AdminPostWorkflowsRunReq, +} from "./validators" + +const route = Router() + +const retrieveTransformQueryConfig = { + defaultFields: defaultAdminWorkflowExecutionDetailFields, + defaultRelations: defaultAdminWorkflowExecutionsRelations, + allowedRelations: allowedAdminWorkflowExecutionsRelations, + isList: false, +} + +const listTransformQueryConfig = { + ...retrieveTransformQueryConfig, + defaultFields: defaultAdminWorkflowExecutionsFields, + isList: true, +} + +export default (app) => { + app.use("/workflows-executions", route) + + route.get( + "/", + transformQuery(AdminGetWorkflowExecutionsParams, listTransformQueryConfig), + middlewares.wrap(require("./list-execution").default) + ) + + route.get( + "/:id", + transformQuery( + AdminGetWorkflowExecutionDetailsParams, + retrieveTransformQueryConfig + ), + middlewares.wrap(require("./get-execution").default) + ) + + route.get( + "/:workflow_id/:transaction_id", + transformQuery( + AdminGetWorkflowExecutionDetailsParams, + retrieveTransformQueryConfig + ), + middlewares.wrap(require("./get-execution").default) + ) + + route.post( + "/:id/steps/success", + transformBody(AdminPostWorkflowsAsyncResponseReq), + middlewares.wrap(require("./set-step-success").default) + ) + + route.post( + "/:id/steps/failure", + transformBody(AdminPostWorkflowsAsyncResponseReq), + middlewares.wrap(require("./set-step-failure").default) + ) + + route.post( + "/:id/run", + transformBody(AdminPostWorkflowsRunReq), + middlewares.wrap(require("./run-workflow").default) + ) + + return app +} + +export * from "./query-config" diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/list-execution.ts b/packages/medusa/src/api/routes/admin/workflows-executions/list-execution.ts new file mode 100644 index 0000000000000..eb9e5adebc686 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows-executions/list-execution.ts @@ -0,0 +1,29 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const listConfig = req.listConfig + + const [workflow_executions, count] = + await workflowEngineService.listAndCountWorkflowExecution( + req.filterableFields, + { + select: req.listConfig.select, + relations: req.listConfig.relations, + skip: listConfig.skip, + take: listConfig.take, + } + ) + + res.json({ + workflow_executions, + count, + offset: listConfig.skip, + limit: listConfig.take, + }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/query-config.ts b/packages/medusa/src/api/routes/admin/workflows-executions/query-config.ts new file mode 100644 index 0000000000000..8fe26626be099 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows-executions/query-config.ts @@ -0,0 +1,23 @@ +export const defaultAdminWorkflowExecutionsRelations = [] +export const allowedAdminWorkflowExecutionsRelations = [] +export const defaultAdminWorkflowExecutionsFields = [ + "id", + "workflow_id", + "transaction_id", + "state", + "created_at", + "updated_at", + "deleted_at", +] + +export const defaultAdminWorkflowExecutionDetailFields = [ + "id", + "workflow_id", + "transaction_id", + "context", + "execution", + "state", + "created_at", + "updated_at", + "deleted_at", +] diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/run-workflow.ts b/packages/medusa/src/api/routes/admin/workflows-executions/run-workflow.ts new file mode 100644 index 0000000000000..95768397b0a8e --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows-executions/run-workflow.ts @@ -0,0 +1,35 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { + IWorkflowEngineService, + WorkflowOrchestratorTypes, +} from "@medusajs/workflows-sdk" + +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" +import { AdminPostWorkflowsRunReq } from "./validators" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id } = req.params + + const { transaction_id, input } = + req.validatedBody as AdminPostWorkflowsRunReq + + const options = { + transactionId: transaction_id, + input, + context: { + requestId: req.requestId, + }, + throwOnError: false, + } as WorkflowOrchestratorTypes.WorkflowOrchestratorRunDTO + + const { acknowledgement } = await workflowEngineService.run( + workflow_id, + options + ) + + return res.status(200).json({ acknowledgement }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/set-step-failure.ts b/packages/medusa/src/api/routes/admin/workflows-executions/set-step-failure.ts new file mode 100644 index 0000000000000..5ff902b81a971 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows-executions/set-step-failure.ts @@ -0,0 +1,41 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { TransactionHandlerType, isDefined } from "@medusajs/utils" +import { IWorkflowEngineService, StepResponse } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" +import { AdminPostWorkflowsAsyncResponseReq } from "./validators" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id } = req.params + + const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + + const { transaction_id, step_id } = body + + const compensateInput = body.compensate_input + const stepResponse = isDefined(body.response) + ? new StepResponse(body.response, compensateInput) + : undefined + const stepAction = body.action || TransactionHandlerType.INVOKE + + await workflowEngineService.setStepFailure({ + idempotencyKey: { + action: stepAction, + transactionId: transaction_id, + stepId: step_id, + workflowId: workflow_id, + }, + stepResponse, + options: { + container: req.scope, + context: { + requestId: req.requestId, + }, + }, + }) + + return res.status(200).json({ success: true }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/set-step-success.ts b/packages/medusa/src/api/routes/admin/workflows-executions/set-step-success.ts new file mode 100644 index 0000000000000..e2b854358a8c0 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows-executions/set-step-success.ts @@ -0,0 +1,41 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { TransactionHandlerType, isDefined } from "@medusajs/utils" +import { IWorkflowEngineService, StepResponse } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" +import { AdminPostWorkflowsAsyncResponseReq } from "./validators" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id } = req.params + + const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + + const { transaction_id, step_id } = body + + const compensateInput = body.compensate_input + const stepResponse = isDefined(body.response) + ? new StepResponse(body.response, compensateInput) + : undefined + const stepAction = body.action || TransactionHandlerType.INVOKE + + await workflowEngineService.setStepSuccess({ + idempotencyKey: { + action: stepAction, + transactionId: transaction_id, + stepId: step_id, + workflowId: workflow_id, + }, + stepResponse, + options: { + container: req.scope, + context: { + requestId: req.requestId, + }, + }, + }) + + return res.status(200).json({ success: true }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/subscribe.ts b/packages/medusa/src/api/routes/admin/workflows-executions/subscribe.ts new file mode 100644 index 0000000000000..8c9131af6ce61 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows-executions/subscribe.ts @@ -0,0 +1,62 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id, transaction_id } = req.query as any + + const subscriberId = "__sub__" + Math.random().toString(36).substring(2, 9) + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }) + + req.on("close", () => { + res.end() + + void workflowEngineService.unsubscribe({ + workflowId: workflow_id, + transactionId: transaction_id, + subscriberOrId: subscriberId, + }) + }) + + req.on("error", (err: any) => { + if (err.code === "ECONNRESET") { + res.end() + } + }) + + void workflowEngineService.subscribe({ + workflowId: workflow_id, + transactionId: transaction_id, + subscriber: async (args) => { + const { + eventType, + workflowId, + transactionId, + step, + response, + result, + errors, + } = args + + const data = { + event_type: eventType, + workflow_id: workflowId, + transaction_id: transactionId, + step, + response, + result, + errors, + } + res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`) + }, + subscriberId, + }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/validators.ts b/packages/medusa/src/api/routes/admin/workflows-executions/validators.ts new file mode 100644 index 0000000000000..50b8cb565a03c --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows-executions/validators.ts @@ -0,0 +1,54 @@ +import { TransactionHandlerType } from "@medusajs/utils" +import { Transform } from "class-transformer" +import { IsEnum, IsOptional, IsString } from "class-validator" +import { FindParams, extendedFindParamsMixin } from "../../../../types/common" +import { IsType } from "../../../../utils" + +export class AdminGetWorkflowExecutionDetailsParams extends FindParams {} + +export class AdminGetWorkflowExecutionsParams extends extendedFindParamsMixin({ + limit: 100, + offset: 0, +}) { + /** + * transaction id(s) to filter workflow executions by transaction_id. + */ + @IsOptional() + @IsType([String, [String]]) + transaction_id?: string | string[] + + /** + * workflow id(s) to filter workflow executions by workflow_id + */ + @IsOptional() + @IsType([String, [String]]) + workflow_id?: string | string[] +} + +export class AdminPostWorkflowsRunReq { + @IsOptional() + input?: unknown + + @IsOptional() + @IsString() + transaction_id?: string +} + +export class AdminPostWorkflowsAsyncResponseReq { + @IsString() + transaction_id: string + + @IsString() + step_id: string + + @IsOptional() + response?: unknown + + @IsOptional() + compensate_input?: unknown + + @IsOptional() + @Transform(({ value }) => (value + "").toLowerCase()) + @IsEnum(TransactionHandlerType) + action?: TransactionHandlerType +} diff --git a/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 11b92ab0cb53c..c4e6c0a209587 100644 --- a/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -1,7 +1,7 @@ import { MedusaApp } from "@medusajs/modules-sdk" import { RemoteJoinerQuery } from "@medusajs/types" import { TransactionHandlerType } from "@medusajs/utils" -import { IWorkflowsModuleService } from "@medusajs/workflows-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { knex } from "knex" import { setTimeout } from "timers/promises" import "../__fixtures__" @@ -22,7 +22,7 @@ const afterEach_ = async () => { describe("Workflow Orchestrator module", function () { describe("Testing basic workflow", function () { - let workflowOrcModule: IWorkflowsModuleService + let workflowOrcModule: IWorkflowEngineService let query: ( query: string | RemoteJoinerQuery | object, variables?: Record @@ -52,8 +52,7 @@ describe("Workflow Orchestrator module", function () { await runMigrations() - workflowOrcModule = - modules.workflows as unknown as IWorkflowsModuleService + workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService }) afterEach(afterEach_) diff --git a/packages/workflow-engine-inmemory/src/initialize/index.ts b/packages/workflow-engine-inmemory/src/initialize/index.ts index 20f4f49231b99..dafe1ddc8f328 100644 --- a/packages/workflow-engine-inmemory/src/initialize/index.ts +++ b/packages/workflow-engine-inmemory/src/initialize/index.ts @@ -1,12 +1,12 @@ import { ExternalModuleDeclaration, InternalModuleDeclaration, - MedusaModule, MODULE_PACKAGE_NAMES, + MedusaModule, Modules, } from "@medusajs/modules-sdk" import { ModulesSdkTypes } from "@medusajs/types" -import { WorkflowOrchestratorTypes } from "@medusajs/workflows-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { moduleDefinition } from "../module-definition" import { InitializeModuleInjectableDependencies } from "../types" @@ -17,20 +17,18 @@ export const initialize = async ( | ExternalModuleDeclaration | InternalModuleDeclaration, injectedDependencies?: InitializeModuleInjectableDependencies -): Promise => { +): Promise => { const loaded = // eslint-disable-next-line max-len - await MedusaModule.bootstrap( - { - moduleKey: Modules.WORKFLOW_ENGINE, - defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE], - declaration: options as - | InternalModuleDeclaration - | ExternalModuleDeclaration, - injectedDependencies, - moduleExports: moduleDefinition, - } - ) + await MedusaModule.bootstrap({ + moduleKey: Modules.WORKFLOW_ENGINE, + defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE], + declaration: options as + | InternalModuleDeclaration + | ExternalModuleDeclaration, + injectedDependencies, + moduleExports: moduleDefinition, + }) return loaded[Modules.WORKFLOW_ENGINE] } diff --git a/packages/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/workflow-engine-inmemory/src/services/workflows-module.ts index 789fa5760092b..8701e6d26d219 100644 --- a/packages/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/workflow-engine-inmemory/src/services/workflows-module.ts @@ -10,8 +10,11 @@ import { InjectManager, InjectSharedContext, MedusaContext, + MedusaError, + isString, } from "@medusajs/utils" import type { + IWorkflowEngineService, ReturnWorkflow, UnwrapWorkflowInputDataType, WorkflowOrchestratorTypes, @@ -25,9 +28,7 @@ type InjectedDependencies = { workflowOrchestratorService: WorkflowOrchestratorService } -export class WorkflowsModuleService - implements WorkflowOrchestratorTypes.IWorkflowsModuleService -{ +export class WorkflowsModuleService implements IWorkflowEngineService { protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.InternalModuleService protected workflowOrchestratorService_: WorkflowOrchestratorService @@ -49,12 +50,70 @@ export class WorkflowsModuleService return joinerConfig } + @InjectManager("baseRepository_") + async retrieveWorkflowExecution( + idOrObject: + | string + | { + workflow_id: string + transaction_id: string + }, + config: FindConfig = {}, + @MedusaContext() sharedContext: Context = {} + ): Promise { + const objValue = isString(idOrObject) + ? { id: idOrObject } + : { + workflow_id: idOrObject.workflow_id, + transaction_id: idOrObject.transaction_id, + } + + const wfExecution = await this.workflowExecutionService_.list( + objValue, + config, + sharedContext + ) + + if (wfExecution.length === 0) { + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `WorkflowExecution with ${Object.keys(objValue).join( + ", " + )}: ${Object.values(objValue).join(", ")} was not found` + ) + } + + // eslint-disable-next-line max-len + return await this.baseRepository_.serialize( + wfExecution[0], + { + populate: true, + } + ) + } + @InjectManager("baseRepository_") async listWorkflowExecution( filters: WorkflowOrchestratorTypes.FilterableWorkflowExecutionProps = {}, config: FindConfig = {}, @MedusaContext() sharedContext: Context = {} ): Promise { + if (filters.transaction_id) { + if (Array.isArray(filters.transaction_id)) { + filters.transaction_id = { + $in: filters.transaction_id, + } + } + } + + if (filters.workflow_id) { + if (Array.isArray(filters.workflow_id)) { + filters.workflow_id = { + $in: filters.workflow_id, + } + } + } + const wfExecutions = await this.workflowExecutionService_.list( filters, config, @@ -74,6 +133,22 @@ export class WorkflowsModuleService config: FindConfig = {}, @MedusaContext() sharedContext: Context = {} ): Promise<[WorkflowOrchestratorTypes.WorkflowExecutionDTO[], number]> { + if (filters.transaction_id) { + if (Array.isArray(filters.transaction_id)) { + filters.transaction_id = { + $in: filters.transaction_id, + } + } + } + + if (filters.workflow_id) { + if (Array.isArray(filters.workflow_id)) { + filters.workflow_id = { + $in: filters.workflow_id, + } + } + } + const [wfExecutions, count] = await this.workflowExecutionService_.listAndCount( filters, diff --git a/packages/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 60c9771def6b7..e138768aef805 100644 --- a/packages/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -4,9 +4,9 @@ import { TransactionCheckpoint, TransactionStep, } from "@medusajs/orchestration" +import { ModulesSdkTypes } from "@medusajs/types" import { TransactionState } from "@medusajs/utils" import { WorkflowOrchestratorService } from "@services" -import { ModulesSdkTypes } from "@medusajs/types" // eslint-disable-next-line max-len export class InMemoryDistributedTransactionStorage extends DistributedTransactionStorage { @@ -55,6 +55,22 @@ export class InMemoryDistributedTransactionStorage extends DistributedTransactio ]) } + private stringifyWithSymbol(key, value) { + if (key === "__type" && typeof value === "symbol") { + return Symbol.keyFor(value) + } + + return value + } + + private jsonWithSymbol(key, value) { + if (key === "__type" && typeof value === "string") { + return Symbol.for(value) + } + + return value + } + async get(key: string): Promise { return this.storage.get(key) } @@ -89,10 +105,13 @@ export class InMemoryDistributedTransactionStorage extends DistributedTransactio }) } + const stringifiedData = JSON.stringify(data, this.stringifyWithSymbol) + const parsedData = JSON.parse(stringifiedData) + if (hasFinished && !retentionTime) { - await this.deleteFromDb(data) + await this.deleteFromDb(parsedData) } else { - await this.saveToDb(data) + await this.saveToDb(parsedData) } if (hasFinished) { diff --git a/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 802fff34187d1..9d72daafb1bf8 100644 --- a/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -5,7 +5,7 @@ import { } from "@medusajs/orchestration" import { RemoteJoinerQuery } from "@medusajs/types" import { TransactionHandlerType } from "@medusajs/utils" -import { IWorkflowsModuleService } from "@medusajs/workflows-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { knex } from "knex" import { setTimeout } from "timers/promises" import "../__fixtures__" @@ -26,7 +26,7 @@ const afterEach_ = async () => { describe("Workflow Orchestrator module", function () { describe("Testing basic workflow", function () { - let workflowOrcModule: IWorkflowsModuleService + let workflowOrcModule: IWorkflowEngineService let query: ( query: string | RemoteJoinerQuery | object, variables?: Record @@ -61,8 +61,7 @@ describe("Workflow Orchestrator module", function () { await runMigrations() - workflowOrcModule = - modules.workflows as unknown as IWorkflowsModuleService + workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService }) afterEach(afterEach_) diff --git a/packages/workflow-engine-redis/src/initialize/index.ts b/packages/workflow-engine-redis/src/initialize/index.ts index 20f4f49231b99..b548387c5755f 100644 --- a/packages/workflow-engine-redis/src/initialize/index.ts +++ b/packages/workflow-engine-redis/src/initialize/index.ts @@ -1,12 +1,13 @@ import { ExternalModuleDeclaration, InternalModuleDeclaration, - MedusaModule, MODULE_PACKAGE_NAMES, + MedusaModule, Modules, } from "@medusajs/modules-sdk" + import { ModulesSdkTypes } from "@medusajs/types" -import { WorkflowOrchestratorTypes } from "@medusajs/workflows-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { moduleDefinition } from "../module-definition" import { InitializeModuleInjectableDependencies } from "../types" @@ -17,20 +18,18 @@ export const initialize = async ( | ExternalModuleDeclaration | InternalModuleDeclaration, injectedDependencies?: InitializeModuleInjectableDependencies -): Promise => { +): Promise => { const loaded = // eslint-disable-next-line max-len - await MedusaModule.bootstrap( - { - moduleKey: Modules.WORKFLOW_ENGINE, - defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE], - declaration: options as - | InternalModuleDeclaration - | ExternalModuleDeclaration, - injectedDependencies, - moduleExports: moduleDefinition, - } - ) + await MedusaModule.bootstrap({ + moduleKey: Modules.WORKFLOW_ENGINE, + defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE], + declaration: options as + | InternalModuleDeclaration + | ExternalModuleDeclaration, + injectedDependencies, + moduleExports: moduleDefinition, + }) return loaded[Modules.WORKFLOW_ENGINE] } diff --git a/packages/workflow-engine-redis/src/services/workflows-module.ts b/packages/workflow-engine-redis/src/services/workflows-module.ts index 6667236626790..8701e6d26d219 100644 --- a/packages/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/workflow-engine-redis/src/services/workflows-module.ts @@ -10,14 +10,17 @@ import { InjectManager, InjectSharedContext, MedusaContext, + MedusaError, + isString, } from "@medusajs/utils" import type { + IWorkflowEngineService, ReturnWorkflow, UnwrapWorkflowInputDataType, WorkflowOrchestratorTypes, } from "@medusajs/workflows-sdk" -import {WorkflowOrchestratorService} from "@services" -import {joinerConfig} from "../joiner-config" +import { WorkflowOrchestratorService } from "@services" +import { joinerConfig } from "../joiner-config" type InjectedDependencies = { baseRepository: DAL.RepositoryService @@ -25,9 +28,7 @@ type InjectedDependencies = { workflowOrchestratorService: WorkflowOrchestratorService } -export class WorkflowsModuleService - implements WorkflowOrchestratorTypes.IWorkflowsModuleService -{ +export class WorkflowsModuleService implements IWorkflowEngineService { protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.InternalModuleService protected workflowOrchestratorService_: WorkflowOrchestratorService @@ -49,12 +50,70 @@ export class WorkflowsModuleService return joinerConfig } + @InjectManager("baseRepository_") + async retrieveWorkflowExecution( + idOrObject: + | string + | { + workflow_id: string + transaction_id: string + }, + config: FindConfig = {}, + @MedusaContext() sharedContext: Context = {} + ): Promise { + const objValue = isString(idOrObject) + ? { id: idOrObject } + : { + workflow_id: idOrObject.workflow_id, + transaction_id: idOrObject.transaction_id, + } + + const wfExecution = await this.workflowExecutionService_.list( + objValue, + config, + sharedContext + ) + + if (wfExecution.length === 0) { + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `WorkflowExecution with ${Object.keys(objValue).join( + ", " + )}: ${Object.values(objValue).join(", ")} was not found` + ) + } + + // eslint-disable-next-line max-len + return await this.baseRepository_.serialize( + wfExecution[0], + { + populate: true, + } + ) + } + @InjectManager("baseRepository_") async listWorkflowExecution( filters: WorkflowOrchestratorTypes.FilterableWorkflowExecutionProps = {}, config: FindConfig = {}, @MedusaContext() sharedContext: Context = {} ): Promise { + if (filters.transaction_id) { + if (Array.isArray(filters.transaction_id)) { + filters.transaction_id = { + $in: filters.transaction_id, + } + } + } + + if (filters.workflow_id) { + if (Array.isArray(filters.workflow_id)) { + filters.workflow_id = { + $in: filters.workflow_id, + } + } + } + const wfExecutions = await this.workflowExecutionService_.list( filters, config, @@ -74,6 +133,22 @@ export class WorkflowsModuleService config: FindConfig = {}, @MedusaContext() sharedContext: Context = {} ): Promise<[WorkflowOrchestratorTypes.WorkflowExecutionDTO[], number]> { + if (filters.transaction_id) { + if (Array.isArray(filters.transaction_id)) { + filters.transaction_id = { + $in: filters.transaction_id, + } + } + } + + if (filters.workflow_id) { + if (Array.isArray(filters.workflow_id)) { + filters.workflow_id = { + $in: filters.workflow_id, + } + } + } + const [wfExecutions, count] = await this.workflowExecutionService_.listAndCount( filters, diff --git a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 512c8e7cf6760..4b33f8e37e807 100644 --- a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -4,11 +4,9 @@ import { TransactionCheckpoint, TransactionStep, } from "@medusajs/orchestration" -import { TransactionState } from "@medusajs/utils" import { ModulesSdkTypes } from "@medusajs/types" -import { - WorkflowOrchestratorService, -} from "@services" +import { TransactionState } from "@medusajs/utils" +import { WorkflowOrchestratorService } from "@services" import { Queue, Worker } from "bullmq" import Redis from "ioredis" @@ -21,7 +19,7 @@ enum JobType { // eslint-disable-next-line max-len export class RedisDistributedTransactionStorage extends DistributedTransactionStorage { private static TTL_AFTER_COMPLETED = 60 * 15 // 15 minutes - private workflowExecutionService_: ModulesSdkTypes.InternalModuleService + private workflowExecutionService_: ModulesSdkTypes.InternalModuleService private workflowOrchestratorService_: WorkflowOrchestratorService private redisClient: Redis @@ -34,7 +32,7 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt redisWorkerConnection, redisQueueName, }: { - workflowExecutionService: ModulesSdkTypes.InternalModuleService, + workflowExecutionService: ModulesSdkTypes.InternalModuleService redisConnection: Redis redisWorkerConnection: Redis redisQueueName: string @@ -161,33 +159,28 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt }) } + const stringifiedData = JSON.stringify(data, this.stringifyWithSymbol) + const parsedData = JSON.parse(stringifiedData) + if (!hasFinished) { if (ttl) { - await this.redisClient.set( - key, - JSON.stringify(data, this.stringifyWithSymbol), - "EX", - ttl - ) + await this.redisClient.set(key, stringifiedData, "EX", ttl) } else { - await this.redisClient.set( - key, - JSON.stringify(data, this.stringifyWithSymbol) - ) + await this.redisClient.set(key, stringifiedData) } } if (hasFinished && !retentionTime) { - await this.deleteFromDb(data) + await this.deleteFromDb(parsedData) } else { - await this.saveToDb(data) + await this.saveToDb(parsedData) } if (hasFinished) { // await this.redisClient.del(key) await this.redisClient.set( key, - JSON.stringify(data, this.stringifyWithSymbol), + stringifiedData, "EX", RedisDistributedTransactionStorage.TTL_AFTER_COMPLETED ) diff --git a/packages/workflows-sdk/src/index.ts b/packages/workflows-sdk/src/index.ts index cec02f1e03298..f9fd0e20c7d9a 100644 --- a/packages/workflows-sdk/src/index.ts +++ b/packages/workflows-sdk/src/index.ts @@ -1,5 +1,6 @@ export * from "./helper" export * from "./medusa-workflow" export * as WorkflowOrchestratorTypes from "./types" +export { IWorkflowEngineService } from "./types/service" export * from "./utils/composer" export * as Composer from "./utils/composer" diff --git a/packages/workflows-sdk/src/types/common.ts b/packages/workflows-sdk/src/types/common.ts index f3a81e72713fa..c8859970a3d2d 100644 --- a/packages/workflows-sdk/src/types/common.ts +++ b/packages/workflows-sdk/src/types/common.ts @@ -1,4 +1,4 @@ -import { BaseFilterable } from "@medusajs/types" +import { BaseFilterable, OperatorMap } from "@medusajs/types" export interface WorkflowExecutionDTO { id: string @@ -14,8 +14,8 @@ export interface WorkflowExecutionDTO { export interface FilterableWorkflowExecutionProps extends BaseFilterable { - id?: string[] - workflow_id?: string[] - transaction_id?: string[] - state?: any[] + id?: string | string[] | OperatorMap + workflow_id?: string | string[] | OperatorMap + transaction_id?: string | string[] | OperatorMap + state?: string | string[] | OperatorMap } diff --git a/packages/workflows-sdk/src/types/service.ts b/packages/workflows-sdk/src/types/service.ts index ed055e39e672f..a2cfa9aa48686 100644 --- a/packages/workflows-sdk/src/types/service.ts +++ b/packages/workflows-sdk/src/types/service.ts @@ -31,7 +31,18 @@ export type IdempotencyKeyParts = { action: "invoke" | "compensate" } -export interface IWorkflowsModuleService extends IModuleService { +export interface IWorkflowEngineService extends IModuleService { + retrieveWorkflowExecution( + idOrObject: + | string + | { + workflow_id: string + transaction_id: string + }, + config?: FindConfig, + sharedContext?: Context + ): Promise + listWorkflowExecution( filters?: FilterableWorkflowExecutionProps, config?: FindConfig, @@ -88,7 +99,7 @@ export interface IWorkflowsModuleService extends IModuleService { stepResponse, options, }: { - idempotencyKey: string | object + idempotencyKey: string | IdempotencyKeyParts stepResponse: unknown options?: Record }, diff --git a/yarn.lock b/yarn.lock index 73d0800a2f9eb..2048341339f69 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8876,7 +8876,7 @@ __metadata: languageName: unknown linkType: soft -"@medusajs/workflow-engine-inmemory@workspace:packages/workflow-engine-inmemory": +"@medusajs/workflow-engine-inmemory@workspace:*, @medusajs/workflow-engine-inmemory@workspace:packages/workflow-engine-inmemory": version: 0.0.0-use.local resolution: "@medusajs/workflow-engine-inmemory@workspace:packages/workflow-engine-inmemory" dependencies: @@ -31546,6 +31546,7 @@ __metadata: "@medusajs/cache-inmemory": "workspace:*" "@medusajs/event-bus-local": "workspace:*" "@medusajs/medusa": "workspace:*" + "@medusajs/workflow-engine-inmemory": "workspace:*" babel-preset-medusa-package: "*" faker: ^5.5.3 jest: ^26.6.3 @@ -31576,6 +31577,7 @@ __metadata: "@medusajs/region": "workspace:^" "@medusajs/types": "workspace:^" "@medusajs/utils": "workspace:^" + "@medusajs/workflow-engine-inmemory": "workspace:*" babel-preset-medusa-package: "*" faker: ^5.5.3 jest: ^26.6.3