Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ext/node): migrate back to using "Deno.serve" API for HTTP server #18865

Merged
merged 13 commits into from
Apr 27, 2023
148 changes: 62 additions & 86 deletions ext/node/polyfills/http.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
import { type Deferred, deferred } from "ext:deno_node/_util/async.ts";
import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts";
import { Buffer } from "ext:deno_node/buffer.ts";
import { ERR_SERVER_NOT_RUNNING } from "ext:deno_node/internal/errors.ts";
Expand All @@ -16,9 +17,8 @@ import { Agent } from "ext:deno_node/_http_agent.mjs";
import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts";
import { urlToHttpOptions } from "ext:deno_node/internal/url.ts";
import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts";
import { upgradeHttpRaw } from "ext:deno_http/00_serve.js";
import * as httpRuntime from "ext:runtime/40_http.js";
import { connResetException } from "ext:deno_node/internal/errors.ts";
import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js";

enum STATUS_CODES {
/** RFC 7231, 6.2.1 */
Expand Down Expand Up @@ -427,7 +427,7 @@ export class ServerResponse extends NodeWritable {
finished = false;
headersSent = false;
#firstChunk: Chunk | null = null;
#reqEvent?: Deno.RequestEvent;
#resolve: (value: Response | PromiseLike<Response>) => void;

static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) {
if (typeof chunk === "string") {
Expand All @@ -443,7 +443,7 @@ export class ServerResponse extends NodeWritable {
return status === 101 || status === 204 || status === 205 || status === 304;
}

constructor(reqEvent: undefined | Deno.RequestEvent) {
constructor(resolve: (value: Response | PromiseLike<Response>) => void) {
let controller: ReadableByteStreamController;
const readable = new ReadableStream({
start(c) {
Expand Down Expand Up @@ -485,7 +485,7 @@ export class ServerResponse extends NodeWritable {
},
});
this.#readable = readable;
this.#reqEvent = reqEvent;
this.#resolve = resolve;
}

setHeader(name: string, value: string) {
Expand Down Expand Up @@ -536,16 +536,13 @@ export class ServerResponse extends NodeWritable {
if (ServerResponse.#bodyShouldBeNull(this.statusCode!)) {
body = null;
}
this.#reqEvent!.respondWith(
this.#resolve(
new Response(body, {
headers: this.#headers,
status: this.statusCode,
statusText: this.statusMessage,
}),
).catch(() => {
// TODO(bartlomieju): this error should be handled somehow
// ignore this error
});
);
}

// deno-lint-ignore no-explicit-any
Expand Down Expand Up @@ -577,7 +574,7 @@ export class IncomingMessageForServer extends NodeReadable {
// These properties are used by `npm:forwarded` for example.
socket: { remoteAddress: string; remotePort: number };

constructor(req: Request, conn: Deno.Conn) {
constructor(req: Request, remoteAddr: { hostname: string; port: number }) {
// Check if no body (GET/HEAD/OPTIONS/...)
const reader = req.body?.getReader();
super({
Expand Down Expand Up @@ -605,8 +602,8 @@ export class IncomingMessageForServer extends NodeReadable {
this.url = req.url?.slice(req.url.indexOf("/", 8));
this.method = req.method;
this.socket = {
remoteAddress: conn.remoteAddr.hostname,
remotePort: conn.remoteAddr.port,
remoteAddress: remoteAddr.hostname,
remotePort: remoteAddr.port,
};
this.#req = req;
}
Expand Down Expand Up @@ -648,10 +645,17 @@ export function Server(handler?: ServerHandler): ServerImpl {
class ServerImpl extends EventEmitter {
#httpConnections: Set<Deno.HttpConn> = new Set();
#listener?: Deno.Listener;

#addr: Deno.NetAddr;
#hasClosed = false;
#ac?: AbortController;
#servePromise: Deferred<void>;
listening = false;

constructor(handler?: ServerHandler) {
super();
this.#servePromise = deferred();
this.#servePromise.then(() => this.emit("close"));
if (handler !== undefined) {
this.on("request", handler);
}
Expand All @@ -676,70 +680,52 @@ class ServerImpl extends EventEmitter {

// TODO(bnoordhuis) Node prefers [::] when host is omitted,
// we on the other hand default to 0.0.0.0.
const hostname = options.host ?? "0.0.0.0";
this.#addr = {
hostname,
port,
} as Deno.NetAddr;
this.listening = true;
const hostname = options.host ?? "";
this.#listener = Deno.listen({ port, hostname });
nextTick(() => this.#listenLoop());
nextTick(() => this.#serve());

return this;
}

async #listenLoop() {
const go = async (tcpConn: Deno.Conn, httpConn: Deno.HttpConn) => {
try {
for (;;) {
let reqEvent = null;
try {
// Note: httpConn.nextRequest() calls httpConn.close() on error.
reqEvent = await httpConn.nextRequest();
} catch {
// Connection closed.
// TODO(bnoordhuis) Emit "clientError" event on the http.Server
// instance? Node emits it when request parsing fails and expects
// the listener to send a raw 4xx HTTP response on the underlying
// net.Socket but we don't have one to pass to the listener.
}
if (reqEvent === null) {
break;
}
const req = new IncomingMessageForServer(reqEvent.request, tcpConn);
if (req.upgrade && this.listenerCount("upgrade") > 0) {
const conn = await upgradeHttpRaw(
reqEvent.request,
tcpConn,
) as Deno.Conn;
const socket = new Socket({
handle: new TCP(constants.SERVER, conn),
});
this.emit("upgrade", req, socket, Buffer.from([]));
return;
} else {
const res = new ServerResponse(reqEvent);
this.emit("request", req, res);
}
}
} finally {
this.#httpConnections.delete(httpConn);
#serve() {
const ac = new AbortController();
const handler = (request: Request, info: Deno.ServeHandlerInfo) => {
const req = new IncomingMessageForServer(request, info.remoteAddr);
if (req.upgrade && this.listenerCount("upgrade") > 0) {
const { conn, response } = upgradeHttpRaw(request);
const socket = new Socket({
handle: new TCP(constants.SERVER, conn),
});
this.emit("upgrade", req, socket, Buffer.from([]));
return response;
} else {
return new Promise<Response>((resolve): void => {
const res = new ServerResponse(resolve);
this.emit("request", req, res);
});
}
};

const listener = this.#listener;

if (listener !== undefined) {
this.emit("listening");

for await (const conn of listener) {
let httpConn: Deno.HttpConn;
try {
httpConn = httpRuntime.serveHttp(conn);
} catch {
continue; /// Connection closed.
}

this.#httpConnections.add(httpConn);
go(conn, httpConn);
}
if (this.#hasClosed) {
return;
}
this.#ac = ac;
serve(
{
handler: handler as Deno.ServeHandler,
...this.#addr,
signal: ac.signal,
// @ts-ignore Might be any without `--unstable` flag
onListen: ({ port }) => {
this.#addr!.port = port;
this.emit("listening");
},
},
).then(() => this.#servePromise!.resolve());
}

setTimeout() {
Expand All @@ -750,6 +736,7 @@ class ServerImpl extends EventEmitter {
const listening = this.listening;
this.listening = false;

this.#hasClosed = true;
if (typeof cb === "function") {
if (listening) {
this.once("close", cb);
Expand All @@ -760,31 +747,20 @@ class ServerImpl extends EventEmitter {
}
}

nextTick(() => this.emit("close"));

if (listening) {
this.#listener!.close();
this.#listener = undefined;

for (const httpConn of this.#httpConnections) {
try {
httpConn.close();
} catch {
// Already closed.
}
}

this.#httpConnections.clear();
if (listening && this.#ac) {
this.#ac.abort();
this.#ac = undefined;
} else {
this.#servePromise!.resolve();
}

return this;
}

address() {
const addr = this.#listener!.addr as Deno.NetAddr;
return {
port: addr.port,
address: addr.hostname,
port: this.#addr.port,
address: this.#addr.hostname,
};
}
}
Expand Down