From 4c14815d6a0db1592e6eae83745ccd8a9b0eb74d Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Tue, 6 Feb 2024 15:45:00 -0300 Subject: [PATCH 1/6] feat(medusa): workflow engine api --- integration-tests/api/medusa-config.js | 1 + integration-tests/api/package.json | 1 + .../plugins/__tests__/workflow-engine/api.ts | 260 ++++++++++++++++++ integration-tests/plugins/medusa-config.js | 1 + integration-tests/plugins/package.json | 1 + .../[step_id]/failure/route.ts | 42 +++ .../[step_id]/subscribe/route.ts | 65 +++++ .../[step_id]/success/route.ts | 42 +++ .../workflows/[id]/[transaction_id]/route.ts | 26 ++ .../api-v2/admin/workflows/[id]/run/route.ts | 34 +++ .../admin/workflows/[id]/subscribe/route.ts | 60 ++++ .../api-v2/admin/workflows/execution/route.ts | 43 +++ .../src/api-v2/admin/workflows/middlewares.ts | 47 ++++ .../api-v2/admin/workflows/query-config.ts | 36 +++ .../src/api-v2/admin/workflows/validators.ts | 48 ++++ packages/medusa/src/api-v2/middlewares.ts | 6 +- .../integration-tests/__tests__/index.spec.ts | 7 +- .../src/initialize/index.ts | 26 +- .../src/services/workflows-module.ts | 42 ++- .../utils/workflow-orchestrator-storage.ts | 25 +- .../integration-tests/__tests__/index.spec.ts | 7 +- .../src/initialize/index.ts | 27 +- .../src/services/workflows-module.ts | 46 +++- .../utils/workflow-orchestrator-storage.ts | 31 +-- packages/workflows-sdk/src/index.ts | 1 + packages/workflows-sdk/src/types/service.ts | 13 +- yarn.lock | 4 +- 27 files changed, 871 insertions(+), 71 deletions(-) create mode 100644 integration-tests/plugins/__tests__/workflow-engine/api.ts create mode 100644 packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/failure/route.ts create mode 100644 packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/subscribe/route.ts create mode 100644 packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/success/route.ts create mode 100644 packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/route.ts create mode 100644 packages/medusa/src/api-v2/admin/workflows/[id]/run/route.ts create mode 100644 packages/medusa/src/api-v2/admin/workflows/[id]/subscribe/route.ts create mode 100644 packages/medusa/src/api-v2/admin/workflows/execution/route.ts create mode 100644 packages/medusa/src/api-v2/admin/workflows/middlewares.ts create mode 100644 packages/medusa/src/api-v2/admin/workflows/query-config.ts create mode 100644 packages/medusa/src/api-v2/admin/workflows/validators.ts diff --git a/integration-tests/api/medusa-config.js b/integration-tests/api/medusa-config.js index 3438fc82ccb11..e3ff70f29f99f 100644 --- a/integration-tests/api/medusa-config.js +++ b/integration-tests/api/medusa-config.js @@ -25,5 +25,6 @@ module.exports = { resolve: "@medusajs/cache-inmemory", options: { ttl: cacheTTL }, }, + workflows: true, }, } diff --git a/integration-tests/api/package.json b/integration-tests/api/package.json index fa7cbc9d27fe0..c2c76e1ebc778 100644 --- a/integration-tests/api/package.json +++ b/integration-tests/api/package.json @@ -12,6 +12,7 @@ "@medusajs/cache-inmemory": "workspace:*", "@medusajs/event-bus-local": "workspace:*", "@medusajs/medusa": "workspace:*", + "@medusajs/workflow-engine-inmemory": "workspace:*", "faker": "^5.5.3", "medusa-interfaces": "workspace:*", "pg": "^8.11.0", diff --git a/integration-tests/plugins/__tests__/workflow-engine/api.ts b/integration-tests/plugins/__tests__/workflow-engine/api.ts new file mode 100644 index 0000000000000..eafd68bfa4864 --- /dev/null +++ b/integration-tests/plugins/__tests__/workflow-engine/api.ts @@ -0,0 +1,260 @@ +import { useApi } from "../../../environment-helpers/use-api" +import { initDb, useDb } from "../../../environment-helpers/use-db" + +import { + StepResponse, + WorkflowData, + createStep, + createWorkflow, +} from "@medusajs/workflows-sdk" +import { AxiosInstance } from "axios" +import path from "path" +import { startBootstrapApp } from "../../../environment-helpers/bootstrap-app" +import { getContainer } from "../../../environment-helpers/use-container" +import adminSeeder from "../../../helpers/admin-seeder" + +jest.setTimeout(5000000) + +const adminHeaders = { + headers: { + "x-medusa-access-token": "test_token", + }, +} + +const env = { + MEDUSA_FF_MEDUSA_V2: true, +} + +describe("Workflow Engine API", () => { + let medusaContainer + let dbConnection + let shutdownServer + + beforeAll(async () => { + const cwd = path.resolve(path.join(__dirname, "..", "..")) + dbConnection = await initDb({ cwd, env } as any) + shutdownServer = await startBootstrapApp({ cwd, env }) + medusaContainer = getContainer() + + await adminSeeder(dbConnection) + }) + + afterAll(async () => { + const db = useDb() + await db.shutdown() + await shutdownServer() + }) + + describe("running workflows", () => { + beforeAll(async () => { + const step1 = createStep( + { + name: "my-step", + }, + async (input: { initial: string }) => { + return new StepResponse({ + result: input.initial, + }) + } + ) + const step2 = createStep( + { + name: "my-step-async", + async: true, + }, + async () => {} + ) + + createWorkflow( + { + name: "my-workflow-name", + retentionTime: 1000, + }, + function (input: WorkflowData<{ initial: string }>) { + step1(input) + return step2() + } + ) + }) + + it("Should list all workflows in execution or completed", async () => { + const api = useApi()! as AxiosInstance + + for (let i = 3; i--; ) { + await api.post( + `/admin/workflows/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + }, + adminHeaders + ) + } + + const executions = await api.get( + `/admin/workflows/execution`, + adminHeaders + ) + + expect(executions.data.count).toEqual(3) + expect(executions.data.workflow_executions.length).toEqual(3) + expect(executions.data.workflow_executions[0]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.any(String), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + }) + + it("Should list all workflows matching the filters", async () => { + const api = useApi()! as AxiosInstance + + for (let i = 3; i--; ) { + await api.post( + `/admin/workflows/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + transaction_id: "transaction_" + (i + 1), + }, + adminHeaders + ) + } + + const executions = await api.get( + `/admin/workflows/execution?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, + adminHeaders + ) + + expect(executions.data.count).toEqual(2) + expect(executions.data.workflow_executions.length).toEqual(2) + expect(executions.data.workflow_executions[0]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.stringMatching(/transaction_1|transaction_2/), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + expect(executions.data.workflow_executions[1]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.stringMatching(/transaction_1|transaction_2/), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + }) + + it("Should execute a workflow and retrieve its execution while running and when it is completed", async () => { + const api = useApi()! as AxiosInstance + + const wf = await api.post( + `/admin/workflows/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + transaction_id: "trx_123", + }, + adminHeaders + ) + + expect(wf.data).toEqual({ + acknowledgement: { + transactionId: "trx_123", + workflowId: "my-workflow-name", + }, + }) + + const execution = await api.get( + `/admin/workflows/my-workflow-name/trx_123`, + adminHeaders + ) + + expect(execution.data).toEqual({ + workflow_execution: expect.objectContaining({ + workflow_id: "my-workflow-name", + transaction_id: "trx_123", + id: expect.any(String), + state: "invoking", + execution: expect.objectContaining({ + hasAsyncSteps: true, + hasFailedSteps: false, + hasSkippedSteps: false, + hasWaitingSteps: false, + hasRevertedSteps: false, + }), + context: expect.objectContaining({ + data: expect.objectContaining({ + invoke: { + "my-step": { + __type: "WorkflowWorkflowData", + output: { + __type: "WorkflowStepResponse", + output: { + result: "abc", + }, + compensateInput: { + result: "abc", + }, + }, + }, + }, + payload: { + initial: "abc", + }, + }), + }), + }), + }) + + const respondAsync = await api.post( + `/admin/workflows/my-workflow-name/trx_123/my-step-async/success`, + { + response: { + all: "good", + }, + }, + adminHeaders + ) + + expect(respondAsync.data.success).toEqual(true) + + const completed = await api.get( + `/admin/workflows/my-workflow-name/trx_123`, + adminHeaders + ) + + expect(completed.data).toEqual({ + workflow_execution: expect.objectContaining({ + workflow_id: "my-workflow-name", + transaction_id: "trx_123", + state: "done", + context: expect.objectContaining({ + data: expect.objectContaining({ + invoke: expect.objectContaining({ + "my-step-async": { + __type: "WorkflowStepResponse", + output: { + all: "good", + }, + compensateInput: { + all: "good", + }, + }, + }), + }), + }), + }), + }) + }) + }) +}) diff --git a/integration-tests/plugins/medusa-config.js b/integration-tests/plugins/medusa-config.js index fdfad225dff1c..d4d3dfe3aad89 100644 --- a/integration-tests/plugins/medusa-config.js +++ b/integration-tests/plugins/medusa-config.js @@ -91,5 +91,6 @@ module.exports = { resources: "shared", resolve: "@medusajs/cart", }, + [Modules.WORKFLOW_ENGINE]: true, }, } diff --git a/integration-tests/plugins/package.json b/integration-tests/plugins/package.json index 17c355023ecc4..f15ee71e38045 100644 --- a/integration-tests/plugins/package.json +++ b/integration-tests/plugins/package.json @@ -20,6 +20,7 @@ "@medusajs/product": "workspace:^", "@medusajs/promotion": "workspace:^", "@medusajs/utils": "workspace:^", + "@medusajs/workflow-engine-inmemory": "workspace:*", "faker": "^5.5.3", "medusa-fulfillment-webshipper": "workspace:*", "medusa-interfaces": "workspace:*", diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/failure/route.ts b/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/failure/route.ts new file mode 100644 index 0000000000000..917fc490304f2 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/failure/route.ts @@ -0,0 +1,42 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { TransactionHandlerType, isDefined } from "@medusajs/utils" +import { IWorkflowEngineService, StepResponse } from "@medusajs/workflows-sdk" +import { + MedusaRequest, + MedusaResponse, +} from "../../../../../../../types/routing" +import { AdminPostWorkflowsAsyncResponseReq } from "../../../../validators" + +export const POST = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id, transaction_id, step_id } = req.params + + const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + + const compensateInput = body.compensate_input + const stepResponse = isDefined(body.response) + ? new StepResponse(body.response, compensateInput) + : undefined + const stepAction = body.action || TransactionHandlerType.INVOKE + + await workflowEngineService.setStepFailure({ + idempotencyKey: { + action: stepAction, + transactionId: transaction_id, + stepId: step_id, + workflowId: workflow_id, + }, + stepResponse, + options: { + container: req.scope, + context: { + requestId: req.requestId, + }, + }, + }) + + return res.status(200).json({ success: true }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/subscribe/route.ts b/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/subscribe/route.ts new file mode 100644 index 0000000000000..653a29a84fa9c --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/subscribe/route.ts @@ -0,0 +1,65 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { + MedusaRequest, + MedusaResponse, +} from "../../../../../../../types/routing" + +export const GET = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id, transaction_id } = req.query as any + + const subscriberId = "__sub__" + Math.random().toString(36).substring(2, 9) + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }) + + req.on("close", () => { + res.end() + + void workflowEngineService.unsubscribe({ + workflowId: workflow_id, + transactionId: transaction_id, + subscriberOrId: subscriberId, + }) + }) + + req.on("error", (err: any) => { + if (err.code === "ECONNRESET") { + res.end() + } + }) + + void workflowEngineService.subscribe({ + workflowId: workflow_id, + transactionId: transaction_id, + subscriber: async (args) => { + const { + eventType, + workflowId, + transactionId, + step, + response, + result, + errors, + } = args + + const data = { + event_type: eventType, + workflow_id: workflowId, + transaction_id: transactionId, + step, + response, + result, + errors, + } + res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`) + }, + subscriberId, + }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/success/route.ts b/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/success/route.ts new file mode 100644 index 0000000000000..fc2201b83f13d --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/success/route.ts @@ -0,0 +1,42 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { TransactionHandlerType, isDefined } from "@medusajs/utils" +import { IWorkflowEngineService, StepResponse } from "@medusajs/workflows-sdk" +import { + MedusaRequest, + MedusaResponse, +} from "../../../../../../../types/routing" +import { AdminPostWorkflowsAsyncResponseReq } from "../../../../validators" + +export const POST = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id, transaction_id, step_id } = req.params + + const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + + const compensateInput = body.compensate_input + const stepResponse = isDefined(body.response) + ? new StepResponse(body.response, compensateInput) + : undefined + const stepAction = body.action || TransactionHandlerType.INVOKE + + await workflowEngineService.setStepSuccess({ + idempotencyKey: { + action: stepAction, + transactionId: transaction_id, + stepId: step_id, + workflowId: workflow_id, + }, + stepResponse, + options: { + container: req.scope, + context: { + requestId: req.requestId, + }, + }, + }) + + return res.status(200).json({ success: true }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/route.ts b/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/route.ts new file mode 100644 index 0000000000000..516035a597967 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/route.ts @@ -0,0 +1,26 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../../types/routing" + +export const GET = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id, transaction_id } = req.params + + const execution = await workflowEngineService.retrieveWorkflowExecution( + { + workflow_id, + transaction_id, + }, + { + select: req.retrieveConfig.select, + relations: req.retrieveConfig.relations, + } + ) + + res.status(200).json({ + workflow_execution: execution, + }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/run/route.ts b/packages/medusa/src/api-v2/admin/workflows/[id]/run/route.ts new file mode 100644 index 0000000000000..8248a986a93c7 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows/[id]/run/route.ts @@ -0,0 +1,34 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { + IWorkflowEngineService, + WorkflowOrchestratorTypes, +} from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../../types/routing" +import { AdminPostWorkflowsRunReq } from "../../validators" + +export const POST = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id } = req.params + + const { transaction_id, input } = + req.validatedBody as AdminPostWorkflowsRunReq + + const options = { + transactionId: transaction_id, + input, + context: { + requestId: req.requestId, + }, + throwOnError: false, + } as WorkflowOrchestratorTypes.WorkflowOrchestratorRunDTO + + const { acknowledgement } = await workflowEngineService.run( + workflow_id, + options + ) + + return res.status(200).json({ acknowledgement }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/subscribe/route.ts b/packages/medusa/src/api-v2/admin/workflows/[id]/subscribe/route.ts new file mode 100644 index 0000000000000..aeb30187537e2 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows/[id]/subscribe/route.ts @@ -0,0 +1,60 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../../types/routing" + +export const GET = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id } = req.query as any + + const subscriberId = "__sub__" + Math.random().toString(36).substring(2, 9) + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }) + + req.on("close", () => { + res.end() + + void workflowEngineService.unsubscribe({ + workflowId: workflow_id, + subscriberOrId: subscriberId, + }) + }) + + req.on("error", (err: any) => { + if (err.code === "ECONNRESET") { + res.end() + } + }) + + void workflowEngineService.subscribe({ + workflowId: workflow_id, + subscriber: async (args) => { + const { + eventType, + workflowId, + transactionId, + step, + response, + result, + errors, + } = args + + const data = { + event_type: eventType, + workflow_id: workflowId, + transaction_id: transactionId, + step, + response, + result, + errors, + } + res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`) + }, + subscriberId, + }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows/execution/route.ts b/packages/medusa/src/api-v2/admin/workflows/execution/route.ts new file mode 100644 index 0000000000000..d5d87190b6631 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows/execution/route.ts @@ -0,0 +1,43 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" + +export const GET = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const listConfig = req.listConfig + + const { transaction_id, workflow_id } = req.filterableFields + + const transactionIds = Array.isArray(transaction_id) + ? transaction_id + : [transaction_id] + const workflowIds = Array.isArray(workflow_id) ? workflow_id : [workflow_id] + + const filters = {} + + if (transaction_id) { + filters["transaction_id"] = transactionIds + } + + if (workflow_id) { + filters["workflow_id"] = workflowIds + } + + const [workflow_executions, count] = + await workflowEngineService.listAndCountWorkflowExecution(filters, { + select: req.listConfig.select, + relations: req.listConfig.relations, + skip: listConfig.skip, + take: listConfig.take, + }) + + res.json({ + workflow_executions, + count, + offset: listConfig.skip, + limit: listConfig.take, + }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows/middlewares.ts b/packages/medusa/src/api-v2/admin/workflows/middlewares.ts new file mode 100644 index 0000000000000..f24c1fdb45ed5 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows/middlewares.ts @@ -0,0 +1,47 @@ +import { transformBody, transformQuery } from "../../../api/middlewares" +import { MiddlewareRoute } from "../../../loaders/helpers/routing/types" +import * as QueryConfig from "./query-config" +import { + AdminGetWorkflowExecutionDetailsParams, + AdminGetWorkflowExecutionsParams, + AdminPostWorkflowsAsyncResponseReq, + AdminPostWorkflowsRunReq, +} from "./validators" + +export const adminWorkflowsMiddlewares: MiddlewareRoute[] = [ + { + method: ["GET"], + matcher: "/admin/workflows/execution", + middlewares: [ + transformQuery( + AdminGetWorkflowExecutionsParams, + QueryConfig.listTransformQueryConfig + ), + ], + }, + { + method: ["GET"], + matcher: "/admin/workflows/:id/:transaction_id", + middlewares: [ + transformQuery( + AdminGetWorkflowExecutionDetailsParams, + QueryConfig.retrieveTransformQueryConfig + ), + ], + }, + { + method: ["POST"], + matcher: "/admin/workflows/:id/run", + middlewares: [transformBody(AdminPostWorkflowsRunReq)], + }, + { + method: ["POST"], + matcher: "/admin/workflows/:id/:transaction_id/:step_id/success", + middlewares: [transformBody(AdminPostWorkflowsAsyncResponseReq)], + }, + { + method: ["POST"], + matcher: "/admin/workflows/:id/:transaction_id/:step_id/failure", + middlewares: [transformBody(AdminPostWorkflowsAsyncResponseReq)], + }, +] diff --git a/packages/medusa/src/api-v2/admin/workflows/query-config.ts b/packages/medusa/src/api-v2/admin/workflows/query-config.ts new file mode 100644 index 0000000000000..3fd0fb7a18914 --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows/query-config.ts @@ -0,0 +1,36 @@ +export const defaultAdminWorkflowExecutionsRelations = [] +export const allowedAdminWorkflowExecutionsRelations = [] +export const defaultAdminWorkflowExecutionsFields = [ + "id", + "workflow_id", + "transaction_id", + "state", + "created_at", + "updated_at", + "deleted_at", +] + +export const defaultAdminWorkflowExecutionDetailFields = [ + "id", + "workflow_id", + "transaction_id", + "context", + "execution", + "state", + "created_at", + "updated_at", + "deleted_at", +] + +export const retrieveTransformQueryConfig = { + defaultFields: defaultAdminWorkflowExecutionDetailFields, + defaultRelations: defaultAdminWorkflowExecutionsRelations, + allowedRelations: allowedAdminWorkflowExecutionsRelations, + isList: false, +} + +export const listTransformQueryConfig = { + ...retrieveTransformQueryConfig, + defaultFields: defaultAdminWorkflowExecutionsFields, + isList: true, +} diff --git a/packages/medusa/src/api-v2/admin/workflows/validators.ts b/packages/medusa/src/api-v2/admin/workflows/validators.ts new file mode 100644 index 0000000000000..8e898a627f0bf --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows/validators.ts @@ -0,0 +1,48 @@ +import { TransactionHandlerType } from "@medusajs/utils" +import { Transform } from "class-transformer" +import { IsEnum, IsOptional, IsString } from "class-validator" +import { FindParams, extendedFindParamsMixin } from "../../../types/common" +import { IsType } from "../../../utils" + +export class AdminGetWorkflowExecutionDetailsParams extends FindParams {} + +export class AdminGetWorkflowExecutionsParams extends extendedFindParamsMixin({ + limit: 100, + offset: 0, +}) { + /** + * transaction id(s) to filter workflow executions by transaction_id. + */ + @IsOptional() + @IsType([String, [String]]) + transaction_id?: string | string[] + + /** + * workflow id(s) to filter workflow executions by workflow_id + */ + @IsOptional() + @IsType([String, [String]]) + workflow_id?: string | string[] +} + +export class AdminPostWorkflowsRunReq { + @IsOptional() + input?: unknown + + @IsOptional() + @IsString() + transaction_id?: string +} + +export class AdminPostWorkflowsAsyncResponseReq { + @IsOptional() + response?: unknown + + @IsOptional() + compensate_input?: unknown + + @IsOptional() + @Transform(({ value }) => (value + "").toLowerCase()) + @IsEnum(TransactionHandlerType) + action?: TransactionHandlerType +} diff --git a/packages/medusa/src/api-v2/middlewares.ts b/packages/medusa/src/api-v2/middlewares.ts index e7e440d096351..30f660cb1aaca 100644 --- a/packages/medusa/src/api-v2/middlewares.ts +++ b/packages/medusa/src/api-v2/middlewares.ts @@ -1,11 +1,12 @@ import { MiddlewaresConfig } from "../loaders/helpers/routing/types" import { adminCampaignRoutesMiddlewares } from "./admin/campaigns/middlewares" import { adminCustomerGroupRoutesMiddlewares } from "./admin/customer-groups/middlewares" -import { storeCustomerRoutesMiddlewares } from "./store/customers/middlewares" import { adminCustomerRoutesMiddlewares } from "./admin/customers/middlewares" import { adminPromotionRoutesMiddlewares } from "./admin/promotions/middlewares" -import { storeCartRoutesMiddlewares } from "./store/carts/middlewares" +import { adminWorkflowsMiddlewares } from "./admin/workflows/middlewares" import { authRoutesMiddlewares } from "./auth/middlewares" +import { storeCartRoutesMiddlewares } from "./store/carts/middlewares" +import { storeCustomerRoutesMiddlewares } from "./store/customers/middlewares" export const config: MiddlewaresConfig = { routes: [ @@ -16,5 +17,6 @@ export const config: MiddlewaresConfig = { ...storeCustomerRoutesMiddlewares, ...storeCartRoutesMiddlewares, ...authRoutesMiddlewares, + ...adminWorkflowsMiddlewares, ], } diff --git a/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 11b92ab0cb53c..c4e6c0a209587 100644 --- a/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -1,7 +1,7 @@ import { MedusaApp } from "@medusajs/modules-sdk" import { RemoteJoinerQuery } from "@medusajs/types" import { TransactionHandlerType } from "@medusajs/utils" -import { IWorkflowsModuleService } from "@medusajs/workflows-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { knex } from "knex" import { setTimeout } from "timers/promises" import "../__fixtures__" @@ -22,7 +22,7 @@ const afterEach_ = async () => { describe("Workflow Orchestrator module", function () { describe("Testing basic workflow", function () { - let workflowOrcModule: IWorkflowsModuleService + let workflowOrcModule: IWorkflowEngineService let query: ( query: string | RemoteJoinerQuery | object, variables?: Record @@ -52,8 +52,7 @@ describe("Workflow Orchestrator module", function () { await runMigrations() - workflowOrcModule = - modules.workflows as unknown as IWorkflowsModuleService + workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService }) afterEach(afterEach_) diff --git a/packages/workflow-engine-inmemory/src/initialize/index.ts b/packages/workflow-engine-inmemory/src/initialize/index.ts index 20f4f49231b99..dafe1ddc8f328 100644 --- a/packages/workflow-engine-inmemory/src/initialize/index.ts +++ b/packages/workflow-engine-inmemory/src/initialize/index.ts @@ -1,12 +1,12 @@ import { ExternalModuleDeclaration, InternalModuleDeclaration, - MedusaModule, MODULE_PACKAGE_NAMES, + MedusaModule, Modules, } from "@medusajs/modules-sdk" import { ModulesSdkTypes } from "@medusajs/types" -import { WorkflowOrchestratorTypes } from "@medusajs/workflows-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { moduleDefinition } from "../module-definition" import { InitializeModuleInjectableDependencies } from "../types" @@ -17,20 +17,18 @@ export const initialize = async ( | ExternalModuleDeclaration | InternalModuleDeclaration, injectedDependencies?: InitializeModuleInjectableDependencies -): Promise => { +): Promise => { const loaded = // eslint-disable-next-line max-len - await MedusaModule.bootstrap( - { - moduleKey: Modules.WORKFLOW_ENGINE, - defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE], - declaration: options as - | InternalModuleDeclaration - | ExternalModuleDeclaration, - injectedDependencies, - moduleExports: moduleDefinition, - } - ) + await MedusaModule.bootstrap({ + moduleKey: Modules.WORKFLOW_ENGINE, + defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE], + declaration: options as + | InternalModuleDeclaration + | ExternalModuleDeclaration, + injectedDependencies, + moduleExports: moduleDefinition, + }) return loaded[Modules.WORKFLOW_ENGINE] } diff --git a/packages/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/workflow-engine-inmemory/src/services/workflows-module.ts index 789fa5760092b..d6b055fe3422c 100644 --- a/packages/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/workflow-engine-inmemory/src/services/workflows-module.ts @@ -10,8 +10,10 @@ import { InjectManager, InjectSharedContext, MedusaContext, + MedusaError, } from "@medusajs/utils" import type { + IWorkflowEngineService, ReturnWorkflow, UnwrapWorkflowInputDataType, WorkflowOrchestratorTypes, @@ -25,9 +27,7 @@ type InjectedDependencies = { workflowOrchestratorService: WorkflowOrchestratorService } -export class WorkflowsModuleService - implements WorkflowOrchestratorTypes.IWorkflowsModuleService -{ +export class WorkflowsModuleService implements IWorkflowEngineService { protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.InternalModuleService protected workflowOrchestratorService_: WorkflowOrchestratorService @@ -49,6 +49,42 @@ export class WorkflowsModuleService return joinerConfig } + @InjectManager("baseRepository_") + async retrieveWorkflowExecution( + idOrObject: { + workflow_id: string + transaction_id: string + }, + config: FindConfig = {}, + @MedusaContext() sharedContext: Context = {} + ): Promise { + const wfExecution = await this.workflowExecutionService_.list( + { + workflow_id: idOrObject.workflow_id, + transaction_id: idOrObject.transaction_id, + }, + config, + sharedContext + ) + + if (wfExecution.length === 0) { + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `WorkflowExecution with workflow_id, transaction_id: ${Object.values( + idOrObject + ).join(", ")} was not found` + ) + } + + // eslint-disable-next-line max-len + return await this.baseRepository_.serialize( + wfExecution[0], + { + populate: true, + } + ) + } + @InjectManager("baseRepository_") async listWorkflowExecution( filters: WorkflowOrchestratorTypes.FilterableWorkflowExecutionProps = {}, diff --git a/packages/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 60c9771def6b7..e138768aef805 100644 --- a/packages/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -4,9 +4,9 @@ import { TransactionCheckpoint, TransactionStep, } from "@medusajs/orchestration" +import { ModulesSdkTypes } from "@medusajs/types" import { TransactionState } from "@medusajs/utils" import { WorkflowOrchestratorService } from "@services" -import { ModulesSdkTypes } from "@medusajs/types" // eslint-disable-next-line max-len export class InMemoryDistributedTransactionStorage extends DistributedTransactionStorage { @@ -55,6 +55,22 @@ export class InMemoryDistributedTransactionStorage extends DistributedTransactio ]) } + private stringifyWithSymbol(key, value) { + if (key === "__type" && typeof value === "symbol") { + return Symbol.keyFor(value) + } + + return value + } + + private jsonWithSymbol(key, value) { + if (key === "__type" && typeof value === "string") { + return Symbol.for(value) + } + + return value + } + async get(key: string): Promise { return this.storage.get(key) } @@ -89,10 +105,13 @@ export class InMemoryDistributedTransactionStorage extends DistributedTransactio }) } + const stringifiedData = JSON.stringify(data, this.stringifyWithSymbol) + const parsedData = JSON.parse(stringifiedData) + if (hasFinished && !retentionTime) { - await this.deleteFromDb(data) + await this.deleteFromDb(parsedData) } else { - await this.saveToDb(data) + await this.saveToDb(parsedData) } if (hasFinished) { diff --git a/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 802fff34187d1..9d72daafb1bf8 100644 --- a/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -5,7 +5,7 @@ import { } from "@medusajs/orchestration" import { RemoteJoinerQuery } from "@medusajs/types" import { TransactionHandlerType } from "@medusajs/utils" -import { IWorkflowsModuleService } from "@medusajs/workflows-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { knex } from "knex" import { setTimeout } from "timers/promises" import "../__fixtures__" @@ -26,7 +26,7 @@ const afterEach_ = async () => { describe("Workflow Orchestrator module", function () { describe("Testing basic workflow", function () { - let workflowOrcModule: IWorkflowsModuleService + let workflowOrcModule: IWorkflowEngineService let query: ( query: string | RemoteJoinerQuery | object, variables?: Record @@ -61,8 +61,7 @@ describe("Workflow Orchestrator module", function () { await runMigrations() - workflowOrcModule = - modules.workflows as unknown as IWorkflowsModuleService + workflowOrcModule = modules.workflows as unknown as IWorkflowEngineService }) afterEach(afterEach_) diff --git a/packages/workflow-engine-redis/src/initialize/index.ts b/packages/workflow-engine-redis/src/initialize/index.ts index 20f4f49231b99..b548387c5755f 100644 --- a/packages/workflow-engine-redis/src/initialize/index.ts +++ b/packages/workflow-engine-redis/src/initialize/index.ts @@ -1,12 +1,13 @@ import { ExternalModuleDeclaration, InternalModuleDeclaration, - MedusaModule, MODULE_PACKAGE_NAMES, + MedusaModule, Modules, } from "@medusajs/modules-sdk" + import { ModulesSdkTypes } from "@medusajs/types" -import { WorkflowOrchestratorTypes } from "@medusajs/workflows-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" import { moduleDefinition } from "../module-definition" import { InitializeModuleInjectableDependencies } from "../types" @@ -17,20 +18,18 @@ export const initialize = async ( | ExternalModuleDeclaration | InternalModuleDeclaration, injectedDependencies?: InitializeModuleInjectableDependencies -): Promise => { +): Promise => { const loaded = // eslint-disable-next-line max-len - await MedusaModule.bootstrap( - { - moduleKey: Modules.WORKFLOW_ENGINE, - defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE], - declaration: options as - | InternalModuleDeclaration - | ExternalModuleDeclaration, - injectedDependencies, - moduleExports: moduleDefinition, - } - ) + await MedusaModule.bootstrap({ + moduleKey: Modules.WORKFLOW_ENGINE, + defaultPath: MODULE_PACKAGE_NAMES[Modules.WORKFLOW_ENGINE], + declaration: options as + | InternalModuleDeclaration + | ExternalModuleDeclaration, + injectedDependencies, + moduleExports: moduleDefinition, + }) return loaded[Modules.WORKFLOW_ENGINE] } diff --git a/packages/workflow-engine-redis/src/services/workflows-module.ts b/packages/workflow-engine-redis/src/services/workflows-module.ts index 6667236626790..d6b055fe3422c 100644 --- a/packages/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/workflow-engine-redis/src/services/workflows-module.ts @@ -10,14 +10,16 @@ import { InjectManager, InjectSharedContext, MedusaContext, + MedusaError, } from "@medusajs/utils" import type { + IWorkflowEngineService, ReturnWorkflow, UnwrapWorkflowInputDataType, WorkflowOrchestratorTypes, } from "@medusajs/workflows-sdk" -import {WorkflowOrchestratorService} from "@services" -import {joinerConfig} from "../joiner-config" +import { WorkflowOrchestratorService } from "@services" +import { joinerConfig } from "../joiner-config" type InjectedDependencies = { baseRepository: DAL.RepositoryService @@ -25,9 +27,7 @@ type InjectedDependencies = { workflowOrchestratorService: WorkflowOrchestratorService } -export class WorkflowsModuleService - implements WorkflowOrchestratorTypes.IWorkflowsModuleService -{ +export class WorkflowsModuleService implements IWorkflowEngineService { protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.InternalModuleService protected workflowOrchestratorService_: WorkflowOrchestratorService @@ -49,6 +49,42 @@ export class WorkflowsModuleService return joinerConfig } + @InjectManager("baseRepository_") + async retrieveWorkflowExecution( + idOrObject: { + workflow_id: string + transaction_id: string + }, + config: FindConfig = {}, + @MedusaContext() sharedContext: Context = {} + ): Promise { + const wfExecution = await this.workflowExecutionService_.list( + { + workflow_id: idOrObject.workflow_id, + transaction_id: idOrObject.transaction_id, + }, + config, + sharedContext + ) + + if (wfExecution.length === 0) { + throw new MedusaError( + MedusaError.Types.NOT_FOUND, + `WorkflowExecution with workflow_id, transaction_id: ${Object.values( + idOrObject + ).join(", ")} was not found` + ) + } + + // eslint-disable-next-line max-len + return await this.baseRepository_.serialize( + wfExecution[0], + { + populate: true, + } + ) + } + @InjectManager("baseRepository_") async listWorkflowExecution( filters: WorkflowOrchestratorTypes.FilterableWorkflowExecutionProps = {}, diff --git a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 512c8e7cf6760..4b33f8e37e807 100644 --- a/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -4,11 +4,9 @@ import { TransactionCheckpoint, TransactionStep, } from "@medusajs/orchestration" -import { TransactionState } from "@medusajs/utils" import { ModulesSdkTypes } from "@medusajs/types" -import { - WorkflowOrchestratorService, -} from "@services" +import { TransactionState } from "@medusajs/utils" +import { WorkflowOrchestratorService } from "@services" import { Queue, Worker } from "bullmq" import Redis from "ioredis" @@ -21,7 +19,7 @@ enum JobType { // eslint-disable-next-line max-len export class RedisDistributedTransactionStorage extends DistributedTransactionStorage { private static TTL_AFTER_COMPLETED = 60 * 15 // 15 minutes - private workflowExecutionService_: ModulesSdkTypes.InternalModuleService + private workflowExecutionService_: ModulesSdkTypes.InternalModuleService private workflowOrchestratorService_: WorkflowOrchestratorService private redisClient: Redis @@ -34,7 +32,7 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt redisWorkerConnection, redisQueueName, }: { - workflowExecutionService: ModulesSdkTypes.InternalModuleService, + workflowExecutionService: ModulesSdkTypes.InternalModuleService redisConnection: Redis redisWorkerConnection: Redis redisQueueName: string @@ -161,33 +159,28 @@ export class RedisDistributedTransactionStorage extends DistributedTransactionSt }) } + const stringifiedData = JSON.stringify(data, this.stringifyWithSymbol) + const parsedData = JSON.parse(stringifiedData) + if (!hasFinished) { if (ttl) { - await this.redisClient.set( - key, - JSON.stringify(data, this.stringifyWithSymbol), - "EX", - ttl - ) + await this.redisClient.set(key, stringifiedData, "EX", ttl) } else { - await this.redisClient.set( - key, - JSON.stringify(data, this.stringifyWithSymbol) - ) + await this.redisClient.set(key, stringifiedData) } } if (hasFinished && !retentionTime) { - await this.deleteFromDb(data) + await this.deleteFromDb(parsedData) } else { - await this.saveToDb(data) + await this.saveToDb(parsedData) } if (hasFinished) { // await this.redisClient.del(key) await this.redisClient.set( key, - JSON.stringify(data, this.stringifyWithSymbol), + stringifiedData, "EX", RedisDistributedTransactionStorage.TTL_AFTER_COMPLETED ) diff --git a/packages/workflows-sdk/src/index.ts b/packages/workflows-sdk/src/index.ts index cec02f1e03298..f9fd0e20c7d9a 100644 --- a/packages/workflows-sdk/src/index.ts +++ b/packages/workflows-sdk/src/index.ts @@ -1,5 +1,6 @@ export * from "./helper" export * from "./medusa-workflow" export * as WorkflowOrchestratorTypes from "./types" +export { IWorkflowEngineService } from "./types/service" export * from "./utils/composer" export * as Composer from "./utils/composer" diff --git a/packages/workflows-sdk/src/types/service.ts b/packages/workflows-sdk/src/types/service.ts index ed055e39e672f..4619aa89720d0 100644 --- a/packages/workflows-sdk/src/types/service.ts +++ b/packages/workflows-sdk/src/types/service.ts @@ -31,7 +31,16 @@ export type IdempotencyKeyParts = { action: "invoke" | "compensate" } -export interface IWorkflowsModuleService extends IModuleService { +export interface IWorkflowEngineService extends IModuleService { + retrieveWorkflowExecution( + idOrObject: { + workflow_id: string + transaction_id: string + }, + config?: FindConfig, + sharedContext?: Context + ) + listWorkflowExecution( filters?: FilterableWorkflowExecutionProps, config?: FindConfig, @@ -88,7 +97,7 @@ export interface IWorkflowsModuleService extends IModuleService { stepResponse, options, }: { - idempotencyKey: string | object + idempotencyKey: string | IdempotencyKeyParts stepResponse: unknown options?: Record }, diff --git a/yarn.lock b/yarn.lock index 73d5c6e5d5f2d..dde746ecd35b5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8874,7 +8874,7 @@ __metadata: languageName: unknown linkType: soft -"@medusajs/workflow-engine-inmemory@workspace:packages/workflow-engine-inmemory": +"@medusajs/workflow-engine-inmemory@workspace:*, @medusajs/workflow-engine-inmemory@workspace:packages/workflow-engine-inmemory": version: 0.0.0-use.local resolution: "@medusajs/workflow-engine-inmemory@workspace:packages/workflow-engine-inmemory" dependencies: @@ -31537,6 +31537,7 @@ __metadata: "@medusajs/cache-inmemory": "workspace:*" "@medusajs/event-bus-local": "workspace:*" "@medusajs/medusa": "workspace:*" + "@medusajs/workflow-engine-inmemory": "workspace:*" babel-preset-medusa-package: "*" faker: ^5.5.3 jest: ^26.6.3 @@ -31566,6 +31567,7 @@ __metadata: "@medusajs/promotion": "workspace:^" "@medusajs/types": "workspace:^" "@medusajs/utils": "workspace:^" + "@medusajs/workflow-engine-inmemory": "workspace:*" babel-preset-medusa-package: "*" faker: ^5.5.3 jest: ^26.6.3 From 4f7303dda778ff28a940f746e06bdab52b1de97e Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Wed, 7 Feb 2024 11:31:10 -0300 Subject: [PATCH 2/6] api v1 --- .../environment-helpers/use-db.js | 12 +- .../plugins/__tests__/workflow-engine/api.ts | 8 +- .../__tests__/workflow-engine/api_v2.ts | 260 ++++++++++++++++++ packages/medusa/src/api/index.js | 7 +- packages/medusa/src/api/routes/admin/index.js | 2 + .../routes/admin/workflows/get-execution.ts | 26 ++ .../src/api/routes/admin/workflows/index.ts | 74 +++++ .../routes/admin/workflows/list-execution.ts | 43 +++ .../routes/admin/workflows/query-config.ts | 23 ++ .../routes/admin/workflows/run-workflow.ts | 35 +++ .../admin/workflows/set-step-failure.ts | 39 +++ .../admin/workflows/set-step-success.ts | 39 +++ .../api/routes/admin/workflows/subscribe.ts | 62 +++++ .../api/routes/admin/workflows/validators.ts | 48 ++++ 14 files changed, 671 insertions(+), 7 deletions(-) create mode 100644 integration-tests/plugins/__tests__/workflow-engine/api_v2.ts create mode 100644 packages/medusa/src/api/routes/admin/workflows/get-execution.ts create mode 100644 packages/medusa/src/api/routes/admin/workflows/index.ts create mode 100644 packages/medusa/src/api/routes/admin/workflows/list-execution.ts create mode 100644 packages/medusa/src/api/routes/admin/workflows/query-config.ts create mode 100644 packages/medusa/src/api/routes/admin/workflows/run-workflow.ts create mode 100644 packages/medusa/src/api/routes/admin/workflows/set-step-failure.ts create mode 100644 packages/medusa/src/api/routes/admin/workflows/set-step-success.ts create mode 100644 packages/medusa/src/api/routes/admin/workflows/subscribe.ts create mode 100644 packages/medusa/src/api/routes/admin/workflows/validators.ts diff --git a/integration-tests/environment-helpers/use-db.js b/integration-tests/environment-helpers/use-db.js index 261ccfac95521..d7faecd639dc5 100644 --- a/integration-tests/environment-helpers/use-db.js +++ b/integration-tests/environment-helpers/use-db.js @@ -87,7 +87,12 @@ const DbTestUtil = { const instance = DbTestUtil module.exports = { - initDb: async function ({ cwd, database_extra, env }) { + initDb: async function ({ + cwd, + database_extra, + env, + force_modules_migration, + }) { if (isObject(env)) { Object.entries(env).forEach(([k, v]) => (process.env[k] = v)) } @@ -148,7 +153,10 @@ module.exports = { instance.setDb(dbDataSource) - if (featureFlagRouter.isFeatureEnabled(MedusaV2Flag.key)) { + if ( + force_modules_migration || + featureFlagRouter.isFeatureEnabled(MedusaV2Flag.key) + ) { const pgConnectionLoader = require("@medusajs/medusa/dist/loaders/pg-connection").default diff --git a/integration-tests/plugins/__tests__/workflow-engine/api.ts b/integration-tests/plugins/__tests__/workflow-engine/api.ts index eafd68bfa4864..7da3ee6b4b882 100644 --- a/integration-tests/plugins/__tests__/workflow-engine/api.ts +++ b/integration-tests/plugins/__tests__/workflow-engine/api.ts @@ -22,7 +22,7 @@ const adminHeaders = { } const env = { - MEDUSA_FF_MEDUSA_V2: true, + MEDUSA_FF_MEDUSA_V2: false, } describe("Workflow Engine API", () => { @@ -32,7 +32,11 @@ describe("Workflow Engine API", () => { beforeAll(async () => { const cwd = path.resolve(path.join(__dirname, "..", "..")) - dbConnection = await initDb({ cwd, env } as any) + dbConnection = await initDb({ + cwd, + env, + force_modules_migration: true, + } as any) shutdownServer = await startBootstrapApp({ cwd, env }) medusaContainer = getContainer() diff --git a/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts b/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts new file mode 100644 index 0000000000000..eafd68bfa4864 --- /dev/null +++ b/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts @@ -0,0 +1,260 @@ +import { useApi } from "../../../environment-helpers/use-api" +import { initDb, useDb } from "../../../environment-helpers/use-db" + +import { + StepResponse, + WorkflowData, + createStep, + createWorkflow, +} from "@medusajs/workflows-sdk" +import { AxiosInstance } from "axios" +import path from "path" +import { startBootstrapApp } from "../../../environment-helpers/bootstrap-app" +import { getContainer } from "../../../environment-helpers/use-container" +import adminSeeder from "../../../helpers/admin-seeder" + +jest.setTimeout(5000000) + +const adminHeaders = { + headers: { + "x-medusa-access-token": "test_token", + }, +} + +const env = { + MEDUSA_FF_MEDUSA_V2: true, +} + +describe("Workflow Engine API", () => { + let medusaContainer + let dbConnection + let shutdownServer + + beforeAll(async () => { + const cwd = path.resolve(path.join(__dirname, "..", "..")) + dbConnection = await initDb({ cwd, env } as any) + shutdownServer = await startBootstrapApp({ cwd, env }) + medusaContainer = getContainer() + + await adminSeeder(dbConnection) + }) + + afterAll(async () => { + const db = useDb() + await db.shutdown() + await shutdownServer() + }) + + describe("running workflows", () => { + beforeAll(async () => { + const step1 = createStep( + { + name: "my-step", + }, + async (input: { initial: string }) => { + return new StepResponse({ + result: input.initial, + }) + } + ) + const step2 = createStep( + { + name: "my-step-async", + async: true, + }, + async () => {} + ) + + createWorkflow( + { + name: "my-workflow-name", + retentionTime: 1000, + }, + function (input: WorkflowData<{ initial: string }>) { + step1(input) + return step2() + } + ) + }) + + it("Should list all workflows in execution or completed", async () => { + const api = useApi()! as AxiosInstance + + for (let i = 3; i--; ) { + await api.post( + `/admin/workflows/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + }, + adminHeaders + ) + } + + const executions = await api.get( + `/admin/workflows/execution`, + adminHeaders + ) + + expect(executions.data.count).toEqual(3) + expect(executions.data.workflow_executions.length).toEqual(3) + expect(executions.data.workflow_executions[0]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.any(String), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + }) + + it("Should list all workflows matching the filters", async () => { + const api = useApi()! as AxiosInstance + + for (let i = 3; i--; ) { + await api.post( + `/admin/workflows/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + transaction_id: "transaction_" + (i + 1), + }, + adminHeaders + ) + } + + const executions = await api.get( + `/admin/workflows/execution?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, + adminHeaders + ) + + expect(executions.data.count).toEqual(2) + expect(executions.data.workflow_executions.length).toEqual(2) + expect(executions.data.workflow_executions[0]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.stringMatching(/transaction_1|transaction_2/), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + expect(executions.data.workflow_executions[1]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.stringMatching(/transaction_1|transaction_2/), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + }) + + it("Should execute a workflow and retrieve its execution while running and when it is completed", async () => { + const api = useApi()! as AxiosInstance + + const wf = await api.post( + `/admin/workflows/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + transaction_id: "trx_123", + }, + adminHeaders + ) + + expect(wf.data).toEqual({ + acknowledgement: { + transactionId: "trx_123", + workflowId: "my-workflow-name", + }, + }) + + const execution = await api.get( + `/admin/workflows/my-workflow-name/trx_123`, + adminHeaders + ) + + expect(execution.data).toEqual({ + workflow_execution: expect.objectContaining({ + workflow_id: "my-workflow-name", + transaction_id: "trx_123", + id: expect.any(String), + state: "invoking", + execution: expect.objectContaining({ + hasAsyncSteps: true, + hasFailedSteps: false, + hasSkippedSteps: false, + hasWaitingSteps: false, + hasRevertedSteps: false, + }), + context: expect.objectContaining({ + data: expect.objectContaining({ + invoke: { + "my-step": { + __type: "WorkflowWorkflowData", + output: { + __type: "WorkflowStepResponse", + output: { + result: "abc", + }, + compensateInput: { + result: "abc", + }, + }, + }, + }, + payload: { + initial: "abc", + }, + }), + }), + }), + }) + + const respondAsync = await api.post( + `/admin/workflows/my-workflow-name/trx_123/my-step-async/success`, + { + response: { + all: "good", + }, + }, + adminHeaders + ) + + expect(respondAsync.data.success).toEqual(true) + + const completed = await api.get( + `/admin/workflows/my-workflow-name/trx_123`, + adminHeaders + ) + + expect(completed.data).toEqual({ + workflow_execution: expect.objectContaining({ + workflow_id: "my-workflow-name", + transaction_id: "trx_123", + state: "done", + context: expect.objectContaining({ + data: expect.objectContaining({ + invoke: expect.objectContaining({ + "my-step-async": { + __type: "WorkflowStepResponse", + output: { + all: "good", + }, + compensateInput: { + all: "good", + }, + }, + }), + }), + }), + }), + }) + }) + }) +}) diff --git a/packages/medusa/src/api/index.js b/packages/medusa/src/api/index.js index d9b60fbb71ce7..dcd5b29aed7ae 100644 --- a/packages/medusa/src/api/index.js +++ b/packages/medusa/src/api/index.js @@ -1,9 +1,9 @@ +import compression from "compression" import { Router } from "express" +import { compressionOptions, shouldCompressResponse } from "../utils/api" import errorHandler from "./middlewares/error-handler" -import compression from "compression" import admin from "./routes/admin" import store from "./routes/store" -import { shouldCompressResponse, compressionOptions } from "../utils/api" // guaranteed to get dependencies export default (container, config) => { @@ -53,9 +53,9 @@ export * from "./routes/admin/product-types" export * from "./routes/admin/products" export * from "./routes/admin/publishable-api-keys" export * from "./routes/admin/regions" +export * from "./routes/admin/reservations" export * from "./routes/admin/return-reasons" export * from "./routes/admin/returns" -export * from "./routes/admin/reservations" export * from "./routes/admin/sales-channels" export * from "./routes/admin/shipping-options" export * from "./routes/admin/shipping-profiles" @@ -66,6 +66,7 @@ export * from "./routes/admin/tax-rates" export * from "./routes/admin/uploads" export * from "./routes/admin/users" export * from "./routes/admin/variants" +export * from "./routes/admin/workflows" // Store export * from "./routes/store/auth" export * from "./routes/store/carts" diff --git a/packages/medusa/src/api/routes/admin/index.js b/packages/medusa/src/api/routes/admin/index.js index da882ef408d05..7fb970ffd2e08 100644 --- a/packages/medusa/src/api/routes/admin/index.js +++ b/packages/medusa/src/api/routes/admin/index.js @@ -41,6 +41,7 @@ import taxRateRoutes from "./tax-rates" import uploadRoutes from "./uploads" import userRoutes, { unauthenticatedUserRoutes } from "./users" import variantRoutes from "./variants" +import workflowRoutes from "./workflows" const route = Router() @@ -115,6 +116,7 @@ export default (app, container, config) => { paymentCollectionRoutes(route) paymentRoutes(route) productCategoryRoutes(route) + workflowRoutes(route) return app } diff --git a/packages/medusa/src/api/routes/admin/workflows/get-execution.ts b/packages/medusa/src/api/routes/admin/workflows/get-execution.ts new file mode 100644 index 0000000000000..3c1330c1f6892 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows/get-execution.ts @@ -0,0 +1,26 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id, transaction_id } = req.params + + const execution = await workflowEngineService.retrieveWorkflowExecution( + { + workflow_id, + transaction_id, + }, + { + select: req.retrieveConfig.select, + relations: req.retrieveConfig.relations, + } + ) + + res.status(200).json({ + workflow_execution: execution, + }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows/index.ts b/packages/medusa/src/api/routes/admin/workflows/index.ts new file mode 100644 index 0000000000000..9abe54560f4bd --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows/index.ts @@ -0,0 +1,74 @@ +import middlewares, { + transformBody, + transformQuery, +} from "../../../middlewares" + +import { Router } from "express" +import { + allowedAdminWorkflowExecutionsRelations, + defaultAdminWorkflowExecutionDetailFields, + defaultAdminWorkflowExecutionsFields, + defaultAdminWorkflowExecutionsRelations, +} from "./query-config" +import { + AdminGetWorkflowExecutionDetailsParams, + AdminGetWorkflowExecutionsParams, + AdminPostWorkflowsAsyncResponseReq, + AdminPostWorkflowsRunReq, +} from "./validators" + +const route = Router() + +const retrieveTransformQueryConfig = { + defaultFields: defaultAdminWorkflowExecutionDetailFields, + defaultRelations: defaultAdminWorkflowExecutionsRelations, + allowedRelations: allowedAdminWorkflowExecutionsRelations, + isList: false, +} + +const listTransformQueryConfig = { + ...retrieveTransformQueryConfig, + defaultFields: defaultAdminWorkflowExecutionsFields, + isList: true, +} + +export default (app) => { + app.use("/workflows", route) + + route.get( + "/execution", + transformQuery(AdminGetWorkflowExecutionsParams, listTransformQueryConfig), + middlewares.wrap(require("./list-execution").default) + ) + + route.get( + "/:id/:transaction_id", + transformQuery( + AdminGetWorkflowExecutionDetailsParams, + retrieveTransformQueryConfig + ), + middlewares.wrap(require("./get-execution").default) + ) + + route.post( + "/:id/run", + transformBody(AdminPostWorkflowsRunReq), + middlewares.wrap(require("./run-workflow").default) + ) + + route.post( + "/:id/:transaction_id/:step_id/success", + transformBody(AdminPostWorkflowsAsyncResponseReq), + middlewares.wrap(require("./set-step-success").default) + ) + + route.post( + "/:id/:transaction_id/:step_id/failure", + transformBody(AdminPostWorkflowsAsyncResponseReq), + middlewares.wrap(require("./set-step-failure").default) + ) + + return app +} + +export * from "./query-config" diff --git a/packages/medusa/src/api/routes/admin/workflows/list-execution.ts b/packages/medusa/src/api/routes/admin/workflows/list-execution.ts new file mode 100644 index 0000000000000..fc6b6cccc2153 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows/list-execution.ts @@ -0,0 +1,43 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const listConfig = req.listConfig + + const { transaction_id, workflow_id } = req.filterableFields + + const transactionIds = Array.isArray(transaction_id) + ? transaction_id + : [transaction_id] + const workflowIds = Array.isArray(workflow_id) ? workflow_id : [workflow_id] + + const filters = {} + + if (transaction_id) { + filters["transaction_id"] = transactionIds + } + + if (workflow_id) { + filters["workflow_id"] = workflowIds + } + + const [workflow_executions, count] = + await workflowEngineService.listAndCountWorkflowExecution(filters, { + select: req.listConfig.select, + relations: req.listConfig.relations, + skip: listConfig.skip, + take: listConfig.take, + }) + + res.json({ + workflow_executions, + count, + offset: listConfig.skip, + limit: listConfig.take, + }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows/query-config.ts b/packages/medusa/src/api/routes/admin/workflows/query-config.ts new file mode 100644 index 0000000000000..8fe26626be099 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows/query-config.ts @@ -0,0 +1,23 @@ +export const defaultAdminWorkflowExecutionsRelations = [] +export const allowedAdminWorkflowExecutionsRelations = [] +export const defaultAdminWorkflowExecutionsFields = [ + "id", + "workflow_id", + "transaction_id", + "state", + "created_at", + "updated_at", + "deleted_at", +] + +export const defaultAdminWorkflowExecutionDetailFields = [ + "id", + "workflow_id", + "transaction_id", + "context", + "execution", + "state", + "created_at", + "updated_at", + "deleted_at", +] diff --git a/packages/medusa/src/api/routes/admin/workflows/run-workflow.ts b/packages/medusa/src/api/routes/admin/workflows/run-workflow.ts new file mode 100644 index 0000000000000..95768397b0a8e --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows/run-workflow.ts @@ -0,0 +1,35 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { + IWorkflowEngineService, + WorkflowOrchestratorTypes, +} from "@medusajs/workflows-sdk" + +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" +import { AdminPostWorkflowsRunReq } from "./validators" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id } = req.params + + const { transaction_id, input } = + req.validatedBody as AdminPostWorkflowsRunReq + + const options = { + transactionId: transaction_id, + input, + context: { + requestId: req.requestId, + }, + throwOnError: false, + } as WorkflowOrchestratorTypes.WorkflowOrchestratorRunDTO + + const { acknowledgement } = await workflowEngineService.run( + workflow_id, + options + ) + + return res.status(200).json({ acknowledgement }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows/set-step-failure.ts b/packages/medusa/src/api/routes/admin/workflows/set-step-failure.ts new file mode 100644 index 0000000000000..9c3c5c574b842 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows/set-step-failure.ts @@ -0,0 +1,39 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { TransactionHandlerType, isDefined } from "@medusajs/utils" +import { IWorkflowEngineService, StepResponse } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" +import { AdminPostWorkflowsAsyncResponseReq } from "./validators" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id, transaction_id, step_id } = req.params + + const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + + const compensateInput = body.compensate_input + const stepResponse = isDefined(body.response) + ? new StepResponse(body.response, compensateInput) + : undefined + const stepAction = body.action || TransactionHandlerType.INVOKE + + await workflowEngineService.setStepFailure({ + idempotencyKey: { + action: stepAction, + transactionId: transaction_id, + stepId: step_id, + workflowId: workflow_id, + }, + stepResponse, + options: { + container: req.scope, + context: { + requestId: req.requestId, + }, + }, + }) + + return res.status(200).json({ success: true }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows/set-step-success.ts b/packages/medusa/src/api/routes/admin/workflows/set-step-success.ts new file mode 100644 index 0000000000000..a3cc17f918670 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows/set-step-success.ts @@ -0,0 +1,39 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { TransactionHandlerType, isDefined } from "@medusajs/utils" +import { IWorkflowEngineService, StepResponse } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" +import { AdminPostWorkflowsAsyncResponseReq } from "./validators" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id, transaction_id, step_id } = req.params + + const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + + const compensateInput = body.compensate_input + const stepResponse = isDefined(body.response) + ? new StepResponse(body.response, compensateInput) + : undefined + const stepAction = body.action || TransactionHandlerType.INVOKE + + await workflowEngineService.setStepSuccess({ + idempotencyKey: { + action: stepAction, + transactionId: transaction_id, + stepId: step_id, + workflowId: workflow_id, + }, + stepResponse, + options: { + container: req.scope, + context: { + requestId: req.requestId, + }, + }, + }) + + return res.status(200).json({ success: true }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows/subscribe.ts b/packages/medusa/src/api/routes/admin/workflows/subscribe.ts new file mode 100644 index 0000000000000..8c9131af6ce61 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows/subscribe.ts @@ -0,0 +1,62 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" + +export default async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id: workflow_id, transaction_id } = req.query as any + + const subscriberId = "__sub__" + Math.random().toString(36).substring(2, 9) + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }) + + req.on("close", () => { + res.end() + + void workflowEngineService.unsubscribe({ + workflowId: workflow_id, + transactionId: transaction_id, + subscriberOrId: subscriberId, + }) + }) + + req.on("error", (err: any) => { + if (err.code === "ECONNRESET") { + res.end() + } + }) + + void workflowEngineService.subscribe({ + workflowId: workflow_id, + transactionId: transaction_id, + subscriber: async (args) => { + const { + eventType, + workflowId, + transactionId, + step, + response, + result, + errors, + } = args + + const data = { + event_type: eventType, + workflow_id: workflowId, + transaction_id: transactionId, + step, + response, + result, + errors, + } + res.write(`event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`) + }, + subscriberId, + }) +} diff --git a/packages/medusa/src/api/routes/admin/workflows/validators.ts b/packages/medusa/src/api/routes/admin/workflows/validators.ts new file mode 100644 index 0000000000000..1c61bc72bac64 --- /dev/null +++ b/packages/medusa/src/api/routes/admin/workflows/validators.ts @@ -0,0 +1,48 @@ +import { TransactionHandlerType } from "@medusajs/utils" +import { Transform } from "class-transformer" +import { IsEnum, IsOptional, IsString } from "class-validator" +import { FindParams, extendedFindParamsMixin } from "../../../../types/common" +import { IsType } from "../../../../utils" + +export class AdminGetWorkflowExecutionDetailsParams extends FindParams {} + +export class AdminGetWorkflowExecutionsParams extends extendedFindParamsMixin({ + limit: 100, + offset: 0, +}) { + /** + * transaction id(s) to filter workflow executions by transaction_id. + */ + @IsOptional() + @IsType([String, [String]]) + transaction_id?: string | string[] + + /** + * workflow id(s) to filter workflow executions by workflow_id + */ + @IsOptional() + @IsType([String, [String]]) + workflow_id?: string | string[] +} + +export class AdminPostWorkflowsRunReq { + @IsOptional() + input?: unknown + + @IsOptional() + @IsString() + transaction_id?: string +} + +export class AdminPostWorkflowsAsyncResponseReq { + @IsOptional() + response?: unknown + + @IsOptional() + compensate_input?: unknown + + @IsOptional() + @Transform(({ value }) => (value + "").toLowerCase()) + @IsEnum(TransactionHandlerType) + action?: TransactionHandlerType +} From c0c32e55ea90df485f1616e53213def6f48b9654 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Wed, 7 Feb 2024 11:58:44 -0300 Subject: [PATCH 3/6] changeset --- .changeset/fresh-olives-exercise.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/fresh-olives-exercise.md diff --git a/.changeset/fresh-olives-exercise.md b/.changeset/fresh-olives-exercise.md new file mode 100644 index 0000000000000..e4e1b71861b43 --- /dev/null +++ b/.changeset/fresh-olives-exercise.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/workflows-sdk": patch +"@medusajs/medusa": patch +--- + +Workflow engine API From 84dfab0bd4e87fb9edda6b3c2f2b2932ffa9c80c Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 12 Feb 2024 16:44:45 -0300 Subject: [PATCH 4/6] rename workflows execution and retrieve by execution id --- .../plugins/__tests__/workflow-engine/api.ts | 27 +++++++++++------ .../__tests__/workflow-engine/api_v2.ts | 27 +++++++++++------ .../admin/workflows-executions/[id]/route.ts | 20 +++++++++++++ .../[step_id]/failure/route.ts | 2 +- .../[step_id]/subscribe/route.ts | 2 +- .../[step_id]/success/route.ts | 2 +- .../[workflow_id]}/[transaction_id]/route.ts | 2 +- .../[workflow_id]}/run/route.ts | 2 +- .../[workflow_id]}/subscribe/route.ts | 2 +- .../middlewares.ts | 22 ++++++++++---- .../query-config.ts | 0 .../route.ts | 2 +- .../validators.ts | 0 packages/medusa/src/api-v2/middlewares.ts | 4 +-- packages/medusa/src/api/index.js | 2 +- packages/medusa/src/api/routes/admin/index.js | 2 +- .../get-execution.ts | 4 +-- .../index.ts | 23 ++++++++++----- .../list-execution.ts | 0 .../query-config.ts | 0 .../run-workflow.ts | 0 .../set-step-failure.ts | 0 .../set-step-success.ts | 0 .../subscribe.ts | 0 .../validators.ts | 0 .../src/services/workflows-module.ts | 29 ++++++++++++------- .../src/services/workflows-module.ts | 29 ++++++++++++------- packages/workflows-sdk/src/types/service.ts | 10 ++++--- 28 files changed, 143 insertions(+), 70 deletions(-) create mode 100644 packages/medusa/src/api-v2/admin/workflows-executions/[id]/route.ts rename packages/medusa/src/api-v2/admin/{workflows/[id] => workflows-executions/[workflow_id]}/[transaction_id]/[step_id]/failure/route.ts (95%) rename packages/medusa/src/api-v2/admin/{workflows/[id] => workflows-executions/[workflow_id]}/[transaction_id]/[step_id]/subscribe/route.ts (96%) rename packages/medusa/src/api-v2/admin/{workflows/[id] => workflows-executions/[workflow_id]}/[transaction_id]/[step_id]/success/route.ts (95%) rename packages/medusa/src/api-v2/admin/{workflows/[id] => workflows-executions/[workflow_id]}/[transaction_id]/route.ts (92%) rename packages/medusa/src/api-v2/admin/{workflows/[id] => workflows-executions/[workflow_id]}/run/route.ts (95%) rename packages/medusa/src/api-v2/admin/{workflows/[id] => workflows-executions/[workflow_id]}/subscribe/route.ts (96%) rename packages/medusa/src/api-v2/admin/{workflows => workflows-executions}/middlewares.ts (61%) rename packages/medusa/src/api-v2/admin/{workflows => workflows-executions}/query-config.ts (100%) rename packages/medusa/src/api-v2/admin/{workflows/execution => workflows-executions}/route.ts (93%) rename packages/medusa/src/api-v2/admin/{workflows => workflows-executions}/validators.ts (100%) rename packages/medusa/src/api/routes/admin/{workflows => workflows-executions}/get-execution.ts (90%) rename packages/medusa/src/api/routes/admin/{workflows => workflows-executions}/index.ts (86%) rename packages/medusa/src/api/routes/admin/{workflows => workflows-executions}/list-execution.ts (100%) rename packages/medusa/src/api/routes/admin/{workflows => workflows-executions}/query-config.ts (100%) rename packages/medusa/src/api/routes/admin/{workflows => workflows-executions}/run-workflow.ts (100%) rename packages/medusa/src/api/routes/admin/{workflows => workflows-executions}/set-step-failure.ts (100%) rename packages/medusa/src/api/routes/admin/{workflows => workflows-executions}/set-step-success.ts (100%) rename packages/medusa/src/api/routes/admin/{workflows => workflows-executions}/subscribe.ts (100%) rename packages/medusa/src/api/routes/admin/{workflows => workflows-executions}/validators.ts (100%) diff --git a/integration-tests/plugins/__tests__/workflow-engine/api.ts b/integration-tests/plugins/__tests__/workflow-engine/api.ts index 7da3ee6b4b882..38654f106fdee 100644 --- a/integration-tests/plugins/__tests__/workflow-engine/api.ts +++ b/integration-tests/plugins/__tests__/workflow-engine/api.ts @@ -81,12 +81,12 @@ describe("Workflow Engine API", () => { ) }) - it("Should list all workflows in execution or completed", async () => { + it("Should list all workflows in execution or completed and retrieve them by id", async () => { const api = useApi()! as AxiosInstance for (let i = 3; i--; ) { await api.post( - `/admin/workflows/my-workflow-name/run`, + `/admin/workflows-executions/my-workflow-name/run`, { input: { initial: "abc", @@ -97,7 +97,7 @@ describe("Workflow Engine API", () => { } const executions = await api.get( - `/admin/workflows/execution`, + `/admin/workflows-executions`, adminHeaders ) @@ -112,6 +112,15 @@ describe("Workflow Engine API", () => { updated_at: expect.any(String), deleted_at: null, }) + + const retrivedById = await api.get( + `/admin/workflows-executions/` + + executions.data.workflow_executions[0].id, + adminHeaders + ) + expect(retrivedById.data.workflow_execution).toEqual( + expect.objectContaining(executions.data.workflow_executions[0]) + ) }) it("Should list all workflows matching the filters", async () => { @@ -119,7 +128,7 @@ describe("Workflow Engine API", () => { for (let i = 3; i--; ) { await api.post( - `/admin/workflows/my-workflow-name/run`, + `/admin/workflows-executions/my-workflow-name/run`, { input: { initial: "abc", @@ -131,7 +140,7 @@ describe("Workflow Engine API", () => { } const executions = await api.get( - `/admin/workflows/execution?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, + `/admin/workflows-executions?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, adminHeaders ) @@ -161,7 +170,7 @@ describe("Workflow Engine API", () => { const api = useApi()! as AxiosInstance const wf = await api.post( - `/admin/workflows/my-workflow-name/run`, + `/admin/workflows-executions/my-workflow-name/run`, { input: { initial: "abc", @@ -179,7 +188,7 @@ describe("Workflow Engine API", () => { }) const execution = await api.get( - `/admin/workflows/my-workflow-name/trx_123`, + `/admin/workflows-executions/my-workflow-name/trx_123`, adminHeaders ) @@ -221,7 +230,7 @@ describe("Workflow Engine API", () => { }) const respondAsync = await api.post( - `/admin/workflows/my-workflow-name/trx_123/my-step-async/success`, + `/admin/workflows-executions/my-workflow-name/trx_123/my-step-async/success`, { response: { all: "good", @@ -233,7 +242,7 @@ describe("Workflow Engine API", () => { expect(respondAsync.data.success).toEqual(true) const completed = await api.get( - `/admin/workflows/my-workflow-name/trx_123`, + `/admin/workflows-executions/my-workflow-name/trx_123`, adminHeaders ) diff --git a/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts b/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts index eafd68bfa4864..282a262305c2d 100644 --- a/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts +++ b/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts @@ -77,12 +77,12 @@ describe("Workflow Engine API", () => { ) }) - it("Should list all workflows in execution or completed", async () => { + it("Should list all workflows in execution or completed and retrieve them by id", async () => { const api = useApi()! as AxiosInstance for (let i = 3; i--; ) { await api.post( - `/admin/workflows/my-workflow-name/run`, + `/admin/workflows-executions/my-workflow-name/run`, { input: { initial: "abc", @@ -93,7 +93,7 @@ describe("Workflow Engine API", () => { } const executions = await api.get( - `/admin/workflows/execution`, + `/admin/workflows-executions`, adminHeaders ) @@ -108,6 +108,15 @@ describe("Workflow Engine API", () => { updated_at: expect.any(String), deleted_at: null, }) + + const retrivedById = await api.get( + `/admin/workflows-executions/` + + executions.data.workflow_executions[0].id, + adminHeaders + ) + expect(retrivedById.data.workflow_execution).toEqual( + expect.objectContaining(executions.data.workflow_executions[0]) + ) }) it("Should list all workflows matching the filters", async () => { @@ -115,7 +124,7 @@ describe("Workflow Engine API", () => { for (let i = 3; i--; ) { await api.post( - `/admin/workflows/my-workflow-name/run`, + `/admin/workflows-executions/my-workflow-name/run`, { input: { initial: "abc", @@ -127,7 +136,7 @@ describe("Workflow Engine API", () => { } const executions = await api.get( - `/admin/workflows/execution?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, + `/admin/workflows-executions?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, adminHeaders ) @@ -157,7 +166,7 @@ describe("Workflow Engine API", () => { const api = useApi()! as AxiosInstance const wf = await api.post( - `/admin/workflows/my-workflow-name/run`, + `/admin/workflows-executions/my-workflow-name/run`, { input: { initial: "abc", @@ -175,7 +184,7 @@ describe("Workflow Engine API", () => { }) const execution = await api.get( - `/admin/workflows/my-workflow-name/trx_123`, + `/admin/workflows-executions/my-workflow-name/trx_123`, adminHeaders ) @@ -217,7 +226,7 @@ describe("Workflow Engine API", () => { }) const respondAsync = await api.post( - `/admin/workflows/my-workflow-name/trx_123/my-step-async/success`, + `/admin/workflows-executions/my-workflow-name/trx_123/my-step-async/success`, { response: { all: "good", @@ -229,7 +238,7 @@ describe("Workflow Engine API", () => { expect(respondAsync.data.success).toEqual(true) const completed = await api.get( - `/admin/workflows/my-workflow-name/trx_123`, + `/admin/workflows-executions/my-workflow-name/trx_123`, adminHeaders ) diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/[id]/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[id]/route.ts new file mode 100644 index 0000000000000..f411fe397c8cb --- /dev/null +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[id]/route.ts @@ -0,0 +1,20 @@ +import { ModuleRegistrationName } from "@medusajs/modules-sdk" +import { IWorkflowEngineService } from "@medusajs/workflows-sdk" +import { MedusaRequest, MedusaResponse } from "../../../../types/routing" + +export const GET = async (req: MedusaRequest, res: MedusaResponse) => { + const workflowEngineService: IWorkflowEngineService = req.scope.resolve( + ModuleRegistrationName.WORKFLOW_ENGINE + ) + + const { id } = req.params + + const execution = await workflowEngineService.retrieveWorkflowExecution(id, { + select: req.retrieveConfig.select, + relations: req.retrieveConfig.relations, + }) + + res.status(200).json({ + workflow_execution: execution, + }) +} diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/failure/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/failure/route.ts similarity index 95% rename from packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/failure/route.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/failure/route.ts index 917fc490304f2..f7fd222388c42 100644 --- a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/failure/route.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/failure/route.ts @@ -12,7 +12,7 @@ export const POST = async (req: MedusaRequest, res: MedusaResponse) => { ModuleRegistrationName.WORKFLOW_ENGINE ) - const { id: workflow_id, transaction_id, step_id } = req.params + const { workflow_id, transaction_id, step_id } = req.params const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/subscribe/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/subscribe/route.ts similarity index 96% rename from packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/subscribe/route.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/subscribe/route.ts index 653a29a84fa9c..0e75c536e07f0 100644 --- a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/subscribe/route.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/subscribe/route.ts @@ -10,7 +10,7 @@ export const GET = async (req: MedusaRequest, res: MedusaResponse) => { ModuleRegistrationName.WORKFLOW_ENGINE ) - const { id: workflow_id, transaction_id } = req.query as any + const { workflow_id, transaction_id } = req.query as any const subscriberId = "__sub__" + Math.random().toString(36).substring(2, 9) res.writeHead(200, { diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/success/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/success/route.ts similarity index 95% rename from packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/success/route.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/success/route.ts index fc2201b83f13d..ad158d9654011 100644 --- a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/[step_id]/success/route.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/success/route.ts @@ -12,7 +12,7 @@ export const POST = async (req: MedusaRequest, res: MedusaResponse) => { ModuleRegistrationName.WORKFLOW_ENGINE ) - const { id: workflow_id, transaction_id, step_id } = req.params + const { workflow_id, transaction_id, step_id } = req.params const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/route.ts similarity index 92% rename from packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/route.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/route.ts index 516035a597967..177ee8a93456b 100644 --- a/packages/medusa/src/api-v2/admin/workflows/[id]/[transaction_id]/route.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/route.ts @@ -7,7 +7,7 @@ export const GET = async (req: MedusaRequest, res: MedusaResponse) => { ModuleRegistrationName.WORKFLOW_ENGINE ) - const { id: workflow_id, transaction_id } = req.params + const { workflow_id, transaction_id } = req.params const execution = await workflowEngineService.retrieveWorkflowExecution( { diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/run/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/run/route.ts similarity index 95% rename from packages/medusa/src/api-v2/admin/workflows/[id]/run/route.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/run/route.ts index 8248a986a93c7..270313cab717e 100644 --- a/packages/medusa/src/api-v2/admin/workflows/[id]/run/route.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/run/route.ts @@ -11,7 +11,7 @@ export const POST = async (req: MedusaRequest, res: MedusaResponse) => { ModuleRegistrationName.WORKFLOW_ENGINE ) - const { id: workflow_id } = req.params + const { workflow_id } = req.params const { transaction_id, input } = req.validatedBody as AdminPostWorkflowsRunReq diff --git a/packages/medusa/src/api-v2/admin/workflows/[id]/subscribe/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/subscribe/route.ts similarity index 96% rename from packages/medusa/src/api-v2/admin/workflows/[id]/subscribe/route.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/subscribe/route.ts index aeb30187537e2..6bd3af50c4e0d 100644 --- a/packages/medusa/src/api-v2/admin/workflows/[id]/subscribe/route.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/subscribe/route.ts @@ -7,7 +7,7 @@ export const GET = async (req: MedusaRequest, res: MedusaResponse) => { ModuleRegistrationName.WORKFLOW_ENGINE ) - const { id: workflow_id } = req.query as any + const { workflow_id } = req.query as any const subscriberId = "__sub__" + Math.random().toString(36).substring(2, 9) res.writeHead(200, { diff --git a/packages/medusa/src/api-v2/admin/workflows/middlewares.ts b/packages/medusa/src/api-v2/admin/workflows-executions/middlewares.ts similarity index 61% rename from packages/medusa/src/api-v2/admin/workflows/middlewares.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/middlewares.ts index f24c1fdb45ed5..100cb1738fe09 100644 --- a/packages/medusa/src/api-v2/admin/workflows/middlewares.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/middlewares.ts @@ -8,10 +8,10 @@ import { AdminPostWorkflowsRunReq, } from "./validators" -export const adminWorkflowsMiddlewares: MiddlewareRoute[] = [ +export const adminWorkflowsExecutionsMiddlewares: MiddlewareRoute[] = [ { method: ["GET"], - matcher: "/admin/workflows/execution", + matcher: "/admin/workflows-executions", middlewares: [ transformQuery( AdminGetWorkflowExecutionsParams, @@ -21,7 +21,17 @@ export const adminWorkflowsMiddlewares: MiddlewareRoute[] = [ }, { method: ["GET"], - matcher: "/admin/workflows/:id/:transaction_id", + matcher: "/admin/workflows-executions/:id", + middlewares: [ + transformQuery( + AdminGetWorkflowExecutionDetailsParams, + QueryConfig.retrieveTransformQueryConfig + ), + ], + }, + { + method: ["GET"], + matcher: "/admin/workflows-executions/:workflow_id/:transaction_id", middlewares: [ transformQuery( AdminGetWorkflowExecutionDetailsParams, @@ -31,17 +41,17 @@ export const adminWorkflowsMiddlewares: MiddlewareRoute[] = [ }, { method: ["POST"], - matcher: "/admin/workflows/:id/run", + matcher: "/admin/workflows-executions/:id/run", middlewares: [transformBody(AdminPostWorkflowsRunReq)], }, { method: ["POST"], - matcher: "/admin/workflows/:id/:transaction_id/:step_id/success", + matcher: "/admin/workflows-executions/:id/:transaction_id/:step_id/success", middlewares: [transformBody(AdminPostWorkflowsAsyncResponseReq)], }, { method: ["POST"], - matcher: "/admin/workflows/:id/:transaction_id/:step_id/failure", + matcher: "/admin/workflows-executions/:id/:transaction_id/:step_id/failure", middlewares: [transformBody(AdminPostWorkflowsAsyncResponseReq)], }, ] diff --git a/packages/medusa/src/api-v2/admin/workflows/query-config.ts b/packages/medusa/src/api-v2/admin/workflows-executions/query-config.ts similarity index 100% rename from packages/medusa/src/api-v2/admin/workflows/query-config.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/query-config.ts diff --git a/packages/medusa/src/api-v2/admin/workflows/execution/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/route.ts similarity index 93% rename from packages/medusa/src/api-v2/admin/workflows/execution/route.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/route.ts index d5d87190b6631..2903e80fc5554 100644 --- a/packages/medusa/src/api-v2/admin/workflows/execution/route.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/route.ts @@ -1,6 +1,6 @@ import { ModuleRegistrationName } from "@medusajs/modules-sdk" import { IWorkflowEngineService } from "@medusajs/workflows-sdk" -import { MedusaRequest, MedusaResponse } from "../../../../types/routing" +import { MedusaRequest, MedusaResponse } from "../../../types/routing" export const GET = async (req: MedusaRequest, res: MedusaResponse) => { const workflowEngineService: IWorkflowEngineService = req.scope.resolve( diff --git a/packages/medusa/src/api-v2/admin/workflows/validators.ts b/packages/medusa/src/api-v2/admin/workflows-executions/validators.ts similarity index 100% rename from packages/medusa/src/api-v2/admin/workflows/validators.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/validators.ts diff --git a/packages/medusa/src/api-v2/middlewares.ts b/packages/medusa/src/api-v2/middlewares.ts index 89b846885ddee..22d7698e4a133 100644 --- a/packages/medusa/src/api-v2/middlewares.ts +++ b/packages/medusa/src/api-v2/middlewares.ts @@ -4,7 +4,7 @@ import { adminCustomerGroupRoutesMiddlewares } from "./admin/customer-groups/mid import { adminCustomerRoutesMiddlewares } from "./admin/customers/middlewares" import { adminPromotionRoutesMiddlewares } from "./admin/promotions/middlewares" import { adminRegionRoutesMiddlewares } from "./admin/regions/middlewares" -import { adminWorkflowsMiddlewares } from "./admin/workflows/middlewares" +import { adminWorkflowsExecutionsMiddlewares } from "./admin/workflows-executions/middlewares" import { authRoutesMiddlewares } from "./auth/middlewares" import { storeCartRoutesMiddlewares } from "./store/carts/middlewares" import { storeCustomerRoutesMiddlewares } from "./store/customers/middlewares" @@ -18,7 +18,7 @@ export const config: MiddlewaresConfig = { ...storeCustomerRoutesMiddlewares, ...storeCartRoutesMiddlewares, ...authRoutesMiddlewares, - ...adminWorkflowsMiddlewares, + ...adminWorkflowsExecutionsMiddlewares, ...adminRegionRoutesMiddlewares, ], } diff --git a/packages/medusa/src/api/index.js b/packages/medusa/src/api/index.js index dcd5b29aed7ae..fd08f26953135 100644 --- a/packages/medusa/src/api/index.js +++ b/packages/medusa/src/api/index.js @@ -66,7 +66,7 @@ export * from "./routes/admin/tax-rates" export * from "./routes/admin/uploads" export * from "./routes/admin/users" export * from "./routes/admin/variants" -export * from "./routes/admin/workflows" +export * from "./routes/admin/workflows-executions" // Store export * from "./routes/store/auth" export * from "./routes/store/carts" diff --git a/packages/medusa/src/api/routes/admin/index.js b/packages/medusa/src/api/routes/admin/index.js index 7fb970ffd2e08..dc4ed7dfc5c0e 100644 --- a/packages/medusa/src/api/routes/admin/index.js +++ b/packages/medusa/src/api/routes/admin/index.js @@ -41,7 +41,7 @@ import taxRateRoutes from "./tax-rates" import uploadRoutes from "./uploads" import userRoutes, { unauthenticatedUserRoutes } from "./users" import variantRoutes from "./variants" -import workflowRoutes from "./workflows" +import workflowRoutes from "./workflows-executions" const route = Router() diff --git a/packages/medusa/src/api/routes/admin/workflows/get-execution.ts b/packages/medusa/src/api/routes/admin/workflows-executions/get-execution.ts similarity index 90% rename from packages/medusa/src/api/routes/admin/workflows/get-execution.ts rename to packages/medusa/src/api/routes/admin/workflows-executions/get-execution.ts index 3c1330c1f6892..41315fba68566 100644 --- a/packages/medusa/src/api/routes/admin/workflows/get-execution.ts +++ b/packages/medusa/src/api/routes/admin/workflows-executions/get-execution.ts @@ -7,10 +7,10 @@ export default async (req: MedusaRequest, res: MedusaResponse) => { ModuleRegistrationName.WORKFLOW_ENGINE ) - const { id: workflow_id, transaction_id } = req.params + const { id, workflow_id, transaction_id } = req.params const execution = await workflowEngineService.retrieveWorkflowExecution( - { + id ?? { workflow_id, transaction_id, }, diff --git a/packages/medusa/src/api/routes/admin/workflows/index.ts b/packages/medusa/src/api/routes/admin/workflows-executions/index.ts similarity index 86% rename from packages/medusa/src/api/routes/admin/workflows/index.ts rename to packages/medusa/src/api/routes/admin/workflows-executions/index.ts index 9abe54560f4bd..6ea156a771b0a 100644 --- a/packages/medusa/src/api/routes/admin/workflows/index.ts +++ b/packages/medusa/src/api/routes/admin/workflows-executions/index.ts @@ -33,16 +33,16 @@ const listTransformQueryConfig = { } export default (app) => { - app.use("/workflows", route) + app.use("/workflows-executions", route) route.get( - "/execution", + "/", transformQuery(AdminGetWorkflowExecutionsParams, listTransformQueryConfig), middlewares.wrap(require("./list-execution").default) ) route.get( - "/:id/:transaction_id", + "/:id", transformQuery( AdminGetWorkflowExecutionDetailsParams, retrieveTransformQueryConfig @@ -50,10 +50,13 @@ export default (app) => { middlewares.wrap(require("./get-execution").default) ) - route.post( - "/:id/run", - transformBody(AdminPostWorkflowsRunReq), - middlewares.wrap(require("./run-workflow").default) + route.get( + "/:workflow_id/:transaction_id", + transformQuery( + AdminGetWorkflowExecutionDetailsParams, + retrieveTransformQueryConfig + ), + middlewares.wrap(require("./get-execution").default) ) route.post( @@ -68,6 +71,12 @@ export default (app) => { middlewares.wrap(require("./set-step-failure").default) ) + route.post( + "/:id/run", + transformBody(AdminPostWorkflowsRunReq), + middlewares.wrap(require("./run-workflow").default) + ) + return app } diff --git a/packages/medusa/src/api/routes/admin/workflows/list-execution.ts b/packages/medusa/src/api/routes/admin/workflows-executions/list-execution.ts similarity index 100% rename from packages/medusa/src/api/routes/admin/workflows/list-execution.ts rename to packages/medusa/src/api/routes/admin/workflows-executions/list-execution.ts diff --git a/packages/medusa/src/api/routes/admin/workflows/query-config.ts b/packages/medusa/src/api/routes/admin/workflows-executions/query-config.ts similarity index 100% rename from packages/medusa/src/api/routes/admin/workflows/query-config.ts rename to packages/medusa/src/api/routes/admin/workflows-executions/query-config.ts diff --git a/packages/medusa/src/api/routes/admin/workflows/run-workflow.ts b/packages/medusa/src/api/routes/admin/workflows-executions/run-workflow.ts similarity index 100% rename from packages/medusa/src/api/routes/admin/workflows/run-workflow.ts rename to packages/medusa/src/api/routes/admin/workflows-executions/run-workflow.ts diff --git a/packages/medusa/src/api/routes/admin/workflows/set-step-failure.ts b/packages/medusa/src/api/routes/admin/workflows-executions/set-step-failure.ts similarity index 100% rename from packages/medusa/src/api/routes/admin/workflows/set-step-failure.ts rename to packages/medusa/src/api/routes/admin/workflows-executions/set-step-failure.ts diff --git a/packages/medusa/src/api/routes/admin/workflows/set-step-success.ts b/packages/medusa/src/api/routes/admin/workflows-executions/set-step-success.ts similarity index 100% rename from packages/medusa/src/api/routes/admin/workflows/set-step-success.ts rename to packages/medusa/src/api/routes/admin/workflows-executions/set-step-success.ts diff --git a/packages/medusa/src/api/routes/admin/workflows/subscribe.ts b/packages/medusa/src/api/routes/admin/workflows-executions/subscribe.ts similarity index 100% rename from packages/medusa/src/api/routes/admin/workflows/subscribe.ts rename to packages/medusa/src/api/routes/admin/workflows-executions/subscribe.ts diff --git a/packages/medusa/src/api/routes/admin/workflows/validators.ts b/packages/medusa/src/api/routes/admin/workflows-executions/validators.ts similarity index 100% rename from packages/medusa/src/api/routes/admin/workflows/validators.ts rename to packages/medusa/src/api/routes/admin/workflows-executions/validators.ts diff --git a/packages/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/workflow-engine-inmemory/src/services/workflows-module.ts index d6b055fe3422c..d384ebedf38a1 100644 --- a/packages/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/workflow-engine-inmemory/src/services/workflows-module.ts @@ -11,6 +11,7 @@ import { InjectSharedContext, MedusaContext, MedusaError, + isString, } from "@medusajs/utils" import type { IWorkflowEngineService, @@ -51,18 +52,24 @@ export class WorkflowsModuleService implements IWorkflowEngineService { @InjectManager("baseRepository_") async retrieveWorkflowExecution( - idOrObject: { - workflow_id: string - transaction_id: string - }, + idOrObject: + | string + | { + workflow_id: string + transaction_id: string + }, config: FindConfig = {}, @MedusaContext() sharedContext: Context = {} ): Promise { + const objValue = isString(idOrObject) + ? { id: idOrObject } + : { + workflow_id: idOrObject.workflow_id, + transaction_id: idOrObject.transaction_id, + } + const wfExecution = await this.workflowExecutionService_.list( - { - workflow_id: idOrObject.workflow_id, - transaction_id: idOrObject.transaction_id, - }, + objValue, config, sharedContext ) @@ -70,9 +77,9 @@ export class WorkflowsModuleService implements IWorkflowEngineService { if (wfExecution.length === 0) { throw new MedusaError( MedusaError.Types.NOT_FOUND, - `WorkflowExecution with workflow_id, transaction_id: ${Object.values( - idOrObject - ).join(", ")} was not found` + `WorkflowExecution with ${Object.keys(objValue).join( + ", " + )}: ${Object.values(objValue).join(", ")} was not found` ) } diff --git a/packages/workflow-engine-redis/src/services/workflows-module.ts b/packages/workflow-engine-redis/src/services/workflows-module.ts index d6b055fe3422c..d384ebedf38a1 100644 --- a/packages/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/workflow-engine-redis/src/services/workflows-module.ts @@ -11,6 +11,7 @@ import { InjectSharedContext, MedusaContext, MedusaError, + isString, } from "@medusajs/utils" import type { IWorkflowEngineService, @@ -51,18 +52,24 @@ export class WorkflowsModuleService implements IWorkflowEngineService { @InjectManager("baseRepository_") async retrieveWorkflowExecution( - idOrObject: { - workflow_id: string - transaction_id: string - }, + idOrObject: + | string + | { + workflow_id: string + transaction_id: string + }, config: FindConfig = {}, @MedusaContext() sharedContext: Context = {} ): Promise { + const objValue = isString(idOrObject) + ? { id: idOrObject } + : { + workflow_id: idOrObject.workflow_id, + transaction_id: idOrObject.transaction_id, + } + const wfExecution = await this.workflowExecutionService_.list( - { - workflow_id: idOrObject.workflow_id, - transaction_id: idOrObject.transaction_id, - }, + objValue, config, sharedContext ) @@ -70,9 +77,9 @@ export class WorkflowsModuleService implements IWorkflowEngineService { if (wfExecution.length === 0) { throw new MedusaError( MedusaError.Types.NOT_FOUND, - `WorkflowExecution with workflow_id, transaction_id: ${Object.values( - idOrObject - ).join(", ")} was not found` + `WorkflowExecution with ${Object.keys(objValue).join( + ", " + )}: ${Object.values(objValue).join(", ")} was not found` ) } diff --git a/packages/workflows-sdk/src/types/service.ts b/packages/workflows-sdk/src/types/service.ts index 4619aa89720d0..3deb044f839fc 100644 --- a/packages/workflows-sdk/src/types/service.ts +++ b/packages/workflows-sdk/src/types/service.ts @@ -33,10 +33,12 @@ export type IdempotencyKeyParts = { export interface IWorkflowEngineService extends IModuleService { retrieveWorkflowExecution( - idOrObject: { - workflow_id: string - transaction_id: string - }, + idOrObject: + | string + | { + workflow_id: string + transaction_id: string + }, config?: FindConfig, sharedContext?: Context ) From 53bde29f4e19724fcb189167f0429722e68aba6e Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 12 Feb 2024 16:45:50 -0300 Subject: [PATCH 5/6] type --- packages/workflows-sdk/src/types/service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/workflows-sdk/src/types/service.ts b/packages/workflows-sdk/src/types/service.ts index 3deb044f839fc..a2cfa9aa48686 100644 --- a/packages/workflows-sdk/src/types/service.ts +++ b/packages/workflows-sdk/src/types/service.ts @@ -41,7 +41,7 @@ export interface IWorkflowEngineService extends IModuleService { }, config?: FindConfig, sharedContext?: Context - ) + ): Promise listWorkflowExecution( filters?: FilterableWorkflowExecutionProps, From 58dad39545860885363eb2a6746f633c4f6bda42 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Tue, 13 Feb 2024 08:20:28 -0300 Subject: [PATCH 6/6] change endpoints --- .../plugins/__tests__/workflow-engine/api.ts | 268 +---------------- .../__tests__/workflow-engine/api_v2.ts | 264 +---------------- .../__tests__/workflow-engine/tests.ts | 273 ++++++++++++++++++ .../[step_id] => steps}/failure/route.ts | 11 +- .../[step_id] => steps}/success/route.ts | 11 +- .../admin/workflows-executions/middlewares.ts | 5 +- .../admin/workflows-executions/route.ts | 32 +- .../admin/workflows-executions/validators.ts | 6 + .../admin/workflows-executions/index.ts | 4 +- .../workflows-executions/list-execution.ts | 32 +- .../workflows-executions/set-step-failure.ts | 4 +- .../workflows-executions/set-step-success.ts | 4 +- .../admin/workflows-executions/validators.ts | 6 + .../src/services/workflows-module.ts | 32 ++ .../src/services/workflows-module.ts | 32 ++ packages/workflows-sdk/src/types/common.ts | 10 +- 16 files changed, 397 insertions(+), 597 deletions(-) create mode 100644 integration-tests/plugins/__tests__/workflow-engine/tests.ts rename packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/{[transaction_id]/[step_id] => steps}/failure/route.ts (82%) rename packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/{[transaction_id]/[step_id] => steps}/success/route.ts (82%) diff --git a/integration-tests/plugins/__tests__/workflow-engine/api.ts b/integration-tests/plugins/__tests__/workflow-engine/api.ts index 38654f106fdee..7498e32e3069f 100644 --- a/integration-tests/plugins/__tests__/workflow-engine/api.ts +++ b/integration-tests/plugins/__tests__/workflow-engine/api.ts @@ -1,273 +1,9 @@ -import { useApi } from "../../../environment-helpers/use-api" -import { initDb, useDb } from "../../../environment-helpers/use-db" - -import { - StepResponse, - WorkflowData, - createStep, - createWorkflow, -} from "@medusajs/workflows-sdk" -import { AxiosInstance } from "axios" -import path from "path" -import { startBootstrapApp } from "../../../environment-helpers/bootstrap-app" -import { getContainer } from "../../../environment-helpers/use-container" -import adminSeeder from "../../../helpers/admin-seeder" +import { workflowEngineTestSuite } from "./tests" jest.setTimeout(5000000) -const adminHeaders = { - headers: { - "x-medusa-access-token": "test_token", - }, -} - const env = { MEDUSA_FF_MEDUSA_V2: false, } -describe("Workflow Engine API", () => { - let medusaContainer - let dbConnection - let shutdownServer - - beforeAll(async () => { - const cwd = path.resolve(path.join(__dirname, "..", "..")) - dbConnection = await initDb({ - cwd, - env, - force_modules_migration: true, - } as any) - shutdownServer = await startBootstrapApp({ cwd, env }) - medusaContainer = getContainer() - - await adminSeeder(dbConnection) - }) - - afterAll(async () => { - const db = useDb() - await db.shutdown() - await shutdownServer() - }) - - describe("running workflows", () => { - beforeAll(async () => { - const step1 = createStep( - { - name: "my-step", - }, - async (input: { initial: string }) => { - return new StepResponse({ - result: input.initial, - }) - } - ) - const step2 = createStep( - { - name: "my-step-async", - async: true, - }, - async () => {} - ) - - createWorkflow( - { - name: "my-workflow-name", - retentionTime: 1000, - }, - function (input: WorkflowData<{ initial: string }>) { - step1(input) - return step2() - } - ) - }) - - it("Should list all workflows in execution or completed and retrieve them by id", async () => { - const api = useApi()! as AxiosInstance - - for (let i = 3; i--; ) { - await api.post( - `/admin/workflows-executions/my-workflow-name/run`, - { - input: { - initial: "abc", - }, - }, - adminHeaders - ) - } - - const executions = await api.get( - `/admin/workflows-executions`, - adminHeaders - ) - - expect(executions.data.count).toEqual(3) - expect(executions.data.workflow_executions.length).toEqual(3) - expect(executions.data.workflow_executions[0]).toEqual({ - workflow_id: "my-workflow-name", - transaction_id: expect.any(String), - id: expect.any(String), - state: "invoking", - created_at: expect.any(String), - updated_at: expect.any(String), - deleted_at: null, - }) - - const retrivedById = await api.get( - `/admin/workflows-executions/` + - executions.data.workflow_executions[0].id, - adminHeaders - ) - expect(retrivedById.data.workflow_execution).toEqual( - expect.objectContaining(executions.data.workflow_executions[0]) - ) - }) - - it("Should list all workflows matching the filters", async () => { - const api = useApi()! as AxiosInstance - - for (let i = 3; i--; ) { - await api.post( - `/admin/workflows-executions/my-workflow-name/run`, - { - input: { - initial: "abc", - }, - transaction_id: "transaction_" + (i + 1), - }, - adminHeaders - ) - } - - const executions = await api.get( - `/admin/workflows-executions?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, - adminHeaders - ) - - expect(executions.data.count).toEqual(2) - expect(executions.data.workflow_executions.length).toEqual(2) - expect(executions.data.workflow_executions[0]).toEqual({ - workflow_id: "my-workflow-name", - transaction_id: expect.stringMatching(/transaction_1|transaction_2/), - id: expect.any(String), - state: "invoking", - created_at: expect.any(String), - updated_at: expect.any(String), - deleted_at: null, - }) - expect(executions.data.workflow_executions[1]).toEqual({ - workflow_id: "my-workflow-name", - transaction_id: expect.stringMatching(/transaction_1|transaction_2/), - id: expect.any(String), - state: "invoking", - created_at: expect.any(String), - updated_at: expect.any(String), - deleted_at: null, - }) - }) - - it("Should execute a workflow and retrieve its execution while running and when it is completed", async () => { - const api = useApi()! as AxiosInstance - - const wf = await api.post( - `/admin/workflows-executions/my-workflow-name/run`, - { - input: { - initial: "abc", - }, - transaction_id: "trx_123", - }, - adminHeaders - ) - - expect(wf.data).toEqual({ - acknowledgement: { - transactionId: "trx_123", - workflowId: "my-workflow-name", - }, - }) - - const execution = await api.get( - `/admin/workflows-executions/my-workflow-name/trx_123`, - adminHeaders - ) - - expect(execution.data).toEqual({ - workflow_execution: expect.objectContaining({ - workflow_id: "my-workflow-name", - transaction_id: "trx_123", - id: expect.any(String), - state: "invoking", - execution: expect.objectContaining({ - hasAsyncSteps: true, - hasFailedSteps: false, - hasSkippedSteps: false, - hasWaitingSteps: false, - hasRevertedSteps: false, - }), - context: expect.objectContaining({ - data: expect.objectContaining({ - invoke: { - "my-step": { - __type: "WorkflowWorkflowData", - output: { - __type: "WorkflowStepResponse", - output: { - result: "abc", - }, - compensateInput: { - result: "abc", - }, - }, - }, - }, - payload: { - initial: "abc", - }, - }), - }), - }), - }) - - const respondAsync = await api.post( - `/admin/workflows-executions/my-workflow-name/trx_123/my-step-async/success`, - { - response: { - all: "good", - }, - }, - adminHeaders - ) - - expect(respondAsync.data.success).toEqual(true) - - const completed = await api.get( - `/admin/workflows-executions/my-workflow-name/trx_123`, - adminHeaders - ) - - expect(completed.data).toEqual({ - workflow_execution: expect.objectContaining({ - workflow_id: "my-workflow-name", - transaction_id: "trx_123", - state: "done", - context: expect.objectContaining({ - data: expect.objectContaining({ - invoke: expect.objectContaining({ - "my-step-async": { - __type: "WorkflowStepResponse", - output: { - all: "good", - }, - compensateInput: { - all: "good", - }, - }, - }), - }), - }), - }), - }) - }) - }) -}) +workflowEngineTestSuite(env, { force_modules_migration: true }) diff --git a/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts b/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts index 282a262305c2d..0422b215e28df 100644 --- a/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts +++ b/integration-tests/plugins/__tests__/workflow-engine/api_v2.ts @@ -1,269 +1,9 @@ -import { useApi } from "../../../environment-helpers/use-api" -import { initDb, useDb } from "../../../environment-helpers/use-db" - -import { - StepResponse, - WorkflowData, - createStep, - createWorkflow, -} from "@medusajs/workflows-sdk" -import { AxiosInstance } from "axios" -import path from "path" -import { startBootstrapApp } from "../../../environment-helpers/bootstrap-app" -import { getContainer } from "../../../environment-helpers/use-container" -import adminSeeder from "../../../helpers/admin-seeder" +import { workflowEngineTestSuite } from "./tests" jest.setTimeout(5000000) -const adminHeaders = { - headers: { - "x-medusa-access-token": "test_token", - }, -} - const env = { MEDUSA_FF_MEDUSA_V2: true, } -describe("Workflow Engine API", () => { - let medusaContainer - let dbConnection - let shutdownServer - - beforeAll(async () => { - const cwd = path.resolve(path.join(__dirname, "..", "..")) - dbConnection = await initDb({ cwd, env } as any) - shutdownServer = await startBootstrapApp({ cwd, env }) - medusaContainer = getContainer() - - await adminSeeder(dbConnection) - }) - - afterAll(async () => { - const db = useDb() - await db.shutdown() - await shutdownServer() - }) - - describe("running workflows", () => { - beforeAll(async () => { - const step1 = createStep( - { - name: "my-step", - }, - async (input: { initial: string }) => { - return new StepResponse({ - result: input.initial, - }) - } - ) - const step2 = createStep( - { - name: "my-step-async", - async: true, - }, - async () => {} - ) - - createWorkflow( - { - name: "my-workflow-name", - retentionTime: 1000, - }, - function (input: WorkflowData<{ initial: string }>) { - step1(input) - return step2() - } - ) - }) - - it("Should list all workflows in execution or completed and retrieve them by id", async () => { - const api = useApi()! as AxiosInstance - - for (let i = 3; i--; ) { - await api.post( - `/admin/workflows-executions/my-workflow-name/run`, - { - input: { - initial: "abc", - }, - }, - adminHeaders - ) - } - - const executions = await api.get( - `/admin/workflows-executions`, - adminHeaders - ) - - expect(executions.data.count).toEqual(3) - expect(executions.data.workflow_executions.length).toEqual(3) - expect(executions.data.workflow_executions[0]).toEqual({ - workflow_id: "my-workflow-name", - transaction_id: expect.any(String), - id: expect.any(String), - state: "invoking", - created_at: expect.any(String), - updated_at: expect.any(String), - deleted_at: null, - }) - - const retrivedById = await api.get( - `/admin/workflows-executions/` + - executions.data.workflow_executions[0].id, - adminHeaders - ) - expect(retrivedById.data.workflow_execution).toEqual( - expect.objectContaining(executions.data.workflow_executions[0]) - ) - }) - - it("Should list all workflows matching the filters", async () => { - const api = useApi()! as AxiosInstance - - for (let i = 3; i--; ) { - await api.post( - `/admin/workflows-executions/my-workflow-name/run`, - { - input: { - initial: "abc", - }, - transaction_id: "transaction_" + (i + 1), - }, - adminHeaders - ) - } - - const executions = await api.get( - `/admin/workflows-executions?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, - adminHeaders - ) - - expect(executions.data.count).toEqual(2) - expect(executions.data.workflow_executions.length).toEqual(2) - expect(executions.data.workflow_executions[0]).toEqual({ - workflow_id: "my-workflow-name", - transaction_id: expect.stringMatching(/transaction_1|transaction_2/), - id: expect.any(String), - state: "invoking", - created_at: expect.any(String), - updated_at: expect.any(String), - deleted_at: null, - }) - expect(executions.data.workflow_executions[1]).toEqual({ - workflow_id: "my-workflow-name", - transaction_id: expect.stringMatching(/transaction_1|transaction_2/), - id: expect.any(String), - state: "invoking", - created_at: expect.any(String), - updated_at: expect.any(String), - deleted_at: null, - }) - }) - - it("Should execute a workflow and retrieve its execution while running and when it is completed", async () => { - const api = useApi()! as AxiosInstance - - const wf = await api.post( - `/admin/workflows-executions/my-workflow-name/run`, - { - input: { - initial: "abc", - }, - transaction_id: "trx_123", - }, - adminHeaders - ) - - expect(wf.data).toEqual({ - acknowledgement: { - transactionId: "trx_123", - workflowId: "my-workflow-name", - }, - }) - - const execution = await api.get( - `/admin/workflows-executions/my-workflow-name/trx_123`, - adminHeaders - ) - - expect(execution.data).toEqual({ - workflow_execution: expect.objectContaining({ - workflow_id: "my-workflow-name", - transaction_id: "trx_123", - id: expect.any(String), - state: "invoking", - execution: expect.objectContaining({ - hasAsyncSteps: true, - hasFailedSteps: false, - hasSkippedSteps: false, - hasWaitingSteps: false, - hasRevertedSteps: false, - }), - context: expect.objectContaining({ - data: expect.objectContaining({ - invoke: { - "my-step": { - __type: "WorkflowWorkflowData", - output: { - __type: "WorkflowStepResponse", - output: { - result: "abc", - }, - compensateInput: { - result: "abc", - }, - }, - }, - }, - payload: { - initial: "abc", - }, - }), - }), - }), - }) - - const respondAsync = await api.post( - `/admin/workflows-executions/my-workflow-name/trx_123/my-step-async/success`, - { - response: { - all: "good", - }, - }, - adminHeaders - ) - - expect(respondAsync.data.success).toEqual(true) - - const completed = await api.get( - `/admin/workflows-executions/my-workflow-name/trx_123`, - adminHeaders - ) - - expect(completed.data).toEqual({ - workflow_execution: expect.objectContaining({ - workflow_id: "my-workflow-name", - transaction_id: "trx_123", - state: "done", - context: expect.objectContaining({ - data: expect.objectContaining({ - invoke: expect.objectContaining({ - "my-step-async": { - __type: "WorkflowStepResponse", - output: { - all: "good", - }, - compensateInput: { - all: "good", - }, - }, - }), - }), - }), - }), - }) - }) - }) -}) +workflowEngineTestSuite(env) diff --git a/integration-tests/plugins/__tests__/workflow-engine/tests.ts b/integration-tests/plugins/__tests__/workflow-engine/tests.ts new file mode 100644 index 0000000000000..01a9dbbd8015f --- /dev/null +++ b/integration-tests/plugins/__tests__/workflow-engine/tests.ts @@ -0,0 +1,273 @@ +import { useApi } from "../../../environment-helpers/use-api" +import { initDb, useDb } from "../../../environment-helpers/use-db" + +import { + StepResponse, + WorkflowData, + createStep, + createWorkflow, +} from "@medusajs/workflows-sdk" +import { AxiosInstance } from "axios" +import path from "path" +import { startBootstrapApp } from "../../../environment-helpers/bootstrap-app" +import { getContainer } from "../../../environment-helpers/use-container" +import adminSeeder from "../../../helpers/admin-seeder" + +export const workflowEngineTestSuite = (env, extraParams = {}) => { + const adminHeaders = { + headers: { + "x-medusa-access-token": "test_token", + }, + } + + describe("Workflow Engine API", () => { + let medusaContainer + let dbConnection + let shutdownServer + + beforeAll(async () => { + const cwd = path.resolve(path.join(__dirname, "..", "..")) + dbConnection = await initDb({ cwd, env, ...extraParams } as any) + shutdownServer = await startBootstrapApp({ cwd, env }) + medusaContainer = getContainer() + + await adminSeeder(dbConnection) + }) + + afterAll(async () => { + const db = useDb() + await db.shutdown() + await shutdownServer() + }) + + describe("running workflows", () => { + beforeAll(async () => { + const step1 = createStep( + { + name: "my-step", + }, + async (input: { initial: string }) => { + return new StepResponse({ + result: input.initial, + }) + } + ) + const step2 = createStep( + { + name: "my-step-async", + async: true, + }, + async () => {} + ) + + createWorkflow( + { + name: "my-workflow-name", + retentionTime: 1000, + }, + function (input: WorkflowData<{ initial: string }>) { + step1(input) + return step2() + } + ) + }) + + it("Should list all workflows in execution or completed and retrieve them by id", async () => { + const api = useApi()! as AxiosInstance + + for (let i = 3; i--; ) { + await api.post( + `/admin/workflows-executions/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + }, + adminHeaders + ) + } + + const executions = await api.get( + `/admin/workflows-executions`, + adminHeaders + ) + + expect(executions.data.count).toEqual(3) + expect(executions.data.workflow_executions.length).toEqual(3) + expect(executions.data.workflow_executions[0]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.any(String), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + + const retrivedById = await api.get( + `/admin/workflows-executions/` + + executions.data.workflow_executions[0].id, + adminHeaders + ) + expect(retrivedById.data.workflow_execution).toEqual( + expect.objectContaining(executions.data.workflow_executions[0]) + ) + }) + + it("Should list all workflows matching the filters", async () => { + const api = useApi()! as AxiosInstance + + for (let i = 3; i--; ) { + await api.post( + `/admin/workflows-executions/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + transaction_id: "transaction_" + (i + 1), + }, + adminHeaders + ) + } + + const executions = await api.get( + `/admin/workflows-executions?transaction_id[]=transaction_1&transaction_id[]=transaction_2`, + adminHeaders + ) + + expect(executions.data.count).toEqual(2) + expect(executions.data.workflow_executions.length).toEqual(2) + expect(executions.data.workflow_executions[0]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.stringMatching(/transaction_1|transaction_2/), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + expect(executions.data.workflow_executions[1]).toEqual({ + workflow_id: "my-workflow-name", + transaction_id: expect.stringMatching(/transaction_1|transaction_2/), + id: expect.any(String), + state: "invoking", + created_at: expect.any(String), + updated_at: expect.any(String), + deleted_at: null, + }) + }) + + it("Should execute a workflow and retrieve its execution while running and when it is completed", async () => { + const api = useApi()! as AxiosInstance + + const wf = await api.post( + `/admin/workflows-executions/my-workflow-name/run`, + { + input: { + initial: "abc", + }, + transaction_id: "trx_123", + }, + adminHeaders + ) + + expect(wf.data).toEqual({ + acknowledgement: { + transactionId: "trx_123", + workflowId: "my-workflow-name", + }, + }) + + const execution = await api.get( + `/admin/workflows-executions/my-workflow-name/trx_123`, + adminHeaders + ) + + expect(execution.data).toEqual({ + workflow_execution: expect.objectContaining({ + workflow_id: "my-workflow-name", + transaction_id: "trx_123", + id: expect.any(String), + state: "invoking", + execution: expect.objectContaining({ + hasAsyncSteps: true, + hasFailedSteps: false, + hasSkippedSteps: false, + hasWaitingSteps: false, + hasRevertedSteps: false, + }), + context: expect.objectContaining({ + data: expect.objectContaining({ + invoke: { + "my-step": { + __type: "WorkflowWorkflowData", + output: { + __type: "WorkflowStepResponse", + output: { + result: "abc", + }, + compensateInput: { + result: "abc", + }, + }, + }, + }, + payload: { + initial: "abc", + }, + }), + }), + }), + }) + + const respondAsync = await api.post( + `/admin/workflows-executions/my-workflow-name/steps/success`, + { + transaction_id: "trx_123", + step_id: "my-step-async", + response: { + all: "good", + }, + }, + adminHeaders + ) + + expect(respondAsync.data.success).toEqual(true) + + const completed = await api.get( + `/admin/workflows-executions/my-workflow-name/trx_123`, + adminHeaders + ) + + expect(completed.data).toEqual({ + workflow_execution: expect.objectContaining({ + workflow_id: "my-workflow-name", + transaction_id: "trx_123", + state: "done", + context: expect.objectContaining({ + data: expect.objectContaining({ + invoke: expect.objectContaining({ + "my-step-async": { + __type: "WorkflowStepResponse", + output: { + all: "good", + }, + compensateInput: { + all: "good", + }, + }, + }), + }), + }), + }), + }) + }) + }) + }) +} + +describe("Noop test", () => { + it("noop check", async () => { + expect(true).toBe(true) + }) +}) diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/failure/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/failure/route.ts similarity index 82% rename from packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/failure/route.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/failure/route.ts index f7fd222388c42..29e3e67a650d1 100644 --- a/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/failure/route.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/failure/route.ts @@ -1,21 +1,20 @@ import { ModuleRegistrationName } from "@medusajs/modules-sdk" import { TransactionHandlerType, isDefined } from "@medusajs/utils" import { IWorkflowEngineService, StepResponse } from "@medusajs/workflows-sdk" -import { - MedusaRequest, - MedusaResponse, -} from "../../../../../../../types/routing" -import { AdminPostWorkflowsAsyncResponseReq } from "../../../../validators" +import { MedusaRequest, MedusaResponse } from "../../../../../../types/routing" +import { AdminPostWorkflowsAsyncResponseReq } from "../../../validators" export const POST = async (req: MedusaRequest, res: MedusaResponse) => { const workflowEngineService: IWorkflowEngineService = req.scope.resolve( ModuleRegistrationName.WORKFLOW_ENGINE ) - const { workflow_id, transaction_id, step_id } = req.params + const { workflow_id } = req.params const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + const { transaction_id, step_id } = body + const compensateInput = body.compensate_input const stepResponse = isDefined(body.response) ? new StepResponse(body.response, compensateInput) diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/success/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/success/route.ts similarity index 82% rename from packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/success/route.ts rename to packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/success/route.ts index ad158d9654011..ac5e5d7658746 100644 --- a/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/[transaction_id]/[step_id]/success/route.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/[workflow_id]/steps/success/route.ts @@ -1,21 +1,20 @@ import { ModuleRegistrationName } from "@medusajs/modules-sdk" import { TransactionHandlerType, isDefined } from "@medusajs/utils" import { IWorkflowEngineService, StepResponse } from "@medusajs/workflows-sdk" -import { - MedusaRequest, - MedusaResponse, -} from "../../../../../../../types/routing" -import { AdminPostWorkflowsAsyncResponseReq } from "../../../../validators" +import { MedusaRequest, MedusaResponse } from "../../../../../../types/routing" +import { AdminPostWorkflowsAsyncResponseReq } from "../../../validators" export const POST = async (req: MedusaRequest, res: MedusaResponse) => { const workflowEngineService: IWorkflowEngineService = req.scope.resolve( ModuleRegistrationName.WORKFLOW_ENGINE ) - const { workflow_id, transaction_id, step_id } = req.params + const { workflow_id } = req.params const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + const { transaction_id, step_id } = body + const compensateInput = body.compensate_input const stepResponse = isDefined(body.response) ? new StepResponse(body.response, compensateInput) diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/middlewares.ts b/packages/medusa/src/api-v2/admin/workflows-executions/middlewares.ts index 100cb1738fe09..47b00944951bd 100644 --- a/packages/medusa/src/api-v2/admin/workflows-executions/middlewares.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/middlewares.ts @@ -46,12 +46,13 @@ export const adminWorkflowsExecutionsMiddlewares: MiddlewareRoute[] = [ }, { method: ["POST"], - matcher: "/admin/workflows-executions/:id/:transaction_id/:step_id/success", + + matcher: "/admin/workflows-executions/:id/steps/success", middlewares: [transformBody(AdminPostWorkflowsAsyncResponseReq)], }, { method: ["POST"], - matcher: "/admin/workflows-executions/:id/:transaction_id/:step_id/failure", + matcher: "/admin/workflows-executions/:id/steps/failure", middlewares: [transformBody(AdminPostWorkflowsAsyncResponseReq)], }, ] diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/route.ts b/packages/medusa/src/api-v2/admin/workflows-executions/route.ts index 2903e80fc5554..ab8127411e148 100644 --- a/packages/medusa/src/api-v2/admin/workflows-executions/route.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/route.ts @@ -9,30 +9,16 @@ export const GET = async (req: MedusaRequest, res: MedusaResponse) => { const listConfig = req.listConfig - const { transaction_id, workflow_id } = req.filterableFields - - const transactionIds = Array.isArray(transaction_id) - ? transaction_id - : [transaction_id] - const workflowIds = Array.isArray(workflow_id) ? workflow_id : [workflow_id] - - const filters = {} - - if (transaction_id) { - filters["transaction_id"] = transactionIds - } - - if (workflow_id) { - filters["workflow_id"] = workflowIds - } - const [workflow_executions, count] = - await workflowEngineService.listAndCountWorkflowExecution(filters, { - select: req.listConfig.select, - relations: req.listConfig.relations, - skip: listConfig.skip, - take: listConfig.take, - }) + await workflowEngineService.listAndCountWorkflowExecution( + req.filterableFields, + { + select: req.listConfig.select, + relations: req.listConfig.relations, + skip: listConfig.skip, + take: listConfig.take, + } + ) res.json({ workflow_executions, diff --git a/packages/medusa/src/api-v2/admin/workflows-executions/validators.ts b/packages/medusa/src/api-v2/admin/workflows-executions/validators.ts index 8e898a627f0bf..27da3231fa2cd 100644 --- a/packages/medusa/src/api-v2/admin/workflows-executions/validators.ts +++ b/packages/medusa/src/api-v2/admin/workflows-executions/validators.ts @@ -35,6 +35,12 @@ export class AdminPostWorkflowsRunReq { } export class AdminPostWorkflowsAsyncResponseReq { + @IsString() + transaction_id: string + + @IsString() + step_id: string + @IsOptional() response?: unknown diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/index.ts b/packages/medusa/src/api/routes/admin/workflows-executions/index.ts index 6ea156a771b0a..092568bf04572 100644 --- a/packages/medusa/src/api/routes/admin/workflows-executions/index.ts +++ b/packages/medusa/src/api/routes/admin/workflows-executions/index.ts @@ -60,13 +60,13 @@ export default (app) => { ) route.post( - "/:id/:transaction_id/:step_id/success", + "/:id/steps/success", transformBody(AdminPostWorkflowsAsyncResponseReq), middlewares.wrap(require("./set-step-success").default) ) route.post( - "/:id/:transaction_id/:step_id/failure", + "/:id/steps/failure", transformBody(AdminPostWorkflowsAsyncResponseReq), middlewares.wrap(require("./set-step-failure").default) ) diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/list-execution.ts b/packages/medusa/src/api/routes/admin/workflows-executions/list-execution.ts index fc6b6cccc2153..eb9e5adebc686 100644 --- a/packages/medusa/src/api/routes/admin/workflows-executions/list-execution.ts +++ b/packages/medusa/src/api/routes/admin/workflows-executions/list-execution.ts @@ -9,30 +9,16 @@ export default async (req: MedusaRequest, res: MedusaResponse) => { const listConfig = req.listConfig - const { transaction_id, workflow_id } = req.filterableFields - - const transactionIds = Array.isArray(transaction_id) - ? transaction_id - : [transaction_id] - const workflowIds = Array.isArray(workflow_id) ? workflow_id : [workflow_id] - - const filters = {} - - if (transaction_id) { - filters["transaction_id"] = transactionIds - } - - if (workflow_id) { - filters["workflow_id"] = workflowIds - } - const [workflow_executions, count] = - await workflowEngineService.listAndCountWorkflowExecution(filters, { - select: req.listConfig.select, - relations: req.listConfig.relations, - skip: listConfig.skip, - take: listConfig.take, - }) + await workflowEngineService.listAndCountWorkflowExecution( + req.filterableFields, + { + select: req.listConfig.select, + relations: req.listConfig.relations, + skip: listConfig.skip, + take: listConfig.take, + } + ) res.json({ workflow_executions, diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/set-step-failure.ts b/packages/medusa/src/api/routes/admin/workflows-executions/set-step-failure.ts index 9c3c5c574b842..5ff902b81a971 100644 --- a/packages/medusa/src/api/routes/admin/workflows-executions/set-step-failure.ts +++ b/packages/medusa/src/api/routes/admin/workflows-executions/set-step-failure.ts @@ -9,10 +9,12 @@ export default async (req: MedusaRequest, res: MedusaResponse) => { ModuleRegistrationName.WORKFLOW_ENGINE ) - const { id: workflow_id, transaction_id, step_id } = req.params + const { id: workflow_id } = req.params const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + const { transaction_id, step_id } = body + const compensateInput = body.compensate_input const stepResponse = isDefined(body.response) ? new StepResponse(body.response, compensateInput) diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/set-step-success.ts b/packages/medusa/src/api/routes/admin/workflows-executions/set-step-success.ts index a3cc17f918670..e2b854358a8c0 100644 --- a/packages/medusa/src/api/routes/admin/workflows-executions/set-step-success.ts +++ b/packages/medusa/src/api/routes/admin/workflows-executions/set-step-success.ts @@ -9,10 +9,12 @@ export default async (req: MedusaRequest, res: MedusaResponse) => { ModuleRegistrationName.WORKFLOW_ENGINE ) - const { id: workflow_id, transaction_id, step_id } = req.params + const { id: workflow_id } = req.params const body = req.validatedBody as AdminPostWorkflowsAsyncResponseReq + const { transaction_id, step_id } = body + const compensateInput = body.compensate_input const stepResponse = isDefined(body.response) ? new StepResponse(body.response, compensateInput) diff --git a/packages/medusa/src/api/routes/admin/workflows-executions/validators.ts b/packages/medusa/src/api/routes/admin/workflows-executions/validators.ts index 1c61bc72bac64..50b8cb565a03c 100644 --- a/packages/medusa/src/api/routes/admin/workflows-executions/validators.ts +++ b/packages/medusa/src/api/routes/admin/workflows-executions/validators.ts @@ -35,6 +35,12 @@ export class AdminPostWorkflowsRunReq { } export class AdminPostWorkflowsAsyncResponseReq { + @IsString() + transaction_id: string + + @IsString() + step_id: string + @IsOptional() response?: unknown diff --git a/packages/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/workflow-engine-inmemory/src/services/workflows-module.ts index d384ebedf38a1..8701e6d26d219 100644 --- a/packages/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/workflow-engine-inmemory/src/services/workflows-module.ts @@ -98,6 +98,22 @@ export class WorkflowsModuleService implements IWorkflowEngineService { config: FindConfig = {}, @MedusaContext() sharedContext: Context = {} ): Promise { + if (filters.transaction_id) { + if (Array.isArray(filters.transaction_id)) { + filters.transaction_id = { + $in: filters.transaction_id, + } + } + } + + if (filters.workflow_id) { + if (Array.isArray(filters.workflow_id)) { + filters.workflow_id = { + $in: filters.workflow_id, + } + } + } + const wfExecutions = await this.workflowExecutionService_.list( filters, config, @@ -117,6 +133,22 @@ export class WorkflowsModuleService implements IWorkflowEngineService { config: FindConfig = {}, @MedusaContext() sharedContext: Context = {} ): Promise<[WorkflowOrchestratorTypes.WorkflowExecutionDTO[], number]> { + if (filters.transaction_id) { + if (Array.isArray(filters.transaction_id)) { + filters.transaction_id = { + $in: filters.transaction_id, + } + } + } + + if (filters.workflow_id) { + if (Array.isArray(filters.workflow_id)) { + filters.workflow_id = { + $in: filters.workflow_id, + } + } + } + const [wfExecutions, count] = await this.workflowExecutionService_.listAndCount( filters, diff --git a/packages/workflow-engine-redis/src/services/workflows-module.ts b/packages/workflow-engine-redis/src/services/workflows-module.ts index d384ebedf38a1..8701e6d26d219 100644 --- a/packages/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/workflow-engine-redis/src/services/workflows-module.ts @@ -98,6 +98,22 @@ export class WorkflowsModuleService implements IWorkflowEngineService { config: FindConfig = {}, @MedusaContext() sharedContext: Context = {} ): Promise { + if (filters.transaction_id) { + if (Array.isArray(filters.transaction_id)) { + filters.transaction_id = { + $in: filters.transaction_id, + } + } + } + + if (filters.workflow_id) { + if (Array.isArray(filters.workflow_id)) { + filters.workflow_id = { + $in: filters.workflow_id, + } + } + } + const wfExecutions = await this.workflowExecutionService_.list( filters, config, @@ -117,6 +133,22 @@ export class WorkflowsModuleService implements IWorkflowEngineService { config: FindConfig = {}, @MedusaContext() sharedContext: Context = {} ): Promise<[WorkflowOrchestratorTypes.WorkflowExecutionDTO[], number]> { + if (filters.transaction_id) { + if (Array.isArray(filters.transaction_id)) { + filters.transaction_id = { + $in: filters.transaction_id, + } + } + } + + if (filters.workflow_id) { + if (Array.isArray(filters.workflow_id)) { + filters.workflow_id = { + $in: filters.workflow_id, + } + } + } + const [wfExecutions, count] = await this.workflowExecutionService_.listAndCount( filters, diff --git a/packages/workflows-sdk/src/types/common.ts b/packages/workflows-sdk/src/types/common.ts index f3a81e72713fa..c8859970a3d2d 100644 --- a/packages/workflows-sdk/src/types/common.ts +++ b/packages/workflows-sdk/src/types/common.ts @@ -1,4 +1,4 @@ -import { BaseFilterable } from "@medusajs/types" +import { BaseFilterable, OperatorMap } from "@medusajs/types" export interface WorkflowExecutionDTO { id: string @@ -14,8 +14,8 @@ export interface WorkflowExecutionDTO { export interface FilterableWorkflowExecutionProps extends BaseFilterable { - id?: string[] - workflow_id?: string[] - transaction_id?: string[] - state?: any[] + id?: string | string[] | OperatorMap + workflow_id?: string | string[] | OperatorMap + transaction_id?: string | string[] | OperatorMap + state?: string | string[] | OperatorMap }