Skip to content

Commit

Permalink
Fix probe response sig with streaming (#707)
Browse files Browse the repository at this point in the history
## Summary
Fix no response signature header when streaming is enabled.

## Checklist
- [x] Added changesets if applicable

---------

Co-authored-by: Jack Williams <1736957+jpwilliams@users.noreply.github.com>
  • Loading branch information
amh4r and jpwilliams authored Sep 12, 2024
1 parent 0e51903 commit 8c4b9ce
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 49 deletions.
5 changes: 5 additions & 0 deletions .changeset/tame-lamps-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"inngest": patch
---

Fix probe response sig with streaming
136 changes: 87 additions & 49 deletions packages/inngest/src/components/InngestCommHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,17 @@ export class InngestCommHandler<
* or not. Takes into account the user's preference and the platform's
* capabilities.
*/
private shouldStream(actions: HandlerResponseWithErrors): boolean {
private async shouldStream(
actions: HandlerResponseWithErrors
): Promise<boolean> {
const rawProbe = await actions.queryStringWithDefaults(
"testing for probe",
queryKeys.Probe
);
if (rawProbe !== undefined) {
return false;
}

// We must be able to stream responses to continue.
if (!actions.transformStreamingResponse) {
return false;
Expand Down Expand Up @@ -626,35 +636,55 @@ export class InngestCommHandler<
* This helps us provide high quality errors about what's going wrong for
* each access without having to wrap every access in a try/catch.
*/
const actions: HandlerResponseWithErrors = Object.entries(
rawActions
).reduce((acc, [key, value]) => {
if (typeof value !== "function") {
return acc;
}
const promisifiedActions: ActionHandlerResponseWithErrors =
Object.entries(rawActions).reduce((acc, [key, value]) => {
if (typeof value !== "function") {
return acc;
}

return {
...acc,
[key]: (reason: string, ...args: unknown[]) => {
const errMessage = [
`Failed calling \`${key}\` from serve handler`,
reason,
]
.filter(Boolean)
.join(" when ");

const fn = () =>
(value as (...args: unknown[]) => unknown)(...args);

return runAsPromise(fn)
.catch(rethrowError(errMessage))
.catch((err) => {
this.log("error", err);
throw err;
});
},
};
}, {} as HandlerResponseWithErrors);
return {
...acc,
[key]: (reason: string, ...args: unknown[]) => {
const errMessage = [
`Failed calling \`${key}\` from serve handler`,
reason,
]
.filter(Boolean)
.join(" when ");

const fn = () =>
(value as (...args: unknown[]) => unknown)(...args);

return runAsPromise(fn)
.catch(rethrowError(errMessage))
.catch((err) => {
this.log("error", err);
throw err;
});
},
};
}, {} as ActionHandlerResponseWithErrors);

/**
* Mapped promisified handlers from userland `serve()` function mixed in
* with some helpers.
*/
const actions: HandlerResponseWithErrors = {
...promisifiedActions,
queryStringWithDefaults: async (
reason: string,
key: string
): Promise<string | undefined> => {
const url = await actions.url(reason);

const ret =
(await actions.queryString?.(reason, key, url)) ||
url.searchParams.get(key) ||
undefined;

return ret;
},
};

const [env, expectedServerKind] = await Promise.all([
actions.env?.("starting to handle request"),
Expand Down Expand Up @@ -809,7 +839,7 @@ export class InngestCommHandler<
};
};

if (this.shouldStream(actions)) {
if (await this.shouldStream(actions)) {
const method = await actions.method("starting streaming response");

if (method === "POST") {
Expand Down Expand Up @@ -916,18 +946,6 @@ export class InngestCommHandler<
try {
const url = await actions.url("starting to handle request");

const getQuerystring = async (
reason: string,
key: string
): Promise<string | undefined> => {
const ret =
(await actions.queryString?.(reason, key, url)) ||
url.searchParams.get(key) ||
undefined;

return ret;
};

if (method === "POST") {
const validationResult = await signatureValidation;
if (!validationResult.success) {
Expand All @@ -941,7 +959,7 @@ export class InngestCommHandler<
};
}

const rawProbe = await getQuerystring(
const rawProbe = await actions.queryStringWithDefaults(
"testing for probe",
queryKeys.Probe
);
Expand Down Expand Up @@ -980,7 +998,7 @@ export class InngestCommHandler<
return probeActions[probe]();
}

const fnId = await getQuerystring(
const fnId = await actions.queryStringWithDefaults(
"processing run request",
queryKeys.FnId
);
Expand All @@ -990,8 +1008,10 @@ export class InngestCommHandler<
}

const stepId =
(await getQuerystring("processing run request", queryKeys.StepId)) ||
null;
(await actions.queryStringWithDefaults(
"processing run request",
queryKeys.StepId
)) || null;

const { version, result } = this.runStep({
functionId: fnId,
Expand Down Expand Up @@ -1148,7 +1168,7 @@ export class InngestCommHandler<
event_key_hash: this.hashedEventKey ?? null,
extra: {
...introspection.extra,
is_streaming: this.shouldStream(actions),
is_streaming: await this.shouldStream(actions),
},
framework: this.frameworkName,
sdk_language: "js",
Expand Down Expand Up @@ -1179,7 +1199,7 @@ export class InngestCommHandler<
}

if (method === "PUT") {
let deployId = await getQuerystring(
let deployId = await actions.queryStringWithDefaults(
"processing deployment request",
queryKeys.DeployId
);
Expand Down Expand Up @@ -1872,7 +1892,7 @@ export interface ActionResponse<
* This enables us to provide accurate errors for each access without having to
* wrap every access in a try/catch.
*/
export type HandlerResponseWithErrors = {
export type ActionHandlerResponseWithErrors = {
[K in keyof HandlerResponse]: NonNullable<HandlerResponse[K]> extends (
...args: infer Args
) => infer R
Expand All @@ -1881,3 +1901,21 @@ export type HandlerResponseWithErrors = {
: (errMessage: string, ...args: Args) => Promise<R>
: HandlerResponse[K];
};

/**
* A version of {@link ActionHandlerResponseWithErrors} that includes helper
* functions that provide sensible defaults on top of the direct access given
* from the bare response.
*/
export interface HandlerResponseWithErrors
extends ActionHandlerResponseWithErrors {
/**
* Fetch a query string value from the request. If no `querystring` action
* has been provided by the `serve()` handler, this will fall back to using
* the provided URL to fetch the query string instead.
*/
queryStringWithDefaults: (
reason: string,
key: string
) => Promise<string | undefined>;
}

0 comments on commit 8c4b9ce

Please sign in to comment.