diff --git a/packages/inngest/src/components/InngestCommHandler.ts b/packages/inngest/src/components/InngestCommHandler.ts index 39b4837c7..d90edc75a 100644 --- a/packages/inngest/src/components/InngestCommHandler.ts +++ b/packages/inngest/src/components/InngestCommHandler.ts @@ -786,6 +786,7 @@ export class InngestCommHandler< timer, isFailureHandler: fn.onFailure, disableImmediateExecution: fndata.value.disable_immediate_execution, + stepCompletionOrder: ctx?.stack?.stack ?? [], }); return execution.start(); diff --git a/packages/inngest/src/components/InngestExecution.ts b/packages/inngest/src/components/InngestExecution.ts index 1a7d7a505..637fbe43c 100644 --- a/packages/inngest/src/components/InngestExecution.ts +++ b/packages/inngest/src/components/InngestExecution.ts @@ -61,6 +61,7 @@ export interface InngestExecutionOptions { fn: AnyInngestFunction; data: unknown; stepState: Record; + stepCompletionOrder: string[]; requestedRunStep?: string; timer?: ServerTiming; isFailureHandler?: boolean; @@ -90,7 +91,7 @@ export class InngestExecution { this.options = options; this.#userFnToRun = this.#getUserFnToRun(); - this.state = this.#createExecutionState(this.options.stepState); + this.state = this.#createExecutionState(this.options); this.fnArg = this.#createFnArg(this.state); this.checkpointHandlers = this.#createCheckpointHandlers(); this.#initializeTimer(this.state); @@ -185,6 +186,20 @@ export class InngestExecution { * them back to Inngest. */ "steps-found": async ({ steps }) => { + /** + * Iterate over state stack first, resolving promises in order to support + * racing. + * + * Then, find all remaining steps and handle them too. + * + * If we "handle" any steps in this pass, we must wait for the next + * checkpoint before taking any action. This is because we could be + * seeing a mix of previously-reported steps and new steps, which we + * can't differentiate. + * + * Because of this, we must also roll up any steps not necessarily found + * in only this checkpoint. + */ const stepResult = await this.#tryExecuteStep(steps); if (stepResult) { const transformResult = await this.#transformOutput(stepResult); @@ -200,7 +215,9 @@ export class InngestExecution { return transformResult; } - const newSteps = await this.#filterNewSteps(steps); + const newSteps = await this.#filterNewSteps( + Object.values(this.state.steps) + ); if (newSteps) { return { type: "steps-found", @@ -460,9 +477,10 @@ export class InngestExecution { return { type: "function-resolved", data }; } - #createExecutionState( - stepState: InngestExecutionOptions["stepState"] - ): ExecutionState { + #createExecutionState({ + stepState, + stepCompletionOrder, + }: InngestExecutionOptions): ExecutionState { let { promise: checkpointPromise, resolve: checkpointResolve } = createDeferredPromise(); @@ -485,13 +503,14 @@ export class InngestExecution { steps: {}, loop, hasSteps: Boolean(Object.keys(stepState).length), + stepCompletionOrder, setCheckpoint: (checkpoint: Checkpoint) => { ({ promise: checkpointPromise, resolve: checkpointResolve } = checkpointResolve(checkpoint)); }, allStateUsed: () => { return Object.values(state.stepState).every((step) => { - return step.fulfilled; + return step.seen; }); }, }; @@ -626,6 +645,7 @@ export type ExecutionResultHandlers = { export interface MemoizedOp extends IncomingOp { fulfilled?: boolean; + seen?: boolean; } export interface ExecutionState { @@ -680,4 +700,10 @@ export interface ExecutionState { * fulfill found steps. */ allStateUsed: () => boolean; + + /** + * An ordered list of step IDs that represents the order in which their + * execution was completed. + */ + stepCompletionOrder: string[]; } diff --git a/packages/inngest/src/components/InngestFunction.test.ts b/packages/inngest/src/components/InngestFunction.test.ts index 4ee093fcb..4a4c4593c 100644 --- a/packages/inngest/src/components/InngestFunction.test.ts +++ b/packages/inngest/src/components/InngestFunction.test.ts @@ -109,6 +109,7 @@ describe("runFn", () => { const execution = fn["createExecution"]({ data: { event: { name: "foo", data: { foo: "foo" } } }, stepState: {}, + stepCompletionOrder: [], }); ret = await execution.start(); @@ -146,6 +147,7 @@ describe("runFn", () => { const execution = fn["createExecution"]({ data: { event: { name: "foo", data: { foo: "foo" } } }, stepState: {}, + stepCompletionOrder: [], }); const ret = await execution.start(); @@ -170,12 +172,14 @@ describe("runFn", () => { runStep?: string; onFailure?: boolean; event?: EventPayload; + stackOrder?: InngestExecutionOptions["stepCompletionOrder"]; disableImmediateExecution?: boolean; } ) => { const execution = fn["createExecution"]({ data: { event: opts?.event || { name: "foo", data: {} } }, stepState, + stepCompletionOrder: opts?.stackOrder ?? Object.keys(stepState), isFailureHandler: Boolean(opts?.onFailure), requestedRunStep: opts?.runStep, timer, @@ -210,6 +214,7 @@ describe("runFn", () => { string, { stack?: InngestExecutionOptions["stepState"]; + stackOrder?: InngestExecutionOptions["stepCompletionOrder"]; onFailure?: boolean; runStep?: string; expectedReturn?: Awaited>; @@ -246,6 +251,7 @@ describe("runFn", () => { beforeAll(async () => { ret = await runFnWithStack(tools.fn, t.stack || {}, { + stackOrder: t.stackOrder, runStep: t.runStep, onFailure: t.onFailure || tools.onFailure, event: t.event || tools.event, @@ -625,6 +631,225 @@ describe("runFn", () => { }) ); + testFn( + "Promise.race", + () => { + const A = jest.fn(() => Promise.resolve("A")); + const B = jest.fn(() => Promise.resolve("B")); + const AWins = jest.fn(() => Promise.resolve("A wins")); + const BWins = jest.fn(() => Promise.resolve("B wins")); + + const fn = inngest.createFunction( + { id: "name" }, + { event: "foo" }, + async ({ step: { run } }) => { + const winner = await Promise.race([run("A", A), run("B", B)]); + + if (winner === "A") { + await run("A wins", AWins); + } else if (winner === "B") { + await run("B wins", BWins); + } + } + ); + + return { fn, steps: { A, B, AWins, BWins } }; + }, + { + A: "A", + B: "B", + AWins: "A wins", + BWins: "B wins", + }, + ({ A, B, AWins, BWins }) => ({ + "first run reports A and B steps": { + expectedReturn: { + type: "steps-found", + steps: [ + expect.objectContaining({ + id: A, + name: "A", + op: StepOpCode.StepPlanned, + }), + expect.objectContaining({ + id: B, + name: "B", + op: StepOpCode.StepPlanned, + }), + ], + }, + }, + + "requesting to run B runs B": { + runStep: B, + expectedReturn: { + type: "step-ran", + step: expect.objectContaining({ + id: B, + name: "B", + op: StepOpCode.RunStep, + data: "B", + }), + }, + expectedStepsRun: ["B"], + disableImmediateExecution: true, + }, + + "request following B reports 'A' and 'B wins' steps": { + stack: { [B]: { id: B, data: "B" } }, + expectedReturn: { + type: "steps-found", + steps: [ + expect.objectContaining({ + id: A, + name: "A", + op: StepOpCode.StepPlanned, + }), + expect.objectContaining({ + id: BWins, + name: "B wins", + op: StepOpCode.StepPlanned, + }), + ], + }, + disableImmediateExecution: true, + }, + + "requesting to run A runs A": { + runStep: A, + expectedReturn: { + type: "step-ran", + step: expect.objectContaining({ + id: A, + name: "A", + op: StepOpCode.RunStep, + data: "A", + }), + }, + expectedStepsRun: ["A"], + disableImmediateExecution: true, + }, + + "request following 'B wins' resolves": { + stack: { + [B]: { id: B, data: "B" }, + [BWins]: { id: BWins, data: "B wins" }, + }, + stackOrder: [B, BWins], + expectedReturn: { type: "function-resolved", data: undefined }, + disableImmediateExecution: true, + }, + + "request following A completion resolves": { + stack: { + [A]: { id: A, data: "A" }, + [B]: { id: B, data: "B" }, + [BWins]: { id: BWins, data: "B wins" }, + }, + stackOrder: [B, BWins, A], + expectedReturn: { type: "function-resolved", data: undefined }, + disableImmediateExecution: true, + }, + + "request if 'A' is complete reports 'B' and 'A wins' steps": { + stack: { [A]: { id: A, data: "A" } }, + expectedReturn: { + type: "steps-found", + steps: [ + expect.objectContaining({ + id: B, + name: "B", + op: StepOpCode.StepPlanned, + }), + expect.objectContaining({ + id: AWins, + name: "A wins", + op: StepOpCode.StepPlanned, + }), + ], + }, + disableImmediateExecution: true, + }, + }) + ); + + testFn( + "Deep Promise.race", + () => { + const A = jest.fn(() => Promise.resolve("A")); + const B = jest.fn(() => Promise.resolve("B")); + const B2 = jest.fn(() => Promise.resolve("B2")); + const AWins = jest.fn(() => Promise.resolve("A wins")); + const BWins = jest.fn(() => Promise.resolve("B wins")); + + const fn = inngest.createFunction( + { id: "name" }, + { event: "foo" }, + async ({ step: { run } }) => { + const winner = await Promise.race([ + run("A", A), + run("B", B).then(() => run("B2", B2)), + ]); + + if (winner === "A") { + await run("A wins", AWins); + } else if (winner === "B2") { + await run("B wins", BWins); + } + } + ); + + return { fn, steps: { A, B, B2, AWins, BWins } }; + }, + { + A: "A", + B: "B", + B2: "B2", + AWins: "A wins", + BWins: "B wins", + }, + ({ A, B, B2, BWins }) => ({ + "if B chain wins without 'A', reports 'A' and 'B wins' steps": { + stack: { [B]: { id: B, data: "B" }, [B2]: { id: B2, data: "B2" } }, + expectedReturn: { + type: "steps-found", + steps: [ + expect.objectContaining({ + id: A, + name: "A", + op: StepOpCode.StepPlanned, + }), + expect.objectContaining({ + id: BWins, + name: "B wins", + op: StepOpCode.StepPlanned, + }), + ], + }, + disableImmediateExecution: true, + }, + "if B chain wins after with 'A' afterwards, reports 'B wins' step": { + stack: { + [B]: { id: B, data: "B" }, + [B2]: { id: B2, data: "B2" }, + [A]: { id: A, data: "A" }, + }, + stackOrder: [B, B2, A], + expectedReturn: { + type: "steps-found", + steps: [ + expect.objectContaining({ + id: BWins, + name: "B wins", + op: StepOpCode.StepPlanned, + }), + ], + }, + disableImmediateExecution: true, + }, + }) + ); + testFn( "silently handle step error", () => { diff --git a/packages/inngest/src/components/InngestStepTools.test.ts b/packages/inngest/src/components/InngestStepTools.test.ts index 3649921d0..1716f7022 100644 --- a/packages/inngest/src/components/InngestStepTools.test.ts +++ b/packages/inngest/src/components/InngestStepTools.test.ts @@ -36,6 +36,7 @@ const getStepTools = ({ fn, data: {}, stepState, + stepCompletionOrder: Object.keys(stepState), }); const tools = createStepTools(client, execution.state); diff --git a/packages/inngest/src/components/InngestStepTools.ts b/packages/inngest/src/components/InngestStepTools.ts index 58b548b52..805b8746a 100644 --- a/packages/inngest/src/components/InngestStepTools.ts +++ b/packages/inngest/src/components/InngestStepTools.ts @@ -3,7 +3,7 @@ import { sha1 } from "hash.js"; import { type Jsonify } from "type-fest"; import { ErrCode, prettyError } from "../helpers/errors"; import { - createFrozenPromise, + createDeferredPromise, resolveAfterPending, runAsPromise, } from "../helpers/promises"; @@ -28,6 +28,13 @@ import { type ExecutionState } from "./InngestExecution"; export interface FoundStep extends HashedOp { fn?: (...args: unknown[]) => unknown; fulfilled: boolean; + handled: boolean; + + /** + * Returns a boolean representing whether or not the step was handled on this + * invocation. + */ + handle: () => boolean; } /** @@ -49,20 +56,42 @@ export const createStepTools = < let foundStepsToReport: FoundStep[] = []; let foundStepsReportPromise: Promise | undefined; - const pushStepToReport = (step: FoundStep) => { - foundStepsToReport.push(step); + const reportNextTick = () => { + // Being explicit instead of using `??=` to appease TypeScript. + if (foundStepsReportPromise) { + return; + } + + foundStepsReportPromise = resolveAfterPending().then(() => { + foundStepsReportPromise = undefined; - if (!foundStepsReportPromise) { - foundStepsReportPromise = resolveAfterPending().then(() => { - const steps = [...foundStepsToReport] as [FoundStep, ...FoundStep[]]; + for (let i = 0; i < state.stepCompletionOrder.length; i++) { + const handled = foundStepsToReport + .find((step) => { + return step.id === state.stepCompletionOrder[i]; + }) + ?.handle(); + + if (handled) { + return void reportNextTick(); + } + } - // Reset - foundStepsToReport = []; - foundStepsReportPromise = undefined; + // If we've handled no steps in this "tick," roll up everything we've + // found and report it. + const steps = [...foundStepsToReport] as [FoundStep, ...FoundStep[]]; + foundStepsToReport = []; - void state.setCheckpoint({ type: "steps-found", steps }); + return void state.setCheckpoint({ + type: "steps-found", + steps: steps, }); - } + }); + }; + + const pushStepToReport = (step: FoundStep) => { + foundStepsToReport.push(step); + reportNextTick(); }; /** @@ -168,36 +197,51 @@ export const createStepTools = < throw new Error("TODO: Step already exists?"); } - const step = (state.steps[opId.id] = { + const { promise, resolve, reject } = createDeferredPromise(); + const stepState = state.stepState[opId.id]; + if (stepState) { + stepState.seen = true; + } + + const step: FoundStep = (state.steps[opId.id] = { ...opId, fn: opts?.fn ? () => opts.fn?.(...args) : undefined, - fulfilled: Boolean(state.stepState[opId.id]), - }); - state.hasSteps = true; + fulfilled: Boolean(stepState), + handled: !stepState, + handle: () => { + if (step.handled) { + return false; + } - pushStepToReport(step); + step.handled = true; - const stepState = state.stepState[opId.id]; + if (stepState) { + stepState.fulfilled = true; - if (stepState) { - stepState.fulfilled = true; - /** - * If this is the last piece of state we had, we've now finished - * memoizing. - */ - if (state.allStateUsed()) { - await state.hooks?.afterMemoization?.(); - await state.hooks?.beforeExecution?.(); - } + if (typeof stepState.data !== "undefined") { + resolve(stepState.data); + } else { + reject(stepState.error); + } + } - if (typeof stepState?.data !== "undefined") { - return Promise.resolve(stepState?.data); - } else { - return Promise.reject(stepState?.error); - } + return true; + }, + }); + + /** + * If this is the last piece of state we had, we've now finished + * memoizing. + */ + if (state.allStateUsed()) { + await state.hooks?.afterMemoization?.(); + await state.hooks?.beforeExecution?.(); } - return createFrozenPromise(); + state.hasSteps = true; + pushStepToReport(step); + + return promise; }) as T; }; diff --git a/packages/inngest/src/helpers/promises.ts b/packages/inngest/src/helpers/promises.ts index 127011e7e..46dd46d90 100644 --- a/packages/inngest/src/helpers/promises.ts +++ b/packages/inngest/src/helpers/promises.ts @@ -65,27 +65,35 @@ export const resolveAfterPending = (): Promise => { type DeferredPromiseReturn = { promise: Promise; resolve: (value: T) => DeferredPromiseReturn; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + reject: (reason: any) => DeferredPromiseReturn; }; /** - * Creates and returns Promise that can be resolved with the returned resolve - * function. + * Creates and returns Promise that can be resolved or rejected with the + * returned `resolve` and `reject` functions. * - * Resolving the function will return a new set of Promise and resolve function. - * These can be ignored if the original Promise is all that's needed. + * Resolving or rejecting the function will return a new set of Promise control + * functions. These can be ignored if the original Promise is all that's needed. */ export const createDeferredPromise = (): DeferredPromiseReturn => { - let resolve: (value: T) => DeferredPromiseReturn; + let resolve: DeferredPromiseReturn["resolve"]; + let reject: DeferredPromiseReturn["reject"]; - const promise = new Promise((_resolve) => { + const promise = new Promise((_resolve, _reject) => { resolve = (value: T) => { _resolve(value); return createDeferredPromise(); }; + + reject = (reason) => { + _reject(reason); + return createDeferredPromise(); + }; }); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - return { promise, resolve: resolve! }; + return { promise, resolve: resolve!, reject: reject! }; }; interface TimeoutPromise extends Promise {