From ed070b16e34f26aa8c7e856a9b35a23c7ebf088e Mon Sep 17 00:00:00 2001 From: Ian Macartney Date: Tue, 13 May 2025 13:03:02 -0700 Subject: [PATCH 1/6] add pause --- example/convex/_generated/api.d.ts | 8 ++++---- src/client/step.ts | 2 ++ src/client/stepContext.ts | 20 +++++++++++++++++--- src/client/types.ts | 30 +++++++++++++++++++++++++++++- src/component/_generated/api.d.ts | 8 ++++---- src/component/journal.ts | 2 ++ src/component/pool.ts | 12 ++++++++++++ src/component/schema.ts | 2 +- 8 files changed, 71 insertions(+), 13 deletions(-) diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 0d97802..1664a30 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -57,7 +57,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -105,7 +105,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -136,7 +136,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -203,7 +203,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; diff --git a/src/client/step.ts b/src/client/step.ts index 0e8a837..a175ce4 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -30,6 +30,7 @@ export type StepRequest = { args: unknown; retry: RetryBehavior | boolean | undefined; schedulerOptions: SchedulerOptions; + pause: boolean | undefined; resolve: (result: unknown) => void; reject: (error: unknown) => void; @@ -138,6 +139,7 @@ export class StepExecutor { handle: await createFunctionHandle(message.function), args: message.args, argsSize: valueSize(message.args as Value), + pause: message.pause, outcome: undefined, startedAt: this.now, completedAt: undefined, diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index 8145672..33980b7 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -41,17 +41,30 @@ export class StepContext implements WorkflowStep { return this.runFunction("action", action, args, opts); } + async pause< + Mutation extends FunctionReference<"mutation", any, any, void>, + Returns = void, + >( + pauseHandler: Mutation, + args: FunctionArgs, + opts?: RunOptions & { returns: Validator }, + ): Promise { + return this.runFunction("mutation", pauseHandler, args, { + ...opts, + pause: true, + }) as Promise; + } + private async runFunction< F extends FunctionReference, >( functionType: FunctionType, f: F, args: unknown, - opts?: RunOptions & RetryOption, + opts?: RunOptions & RetryOption & { pause?: true }, ): Promise { let send: unknown; - const { name, ...rest } = opts ?? {}; - const { retry, ...schedulerOptions } = rest; + const { name, retry, pause, ...schedulerOptions } = opts ?? {}; const p = new Promise((resolve, reject) => { send = this.sender.push({ name: name ?? safeFunctionName(f), @@ -59,6 +72,7 @@ export class StepContext implements WorkflowStep { function: f, args, retry, + pause, schedulerOptions, resolve, reject, diff --git a/src/client/types.ts b/src/client/types.ts index 9e90065..ebd8162 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -1,12 +1,13 @@ import type { RetryOption, WorkId } from "@convex-dev/workpool"; import type { + DefaultFunctionArgs, Expand, FunctionArgs, FunctionReference, FunctionReturnType, } from "convex/server"; import type { api } from "../component/_generated/api.js"; -import type { GenericId } from "convex/values"; +import type { GenericId, Validator } from "convex/values"; import type { WorkflowId } from "../types.js"; export type WorkflowComponent = UseApi; @@ -81,6 +82,33 @@ export type WorkflowStep = { args: FunctionArgs, opts?: RunOptions & RetryOption, ): Promise>; + + /** + * Pause the workflow, to be resumed asynchronously. + * + * It will be marked as paused in the same transaction as the pause handler + * is called. The pause handler must receive the arguments and return nothing. + * + * The return value is the value provided by the resume call, which must match + * the return validator provided. + * + * @param pauseHandler - The pause handler to run, like `internal.index.examplePause`. + * @param args - The arguments to the pause handler. + * @param opts - Options for retrying, scheduling and naming the pause. + */ + pause< + Mutation extends FunctionReference< + "mutation", + "internal", + DefaultFunctionArgs, + void + >, + Returns = void, + >( + pauseHandler: Mutation, + args: FunctionArgs, + opts?: RunOptions & { returns: Validator }, + ): Promise; }; export type UseApi = Expand<{ diff --git a/src/component/_generated/api.d.ts b/src/component/_generated/api.d.ts index 2caf60c..36426a4 100644 --- a/src/component/_generated/api.d.ts +++ b/src/component/_generated/api.d.ts @@ -51,7 +51,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -99,7 +99,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -130,7 +130,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -197,7 +197,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; diff --git a/src/component/journal.ts b/src/component/journal.ts index df9642e..ab67399 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -115,6 +115,8 @@ export const startSteps = mutation({ ); break; } + // Pause is a special mutation + case "pause": case "mutation": { workId = await workpool.enqueueMutation( ctx, diff --git a/src/component/pool.ts b/src/component/pool.ts index 781ad68..334cd35 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -114,6 +114,18 @@ export const onComplete = internalMutation({ ); return; } + if ( + journalEntry.step.functionType === "pause" && + args.result.kind === "success" + ) { + console.event("stepPaused", { + workflowId, + workflowName: workflow.name, + stepName: journalEntry.step.name, + stepNumber: journalEntry.stepNumber, + }); + return; + } journalEntry.step.inProgress = false; journalEntry.step.completedAt = Date.now(); switch (args.result.kind) { diff --git a/src/component/schema.ts b/src/component/schema.ts index c96c0af..a863d6c 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -63,7 +63,7 @@ export const step = v.object({ name: v.string(), inProgress: v.boolean(), workId: v.optional(vWorkIdValidator), - functionType: literals("query", "mutation", "action"), + functionType: literals("query", "mutation", "action", "pause"), handle: v.string(), argsSize: v.number(), args: v.any(), From 134cda454bceef88af6b2cc8e9046eb079d9c160 Mon Sep 17 00:00:00 2001 From: Ian Macartney Date: Mon, 22 Sep 2025 20:18:17 -0700 Subject: [PATCH 2/6] maybe support not passing a function --- src/client/step.ts | 6 ++-- src/client/stepContext.ts | 73 ++++++++++++++++++++++++++++----------- src/client/types.ts | 17 ++++++--- src/component/journal.ts | 13 +++++-- 4 files changed, 80 insertions(+), 29 deletions(-) diff --git a/src/client/step.ts b/src/client/step.ts index a175ce4..b05f7df 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -26,7 +26,7 @@ export type WorkerResult = export type StepRequest = { name: string; functionType: FunctionType; - function: FunctionReference; + function: FunctionReference | undefined; args: unknown; retry: RetryBehavior | boolean | undefined; schedulerOptions: SchedulerOptions; @@ -136,7 +136,9 @@ export class StepExecutor { inProgress: true, name: message.name, functionType: message.functionType, - handle: await createFunctionHandle(message.function), + handle: message.function + ? await createFunctionHandle(message.function) + : "", args: message.args, argsSize: valueSize(message.args as Value), pause: message.pause, diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index 33980b7..d56aec5 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -4,6 +4,7 @@ import type { FunctionArgs, FunctionReturnType, FunctionType, + DefaultFunctionArgs, } from "convex/server"; import { safeFunctionName } from "./safeFunctionName.js"; import type { StepRequest } from "./step.js"; @@ -42,38 +43,70 @@ export class StepContext implements WorkflowStep { } async pause< - Mutation extends FunctionReference<"mutation", any, any, void>, - Returns = void, + Mutation extends FunctionReference< + "mutation", + "internal", + DefaultFunctionArgs, + void + >, + Returns = unknown, >( - pauseHandler: Mutation, - args: FunctionArgs, - opts?: RunOptions & { returns: Validator }, + opts?: { + /** + * The name for the pause. By default, if you pass in api.foo.bar.baz, + * it will use "foo/bar:baz" as the name. If you pass in a function handle, + * it will use the function handle directly. Otherwise it will use "pause". + */ + name?: string; + returns: Validator; + } & ( + | { onPause: Mutation; args: FunctionArgs } + | { onPause?: undefined; args?: undefined } + ), ): Promise { - return this.runFunction("mutation", pauseHandler, args, { - ...opts, - pause: true, - }) as Promise; + if (opts?.onPause) { + return this.runFunction("mutation", opts.onPause, opts.args, { + name: opts.name, + pause: true, + }) as Promise; + } else { + return this.run({ + name: opts?.name ?? "pause", + functionType: "mutation", + function: undefined, + args: {}, + retry: undefined, + pause: true, + schedulerOptions: {}, + }) as Promise; + } } - private async runFunction< - F extends FunctionReference, - >( + private runFunction>( functionType: FunctionType, f: F, args: unknown, opts?: RunOptions & RetryOption & { pause?: true }, ): Promise { - let send: unknown; const { name, retry, pause, ...schedulerOptions } = opts ?? {}; + return this.run({ + name: name ?? (f ? safeFunctionName(f) : "pause"), + functionType, + function: f, + args: args ?? {}, + retry, + pause, + schedulerOptions, + }); + } + + private async run( + req: Omit, + ): Promise { + let send: unknown; const p = new Promise((resolve, reject) => { send = this.sender.push({ - name: name ?? safeFunctionName(f), - functionType, - function: f, - args, - retry, - pause, - schedulerOptions, + ...req, resolve, reject, }); diff --git a/src/client/types.ts b/src/client/types.ts index ebd8162..04aac99 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -103,11 +103,20 @@ export type WorkflowStep = { DefaultFunctionArgs, void >, - Returns = void, + Returns = unknown, >( - pauseHandler: Mutation, - args: FunctionArgs, - opts?: RunOptions & { returns: Validator }, + opts?: { + /** + * The name for the pause. By default, if you pass in api.foo.bar.baz, + * it will use "foo/bar:baz" as the name. If you pass in a function handle, + * it will use the function handle directly. Otherwise it will use "pause". + */ + name?: string; + returns: Validator; + } & ( + | { onPause: Mutation; args: FunctionArgs } + | { onPause?: undefined; args?: undefined } + ), ): Promise; }; diff --git a/src/component/journal.ts b/src/component/journal.ts index ab67399..25d7c8f 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -104,7 +104,8 @@ export const startSteps = mutation({ generationNumber, stepId, }; - let workId: WorkId; + let workId: WorkId | undefined = undefined; + // TODO: use enqueueBatch switch (step.functionType) { case "query": { workId = await workpool.enqueueQuery( @@ -117,6 +118,10 @@ export const startSteps = mutation({ } // Pause is a special mutation case "pause": + if (!handle) { + break; + } + // fallthrough case "mutation": { workId = await workpool.enqueueMutation( ctx, @@ -136,8 +141,10 @@ export const startSteps = mutation({ break; } } - entry.step.workId = workId; - await ctx.db.replace(entry._id, entry); + if (workId) { + entry.step.workId = workId; + await ctx.db.replace(entry._id, entry); + } console.event("started", { workflowId: workflow._id, From d7181eb9e7328261f034e5000662d6882d1ec67d Mon Sep 17 00:00:00 2001 From: Ian Macartney Date: Mon, 22 Sep 2025 20:19:17 -0700 Subject: [PATCH 3/6] async not needed --- src/client/stepContext.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index d56aec5..a5005bf 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -18,7 +18,7 @@ export class StepContext implements WorkflowStep { private sender: BaseChannel, ) {} - async runQuery>( + runQuery>( query: Query, args: FunctionArgs, opts?: RunOptions, @@ -26,7 +26,7 @@ export class StepContext implements WorkflowStep { return this.runFunction("query", query, args, opts); } - async runMutation>( + runMutation>( mutation: Mutation, args: FunctionArgs, opts?: RunOptions, @@ -34,7 +34,7 @@ export class StepContext implements WorkflowStep { return this.runFunction("mutation", mutation, args, opts); } - async runAction>( + runAction>( action: Action, args: FunctionArgs, opts?: RunOptions & RetryOption, @@ -42,7 +42,7 @@ export class StepContext implements WorkflowStep { return this.runFunction("action", action, args, opts); } - async pause< + pause< Mutation extends FunctionReference< "mutation", "internal", From 9bf3173e8a7c4ca2d90309d239553149867e7a5c Mon Sep 17 00:00:00 2001 From: a-fatah Date: Fri, 3 Oct 2025 17:16:49 +0200 Subject: [PATCH 4/6] implement resume --- src/client/index.ts | 31 ++++++++++++++++ src/client/step.ts | 3 +- src/client/stepContext.ts | 7 ++-- src/client/types.ts | 4 +- src/component/journal.ts | 78 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 115 insertions(+), 8 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index 2cc5c5b..24cc570 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -204,6 +204,37 @@ export class WorkflowManager { workflowId, }); } + + /** + * Resume a paused workflow with a type-safe value. + * + * @param ctx - The Convex context. + * @param workflow - The workflow function reference for type safety. + * @param workflowId - The workflow ID. + * @param resumeValue - The value to pass to the paused step. + * @param opts - Options including the validator for type inference and optional step name. + */ + async resume< + F extends FunctionReference<"mutation", "internal">, + V extends Validator, + >( + ctx: RunMutationCtx, + workflow: F, + workflowId: WorkflowId, + resumeValue: unknown, + opts?: { + returns?: V; + name?: string; + }, + ): Promise { + const handle = await createFunctionHandle(workflow); + await ctx.runMutation(this.component.journal.resume, { + workflowHandle: handle, + workflowId, + resumeValue, + name: opts?.name, + }); + } } type RunQueryCtx = { diff --git a/src/client/step.ts b/src/client/step.ts index b05f7df..0cb81a1 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -25,7 +25,7 @@ export type WorkerResult = export type StepRequest = { name: string; - functionType: FunctionType; + functionType: FunctionType | "pause"; function: FunctionReference | undefined; args: unknown; retry: RetryBehavior | boolean | undefined; @@ -141,7 +141,6 @@ export class StepExecutor { : "", args: message.args, argsSize: valueSize(message.args as Value), - pause: message.pause, outcome: undefined, startedAt: this.now, completedAt: undefined, diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index a5005bf..559bd42 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -6,6 +6,7 @@ import type { FunctionType, DefaultFunctionArgs, } from "convex/server"; +import type { Validator } from "convex/values"; import { safeFunctionName } from "./safeFunctionName.js"; import type { StepRequest } from "./step.js"; import type { RetryOption } from "@convex-dev/workpool"; @@ -65,14 +66,14 @@ export class StepContext implements WorkflowStep { ), ): Promise { if (opts?.onPause) { - return this.runFunction("mutation", opts.onPause, opts.args, { + return this.runFunction("pause", opts.onPause, opts.args, { name: opts.name, pause: true, }) as Promise; } else { return this.run({ name: opts?.name ?? "pause", - functionType: "mutation", + functionType: "pause", function: undefined, args: {}, retry: undefined, @@ -83,7 +84,7 @@ export class StepContext implements WorkflowStep { } private runFunction>( - functionType: FunctionType, + functionType: FunctionType | "pause", f: F, args: unknown, opts?: RunOptions & RetryOption & { pause?: true }, diff --git a/src/client/types.ts b/src/client/types.ts index 04aac99..d7f8eec 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -99,9 +99,7 @@ export type WorkflowStep = { pause< Mutation extends FunctionReference< "mutation", - "internal", - DefaultFunctionArgs, - void + "internal" >, Returns = unknown, >( diff --git a/src/component/journal.ts b/src/component/journal.ts index 25d7c8f..a5d009d 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -158,3 +158,81 @@ export const startSteps = mutation({ return entries; }, }); + +export const resume = mutation({ + args: { + workflowHandle: v.string(), + workflowId: v.id("workflows"), + resumeValue: v.any(), + name: v.optional(v.string()), + }, + returns: v.null(), + handler: async (ctx, args) => { + const console = await getDefaultLogger(ctx); + const workflow = await getWorkflow(ctx, args.workflowId, null); + + if (workflow.runResult) { + throw new Error(`Workflow not running: ${args.workflowId}`); + } + + if (workflow.workflowHandle !== args.workflowHandle) { + throw new Error( + `Workflow handle mismatch: expected ${workflow.workflowHandle}, got ${args.workflowHandle}`, + ); + } + + let query = ctx.db + .query("steps") + .withIndex("inProgress", (q) => + q.eq("step.inProgress", true).eq("workflowId", args.workflowId), + ) + .filter((q) => q.eq(q.field("step.functionType"), "pause")); + + if (args.name) { + query = query.filter((q) => q.eq(q.field("step.name"), args.name)); + } + + const pausedStep = await query.first(); + + if (!pausedStep) { + const message = args.name + ? `No paused step with name "${args.name}" found for workflow: ${args.workflowId}` + : `No paused step found for workflow: ${args.workflowId}`; + throw new Error(message); + } + + pausedStep.step.inProgress = false; + pausedStep.step.completedAt = Date.now(); + pausedStep.step.runResult = { + kind: "success", + returnValue: args.resumeValue, + }; + + await ctx.db.replace(pausedStep._id, pausedStep); + + console.event("stepResumed", { + workflowId: workflow._id, + workflowName: workflow.name, + stepName: pausedStep.step.name, + stepNumber: pausedStep.stepNumber, + }); + + const workpool = await getWorkpool(ctx, {}); + await workpool.enqueueMutation( + ctx, + workflow.workflowHandle as FunctionHandle<"mutation">, + { + workflowId: workflow._id, + generationNumber: workflow.generationNumber, + }, + { + name: workflow.name, + onComplete: internal.pool.handlerOnComplete, + context: { + workflowId: workflow._id, + generationNumber: workflow.generationNumber, + }, + }, + ); + }, +}); From 5d3eaefb4c5ac60dff454ace02c99df6a605dd41 Mon Sep 17 00:00:00 2001 From: a-fatah Date: Fri, 3 Oct 2025 17:52:51 +0200 Subject: [PATCH 5/6] refactor: use discriminated union for step types - Replace FunctionType | 'pause' with proper discriminated union - Separate ExecutionStep and PauseStep types with 'type' discriminator - Remove nullable function references - execution steps always have functions - Clean separation between control flow (pause) and execution (query/mutation/action) - Improved type safety with explicit step type checking --- src/client/index.ts | 1 + src/client/step.ts | 75 +++++++++++++++++++++++++++------------ src/client/stepContext.ts | 68 ++++++++++++++++++++++------------- src/client/types.ts | 1 - src/component/journal.ts | 69 ++++++++++++++++++----------------- src/component/pool.ts | 2 +- src/component/schema.ts | 39 ++++++++++++++++---- 7 files changed, 167 insertions(+), 88 deletions(-) diff --git a/src/client/index.ts b/src/client/index.ts index 24cc570..2978921 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -216,6 +216,7 @@ export class WorkflowManager { */ async resume< F extends FunctionReference<"mutation", "internal">, + // eslint-disable-next-line @typescript-eslint/no-explicit-any V extends Validator, >( ctx: RunMutationCtx, diff --git a/src/client/step.ts b/src/client/step.ts index 0cb81a1..79b3902 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -23,19 +23,30 @@ export type WorkerResult = | { type: "handlerDone"; runResult: RunResult } | { type: "executorBlocked" }; -export type StepRequest = { +export type ExecutionStepRequest = { + type: "execution"; name: string; - functionType: FunctionType | "pause"; - function: FunctionReference | undefined; + functionType: FunctionType; + function: FunctionReference; args: unknown; retry: RetryBehavior | boolean | undefined; schedulerOptions: SchedulerOptions; - pause: boolean | undefined; + resolve: (result: unknown) => void; + reject: (error: unknown) => void; +}; +export type PauseStepRequest = { + type: "pause"; + name: string; + onPauseFunction?: FunctionReference<"mutation", "internal">; + args: unknown; + schedulerOptions: SchedulerOptions; resolve: (result: unknown) => void; reject: (error: unknown) => void; }; +export type StepRequest = ExecutionStepRequest | PauseStepRequest; + const MAX_JOURNAL_SIZE = 8 << 20; export class StepExecutor { @@ -132,24 +143,44 @@ export class StepExecutor { async startSteps(messages: StepRequest[]): Promise { const steps = await Promise.all( messages.map(async (message) => { - const step = { - inProgress: true, - name: message.name, - functionType: message.functionType, - handle: message.function - ? await createFunctionHandle(message.function) - : "", - args: message.args, - argsSize: valueSize(message.args as Value), - outcome: undefined, - startedAt: this.now, - completedAt: undefined, - }; - return { - retry: message.retry, - schedulerOptions: message.schedulerOptions, - step, - }; + if (message.type === "execution") { + const step = { + type: "execution" as const, + inProgress: true, + name: message.name, + functionType: message.functionType, + handle: await createFunctionHandle(message.function), + args: message.args, + argsSize: valueSize(message.args as Value), + runResult: undefined, + startedAt: this.now, + completedAt: undefined, + }; + return { + retry: message.retry, + schedulerOptions: message.schedulerOptions, + step, + }; + } else { + const step = { + type: "pause" as const, + inProgress: true, + name: message.name, + onPauseHandle: message.onPauseFunction + ? await createFunctionHandle(message.onPauseFunction) + : undefined, + args: message.args, + argsSize: valueSize(message.args as Value), + runResult: undefined, + startedAt: this.now, + completedAt: undefined, + }; + return { + retry: undefined, + schedulerOptions: message.schedulerOptions, + step, + }; + } }), ); const entries = (await this.ctx.runMutation( diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index 559bd42..b102f23 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -8,7 +8,7 @@ import type { } from "convex/server"; import type { Validator } from "convex/values"; import { safeFunctionName } from "./safeFunctionName.js"; -import type { StepRequest } from "./step.js"; +import type { StepRequest, ExecutionStepRequest } from "./step.js"; import type { RetryOption } from "@convex-dev/workpool"; import type { RunOptions, WorkflowStep } from "./types.js"; import type { WorkflowId } from "../types.js"; @@ -65,44 +65,62 @@ export class StepContext implements WorkflowStep { | { onPause?: undefined; args?: undefined } ), ): Promise { - if (opts?.onPause) { - return this.runFunction("pause", opts.onPause, opts.args, { - name: opts.name, - pause: true, - }) as Promise; - } else { - return this.run({ - name: opts?.name ?? "pause", - functionType: "pause", - function: undefined, - args: {}, - retry: undefined, - pause: true, - schedulerOptions: {}, - }) as Promise; - } + return this.runPause(opts); } private runFunction>( - functionType: FunctionType | "pause", + functionType: FunctionType, f: F, args: unknown, - opts?: RunOptions & RetryOption & { pause?: true }, + opts?: RunOptions & RetryOption, ): Promise { - const { name, retry, pause, ...schedulerOptions } = opts ?? {}; - return this.run({ - name: name ?? (f ? safeFunctionName(f) : "pause"), + const { name, retry, ...schedulerOptions } = opts ?? {}; + return this.runExecution({ + type: "execution", + name: name ?? safeFunctionName(f), functionType, function: f, args: args ?? {}, retry, - pause, schedulerOptions, }); } - private async run( - req: Omit, + private runPause< + Mutation extends FunctionReference< + "mutation", + "internal", + DefaultFunctionArgs, + void + >, + Returns = unknown, + >( + opts?: { + name?: string; + returns: Validator; + } & ( + | { onPause: Mutation; args: FunctionArgs } + | { onPause?: undefined; args?: undefined } + ), + ): Promise { + let send: unknown; + const p = new Promise((resolve, reject) => { + send = this.sender.push({ + type: "pause" as const, + name: opts?.name ?? (opts?.onPause ? safeFunctionName(opts.onPause) : "pause"), + onPauseFunction: opts?.onPause, + args: opts?.args ?? {}, + schedulerOptions: {}, + resolve: resolve as (result: unknown) => void, + reject, + }); + }); + void send; + return p; + } + + private async runExecution( + req: Omit, ): Promise { let send: unknown; const p = new Promise((resolve, reject) => { diff --git a/src/client/types.ts b/src/client/types.ts index d7f8eec..bccd77f 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -1,6 +1,5 @@ import type { RetryOption, WorkId } from "@convex-dev/workpool"; import type { - DefaultFunctionArgs, Expand, FunctionArgs, FunctionReference, diff --git a/src/component/journal.ts b/src/component/journal.ts index a5d009d..fc31be7 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -91,7 +91,6 @@ export const startSteps = mutation({ const entries = await Promise.all( args.steps.map(async (stepArgs, index) => { const { step, retry, schedulerOptions } = stepArgs; - const { name, handle, args } = step; const stepNumber = stepNumberBase + index; const stepId = await ctx.db.insert("steps", { workflowId: workflow._id, @@ -105,42 +104,48 @@ export const startSteps = mutation({ stepId, }; let workId: WorkId | undefined = undefined; - // TODO: use enqueueBatch - switch (step.functionType) { - case "query": { - workId = await workpool.enqueueQuery( - ctx, - handle as FunctionHandle<"query">, - args, - { context, onComplete, name, ...schedulerOptions }, - ); - break; - } - // Pause is a special mutation - case "pause": - if (!handle) { + + if (step.type === "execution") { + switch (step.functionType) { + case "query": { + workId = await workpool.enqueueQuery( + ctx, + step.handle as FunctionHandle<"query">, + step.args, + { context, onComplete, name: step.name, ...schedulerOptions }, + ); + break; + } + case "mutation": { + workId = await workpool.enqueueMutation( + ctx, + step.handle as FunctionHandle<"mutation">, + step.args, + { context, onComplete, name: step.name, ...schedulerOptions }, + ); + break; + } + case "action": { + workId = await workpool.enqueueAction( + ctx, + step.handle as FunctionHandle<"action">, + step.args, + { context, onComplete, name: step.name, retry, ...schedulerOptions }, + ); break; } - // fallthrough - case "mutation": { - workId = await workpool.enqueueMutation( - ctx, - handle as FunctionHandle<"mutation">, - args, - { context, onComplete, name, ...schedulerOptions }, - ); - break; } - case "action": { - workId = await workpool.enqueueAction( + } else if (step.type === "pause") { + if (step.onPauseHandle) { + workId = await workpool.enqueueMutation( ctx, - handle as FunctionHandle<"action">, - args, - { context, onComplete, name, retry, ...schedulerOptions }, + step.onPauseHandle as FunctionHandle<"mutation">, + step.args, + { context, onComplete, name: step.name, ...schedulerOptions }, ); - break; } } + if (workId) { entry.step.workId = workId; await ctx.db.replace(entry._id, entry); @@ -149,7 +154,7 @@ export const startSteps = mutation({ console.event("started", { workflowId: workflow._id, workflowName: workflow.name, - stepName: name, + stepName: step.name, stepNumber, }); return entry; @@ -186,7 +191,7 @@ export const resume = mutation({ .withIndex("inProgress", (q) => q.eq("step.inProgress", true).eq("workflowId", args.workflowId), ) - .filter((q) => q.eq(q.field("step.functionType"), "pause")); + .filter((q) => q.eq(q.field("step.type"), "pause")); if (args.name) { query = query.filter((q) => q.eq(q.field("step.name"), args.name)); diff --git a/src/component/pool.ts b/src/component/pool.ts index 334cd35..b8b55b1 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -115,7 +115,7 @@ export const onComplete = internalMutation({ return; } if ( - journalEntry.step.functionType === "pause" && + journalEntry.step.type === "pause" && args.result.kind === "success" ) { console.event("stepPaused", { diff --git a/src/component/schema.ts b/src/component/schema.ts index a863d6c..725f509 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -59,20 +59,36 @@ export const workflowDocument = v.object({ }); export type Workflow = Infer; -export const step = v.object({ +const baseStepFields = { name: v.string(), inProgress: v.boolean(), workId: v.optional(vWorkIdValidator), - functionType: literals("query", "mutation", "action", "pause"), + runResult: v.optional(vResultValidator), + startedAt: v.number(), + completedAt: v.optional(v.number()), +}; + +const executionStep = v.object({ + ...baseStepFields, + type: v.literal("execution"), + functionType: literals("query", "mutation", "action"), handle: v.string(), argsSize: v.number(), args: v.any(), - runResult: v.optional(vResultValidator), +}); - startedAt: v.number(), - completedAt: v.optional(v.number()), +const pauseStep = v.object({ + ...baseStepFields, + type: v.literal("pause"), + onPauseHandle: v.optional(v.string()), + argsSize: v.number(), + args: v.any(), }); + +export const step = v.union(executionStep, pauseStep); export type Step = Infer; +export type ExecutionStep = Infer; +export type PauseStep = Infer; function stepSize(step: Step): number { let size = 0; @@ -81,8 +97,17 @@ function stepSize(step: Step): number { if (step.workId) { size += step.workId.length; } - size += step.functionType.length; - size += step.handle.length; + size += step.type.length; + + if (step.type === "execution") { + size += step.functionType.length; + size += step.handle.length; + } else if (step.type === "pause") { + if (step.onPauseHandle) { + size += step.onPauseHandle.length; + } + } + size += 8 + step.argsSize; if (step.runResult) { size += resultSize(step.runResult); From 94e4f07b567be9fbedb2402c63e5e04189d0a9d7 Mon Sep 17 00:00:00 2001 From: a-fatah Date: Fri, 3 Oct 2025 18:09:18 +0200 Subject: [PATCH 6/6] re-generate types --- example/convex/_generated/api.d.ts | 199 ++++++++++++++++++++--------- example/convex/setup.test.ts | 2 +- src/component/_generated/api.d.ts | 199 ++++++++++++++++++++--------- 3 files changed, 279 insertions(+), 121 deletions(-) diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 1664a30..ea05f8f 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -53,21 +53,38 @@ export declare const components: { journalEntries: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action" | "pause"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }>; @@ -91,6 +108,17 @@ export declare const components: { }; } >; + resume: FunctionReference< + "mutation", + "internal", + { + name?: string; + resumeValue: any; + workflowHandle: string; + workflowId: string; + }, + null + >; startSteps: FunctionReference< "mutation", "internal", @@ -101,21 +129,38 @@ export declare const components: { | boolean | { base: number; initialBackoffMs: number; maxAttempts: number }; schedulerOptions?: { runAt?: number } | { runAfter?: number }; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action" | "pause"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; }>; workflowId: string; workpoolOptions?: { @@ -132,21 +177,38 @@ export declare const components: { Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action" | "pause"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }> @@ -199,21 +261,38 @@ export declare const components: { inProgress: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action" | "pause"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }>; diff --git a/example/convex/setup.test.ts b/example/convex/setup.test.ts index a1a4451..50f1287 100644 --- a/example/convex/setup.test.ts +++ b/example/convex/setup.test.ts @@ -5,7 +5,7 @@ import schema from "./schema"; export const modules = import.meta.glob("./**/*.*s"); // Sorry about everything -import componentSchema from "../../node_modules/@convex-dev/workflow/src/component/schema"; +import componentSchema from "../../src/component/schema"; export { componentSchema }; export const componentModules = import.meta.glob( "../../node_modules/@convex-dev/workflow/src/component/**/*.ts", diff --git a/src/component/_generated/api.d.ts b/src/component/_generated/api.d.ts index 36426a4..de60b69 100644 --- a/src/component/_generated/api.d.ts +++ b/src/component/_generated/api.d.ts @@ -47,21 +47,38 @@ export type Mounts = { journalEntries: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action" | "pause"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }>; @@ -85,6 +102,17 @@ export type Mounts = { }; } >; + resume: FunctionReference< + "mutation", + "public", + { + name?: string; + resumeValue: any; + workflowHandle: string; + workflowId: string; + }, + null + >; startSteps: FunctionReference< "mutation", "public", @@ -95,21 +123,38 @@ export type Mounts = { | boolean | { base: number; initialBackoffMs: number; maxAttempts: number }; schedulerOptions?: { runAt?: number } | { runAfter?: number }; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action" | "pause"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; }>; workflowId: string; workpoolOptions?: { @@ -126,21 +171,38 @@ export type Mounts = { Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action" | "pause"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }> @@ -193,21 +255,38 @@ export type Mounts = { inProgress: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action" | "pause"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }>;