Skip to content

Commit

Permalink
Added proper version variance handling to spin up multiple engines
Browse files Browse the repository at this point in the history
  • Loading branch information
jpwilliams committed Sep 26, 2023
1 parent bc5bb14 commit ead2856
Show file tree
Hide file tree
Showing 12 changed files with 225 additions and 119 deletions.
26 changes: 20 additions & 6 deletions packages/inngest/src/components/InngestCommHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import {
skipDevServer,
} from "../helpers/env";
import { rethrowError, serializeError } from "../helpers/errors";
import { parseFnData } from "../helpers/functions";
import { fetchAllFnData, parseFnData } from "../helpers/functions";
import { runAsPromise } from "../helpers/promises";
import { createStream } from "../helpers/stream";
import { hashSigningKey, stringify } from "../helpers/strings";
import { type MaybePromise } from "../helpers/types";
import {
logLevels,
type EventPayload,
type FunctionConfig,
type IntrospectRequest,
type LogLevel,
Expand All @@ -41,6 +42,7 @@ import {
type InngestFunction,
} from "./InngestFunction";
import {
ExecutionVersion,
type ExecutionResult,
type ExecutionResultHandler,
type ExecutionResultHandlers,
Expand Down Expand Up @@ -505,6 +507,7 @@ export class InngestCommHandler<
client: this.client,
extras: {
"Server-Timing": timer.getHeader(),
[headerKeys.RequestVersion]: ExecutionVersion.V1.toString(),
},
});

Expand Down Expand Up @@ -774,11 +777,14 @@ export class InngestCommHandler<
throw new Error(`Could not find function with ID "${functionId}"`);
}

const fndata = await parseFnData(data, this.client["inngestApi"]);
const fndata = await fetchAllFnData(
parseFnData(data),
this.client["inngestApi"]
);
if (!fndata.ok) {
throw new Error(fndata.error);
}
const { event, events, steps, ctx } = fndata.value;
const { event, events, steps, ctx, version } = fndata.value;

const stepState = Object.entries(steps ?? {}).reduce<
InngestExecutionOptions["stepState"]
Expand All @@ -790,14 +796,22 @@ export class InngestCommHandler<
};
}, {});

