Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for step-based Promise.race() #315

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/inngest/src/components/InngestCommHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ export class InngestCommHandler<
timer,
isFailureHandler: fn.onFailure,
disableImmediateExecution: fndata.value.disable_immediate_execution,
stepCompletionOrder: ctx?.stack?.stack ?? [],
});

return execution.start();
Expand Down
38 changes: 32 additions & 6 deletions packages/inngest/src/components/InngestExecution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export interface InngestExecutionOptions {
fn: AnyInngestFunction;
data: unknown;
stepState: Record<string, MemoizedOp>;
stepCompletionOrder: string[];
requestedRunStep?: string;
timer?: ServerTiming;
isFailureHandler?: boolean;
Expand Down Expand Up @@ -90,7 +91,7 @@ export class InngestExecution {
this.options = options;

this.#userFnToRun = this.#getUserFnToRun();
this.state = this.#createExecutionState(this.options.stepState);
this.state = this.#createExecutionState(this.options);
this.fnArg = this.#createFnArg(this.state);
this.checkpointHandlers = this.#createCheckpointHandlers();
this.#initializeTimer(this.state);
Expand Down Expand Up @@ -185,6 +186,20 @@ export class InngestExecution {
* them back to Inngest.
*/
"steps-found": async ({ steps }) => {
/**
* Iterate over state stack first, resolving promises in order to support
* racing.
*
* Then, find all remaining steps and handle them too.
*
* If we "handle" any steps in this pass, we must wait for the next
* checkpoint before taking any action. This is because we could be
* seeing a mix of previously-reported steps and new steps, which we
* can't differentiate.
*
* Because of this, we must also roll up any steps not necessarily found
* in only this checkpoint.
*/
const stepResult = await this.#tryExecuteStep(steps);
if (stepResult) {
const transformResult = await this.#transformOutput(stepResult);
Expand All @@ -200,7 +215,9 @@ export class InngestExecution {
return transformResult;
}

const newSteps = await this.#filterNewSteps(steps);
const newSteps = await this.#filterNewSteps(
Object.values(this.state.steps)
);
if (newSteps) {
return {
type: "steps-found",
Expand Down Expand Up @@ -460,9 +477,10 @@ export class InngestExecution {
return { type: "function-resolved", data };
}

#createExecutionState(
stepState: InngestExecutionOptions["stepState"]
): ExecutionState {
#createExecutionState({
stepState,
stepCompletionOrder,
}: InngestExecutionOptions): ExecutionState {
let { promise: checkpointPromise, resolve: checkpointResolve } =
createDeferredPromise<Checkpoint>();

Expand All @@ -485,13 +503,14 @@ export class InngestExecution {
steps: {},
loop,
hasSteps: Boolean(Object.keys(stepState).length),
stepCompletionOrder,
setCheckpoint: (checkpoint: Checkpoint) => {
({ promise: checkpointPromise, resolve: checkpointResolve } =
checkpointResolve(checkpoint));
},
allStateUsed: () => {
return Object.values(state.stepState).every((step) => {
return step.fulfilled;
return step.seen;
});
},
};
Expand Down Expand Up @@ -626,6 +645,7 @@ export type ExecutionResultHandlers<T = ActionResponse> = {

export interface MemoizedOp extends IncomingOp {
fulfilled?: boolean;
seen?: boolean;
}

export interface ExecutionState {
Expand Down Expand Up @@ -680,4 +700,10 @@ export interface ExecutionState {
* fulfill found steps.
*/
allStateUsed: () => boolean;

/**
* An ordered list of step IDs that represents the order in which their
* execution was completed.
*/
stepCompletionOrder: string[];
}
225 changes: 225 additions & 0 deletions packages/inngest/src/components/InngestFunction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ describe("runFn", () => {
const execution = fn["createExecution"]({
data: { event: { name: "foo", data: { foo: "foo" } } },
stepState: {},
stepCompletionOrder: [],
});

ret = await execution.start();
Expand Down Expand Up @@ -146,6 +147,7 @@ describe("runFn", () => {
const execution = fn["createExecution"]({
data: { event: { name: "foo", data: { foo: "foo" } } },
stepState: {},
stepCompletionOrder: [],
});

const ret = await execution.start();
Expand All @@ -170,12 +172,14 @@ describe("runFn", () => {
runStep?: string;
onFailure?: boolean;
event?: EventPayload;
stackOrder?: InngestExecutionOptions["stepCompletionOrder"];
disableImmediateExecution?: boolean;
}
) => {
const execution = fn["createExecution"]({
data: { event: opts?.event || { name: "foo", data: {} } },
stepState,
stepCompletionOrder: opts?.stackOrder ?? Object.keys(stepState),
isFailureHandler: Boolean(opts?.onFailure),
requestedRunStep: opts?.runStep,
timer,
Expand Down Expand Up @@ -210,6 +214,7 @@ describe("runFn", () => {
string,
{
stack?: InngestExecutionOptions["stepState"];
stackOrder?: InngestExecutionOptions["stepCompletionOrder"];
onFailure?: boolean;
runStep?: string;
expectedReturn?: Awaited<ReturnType<typeof runFnWithStack>>;
Expand Down Expand Up @@ -246,6 +251,7 @@ describe("runFn", () => {

beforeAll(async () => {
ret = await runFnWithStack(tools.fn, t.stack || {}, {
stackOrder: t.stackOrder,
runStep: t.runStep,
onFailure: t.onFailure || tools.onFailure,
event: t.event || tools.event,
Expand Down Expand Up @@ -625,6 +631,225 @@ describe("runFn", () => {
})
);

testFn(
"Promise.race",
() => {
const A = jest.fn(() => Promise.resolve("A"));
const B = jest.fn(() => Promise.resolve("B"));
const AWins = jest.fn(() => Promise.resolve("A wins"));
const BWins = jest.fn(() => Promise.resolve("B wins"));

const fn = inngest.createFunction(
{ id: "name" },
{ event: "foo" },
async ({ step: { run } }) => {
const winner = await Promise.race([run("A", A), run("B", B)]);

if (winner === "A") {
await run("A wins", AWins);
} else if (winner === "B") {
await run("B wins", BWins);
}
}
);

return { fn, steps: { A, B, AWins, BWins } };
},
{
A: "A",
B: "B",
AWins: "A wins",
BWins: "B wins",
},
({ A, B, AWins, BWins }) => ({
"first run reports A and B steps": {
expectedReturn: {
type: "steps-found",
steps: [
expect.objectContaining({
id: A,
name: "A",
op: StepOpCode.StepPlanned,
}),
expect.objectContaining({
id: B,
name: "B",
op: StepOpCode.StepPlanned,
}),
],
},
},

"requesting to run B runs B": {
runStep: B,
expectedReturn: {
type: "step-ran",
step: expect.objectContaining({
id: B,
name: "B",
op: StepOpCode.RunStep,
data: "B",
}),
},
expectedStepsRun: ["B"],
disableImmediateExecution: true,
},

"request following B reports 'A' and 'B wins' steps": {
stack: { [B]: { id: B, data: "B" } },
expectedReturn: {
type: "steps-found",
steps: [
expect.objectContaining({
id: A,
name: "A",
op: StepOpCode.StepPlanned,
}),
expect.objectContaining({
id: BWins,
name: "B wins",
op: StepOpCode.StepPlanned,
}),
],
},
disableImmediateExecution: true,
},

"requesting to run A runs A": {
runStep: A,
expectedReturn: {
type: "step-ran",
step: expect.objectContaining({
id: A,
name: "A",
op: StepOpCode.RunStep,
data: "A",
}),
},
expectedStepsRun: ["A"],
disableImmediateExecution: true,
},

"request following 'B wins' resolves": {
stack: {
[B]: { id: B, data: "B" },
[BWins]: { id: BWins, data: "B wins" },
},
stackOrder: [B, BWins],
expectedReturn: { type: "function-resolved", data: undefined },
disableImmediateExecution: true,
},

"request following A completion resolves": {
stack: {
[A]: { id: A, data: "A" },
[B]: { id: B, data: "B" },
[BWins]: { id: BWins, data: "B wins" },
},
stackOrder: [B, BWins, A],
expectedReturn: { type: "function-resolved", data: undefined },
disableImmediateExecution: true,
},

"request if 'A' is complete reports 'B' and 'A wins' steps": {
stack: { [A]: { id: A, data: "A" } },
expectedReturn: {
type: "steps-found",
steps: [
expect.objectContaining({
id: B,
name: "B",
op: StepOpCode.StepPlanned,
}),
expect.objectContaining({
id: AWins,
name: "A wins",
op: StepOpCode.StepPlanned,
}),
],
},
disableImmediateExecution: true,
},
})
);

testFn(
"Deep Promise.race",
() => {
const A = jest.fn(() => Promise.resolve("A"));
const B = jest.fn(() => Promise.resolve("B"));
const B2 = jest.fn(() => Promise.resolve("B2"));
const AWins = jest.fn(() => Promise.resolve("A wins"));
const BWins = jest.fn(() => Promise.resolve("B wins"));

const fn = inngest.createFunction(
{ id: "name" },
{ event: "foo" },
async ({ step: { run } }) => {
const winner = await Promise.race([
run("A", A),
run("B", B).then(() => run("B2", B2)),
]);

if (winner === "A") {
await run("A wins", AWins);
} else if (winner === "B2") {
await run("B wins", BWins);
}
}
);

return { fn, steps: { A, B, B2, AWins, BWins } };
},
{
A: "A",
B: "B",
B2: "B2",
AWins: "A wins",
BWins: "B wins",
},
({ A, B, B2, BWins }) => ({
"if B chain wins without 'A', reports 'A' and 'B wins' steps": {
stack: { [B]: { id: B, data: "B" }, [B2]: { id: B2, data: "B2" } },
expectedReturn: {
type: "steps-found",
steps: [
expect.objectContaining({
id: A,
name: "A",
op: StepOpCode.StepPlanned,
}),
expect.objectContaining({
id: BWins,
name: "B wins",
op: StepOpCode.StepPlanned,
}),
],
},
disableImmediateExecution: true,
},
"if B chain wins after with 'A' afterwards, reports 'B wins' step": {
stack: {
[B]: { id: B, data: "B" },
[B2]: { id: B2, data: "B2" },
[A]: { id: A, data: "A" },
},
stackOrder: [B, B2, A],
expectedReturn: {
type: "steps-found",
steps: [
expect.objectContaining({
id: BWins,
name: "B wins",
op: StepOpCode.StepPlanned,
}),
],
},
disableImmediateExecution: true,
},
})
);

testFn(
"silently handle step error",
() => {
Expand Down
1 change: 1 addition & 0 deletions packages/inngest/src/components/InngestStepTools.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const getStepTools = ({
fn,
data: {},
stepState,
stepCompletionOrder: Object.keys(stepState),
});

const tools = createStepTools(client, execution.state);
Expand Down
Loading