Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions example/convex/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export declare const components: {
args: any;
argsSize: number;
completedAt?: number;
functionType: "query" | "mutation" | "action";
functionType: "query" | "mutation" | "action" | "pause";
handle: string;
inProgress: boolean;
name: string;
Expand Down Expand Up @@ -105,7 +105,7 @@ export declare const components: {
args: any;
argsSize: number;
completedAt?: number;
functionType: "query" | "mutation" | "action";
functionType: "query" | "mutation" | "action" | "pause";
handle: string;
inProgress: boolean;
name: string;
Expand Down Expand Up @@ -136,7 +136,7 @@ export declare const components: {
args: any;
argsSize: number;
completedAt?: number;
functionType: "query" | "mutation" | "action";
functionType: "query" | "mutation" | "action" | "pause";
handle: string;
inProgress: boolean;
name: string;
Expand Down Expand Up @@ -203,7 +203,7 @@ export declare const components: {
args: any;
argsSize: number;
completedAt?: number;
functionType: "query" | "mutation" | "action";
functionType: "query" | "mutation" | "action" | "pause";
handle: string;
inProgress: boolean;
name: string;
Expand Down
8 changes: 6 additions & 2 deletions src/client/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ export type WorkerResult =
export type StepRequest = {
name: string;
functionType: FunctionType;
function: FunctionReference<FunctionType, "internal">;
function: FunctionReference<FunctionType, "internal"> | undefined;
args: unknown;
retry: RetryBehavior | boolean | undefined;
schedulerOptions: SchedulerOptions;
pause: boolean | undefined;

resolve: (result: unknown) => void;
reject: (error: unknown) => void;
Expand Down Expand Up @@ -135,9 +136,12 @@ export class StepExecutor {
inProgress: true,
name: message.name,
functionType: message.functionType,
handle: await createFunctionHandle(message.function),
handle: message.function
? await createFunctionHandle(message.function)
: "",
args: message.args,
argsSize: valueSize(message.args as Value),
pause: message.pause,
outcome: undefined,
startedAt: this.now,
completedAt: undefined,
Expand Down
73 changes: 59 additions & 14 deletions src/client/stepContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import type {
FunctionArgs,
FunctionReturnType,
FunctionType,
DefaultFunctionArgs,
} from "convex/server";
import { safeFunctionName } from "./safeFunctionName.js";
import type { StepRequest } from "./step.js";
import type { RetryOption } from "@convex-dev/workpool";
import type { RunOptions, WorkflowStep } from "./types.js";
import type { WorkflowId } from "../types.js";
import type { Validator } from "convex/values";

export class StepContext implements WorkflowStep {
constructor(
Expand Down Expand Up @@ -41,30 +43,73 @@ export class StepContext implements WorkflowStep {
return this.runFunction("action", action, args, opts);
}

private async runFunction<
F extends FunctionReference<FunctionType, "internal">,
async pause<
Mutation extends FunctionReference<
"mutation",
"internal",
DefaultFunctionArgs,
void
>,
Returns = unknown,
>(
opts?: {
/**
* The name for the pause. By default, if you pass in api.foo.bar.baz,
* it will use "foo/bar:baz" as the name. If you pass in a function handle,
* it will use the function handle directly. Otherwise it will use "pause".
*/
name?: string;
returns: Validator<Returns, "required">;
} & (
| { onPause: Mutation; args: FunctionArgs<Mutation> }
| { onPause?: undefined; args?: undefined }
),
): Promise<Returns> {
if (opts?.onPause) {
return this.runFunction("mutation", opts.onPause, opts.args, {
name: opts.name,
pause: true,
}) as Promise<Returns>;
} else {
return this.run({
name: opts?.name ?? "pause",
functionType: "mutation",
function: undefined,
args: {},
retry: undefined,
pause: true,
schedulerOptions: {},
}) as Promise<Returns>;
}
}

private runFunction<F extends FunctionReference<FunctionType, "internal">>(
functionType: FunctionType,
f: F,
args: unknown,
opts?: RunOptions & RetryOption,
opts?: RunOptions & RetryOption & { pause?: true },
): Promise<unknown> {
let send: unknown;
const { name, ...rest } = opts ?? {};
const { retry, ...schedulerOptions } = rest;
const { name, retry, pause, ...schedulerOptions } = opts ?? {};
return this.run({
name: name ?? (f ? safeFunctionName(f) : "pause"),
functionType,
function: f,
args: args ?? {},
retry,
pause,
schedulerOptions,
});
}

private run(req: Omit<StepRequest, "resolve" | "reject">): Promise<unknown> {
let send: Promise<void>;
const p = new Promise<unknown>((resolve, reject) => {
send = this.sender.push({
name: name ?? safeFunctionName(f),
functionType,
function: f,
args,
retry,
schedulerOptions,
...req,
resolve,
reject,
});
});
await send;
return p;
return send!.then(() => p);
}
}
39 changes: 38 additions & 1 deletion src/client/types.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { RetryOption, WorkId } from "@convex-dev/workpool";
import type {
DefaultFunctionArgs,
Expand,
FunctionArgs,
FunctionReference,
FunctionReturnType,
} from "convex/server";
import type { api } from "../component/_generated/api.js";
import type { GenericId } from "convex/values";
import type { GenericId, Validator } from "convex/values";
import type { WorkflowId } from "../types.js";

export type WorkflowComponent = UseApi<typeof api>;
Expand Down Expand Up @@ -81,6 +82,42 @@ export type WorkflowStep = {
args: FunctionArgs<Action>,
opts?: RunOptions & RetryOption,
): Promise<FunctionReturnType<Action>>;

/**
* Pause the workflow, to be resumed asynchronously.
*
* It will be marked as paused in the same transaction as the pause handler
* is called. The pause handler must receive the arguments and return nothing.
*
* The return value is the value provided by the resume call, which must match
* the return validator provided.
*
* @param pauseHandler - The pause handler to run, like `internal.index.examplePause`.
* @param args - The arguments to the pause handler.
* @param opts - Options for retrying, scheduling and naming the pause.
*/
pause<
Mutation extends FunctionReference<
"mutation",
"internal",
DefaultFunctionArgs,
void
>,
Returns = unknown,
>(
opts?: {
/**
* The name for the pause. By default, if you pass in api.foo.bar.baz,
* it will use "foo/bar:baz" as the name. If you pass in a function handle,
* it will use the function handle directly. Otherwise it will use "pause".
*/
name?: string;
returns: Validator<Returns, "required">;
} & (
| { onPause: Mutation; args: FunctionArgs<Mutation> }
| { onPause?: undefined; args?: undefined }
),
): Promise<Returns>;
};

export type UseApi<API> = Expand<{
Expand Down
8 changes: 4 additions & 4 deletions src/component/_generated/api.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export type Mounts = {
args: any;
argsSize: number;
completedAt?: number;
functionType: "query" | "mutation" | "action";
functionType: "query" | "mutation" | "action" | "pause";
handle: string;
inProgress: boolean;
name: string;
Expand Down Expand Up @@ -99,7 +99,7 @@ export type Mounts = {
args: any;
argsSize: number;
completedAt?: number;
functionType: "query" | "mutation" | "action";
functionType: "query" | "mutation" | "action" | "pause";
handle: string;
inProgress: boolean;
name: string;
Expand Down Expand Up @@ -130,7 +130,7 @@ export type Mounts = {
args: any;
argsSize: number;
completedAt?: number;
functionType: "query" | "mutation" | "action";
functionType: "query" | "mutation" | "action" | "pause";
handle: string;
inProgress: boolean;
name: string;
Expand Down Expand Up @@ -197,7 +197,7 @@ export type Mounts = {
args: any;
argsSize: number;
completedAt?: number;
functionType: "query" | "mutation" | "action";
functionType: "query" | "mutation" | "action" | "pause";
handle: string;
inProgress: boolean;
name: string;
Expand Down
15 changes: 12 additions & 3 deletions src/component/journal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ export const startSteps = mutation({
generationNumber,
stepId,
};
let workId: WorkId;
let workId: WorkId | undefined = undefined;
// TODO: use enqueueBatch
switch (step.functionType) {
case "query": {
workId = await workpool.enqueueQuery(
Expand All @@ -115,6 +116,12 @@ export const startSteps = mutation({
);
break;
}
// Pause is a special mutation
case "pause":
if (!handle) {
break;
}
// fallthrough
case "mutation": {
workId = await workpool.enqueueMutation(
ctx,
Expand All @@ -134,8 +141,10 @@ export const startSteps = mutation({
break;
}
}
entry.step.workId = workId;
await ctx.db.replace(entry._id, entry);
if (workId) {
entry.step.workId = workId;
await ctx.db.replace(entry._id, entry);
}

console.event("started", {
workflowId: workflow._id,
Expand Down
12 changes: 12 additions & 0 deletions src/component/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ export const onComplete = internalMutation({
);
return;
}
if (
journalEntry.step.functionType === "pause" &&
args.result.kind === "success"
) {
console.event("stepPaused", {
workflowId,
workflowName: workflow.name,
stepName: journalEntry.step.name,
stepNumber: journalEntry.stepNumber,
});
return;
}
journalEntry.step.inProgress = false;
journalEntry.step.completedAt = Date.now();
switch (args.result.kind) {
Expand Down
2 changes: 1 addition & 1 deletion src/component/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export const step = v.object({
name: v.string(),
inProgress: v.boolean(),
workId: v.optional(vWorkIdValidator),
functionType: literals("query", "mutation", "action"),
functionType: literals("query", "mutation", "action", "pause"),
handle: v.string(),
argsSize: v.number(),
args: v.any(),
Expand Down