diff --git a/.changeset/clean-paws-build.md b/.changeset/clean-paws-build.md new file mode 100644 index 0000000000000..95b3498134ba7 --- /dev/null +++ b/.changeset/clean-paws-build.md @@ -0,0 +1,7 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/utils": patch +--- + +chore(workflow-engine): Migrate to DML diff --git a/packages/core/utils/src/modules-sdk/__tests__/joiner-config-builder.spec.ts b/packages/core/utils/src/modules-sdk/__tests__/joiner-config-builder.spec.ts index e7e93dfc5d64a..dcdfe6c00efb7 100644 --- a/packages/core/utils/src/modules-sdk/__tests__/joiner-config-builder.spec.ts +++ b/packages/core/utils/src/modules-sdk/__tests__/joiner-config-builder.spec.ts @@ -668,8 +668,8 @@ describe("joiner-config-builder", () => { serviceName: "myService", field: "car", entity: "Car", - linkable: "car_number_plate", - primaryKey: "number_plate", + linkable: "car_id", + primaryKey: "id", }) expect(linkConfig.user.toJSON()).toEqual({ serviceName: "myService", diff --git a/packages/core/utils/src/modules-sdk/joiner-config-builder.ts b/packages/core/utils/src/modules-sdk/joiner-config-builder.ts index 8c5809b6fbbba..b31278a88b5ae 100644 --- a/packages/core/utils/src/modules-sdk/joiner-config-builder.ts +++ b/packages/core/utils/src/modules-sdk/joiner-config-builder.ts @@ -17,7 +17,7 @@ import { toCamelCase, upperCaseFirst, } from "../common" -import { DmlEntity } from "../dml" +import { DmlEntity, IdProperty } from "../dml" import { toGraphQLSchema } from "../dml/helpers/create-graphql" import { PrimaryKeyModifier } from "../dml/properties/primary-key" import { BaseRelationship } from "../dml/relations/base" @@ -396,7 +396,10 @@ export function buildLinkConfigFromModelObjects< } const parsedProperty = (value as PropertyType).parse(property) - if (PrimaryKeyModifier.isPrimaryKeyModifier(value)) { + if ( + PrimaryKeyModifier.isPrimaryKeyModifier(value) || + IdProperty.isIdProperty(value) + ) { const linkableKeyName = parsedProperty.dataType.options?.linkable ?? `${camelToSnakeCase(model.name).toLowerCase()}_${property}` diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index e499e9381b37f..765d64facacbe 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -1,4 +1,7 @@ -import { WorkflowManager } from "@medusajs/framework/orchestration" +import { + DistributedTransactionType, + WorkflowManager, +} from "@medusajs/framework/orchestration" import { Context, IWorkflowEngineService, @@ -60,6 +63,20 @@ moduleIntegrationTestRunner({ serviceName: "workflows", field: "workflowExecution", }, + transaction_id: { + linkable: "workflow_execution_transaction_id", + entity: "WorkflowExecution", + primaryKey: "transaction_id", + serviceName: "workflows", + field: "workflowExecution", + }, + workflow_id: { + linkable: "workflow_execution_workflow_id", + entity: "WorkflowExecution", + primaryKey: "workflow_id", + serviceName: "workflows", + field: "workflowExecution", + }, }, }) }) @@ -87,12 +104,12 @@ moduleIntegrationTestRunner({ }) // Validate context event group id - expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual( - expect.objectContaining({ eventGroupId }) - ) - expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual( - expect.objectContaining({ eventGroupId }) - ) + expect( + (workflowEventGroupIdStep1Mock.mock.calls[0] as any[])[1] + ).toEqual(expect.objectContaining({ eventGroupId })) + expect( + (workflowEventGroupIdStep2Mock.mock.calls[0] as any[])[1] + ).toEqual(expect.objectContaining({ eventGroupId })) }) it("should execute an async workflow keeping track of the event group id that has been auto generated", async () => { @@ -114,14 +131,19 @@ moduleIntegrationTestRunner({ stepResponse: { hey: "oh" }, }) - const generatedEventGroupId = (workflowEventGroupIdStep1Mock.mock - .calls[0][1] as unknown as Context)!.eventGroupId + const generatedEventGroupId = (( + workflowEventGroupIdStep1Mock.mock.calls[0] as any[] + )[1] as unknown as Context)!.eventGroupId // Validate context event group id - expect(workflowEventGroupIdStep1Mock.mock.calls[0][1]).toEqual( + expect( + (workflowEventGroupIdStep1Mock.mock.calls[0] as any[])[1] + ).toEqual( expect.objectContaining({ eventGroupId: generatedEventGroupId }) ) - expect(workflowEventGroupIdStep2Mock.mock.calls[0][1]).toEqual( + expect( + (workflowEventGroupIdStep2Mock.mock.calls[0] as any[])[1] + ).toEqual( expect.objectContaining({ eventGroupId: generatedEventGroupId }) ) }) @@ -139,10 +161,9 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - let executionsList = await query({ - workflow_executions: { - fields: ["workflow_id", "transaction_id", "state"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["workflow_id", "transaction_id", "state"], }) expect(executionsList).toHaveLength(1) @@ -157,11 +178,10 @@ moduleIntegrationTestRunner({ stepResponse: { uhuuuu: "yeaah!" }, }) - executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], + })) expect(executionsList).toHaveLength(0) expect(result).toEqual({ @@ -180,10 +200,9 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", }) - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) expect(executionsList).toHaveLength(1) @@ -208,40 +227,38 @@ moduleIntegrationTestRunner({ expect(workflow2Step3Invoke.mock.calls[0][0]).toEqual({ uhuuuu: "yeaah!", }) - - executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], + })) expect(executionsList).toHaveLength(1) }) it("should revert the entire transaction when a step timeout expires", async () => { - const { transaction } = await workflowOrcModule.run( + const { transaction } = (await workflowOrcModule.run( "workflow_step_timeout", { input: {}, throwOnError: false, } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") }) it("should revert the entire transaction when the transaction timeout expires", async () => { - const { transaction } = await workflowOrcModule.run( + const { transaction } = (await workflowOrcModule.run( "workflow_transaction_timeout", { input: {}, throwOnError: false, } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> await setTimeoutPromise(200) - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") }) it.skip("should subscribe to a async workflow and receive the response when it finishes", (done) => { @@ -393,7 +410,7 @@ moduleIntegrationTestRunner({ }) it("should fetch an idempotent workflow after its completion", async () => { - const { transaction: firstRun } = await workflowOrcModule.run( + const { transaction: firstRun } = (await workflowOrcModule.run( "workflow_idempotent", { input: { @@ -402,15 +419,14 @@ moduleIntegrationTestRunner({ throwOnError: true, transactionId: "transaction_1", } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) - const { transaction: secondRun } = await workflowOrcModule.run( + const { transaction: secondRun } = (await workflowOrcModule.run( "workflow_idempotent", { input: { @@ -419,15 +435,16 @@ moduleIntegrationTestRunner({ throwOnError: true, transactionId: "transaction_1", } - ) + )) as Awaited<{ transaction: DistributedTransactionType }> - const executionsListAfter = await query({ - workflow_executions: { - fields: ["id"], - }, + const { data: executionsListAfter } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) - expect(secondRun.flow.startedAt).toEqual(firstRun.flow.startedAt) + expect(secondRun.getFlow().startedAt).toEqual( + firstRun.getFlow().startedAt + ) expect(executionsList).toHaveLength(1) expect(executionsListAfter).toHaveLength(1) }) diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json new file mode 100644 index 0000000000000..0c8c584c91589 --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/.snapshot-medusa-workflows.json @@ -0,0 +1,171 @@ +{ + "namespaces": [ + "public" + ], + "name": "public", + "tables": [ + { + "columns": { + "workflow_id": { + "name": "workflow_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "transaction_id": { + "name": "transaction_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "id": { + "name": "id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "execution": { + "name": "execution", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "json" + }, + "context": { + "name": "context", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "json" + }, + "state": { + "name": "state", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "enumItems": [ + "not_started", + "invoking", + "waiting_to_compensate", + "compensating", + "done", + "reverted", + "failed" + ], + "mappedType": "enum" + }, + "created_at": { + "name": "created_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "length": 6, + "mappedType": "datetime" + } + }, + "name": "workflow_execution", + "schema": "public", + "indexes": [ + { + "keyName": "IDX_workflow_execution_deleted_at", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_deleted_at\" ON \"workflow_execution\" (deleted_at) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_id\" ON \"workflow_execution\" (id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_workflow_id_transaction_id_unique", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE UNIQUE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id_transaction_id_unique\" ON \"workflow_execution\" (workflow_id, transaction_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_workflow_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id\" ON \"workflow_execution\" (workflow_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_transaction_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_state", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL" + }, + { + "keyName": "workflow_execution_pkey", + "columnNames": [ + "workflow_id", + "transaction_id" + ], + "composite": true, + "primary": true, + "unique": true + } + ], + "checks": [], + "foreignKeys": {} + } + ] +} diff --git a/packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts new file mode 100644 index 0000000000000..be8b1cbac182a --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/src/migrations/Migration20241206101446.ts @@ -0,0 +1,27 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20241206101446 extends Migration { + async up(): Promise { + this.addSql( + `DROP INDEX IF EXISTS "IDX_workflow_execution_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_workflow_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_transaction_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_state";` + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_deleted_at" ON "workflow_execution" (deleted_at) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_id" ON "workflow_execution" (id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id" ON "workflow_execution" (workflow_id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_transaction_id" ON "workflow_execution" (transaction_id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state" ON "workflow_execution" (state) WHERE deleted_at IS NULL;' + ) + } +} diff --git a/packages/modules/workflow-engine-inmemory/src/models/index.ts b/packages/modules/workflow-engine-inmemory/src/models/index.ts index 78fcbfa9214f9..fa5b8a3dd019e 100644 --- a/packages/modules/workflow-engine-inmemory/src/models/index.ts +++ b/packages/modules/workflow-engine-inmemory/src/models/index.ts @@ -1 +1 @@ -export { default as WorkflowExecution } from "./workflow-execution" +export { WorkflowExecution } from "./workflow-execution" diff --git a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts index 22e693d4283eb..c41bc8936ed1b 100644 --- a/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-inmemory/src/models/workflow-execution.ts @@ -1,76 +1,30 @@ import { TransactionState } from "@medusajs/framework/orchestration" -import { DALUtils, generateEntityId } from "@medusajs/framework/utils" -import { - BeforeCreate, - Entity, - Enum, - Filter, - Index, - OnInit, - OptionalProps, - PrimaryKey, - Property, - Unique, -} from "@mikro-orm/core" - -type OptionalFields = "deleted_at" - -@Entity() -@Unique({ - name: "IDX_workflow_execution_workflow_id_transaction_id_unique", - properties: ["workflow_id", "transaction_id"], -}) -@Filter(DALUtils.mikroOrmSoftDeletableFilterOptions) -export default class WorkflowExecution { - [OptionalProps]?: OptionalFields - - @Property({ columnType: "text", nullable: false }) - @Index({ name: "IDX_workflow_execution_id" }) - id!: string - - @Index({ name: "IDX_workflow_execution_workflow_id" }) - @PrimaryKey({ columnType: "text" }) - workflow_id: string - - @Index({ name: "IDX_workflow_execution_transaction_id" }) - @PrimaryKey({ columnType: "text" }) - transaction_id: string - - @Property({ columnType: "jsonb", nullable: true }) - execution: Record | null = null - - @Property({ columnType: "jsonb", nullable: true }) - context: Record | null = null - - @Index({ name: "IDX_workflow_execution_state" }) - @Enum(() => TransactionState) - state: TransactionState - - @Property({ - onCreate: () => new Date(), - columnType: "timestamptz", - defaultRaw: "now()", - }) - created_at: Date - - @Property({ - onCreate: () => new Date(), - onUpdate: () => new Date(), - columnType: "timestamptz", - defaultRaw: "now()", +import { model } from "@medusajs/framework/utils" + +export const WorkflowExecution = model + .define("workflow_execution", { + id: model.id({ prefix: "wf_exec" }), + workflow_id: model.text().primaryKey(), + transaction_id: model.text().primaryKey(), + execution: model.json().nullable(), + context: model.json().nullable(), + state: model.enum(TransactionState), }) - updated_at: Date - - @Property({ columnType: "timestamptz", nullable: true }) - deleted_at: Date | null = null - - @BeforeCreate() - onCreate() { - this.id = generateEntityId(this.id, "wf_exec") - } - - @OnInit() - onInit() { - this.id = generateEntityId(this.id, "wf_exec") - } -} + .indexes([ + { + on: ["id"], + where: "deleted_at IS NULL", + }, + { + on: ["workflow_id"], + where: "deleted_at IS NULL", + }, + { + on: ["transaction_id"], + where: "deleted_at IS NULL", + }, + { + on: ["state"], + where: "deleted_at IS NULL", + }, + ]) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 9771f48784aac..3b67e36d2a5c8 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -1,6 +1,7 @@ import { Context, DAL, + InferEntityType, InternalModuleDeclaration, MedusaContainer, ModulesSdkTypes, @@ -25,9 +26,11 @@ type InjectedDependencies = { } export class WorkflowsModuleService< - TWorkflowExecution extends WorkflowExecution = WorkflowExecution + TWorkflowExecution extends InferEntityType< + typeof WorkflowExecution + > = InferEntityType > extends ModulesSdkUtils.MedusaService<{ - WorkflowExecution: { dto: WorkflowExecution } + WorkflowExecution: { dto: InferEntityType } }>({ WorkflowExecution }) { protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 1e30157a24216..00f60a894afb9 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -1,10 +1,12 @@ import { + DistributedTransactionType, TransactionStepTimeoutError, TransactionTimeoutError, WorkflowManager, } from "@medusajs/framework/orchestration" import { IWorkflowEngineService, + Logger, MedusaContainer, RemoteQueryFunction, } from "@medusajs/framework/types" @@ -99,6 +101,20 @@ moduleIntegrationTestRunner({ serviceName: "workflows", field: "workflowExecution", }, + transaction_id: { + entity: "WorkflowExecution", + field: "workflowExecution", + linkable: "workflow_execution_transaction_id", + primaryKey: "transaction_id", + serviceName: "workflows", + }, + workflow_id: { + entity: "WorkflowExecution", + field: "workflowExecution", + linkable: "workflow_execution_workflow_id", + primaryKey: "workflow_id", + serviceName: "workflows", + }, }, }) }) @@ -112,10 +128,9 @@ moduleIntegrationTestRunner({ throwOnError: true, }) - let executionsList = await query({ - workflow_executions: { - fields: ["workflow_id", "transaction_id", "state"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["workflow_id", "transaction_id", "state"], }) expect(executionsList).toHaveLength(1) @@ -130,11 +145,10 @@ moduleIntegrationTestRunner({ stepResponse: { uhuuuu: "yeaah!" }, }) - executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], + })) expect(executionsList).toHaveLength(0) expect(result).toEqual({ @@ -153,10 +167,9 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", }) - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) expect(executionsList).toHaveLength(1) @@ -170,12 +183,10 @@ moduleIntegrationTestRunner({ }, stepResponse: { uhuuuu: "yeaah!" }, }) - - executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], + })) expect(executionsList).toHaveLength(1) }) @@ -188,10 +199,9 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", }) - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) expect(executionsList).toHaveLength(1) @@ -205,12 +215,10 @@ moduleIntegrationTestRunner({ }, stepResponse: { uhuuuu: "yeaah!" }, }) - - executionsList = await query({ - workflow_executions: { - fields: ["id", "state"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id", "state"], + })) expect(executionsList).toHaveLength(1) expect(executionsList[0].state).toEqual("reverted") @@ -237,10 +245,9 @@ moduleIntegrationTestRunner({ }, }) - let executionsList = await query({ - workflow_executions: { - fields: ["id"], - }, + let { data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id"], }) expect(executionsList).toHaveLength(1) @@ -260,12 +267,10 @@ moduleIntegrationTestRunner({ }) expect(setStepError).toEqual({ uhuuuu: "yeaah!" }) - - executionsList = await query({ - workflow_executions: { - fields: ["id", "state", "context"], - }, - }) + ;({ data: executionsList } = await query.graph({ + entity: "workflow_executions", + fields: ["id", "state", "context"], + })) expect(executionsList).toHaveLength(1) expect(executionsList[0].state).toEqual("failed") @@ -273,7 +278,7 @@ moduleIntegrationTestRunner({ }) it("should revert the entire transaction when a step timeout expires", async () => { - const { transaction, result, errors } = await workflowOrcModule.run( + const { transaction, result, errors } = (await workflowOrcModule.run( "workflow_step_timeout", { input: { @@ -282,9 +287,13 @@ moduleIntegrationTestRunner({ throwOnError: false, logOnError: true, } - ) + )) as Awaited<{ + transaction: DistributedTransactionType + result: any + errors: any + }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") expect(result).toEqual({ myInput: "123", }) @@ -294,16 +303,20 @@ moduleIntegrationTestRunner({ }) it("should revert the entire transaction when the transaction timeout expires", async () => { - const { transaction, result, errors } = await workflowOrcModule.run( + const { transaction, result, errors } = (await workflowOrcModule.run( "workflow_transaction_timeout", { input: {}, transactionId: "trx", throwOnError: false, } - ) + )) as Awaited<{ + transaction: DistributedTransactionType + result: any + errors: any + }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") expect(result).toEqual({ executed: true }) expect(errors).toHaveLength(1) expect(errors[0].action).toEqual("step_1") @@ -323,7 +336,7 @@ moduleIntegrationTestRunner({ await setTimeout(200) - const { transaction, result, errors } = await workflowOrcModule.run( + const { transaction, result, errors } = (await workflowOrcModule.run( "workflow_step_timeout_async", { input: { @@ -332,9 +345,13 @@ moduleIntegrationTestRunner({ transactionId: "transaction_1", throwOnError: false, } - ) + )) as Awaited<{ + transaction: DistributedTransactionType + result: any + errors: any + }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") expect(result).toEqual(undefined) expect(errors).toHaveLength(1) expect(errors[0].action).toEqual("step_1_async") @@ -354,16 +371,20 @@ moduleIntegrationTestRunner({ await setTimeout(200) - const { transaction, result, errors } = await workflowOrcModule.run( + const { transaction, result, errors } = (await workflowOrcModule.run( "workflow_transaction_timeout_async", { input: {}, transactionId: "transaction_1", throwOnError: false, } - ) + )) as Awaited<{ + transaction: DistributedTransactionType + result: any + errors: any + }> - expect(transaction.flow.state).toEqual("reverted") + expect(transaction.getFlow().state).toEqual("reverted") expect(result).toEqual(undefined) expect(errors).toHaveLength(1) expect(errors[0].action).toEqual("step_1") diff --git a/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts b/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts index 3fe2da068123a..7b027f5995841 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/utils/database.ts @@ -15,11 +15,11 @@ const redisUrl = process.env.REDIS_URL || "redis://localhost:6379" const redis = new Redis(redisUrl) interface TestDatabase { - clearTables(knex): Promise + clearTables(): Promise } export const TestDatabase: TestDatabase = { - clearTables: async (knex) => { + clearTables: async () => { await cleanRedis() }, } diff --git a/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json new file mode 100644 index 0000000000000..66a9a97c216f0 --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/migrations/.snapshot-medusa-workflows.json @@ -0,0 +1,163 @@ +{ + "namespaces": [ + "public" + ], + "name": "public", + "tables": [ + { + "columns": { + "workflow_id": { + "name": "workflow_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "transaction_id": { + "name": "transaction_id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "id": { + "name": "id", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "mappedType": "text" + }, + "execution": { + "name": "execution", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "json" + }, + "context": { + "name": "context", + "type": "jsonb", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "json" + }, + "state": { + "name": "state", + "type": "text", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "enumItems": [ + "not_started", + "invoking", + "waiting_to_compensate", + "compensating", + "done", + "reverted", + "failed" + ], + "mappedType": "enum" + }, + "created_at": { + "name": "created_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "updated_at": { + "name": "updated_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": false, + "length": 6, + "default": "now()", + "mappedType": "datetime" + }, + "deleted_at": { + "name": "deleted_at", + "type": "timestamptz", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "length": 6, + "mappedType": "datetime" + } + }, + "name": "workflow_execution", + "schema": "public", + "indexes": [ + { + "keyName": "IDX_workflow_execution_deleted_at", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_deleted_at\" ON \"workflow_execution\" (deleted_at) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_id\" ON \"workflow_execution\" (id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_workflow_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_workflow_id\" ON \"workflow_execution\" (workflow_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_transaction_id", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_transaction_id\" ON \"workflow_execution\" (transaction_id) WHERE deleted_at IS NULL" + }, + { + "keyName": "IDX_workflow_execution_state", + "columnNames": [], + "composite": false, + "primary": false, + "unique": false, + "expression": "CREATE INDEX IF NOT EXISTS \"IDX_workflow_execution_state\" ON \"workflow_execution\" (state) WHERE deleted_at IS NULL" + }, + { + "keyName": "workflow_execution_pkey", + "columnNames": [ + "workflow_id", + "transaction_id" + ], + "composite": true, + "primary": true, + "unique": true + } + ], + "checks": [], + "foreignKeys": {} + } + ] +} diff --git a/packages/modules/workflow-engine-redis/src/migrations/Migration20241206123341.ts b/packages/modules/workflow-engine-redis/src/migrations/Migration20241206123341.ts new file mode 100644 index 0000000000000..50c0680ae90c5 --- /dev/null +++ b/packages/modules/workflow-engine-redis/src/migrations/Migration20241206123341.ts @@ -0,0 +1,27 @@ +import { Migration } from "@mikro-orm/migrations" + +export class Migration20241206123341 extends Migration { + async up(): Promise { + this.addSql( + `DROP INDEX IF EXISTS "IDX_workflow_execution_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_workflow_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_transaction_id"; + DROP INDEX IF EXISTS "IDX_workflow_execution_state";` + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_deleted_at" ON "workflow_execution" (deleted_at) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_id" ON "workflow_execution" (id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_workflow_id" ON "workflow_execution" (workflow_id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_transaction_id" ON "workflow_execution" (transaction_id) WHERE deleted_at IS NULL;' + ) + this.addSql( + 'CREATE INDEX IF NOT EXISTS "IDX_workflow_execution_state" ON "workflow_execution" (state) WHERE deleted_at IS NULL;' + ) + } +} diff --git a/packages/modules/workflow-engine-redis/src/models/index.ts b/packages/modules/workflow-engine-redis/src/models/index.ts index 78fcbfa9214f9..fa5b8a3dd019e 100644 --- a/packages/modules/workflow-engine-redis/src/models/index.ts +++ b/packages/modules/workflow-engine-redis/src/models/index.ts @@ -1 +1 @@ -export { default as WorkflowExecution } from "./workflow-execution" +export { WorkflowExecution } from "./workflow-execution" diff --git a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts index 22e693d4283eb..c41bc8936ed1b 100644 --- a/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts +++ b/packages/modules/workflow-engine-redis/src/models/workflow-execution.ts @@ -1,76 +1,30 @@ import { TransactionState } from "@medusajs/framework/orchestration" -import { DALUtils, generateEntityId } from "@medusajs/framework/utils" -import { - BeforeCreate, - Entity, - Enum, - Filter, - Index, - OnInit, - OptionalProps, - PrimaryKey, - Property, - Unique, -} from "@mikro-orm/core" - -type OptionalFields = "deleted_at" - -@Entity() -@Unique({ - name: "IDX_workflow_execution_workflow_id_transaction_id_unique", - properties: ["workflow_id", "transaction_id"], -}) -@Filter(DALUtils.mikroOrmSoftDeletableFilterOptions) -export default class WorkflowExecution { - [OptionalProps]?: OptionalFields - - @Property({ columnType: "text", nullable: false }) - @Index({ name: "IDX_workflow_execution_id" }) - id!: string - - @Index({ name: "IDX_workflow_execution_workflow_id" }) - @PrimaryKey({ columnType: "text" }) - workflow_id: string - - @Index({ name: "IDX_workflow_execution_transaction_id" }) - @PrimaryKey({ columnType: "text" }) - transaction_id: string - - @Property({ columnType: "jsonb", nullable: true }) - execution: Record | null = null - - @Property({ columnType: "jsonb", nullable: true }) - context: Record | null = null - - @Index({ name: "IDX_workflow_execution_state" }) - @Enum(() => TransactionState) - state: TransactionState - - @Property({ - onCreate: () => new Date(), - columnType: "timestamptz", - defaultRaw: "now()", - }) - created_at: Date - - @Property({ - onCreate: () => new Date(), - onUpdate: () => new Date(), - columnType: "timestamptz", - defaultRaw: "now()", +import { model } from "@medusajs/framework/utils" + +export const WorkflowExecution = model + .define("workflow_execution", { + id: model.id({ prefix: "wf_exec" }), + workflow_id: model.text().primaryKey(), + transaction_id: model.text().primaryKey(), + execution: model.json().nullable(), + context: model.json().nullable(), + state: model.enum(TransactionState), }) - updated_at: Date - - @Property({ columnType: "timestamptz", nullable: true }) - deleted_at: Date | null = null - - @BeforeCreate() - onCreate() { - this.id = generateEntityId(this.id, "wf_exec") - } - - @OnInit() - onInit() { - this.id = generateEntityId(this.id, "wf_exec") - } -} + .indexes([ + { + on: ["id"], + where: "deleted_at IS NULL", + }, + { + on: ["workflow_id"], + where: "deleted_at IS NULL", + }, + { + on: ["transaction_id"], + where: "deleted_at IS NULL", + }, + { + on: ["state"], + where: "deleted_at IS NULL", + }, + ]) diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index 37c371356ff59..7463623e94497 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -1,6 +1,7 @@ import { Context, DAL, + InferEntityType, InternalModuleDeclaration, ModulesSdkTypes, WorkflowsSdkTypes, @@ -25,9 +26,11 @@ type InjectedDependencies = { } export class WorkflowsModuleService< - TWorkflowExecution extends WorkflowExecution = WorkflowExecution + TWorkflowExecution extends InferEntityType< + typeof WorkflowExecution + > = InferEntityType > extends ModulesSdkUtils.MedusaService<{ - WorkflowExecution: { dto: WorkflowExecution } + WorkflowExecution: { dto: InferEntityType } }>({ WorkflowExecution }) { protected baseRepository_: DAL.RepositoryService protected workflowExecutionService_: ModulesSdkTypes.IMedusaInternalService