const execution = fn.fn["createExecution"]({
const execution = fn.fn["createExecution"](version, {
runId: ctx?.run_id || "",
data: { event, events, runId: ctx?.run_id, attempt: ctx?.attempt },
data: {
event: event as EventPayload,
events: events as [EventPayload, ...EventPayload[]],
runId: ctx?.run_id || "",
attempt: ctx?.attempt ?? 0,
},
stepState,
requestedRunStep: stepId === "step" ? undefined : stepId || undefined,
timer,
isFailureHandler: fn.onFailure,
disableImmediateExecution: fndata.value.ctx?.disable_immediate_execution,
disableImmediateExecution:
fndata.value.version === 1
? fndata.value.ctx?.disable_immediate_execution
: undefined,
stepCompletionOrder: ctx?.stack?.stack ?? [],
});

Expand Down
18 changes: 12 additions & 6 deletions packages/inngest/src/components/InngestFunction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { InngestFunction } from "@local/components/InngestFunction";
import { STEP_INDEXING_SUFFIX } from "@local/components/InngestStepTools";
import { NonRetriableError } from "@local/components/NonRetriableError";
import {
ExecutionVersion,
type ExecutionResult,
type ExecutionResults,
type InngestExecutionOptions,
Expand All @@ -34,6 +35,7 @@ import {
type FailureEventPayload,
type OutgoingOp,
} from "@local/types";
import { fromPartial } from "@total-typescript/shoehorn";
import { type IsEqual } from "type-fest";
import { assertType } from "type-plus";
import { createClient } from "../test/helpers";
Expand Down Expand Up @@ -144,8 +146,10 @@ describe("runFn", () => {
flowFn
);

const execution = fn["createExecution"]({
data: { event: { name: "foo", data: { foo: "foo" } } },
const execution = fn["createExecution"](ExecutionVersion.V1, {
data: fromPartial({
event: { name: "foo", data: { foo: "foo" } },
}),
runId: "run",
stepState: {},
stepCompletionOrder: [],
Expand Down Expand Up @@ -183,8 +187,10 @@ describe("runFn", () => {
});

test("wrap thrown error", async () => {
const execution = fn["createExecution"]({
data: { event: { name: "foo", data: { foo: "foo" } } },
const execution = fn["createExecution"](ExecutionVersion.V1, {
data: fromPartial({
event: { name: "foo", data: { foo: "foo" } },
}),
stepState: {},
runId: "run",
stepCompletionOrder: [],
Expand Down Expand Up @@ -216,8 +222,8 @@ describe("runFn", () => {
disableImmediateExecution?: boolean;
}
) => {
const execution = fn["createExecution"]({
data: { event: opts?.event || { name: "foo", data: {} } },
const execution = fn["createExecution"](ExecutionVersion.V1, {
data: fromPartial({ event: opts?.event || { name: "foo", data: {} } }),
runId: "run",
stepState,
stepCompletionOrder: opts?.stackOrder ?? Object.keys(stepState),
Expand Down
18 changes: 14 additions & 4 deletions packages/inngest/src/components/InngestFunction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import {
import { type EventsFromOpts, type Inngest } from "./Inngest";
import { type MiddlewareRegisterReturn } from "./InngestMiddleware";
import {
ExecutionVersion,
type IInngestExecution,
type InngestExecutionOptions,
} from "./execution/InngestExecution";
import { createV0InngestExecution } from "./execution/v0";
import { createV1InngestExecution } from "./execution/v1";

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -193,12 +195,20 @@ export class InngestFunction<
}

private createExecution(
options: Omit<InngestExecutionOptions, "client" | "fn">
version: ExecutionVersion,
partialOptions: Omit<InngestExecutionOptions, "client" | "fn">
): IInngestExecution {
return createV1InngestExecution({
const options: InngestExecutionOptions = {
client: this.#client,
fn: this,
...options,
});
...partialOptions,
};

const versionHandlers = {
[ExecutionVersion.V1]: () => createV1InngestExecution(options),
[ExecutionVersion.V0]: () => createV0InngestExecution(options),
} satisfies Record<ExecutionVersion, () => IInngestExecution>;

return versionHandlers[version]();
}
}
8 changes: 1 addition & 7 deletions packages/inngest/src/components/InngestStepTools.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { createClient } from "../test/helpers";
const getStepTools = () => {
const step = createStepTools(
createClient({ id: "test" }),
{},
({ args, matchOp }) => {
const stepOptions = getStepOptions(args[0]);
return Promise.resolve(matchOp(stepOptions, ...args.slice(1)));
Expand Down Expand Up @@ -422,12 +421,7 @@ describe("sendEvent", () => {
});

const sendEvent: ReturnType<
typeof createStepTools<
typeof opts,
EventsFromOpts<typeof opts>,
"foo",
{}
>
typeof createStepTools<typeof opts, EventsFromOpts<typeof opts>, "foo">
// eslint-disable-next-line @typescript-eslint/no-explicit-any
>["sendEvent"] = (() => undefined) as any;

Expand Down
4 changes: 1 addition & 3 deletions packages/inngest/src/components/InngestStepTools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,9 @@ export const STEP_INDEXING_SUFFIX = ":";
export const createStepTools = <
TOpts extends ClientOptions,
Events extends EventsFromOpts<TOpts>,
TriggeringEvent extends keyof Events & string,
TState
TriggeringEvent extends keyof Events & string
>(
client: Inngest<TOpts>,
state: TState,
stepHandler: StepHandler
) => {
/**
Expand Down
9 changes: 7 additions & 2 deletions packages/inngest/src/components/execution/InngestExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Debug, { type Debugger } from "debug";
import { type Simplify } from "type-fest";
import { type MaybePromise } from "type-plus";
import { type ServerTiming } from "../../helpers/ServerTiming";
import { type IncomingOp, type OutgoingOp } from "../../types";
import { type AnyContext, type IncomingOp, type OutgoingOp } from "../../types";
import { type AnyInngest } from "../Inngest";
import { type ActionResponse } from "../InngestCommHandler";
import { type AnyInngestFunction } from "../InngestFunction";
Expand Down Expand Up @@ -35,14 +35,19 @@ export interface MemoizedOp extends IncomingOp {
seen?: boolean;
}

export enum ExecutionVersion {
V0 = 0,
V1 = 1,
}

/**
* Options for creating a new {@link InngestExecution} instance.
*/
export interface InngestExecutionOptions {
client: AnyInngest;
fn: AnyInngestFunction;
runId: string;
data: unknown;
data: Omit<AnyContext, "step">;
stepState: Record<string, MemoizedOp>;
stepCompletionOrder: string[];
requestedRunStep?: string;
Expand Down
2 changes: 1 addition & 1 deletion packages/inngest/src/components/execution/v0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ export class V0InngestExecution
});
};

const step = createStepTools(this.options.client, this.#state, stepHandler);
const step = createStepTools(this.options.client, stepHandler);

const fnArg = {
...(this.options.data as { event: EventPayload }),
Expand Down
9 changes: 2 additions & 7 deletions packages/inngest/src/components/execution/v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,12 +517,7 @@ class V1InngestExecution extends InngestExecution implements IInngestExecution {
}

#createStepTools(): ReturnType<
typeof createStepTools<
ClientOptions,
Record<string, EventPayload>,
string,
V1ExecutionState
>
typeof createStepTools<ClientOptions, Record<string, EventPayload>, string>
> {
/**
* A list of steps that have been found and are being rolled up before being
Expand Down Expand Up @@ -758,7 +753,7 @@ class V1InngestExecution extends InngestExecution implements IInngestExecution {
return promise;
};

return createStepTools(this.options.client, this.#state, stepHandler);
return createStepTools(this.options.client, stepHandler);
}

#getUserFnToRun(): AnyHandler {
Expand Down
1 change: 0 additions & 1 deletion packages/inngest/src/helpers/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ export const inngestHeaders = (opts?: {
"Content-Type": "application/json",
"User-Agent": sdkVersion,
[headerKeys.SdkVersion]: sdkVersion,
[headerKeys.RequestVersion]: "1",
};

if (opts?.framework) {
Expand Down
36 changes: 26 additions & 10 deletions packages/inngest/src/helpers/functions.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { InngestApi } from "@local/api/api";
import { parseFnData } from "@local/helpers/functions";
import { parseFnData, type FnData } from "@local/helpers/functions";
import { type EventPayload } from "@local/types";

const randomstr = (): string => {
Expand All @@ -16,17 +15,23 @@ const generateEvent = (): EventPayload => {
};

describe("#parseFnData", () => {
const API = new InngestApi({ signingKey: "something" });

[
const specs: {
name: string;
data: Extract<FnData, { version: 1 }>;
isOk: boolean;
}[] = [
{
name: "should parse successfully for valid data",
data: {
version: 1,
event: generateEvent(),
events: [...Array(5).keys()].map(() => generateEvent()),
steps: {},
ctx: {
run_id: randomstr(),
attempt: 0,
disable_immediate_execution: false,
use_api: false,
stack: {
stack: [randomstr()],
current: 0,
Expand All @@ -37,13 +42,18 @@ describe("#parseFnData", () => {
},
{
name: "should return an error for missing event",
// @ts-expect-error No `event`
data: {
version: 1,
events: [...Array(5).keys()].map(() => generateEvent()),
steps: {},
ctx: {
run_id: randomstr(),
attempt: 0,
disable_immediate_execution: false,
use_api: false,
stack: {
stack: [],
stack: [randomstr()],
current: 0,
},
},
Expand All @@ -52,13 +62,19 @@ describe("#parseFnData", () => {
},
{
name: "should return an error with empty object",
// @ts-expect-error No data at all
data: {},
isOk: false,
},
].forEach((test) => {
it(test.name, async () => {
const result = await parseFnData(test.data, API);
expect(result.ok).toEqual(test.isOk);
];

specs.forEach((test) => {
it(test.name, () => {
if (test.isOk) {
return expect(() => parseFnData(test.data)).not.toThrow();
} else {
return expect(() => parseFnData(test.data)).toThrow();
}
});
});
});
Loading

0 comments on commit ead2856

Please sign in to comment.