diff --git a/packages/inngest/src/components/InngestStepTools.ts b/packages/inngest/src/components/InngestStepTools.ts index f32295e4..ea8147ad 100644 --- a/packages/inngest/src/components/InngestStepTools.ts +++ b/packages/inngest/src/components/InngestStepTools.ts @@ -33,6 +33,7 @@ import { InngestFunctionReference } from "./InngestFunctionReference.js"; import { gemini, openai, type AiAdapter } from "./ai/index.js"; import { type InngestExecution } from "./execution/InngestExecution.js"; +import { prettyError } from "../helpers/errors.js"; export interface FoundStep extends HashedOp { hashedId: string; @@ -136,18 +137,377 @@ export const getStepOptions = (options: StepOptionsOrId): StepOptions => { export const STEP_INDEXING_SUFFIX = ":"; /** - * Create a new set of step function tools ready to be used in a step function. - * This function should be run and a fresh set of tools provided every time a - * function is run. + * Define the set of tools the user has access to for their step functions. * - * An op stack (function state) is passed in as well as some mutable properties - * that the tools can use to submit a new op. + * Each key is the function name and is expected to run `createTool` and pass a + * generic type for that function as it will appear in the user's code. */ -export const createStepTools = ( - client: TClient, - execution: InngestExecution, - stepHandler: StepHandler -) => { +export class StepTools { + _disabled = false; + + /** + * Define the set of tools the user has access to for their step functions. + * + * Each key is the function name and is expected to run `createTool` and pass + * a generic type for that function as it will appear in the user's code. + */ + constructor( + private _client: TClient, + private _execution: InngestExecution, + private _stepHandler: StepHandler + ) {} + + /** + * Send one or many events to Inngest. Should always be used in place of + * `inngest.send()` to ensure that the event send is successfully retried and + * not sent multiple times due to memoisation. + * + * @example + * ```ts + * await step.sendEvent("emit-user-creation", { + * name: "app/user.created", + * data: { id: 123 }, + * }); + * + * await step.sendEvent("emit-user-updates", [ + * { + * name: "app/user.created", + * data: { id: 123 }, + * }, + * { + * name: "app/user.feed.created", + * data: { id: 123 }, + * }, + * ]); + * ``` + * + * Returns a promise that will resolve once the event has been sent. + */ + public sendEvent = this._createTool<{ + >>( + idOrOptions: StepOptionsOrId, + payload: Payload + ): Promise>>; + }>( + ({ id, name }) => { + return { + id, + op: StepOpCode.StepPlanned, + name: "sendEvent", + displayName: name ?? id, + }; + }, + { + fn: (idOrOptions, payload) => { + return this._client["_send"]({ + payload, + headers: this._execution["options"]["headers"], + }); + }, + } + ); + + /** + * Wait for a particular event to be received before continuing. When the + * event is received, it will be returned. + * + * You can also provide options to control the particular event that is + * received, for example to ensure that a user ID matches between two events, + * or to only wait a maximum amount of time before giving up and returning + * `null` instead of any event data. + */ + public waitForEvent = this._createTool< + >>( + idOrOptions: StepOptionsOrId, + opts: WaitForEventOpts, IncomingEvent> + ) => Promise< + IncomingEvent extends WithoutInternalStr> + ? GetEvents[IncomingEvent] | null + : IncomingEvent | null + > + >( + ( + { id, name }, + + /** + * Options to control the event we're waiting for. + */ + opts + ) => { + const matchOpts: { timeout: string; if?: string } = { + timeout: timeStr(typeof opts === "string" ? opts : opts.timeout), + }; + + if (typeof opts !== "string") { + if (opts?.match) { + matchOpts.if = `event.${opts.match} == async.${opts.match}`; + } else if (opts?.if) { + matchOpts.if = opts.if; + } + } + + return { + id, + op: StepOpCode.WaitForEvent, + name: opts.event, + opts: matchOpts, + displayName: name ?? id, + }; + } + ); + + /** + * Use this tool to run business logic. Each call to `run` will be retried + * individually, meaning you can compose complex workflows that safely retry + * dependent asynchronous actions. + * + * The function you pass to `run` will be called only when this "step" is to + * be executed and can be synchronous or asynchronous. + * + * In either case, the return value of the function will be the return value + * of the `run` tool, meaning you can return and reason about return data for + * next steps. + */ + public run = this._createStepRun(); + + /** + * AI tooling for running AI models and other AI-related tasks. + */ + public ai = { + /** + * Use this tool to have Inngest make your AI calls. Useful for agentic + * workflows. + * + * Input is also tracked for this tool, meaning you can pass input to the + * function and it will be displayed and editable in the UI. + */ + infer: this._createTool< + ( + idOrOptions: StepOptionsOrId, + options: AiInferOpts + ) => Promise> + >(({ id, name }, options) => { + const modelCopy = { ...options.model }; + + // Allow the model to mutate options and body for this call + options.model.onCall?.(modelCopy, options.body); + + return { + id, + op: StepOpCode.AiGateway, + displayName: name ?? id, + opts: { + type: "step.ai.infer", + url: modelCopy.url, + headers: modelCopy.headers, + auth_key: modelCopy.authKey, + format: modelCopy.format, + body: options.body, + }, + }; + }), + + /** + * Use this tool to wrap AI models and other AI-related tasks. Each call to + * `wrap` will be retried individually, meaning you can compose complex + * workflows that safely retry dependent asynchronous actions. + * + * Input is also tracked for this tool, meaning you can pass input to the + * function and it will be displayed and editable in the UI. + */ + wrap: this._createStepRun("step.ai.wrap"), + + /** + * Models for AI inference and other AI-related tasks. + */ + models: { + /** + * Create an OpenAI model using the OpenAI chat format. + * + * By default it targets the `https://api.openai.com/v1/` base URL. + */ + openai, + + /** + * Create a Gemini model using the OpenAI chat format. + * + * By default it targets the + * `https://generativelanguage.googleapis.com/v1beta/` base URL. + */ + gemini, + }, + }; + + /** + * Wait a specified amount of time before continuing. + * + * The time to wait can be specified using a `number` of milliseconds or an + * `ms`-compatible time string like `"1 hour"`, `"30 mins"`, or `"2.5d"`. + * + * {@link https://npm.im/ms} + * + * To wait until a particular date, use `sleepUntil` instead. + */ + public sleep = this._createTool< + ( + idOrOptions: StepOptionsOrId, + + /** + * The amount of time to wait before continuing. + */ + time: number | string + ) => Promise + >(({ id, name }, time) => { + /** + * The presence of this operation in the returned stack indicates that the + * sleep is over and we should continue execution. + */ + return { + id, + op: StepOpCode.Sleep, + name: timeStr(time), + displayName: name ?? id, + }; + }); + + /** + * Wait until a particular date before continuing by passing a `Date`. + * + * To wait for a particular amount of time from now, always use `sleep` + * instead. + */ + public sleepUntil = this._createTool< + ( + idOrOptions: StepOptionsOrId, + + /** + * The date to wait until before continuing. + */ + time: Date | string + ) => Promise + >(({ id, name }, time) => { + const date = typeof time === "string" ? new Date(time) : time; + + /** + * The presence of this operation in the returned stack indicates that the + * sleep is over and we should continue execution. + */ + try { + return { + id, + op: StepOpCode.Sleep, + name: date.toISOString(), + displayName: name ?? id, + }; + } catch (err) { + /** + * If we're here, it's because the date is invalid. We'll throw a custom + * error here to standardise this response. + */ + // TODO PrettyError + console.warn("Invalid date or date string passed to sleepUntil;", err); + + // TODO PrettyError + throw new Error( + `Invalid date or date string passed to sleepUntil: ${time.toString()}` + ); + } + }); + + /** + * Invoke a passed Inngest `function` with the given `data`. Returns the + * result of the returned value of the function or `null` if the function does + * not return a value. + * + * A string ID can also be passed to reference functions outside of the + * current app. + */ + public invoke = this._createTool< + ( + idOrOptions: StepOptionsOrId, + opts: InvocationOpts + ) => InvocationResult> + >(({ id, name }, invokeOpts) => { + // Create a discriminated union to operate on based on the input types + // available for this tool. + const optsSchema = invokePayloadSchema.extend({ + timeout: z.union([z.number(), z.string(), z.date()]).optional(), + }); + + const parsedFnOpts = optsSchema + .extend({ + _type: z.literal("fullId").optional().default("fullId"), + function: z.string().min(1), + }) + .or( + optsSchema.extend({ + _type: z.literal("fnInstance").optional().default("fnInstance"), + function: z.instanceof(InngestFunction), + }) + ) + .or( + optsSchema.extend({ + _type: z.literal("refInstance").optional().default("refInstance"), + function: z.instanceof(InngestFunctionReference), + }) + ) + .safeParse(invokeOpts); + + if (!parsedFnOpts.success) { + throw new Error( + `Invalid invocation options passed to invoke; must include either a function or functionId.` + ); + } + + const { _type, function: fn, data, user, v, timeout } = parsedFnOpts.data; + const payload = { data, user, v } satisfies MinimalEventPayload; + const opts: { + payload: MinimalEventPayload; + function_id: string; + timeout?: string; + } = { + payload, + function_id: "", + timeout: typeof timeout === "undefined" ? undefined : timeStr(timeout), + }; + + switch (_type) { + case "fnInstance": + opts.function_id = fn.id(fn["client"].id); + break; + + case "fullId": + console.warn( + `${logPrefix} Invoking function with \`function: string\` is deprecated and will be removed in v4.0.0; use an imported function or \`referenceFunction()\` instead. See https://innge.st/ts-referencing-functions` + ); + opts.function_id = fn; + break; + + case "refInstance": + opts.function_id = [ + fn.opts.appId || this._client.id, + fn.opts.functionId, + ] + .filter(Boolean) + .join("-"); + break; + } + + return { + id, + op: StepOpCode.InvokeFunction, + displayName: name ?? id, + opts, + }; + }); + + /** + * Disable the tools once the execution is complete. + */ + private _disable() { + this._disabled = true; + } + /** * A local helper used to create tools that can be used to submit an op. * @@ -155,7 +515,7 @@ export const createStepTools = ( * function signature exposed to the user. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any - const createTool = Promise>( + private _createTool Promise>( /** * A function that returns an ID for this op. This is used to ensure that * the op stack is correctly filled, submitted, and retrieved with the same @@ -167,25 +527,36 @@ export const createStepTools = ( */ matchOp: MatchOpFn, opts?: StepToolOptions - ): T => { + ): T { return (async (...args: Parameters): Promise => { + if (this._disabled) { + console.warn( + prettyError({ + type: "warn", + whatHappened: "A step tool was run after execution finished", + why: "This is likely due to something holding on to a `step` reference after execution finished.", + consequences: "This call will produce a hanging promise.", + }) + ); + } + const parsedArgs = args as unknown as [StepOptionsOrId, ...unknown[]]; - return stepHandler({ args: parsedArgs, matchOp, opts }); + return this._stepHandler({ args: parsedArgs, matchOp, opts }); }) as T; - }; + } /** * Create a new step run tool that can be used to run a step function using * `step.run()` as a shim. */ - const createStepRun = ( + private _createStepRun( /** * The sub-type of this step tool, exposed via `opts.type` when the op is * reported. */ type?: string - ) => { - return createTool< + ) { + return this._createTool< ) => unknown>( idOrOptions: StepOptionsOrId, @@ -239,357 +610,23 @@ export const createStepTools = ( fn: (_, fn, ...input) => fn(...input), } ); - }; - - /** - * Define the set of tools the user has access to for their step functions. - * - * Each key is the function name and is expected to run `createTool` and pass - * a generic type for that function as it will appear in the user's code. - */ - const tools = { - /** - * Send one or many events to Inngest. Should always be used in place of - * `inngest.send()` to ensure that the event send is successfully retried - * and not sent multiple times due to memoisation. - * - * @example - * ```ts - * await step.sendEvent("emit-user-creation", { - * name: "app/user.created", - * data: { id: 123 }, - * }); - * - * await step.sendEvent("emit-user-updates", [ - * { - * name: "app/user.created", - * data: { id: 123 }, - * }, - * { - * name: "app/user.feed.created", - * data: { id: 123 }, - * }, - * ]); - * ``` - * - * Returns a promise that will resolve once the event has been sent. - */ - sendEvent: createTool<{ - >>( - idOrOptions: StepOptionsOrId, - payload: Payload - ): Promise>>; - }>( - ({ id, name }) => { - return { - id, - op: StepOpCode.StepPlanned, - name: "sendEvent", - displayName: name ?? id, - }; - }, - { - fn: (idOrOptions, payload) => { - return client["_send"]({ - payload, - headers: execution["options"]["headers"], - }); - }, - } - ), - - /** - * Wait for a particular event to be received before continuing. When the - * event is received, it will be returned. - * - * You can also provide options to control the particular event that is - * received, for example to ensure that a user ID matches between two - * events, or to only wait a maximum amount of time before giving up and - * returning `null` instead of any event data. - */ - waitForEvent: createTool< - >>( - idOrOptions: StepOptionsOrId, - opts: WaitForEventOpts, IncomingEvent> - ) => Promise< - IncomingEvent extends WithoutInternalStr> - ? GetEvents[IncomingEvent] | null - : IncomingEvent | null - > - >( - ( - { id, name }, - - /** - * Options to control the event we're waiting for. - */ - opts - ) => { - const matchOpts: { timeout: string; if?: string } = { - timeout: timeStr(typeof opts === "string" ? opts : opts.timeout), - }; - - if (typeof opts !== "string") { - if (opts?.match) { - matchOpts.if = `event.${opts.match} == async.${opts.match}`; - } else if (opts?.if) { - matchOpts.if = opts.if; - } - } - - return { - id, - op: StepOpCode.WaitForEvent, - name: opts.event, - opts: matchOpts, - displayName: name ?? id, - }; - } - ), - - /** - * Use this tool to run business logic. Each call to `run` will be retried - * individually, meaning you can compose complex workflows that safely - * retry dependent asynchronous actions. - * - * The function you pass to `run` will be called only when this "step" is to - * be executed and can be synchronous or asynchronous. - * - * In either case, the return value of the function will be the return value - * of the `run` tool, meaning you can return and reason about return data - * for next steps. - */ - run: createStepRun(), - - /** - * AI tooling for running AI models and other AI-related tasks. - */ - ai: { - /** - * Use this tool to have Inngest make your AI calls. Useful for agentic workflows. - * - * Input is also tracked for this tool, meaning you can pass input to the - * function and it will be displayed and editable in the UI. - */ - infer: createTool< - ( - idOrOptions: StepOptionsOrId, - options: AiInferOpts - ) => Promise> - >(({ id, name }, options) => { - const modelCopy = { ...options.model }; - - // Allow the model to mutate options and body for this call - options.model.onCall?.(modelCopy, options.body); - - return { - id, - op: StepOpCode.AiGateway, - displayName: name ?? id, - opts: { - type: "step.ai.infer", - url: modelCopy.url, - headers: modelCopy.headers, - auth_key: modelCopy.authKey, - format: modelCopy.format, - body: options.body, - }, - }; - }), - - /** - * Use this tool to wrap AI models and other AI-related tasks. Each call - * to `wrap` will be retried individually, meaning you can compose complex - * workflows that safely retry dependent asynchronous actions. - * - * Input is also tracked for this tool, meaning you can pass input to the - * function and it will be displayed and editable in the UI. - */ - wrap: createStepRun("step.ai.wrap"), - - /** - * Models for AI inference and other AI-related tasks. - */ - models: { - /** - * Create an OpenAI model using the OpenAI chat format. - * - * By default it targets the `https://api.openai.com/v1/` base URL. - */ - openai, - - /** - * Create a Gemini model using the OpenAI chat format. - * - * By default it targets the `https://generativelanguage.googleapis.com/v1beta/` - * base URL. - */ - gemini, - }, - }, - - /** - * Wait a specified amount of time before continuing. - * - * The time to wait can be specified using a `number` of milliseconds or an - * `ms`-compatible time string like `"1 hour"`, `"30 mins"`, or `"2.5d"`. - * - * {@link https://npm.im/ms} - * - * To wait until a particular date, use `sleepUntil` instead. - */ - sleep: createTool< - ( - idOrOptions: StepOptionsOrId, - - /** - * The amount of time to wait before continuing. - */ - time: number | string - ) => Promise - >(({ id, name }, time) => { - /** - * The presence of this operation in the returned stack indicates that the - * sleep is over and we should continue execution. - */ - return { - id, - op: StepOpCode.Sleep, - name: timeStr(time), - displayName: name ?? id, - }; - }), - - /** - * Wait until a particular date before continuing by passing a `Date`. - * - * To wait for a particular amount of time from now, always use `sleep` - * instead. - */ - sleepUntil: createTool< - ( - idOrOptions: StepOptionsOrId, - - /** - * The date to wait until before continuing. - */ - time: Date | string - ) => Promise - >(({ id, name }, time) => { - const date = typeof time === "string" ? new Date(time) : time; - - /** - * The presence of this operation in the returned stack indicates that the - * sleep is over and we should continue execution. - */ - try { - return { - id, - op: StepOpCode.Sleep, - name: date.toISOString(), - displayName: name ?? id, - }; - } catch (err) { - /** - * If we're here, it's because the date is invalid. We'll throw a custom - * error here to standardise this response. - */ - // TODO PrettyError - console.warn("Invalid date or date string passed to sleepUntil;", err); - - // TODO PrettyError - throw new Error( - `Invalid date or date string passed to sleepUntil: ${time.toString()}` - ); - } - }), - - /** - * Invoke a passed Inngest `function` with the given `data`. Returns the - * result of the returned value of the function or `null` if the function - * does not return a value. - * - * A string ID can also be passed to reference functions outside of the - * current app. - */ - invoke: createTool< - ( - idOrOptions: StepOptionsOrId, - opts: InvocationOpts - ) => InvocationResult> - >(({ id, name }, invokeOpts) => { - // Create a discriminated union to operate on based on the input types - // available for this tool. - const optsSchema = invokePayloadSchema.extend({ - timeout: z.union([z.number(), z.string(), z.date()]).optional(), - }); - - const parsedFnOpts = optsSchema - .extend({ - _type: z.literal("fullId").optional().default("fullId"), - function: z.string().min(1), - }) - .or( - optsSchema.extend({ - _type: z.literal("fnInstance").optional().default("fnInstance"), - function: z.instanceof(InngestFunction), - }) - ) - .or( - optsSchema.extend({ - _type: z.literal("refInstance").optional().default("refInstance"), - function: z.instanceof(InngestFunctionReference), - }) - ) - .safeParse(invokeOpts); - - if (!parsedFnOpts.success) { - throw new Error( - `Invalid invocation options passed to invoke; must include either a function or functionId.` - ); - } - - const { _type, function: fn, data, user, v, timeout } = parsedFnOpts.data; - const payload = { data, user, v } satisfies MinimalEventPayload; - const opts: { - payload: MinimalEventPayload; - function_id: string; - timeout?: string; - } = { - payload, - function_id: "", - timeout: typeof timeout === "undefined" ? undefined : timeStr(timeout), - }; - - switch (_type) { - case "fnInstance": - opts.function_id = fn.id(fn["client"].id); - break; - - case "fullId": - console.warn( - `${logPrefix} Invoking function with \`function: string\` is deprecated and will be removed in v4.0.0; use an imported function or \`referenceFunction()\` instead. See https://innge.st/ts-referencing-functions` - ); - opts.function_id = fn; - break; - - case "refInstance": - opts.function_id = [fn.opts.appId || client.id, fn.opts.functionId] - .filter(Boolean) - .join("-"); - break; - } - - return { - id, - op: StepOpCode.InvokeFunction, - displayName: name ?? id, - opts, - }; - }), - }; + } +} - return tools; +/** + * Create a new set of step function tools ready to be used in a step function. + * This function should be run and a fresh set of tools provided every time a + * function is run. + * + * An op stack (function state) is passed in as well as some mutable properties + * that the tools can use to submit a new op. + */ +export const createStepTools = ( + client: TClient, + execution: InngestExecution, + stepHandler: StepHandler +) => { + return new StepTools(client, execution, stepHandler); }; /** diff --git a/packages/inngest/src/components/execution/v0.ts b/packages/inngest/src/components/execution/v0.ts index 9495b0ab..4acacd0c 100644 --- a/packages/inngest/src/components/execution/v0.ts +++ b/packages/inngest/src/components/execution/v0.ts @@ -70,10 +70,16 @@ export class V0InngestExecution public start() { this.debug("starting V0 execution"); - return (this.execution ??= this._start().then((result) => { - this.debug("result:", result); - return result; - })); + return (this.execution ??= this._start() + .then((result) => { + this.debug("result:", result); + return result; + }) + .finally(() => { + // Disable step tools after execution; only needed to produce warnings + // for if something is keeping hold of these references. + this.fnArg?.step?.["_disable"]?.(); + })); } private async _start(): Promise { diff --git a/packages/inngest/src/components/execution/v1.ts b/packages/inngest/src/components/execution/v1.ts index 145be514..dcf6f701 100644 --- a/packages/inngest/src/components/execution/v1.ts +++ b/packages/inngest/src/components/execution/v1.ts @@ -95,10 +95,16 @@ class V1InngestExecution extends InngestExecution implements IInngestExecution { public start() { this.debug("starting V1 execution"); - return (this.execution ??= this._start().then((result) => { - this.debug("result:", result); - return result; - })); + return (this.execution ??= this._start() + .then((result) => { + this.debug("result:", result); + return result; + }) + .finally(() => { + // Disable step tools after execution; only needed to produce warnings + // for if something is keeping hold of these references. + this.fnArg?.step?.["_disable"]?.(); + })); } /**