Skip to content

Commit

Permalink
feat: handle stream and Uint8Array for lambda presets (#1624)
Browse files Browse the repository at this point in the history
* feat: handle stream and Uint8Array for lambda presets

* skip stream test for bun for now
  • Loading branch information
pi0 authored Aug 23, 2023
1 parent 8c7ca39 commit 84ac362
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/runtime/entries/aws-lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ export async function handler(
cookies,
statusCode: r.status,
headers: normalizeLambdaOutgoingHeaders(r.headers, true),
body: normalizeLambdaOutgoingBody(r.body, r.headers),
body: await normalizeLambdaOutgoingBody(r.body, r.headers),
};
}

return {
statusCode: r.status,
headers: normalizeLambdaOutgoingHeaders(r.headers),
body: normalizeLambdaOutgoingBody(r.body, r.headers),
body: await normalizeLambdaOutgoingBody(r.body, r.headers),
};
}
2 changes: 1 addition & 1 deletion src/runtime/entries/netlify-lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export async function lambda(
return {
statusCode: r.status,
headers: normalizeLambdaOutgoingHeaders(r.headers, true),
body: normalizeLambdaOutgoingBody(r.body, r.headers),
body: await normalizeLambdaOutgoingBody(r.body, r.headers),
multiValueHeaders: {
...(cookies.length > 0 ? { "set-cookie": cookies } : {}),
},
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/entries/stormkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const handler: Handler<StormkitEvent, StormkitResponse> =
body: event.body,
});

const normalizedBody = normalizeLambdaOutgoingBody(
const normalizedBody = await normalizeLambdaOutgoingBody(
response.body,
response.headers
);
Expand Down
45 changes: 42 additions & 3 deletions src/runtime/utils.lambda.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { Readable } from "node:stream";
import type { APIGatewayProxyEventHeaders } from "aws-lambda";

export function normalizeLambdaIncomingHeaders(
Expand Down Expand Up @@ -27,16 +28,17 @@ export function normalizeLambdaOutgoingHeaders(
// AWS Lambda proxy integrations requires base64 encoded buffers
// binaryMediaTypes should be */*
// see https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-payload-encodings.html
export function normalizeLambdaOutgoingBody(
body: BodyInit,
export async function normalizeLambdaOutgoingBody(
body: BodyInit | ReadableStream | Buffer | Readable | Uint8Array,
headers: Record<string, number | string | string[] | undefined>
): string {
): Promise<string> {
if (typeof body === "string") {
return body;
}
if (!body) {
return "";
}
body = await _toBuffer(body as any);
if (Buffer.isBuffer(body)) {
const contentType = (headers["content-type"] as string) || "";
if (isTextType(contentType)) {
Expand All @@ -47,6 +49,43 @@ export function normalizeLambdaOutgoingBody(
throw new Error(`Unsupported body type: ${typeof body}`);
}

function _toBuffer(data: ReadableStream | Readable | Uint8Array) {
if ("pipeTo" in data && typeof data.pipeTo === "function") {
return new Promise<Buffer>((resolve, reject) => {
const chunks: Buffer[] = [];
data
.pipeTo(
new WritableStream({
write(chunk) {
chunks.push(chunk);
},
close() {
resolve(Buffer.concat(chunks));
},
abort(reason) {
reject(reason);
},
})
)
.catch(reject);
});
}
if ("pipe" in data && typeof data.pipe === "function") {
return new Promise<Buffer>((resolve, reject) => {
const chunks: Buffer[] = [];
data
.on("data", (chunk: any) => {
chunks.push(chunk);
})
.on("end", () => {
resolve(Buffer.concat(chunks));
})
.on("error", reject);
});
}
return Buffer.from(data as unknown as Uint16Array);
}

// -- Internal --

const TEXT_TYPE_RE = /^text\/|\/(json|xml)|utf-?8/;
Expand Down
12 changes: 12 additions & 0 deletions test/fixture/routes/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
export default eventHandler(() => {
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode("nitro"));
controller.enqueue(encoder.encode("is"));
controller.enqueue(encoder.encode("awesome"));
controller.close();
},
});
return stream;
});
1 change: 1 addition & 0 deletions test/presets/cloudflare-module.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ describe("nitro:preset:cloudflare-module", async () => {
modules: true,
scriptPath: resolve(ctx.outDir, "server/index.mjs"),
sitePath: resolve(ctx.outDir, "public"),
compatibilityFlags: ["streams_enable_constructors"],
globals: { __env__: {} },
bindings: {
...ctx.env,
Expand Down
1 change: 1 addition & 0 deletions test/presets/cloudflare-pages.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ describe("nitro:preset:cloudflare-pages", async () => {
modules: true,
scriptPath: resolve(ctx.outDir, "_worker.js"),
globals: { __env__: {} },
compatibilityFlags: ["streams_enable_constructors"],
bindings: {
...ctx.env,
ASSETS: {
Expand Down
1 change: 1 addition & 0 deletions test/presets/cloudflare.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ describe("nitro:preset:cloudflare", async () => {
const mf = new Miniflare({
scriptPath: resolve(ctx.outDir, "server/index.mjs"),
globals: { ...ctx.env },
compatibilityFlags: ["streams_enable_constructors"],
});
return async ({ url, headers, method, body }) => {
const res = await mf.dispatchFetch("http://localhost" + url, {
Expand Down
15 changes: 12 additions & 3 deletions test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ export function testNitro(
});

// aws lambda requires buffer responses to be base 64
const LambdaPresets = ["netlify", "aws-lambda"];
it.runIf(LambdaPresets.includes(ctx.preset))(
const LambdaPresets = new Set(["netlify", "aws-lambda"]);
it.runIf(LambdaPresets.has(ctx.preset))(
"buffer image responses",
async () => {
const { data } = await callHandler({ url: "/icon.png" });
Expand Down Expand Up @@ -409,7 +409,7 @@ export function testNitro(
additionalTests(ctx, callHandler);
}

it.skipIf(ctx.preset === "netlify" /* TODO */)("runtime proxy", async () => {
it("runtime proxy", async () => {
const { data } = await callHandler({
url: "/api/proxy?foo=bar",
headers: {
Expand All @@ -420,6 +420,15 @@ export function testNitro(
expect(data.url).toBe("/api/echo?foo=bar");
});

it.skipIf(ctx.preset === "bun" /* TODO */)("stream", async () => {
const { data } = await callHandler({
url: "/stream",
});
expect(data).toBe(
LambdaPresets.has(ctx.preset) ? btoa("nitroisawesome") : "nitroisawesome"
);
});

it("config", async () => {
const { data } = await callHandler({
url: "/config",
Expand Down

0 comments on commit 84ac362

Please sign in to comment.