From 892cbcc4f06a1b9f44816e305333b6939271271c Mon Sep 17 00:00:00 2001 From: Ian Macartney Date: Tue, 13 May 2025 13:03:02 -0700 Subject: [PATCH 1/3] add pause --- example/convex/_generated/api.d.ts | 8 ++++---- src/client/step.ts | 2 ++ src/client/stepContext.ts | 21 ++++++++++++++++++--- src/client/types.ts | 30 +++++++++++++++++++++++++++++- src/component/_generated/api.d.ts | 8 ++++---- src/component/journal.ts | 2 ++ src/component/pool.ts | 12 ++++++++++++ src/component/schema.ts | 2 +- 8 files changed, 72 insertions(+), 13 deletions(-) diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 0d97802..1664a30 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -57,7 +57,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -105,7 +105,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -136,7 +136,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -203,7 +203,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; diff --git a/src/client/step.ts b/src/client/step.ts index 0e8a837..a175ce4 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -30,6 +30,7 @@ export type StepRequest = { args: unknown; retry: RetryBehavior | boolean | undefined; schedulerOptions: SchedulerOptions; + pause: boolean | undefined; resolve: (result: unknown) => void; reject: (error: unknown) => void; @@ -138,6 +139,7 @@ export class StepExecutor { handle: await createFunctionHandle(message.function), args: message.args, argsSize: valueSize(message.args as Value), + pause: message.pause, outcome: undefined, startedAt: this.now, completedAt: undefined, diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index 8145672..8a989eb 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -10,6 +10,7 @@ import type { StepRequest } from "./step.js"; import type { RetryOption } from "@convex-dev/workpool"; import type { RunOptions, WorkflowStep } from "./types.js"; import type { WorkflowId } from "../types.js"; +import type { Validator } from "convex/values"; export class StepContext implements WorkflowStep { constructor( @@ -41,17 +42,30 @@ export class StepContext implements WorkflowStep { return this.runFunction("action", action, args, opts); } + async pause< + Mutation extends FunctionReference<"mutation", any, any, void>, + Returns = void, + >( + pauseHandler: Mutation, + args: FunctionArgs, + opts?: RunOptions & { returns: Validator }, + ): Promise { + return this.runFunction("mutation", pauseHandler, args, { + ...opts, + pause: true, + }) as Promise; + } + private async runFunction< F extends FunctionReference, >( functionType: FunctionType, f: F, args: unknown, - opts?: RunOptions & RetryOption, + opts?: RunOptions & RetryOption & { pause?: true }, ): Promise { let send: unknown; - const { name, ...rest } = opts ?? {}; - const { retry, ...schedulerOptions } = rest; + const { name, retry, pause, ...schedulerOptions } = opts ?? {}; const p = new Promise((resolve, reject) => { send = this.sender.push({ name: name ?? safeFunctionName(f), @@ -59,6 +73,7 @@ export class StepContext implements WorkflowStep { function: f, args, retry, + pause, schedulerOptions, resolve, reject, diff --git a/src/client/types.ts b/src/client/types.ts index 9e90065..ebd8162 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -1,12 +1,13 @@ import type { RetryOption, WorkId } from "@convex-dev/workpool"; import type { + DefaultFunctionArgs, Expand, FunctionArgs, FunctionReference, FunctionReturnType, } from "convex/server"; import type { api } from "../component/_generated/api.js"; -import type { GenericId } from "convex/values"; +import type { GenericId, Validator } from "convex/values"; import type { WorkflowId } from "../types.js"; export type WorkflowComponent = UseApi; @@ -81,6 +82,33 @@ export type WorkflowStep = { args: FunctionArgs, opts?: RunOptions & RetryOption, ): Promise>; + + /** + * Pause the workflow, to be resumed asynchronously. + * + * It will be marked as paused in the same transaction as the pause handler + * is called. The pause handler must receive the arguments and return nothing. + * + * The return value is the value provided by the resume call, which must match + * the return validator provided. + * + * @param pauseHandler - The pause handler to run, like `internal.index.examplePause`. + * @param args - The arguments to the pause handler. + * @param opts - Options for retrying, scheduling and naming the pause. + */ + pause< + Mutation extends FunctionReference< + "mutation", + "internal", + DefaultFunctionArgs, + void + >, + Returns = void, + >( + pauseHandler: Mutation, + args: FunctionArgs, + opts?: RunOptions & { returns: Validator }, + ): Promise; }; export type UseApi = Expand<{ diff --git a/src/component/_generated/api.d.ts b/src/component/_generated/api.d.ts index 2caf60c..36426a4 100644 --- a/src/component/_generated/api.d.ts +++ b/src/component/_generated/api.d.ts @@ -51,7 +51,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -99,7 +99,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -130,7 +130,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -197,7 +197,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; diff --git a/src/component/journal.ts b/src/component/journal.ts index df9642e..ab67399 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -115,6 +115,8 @@ export const startSteps = mutation({ ); break; } + // Pause is a special mutation + case "pause": case "mutation": { workId = await workpool.enqueueMutation( ctx, diff --git a/src/component/pool.ts b/src/component/pool.ts index 781ad68..334cd35 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -114,6 +114,18 @@ export const onComplete = internalMutation({ ); return; } + if ( + journalEntry.step.functionType === "pause" && + args.result.kind === "success" + ) { + console.event("stepPaused", { + workflowId, + workflowName: workflow.name, + stepName: journalEntry.step.name, + stepNumber: journalEntry.stepNumber, + }); + return; + } journalEntry.step.inProgress = false; journalEntry.step.completedAt = Date.now(); switch (args.result.kind) { diff --git a/src/component/schema.ts b/src/component/schema.ts index c96c0af..a863d6c 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -63,7 +63,7 @@ export const step = v.object({ name: v.string(), inProgress: v.boolean(), workId: v.optional(vWorkIdValidator), - functionType: literals("query", "mutation", "action"), + functionType: literals("query", "mutation", "action", "pause"), handle: v.string(), argsSize: v.number(), args: v.any(), From 9fe869c436a5b342f237f0836fb5be4a08e24f4e Mon Sep 17 00:00:00 2001 From: Ian Macartney Date: Mon, 22 Sep 2025 20:18:17 -0700 Subject: [PATCH 2/3] maybe support not passing a function --- src/client/step.ts | 6 ++-- src/client/stepContext.ts | 73 ++++++++++++++++++++++++++++----------- src/client/types.ts | 17 ++++++--- src/component/journal.ts | 13 +++++-- 4 files changed, 80 insertions(+), 29 deletions(-) diff --git a/src/client/step.ts b/src/client/step.ts index a175ce4..b05f7df 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -26,7 +26,7 @@ export type WorkerResult = export type StepRequest = { name: string; functionType: FunctionType; - function: FunctionReference; + function: FunctionReference | undefined; args: unknown; retry: RetryBehavior | boolean | undefined; schedulerOptions: SchedulerOptions; @@ -136,7 +136,9 @@ export class StepExecutor { inProgress: true, name: message.name, functionType: message.functionType, - handle: await createFunctionHandle(message.function), + handle: message.function + ? await createFunctionHandle(message.function) + : "", args: message.args, argsSize: valueSize(message.args as Value), pause: message.pause, diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index 8a989eb..5fd8385 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -4,6 +4,7 @@ import type { FunctionArgs, FunctionReturnType, FunctionType, + DefaultFunctionArgs, } from "convex/server"; import { safeFunctionName } from "./safeFunctionName.js"; import type { StepRequest } from "./step.js"; @@ -43,38 +44,70 @@ export class StepContext implements WorkflowStep { } async pause< - Mutation extends FunctionReference<"mutation", any, any, void>, - Returns = void, + Mutation extends FunctionReference< + "mutation", + "internal", + DefaultFunctionArgs, + void + >, + Returns = unknown, >( - pauseHandler: Mutation, - args: FunctionArgs, - opts?: RunOptions & { returns: Validator }, + opts?: { + /** + * The name for the pause. By default, if you pass in api.foo.bar.baz, + * it will use "foo/bar:baz" as the name. If you pass in a function handle, + * it will use the function handle directly. Otherwise it will use "pause". + */ + name?: string; + returns: Validator; + } & ( + | { onPause: Mutation; args: FunctionArgs } + | { onPause?: undefined; args?: undefined } + ), ): Promise { - return this.runFunction("mutation", pauseHandler, args, { - ...opts, - pause: true, - }) as Promise; + if (opts?.onPause) { + return this.runFunction("mutation", opts.onPause, opts.args, { + name: opts.name, + pause: true, + }) as Promise; + } else { + return this.run({ + name: opts?.name ?? "pause", + functionType: "mutation", + function: undefined, + args: {}, + retry: undefined, + pause: true, + schedulerOptions: {}, + }) as Promise; + } } - private async runFunction< - F extends FunctionReference, - >( + private runFunction>( functionType: FunctionType, f: F, args: unknown, opts?: RunOptions & RetryOption & { pause?: true }, ): Promise { - let send: unknown; const { name, retry, pause, ...schedulerOptions } = opts ?? {}; + return this.run({ + name: name ?? (f ? safeFunctionName(f) : "pause"), + functionType, + function: f, + args: args ?? {}, + retry, + pause, + schedulerOptions, + }); + } + + private async run( + req: Omit, + ): Promise { + let send: unknown; const p = new Promise((resolve, reject) => { send = this.sender.push({ - name: name ?? safeFunctionName(f), - functionType, - function: f, - args, - retry, - pause, - schedulerOptions, + ...req, resolve, reject, }); diff --git a/src/client/types.ts b/src/client/types.ts index ebd8162..04aac99 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -103,11 +103,20 @@ export type WorkflowStep = { DefaultFunctionArgs, void >, - Returns = void, + Returns = unknown, >( - pauseHandler: Mutation, - args: FunctionArgs, - opts?: RunOptions & { returns: Validator }, + opts?: { + /** + * The name for the pause. By default, if you pass in api.foo.bar.baz, + * it will use "foo/bar:baz" as the name. If you pass in a function handle, + * it will use the function handle directly. Otherwise it will use "pause". + */ + name?: string; + returns: Validator; + } & ( + | { onPause: Mutation; args: FunctionArgs } + | { onPause?: undefined; args?: undefined } + ), ): Promise; }; diff --git a/src/component/journal.ts b/src/component/journal.ts index ab67399..25d7c8f 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -104,7 +104,8 @@ export const startSteps = mutation({ generationNumber, stepId, }; - let workId: WorkId; + let workId: WorkId | undefined = undefined; + // TODO: use enqueueBatch switch (step.functionType) { case "query": { workId = await workpool.enqueueQuery( @@ -117,6 +118,10 @@ export const startSteps = mutation({ } // Pause is a special mutation case "pause": + if (!handle) { + break; + } + // fallthrough case "mutation": { workId = await workpool.enqueueMutation( ctx, @@ -136,8 +141,10 @@ export const startSteps = mutation({ break; } } - entry.step.workId = workId; - await ctx.db.replace(entry._id, entry); + if (workId) { + entry.step.workId = workId; + await ctx.db.replace(entry._id, entry); + } console.event("started", { workflowId: workflow._id, From cc9cb1ba2f65edd3c61f30d2a7dde4d9c5c16fc0 Mon Sep 17 00:00:00 2001 From: Ian Macartney Date: Wed, 1 Oct 2025 09:56:05 -0700 Subject: [PATCH 3/3] types for run, and maybe omit from stack trace --- src/client/stepContext.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index 5fd8385..ff87ea4 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -101,10 +101,8 @@ export class StepContext implements WorkflowStep { }); } - private async run( - req: Omit, - ): Promise { - let send: unknown; + private run(req: Omit): Promise { + let send: Promise; const p = new Promise((resolve, reject) => { send = this.sender.push({ ...req, @@ -112,7 +110,6 @@ export class StepContext implements WorkflowStep { reject, }); }); - await send; - return p; + return send!.then(() => p); } }