diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 785bbaab3a9935..1a585f74ce1198 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -1,6 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; +import { type Deferred, deferred } from "ext:deno_node/_util/async.ts"; import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts"; import { Buffer } from "ext:deno_node/buffer.ts"; import { ERR_SERVER_NOT_RUNNING } from "ext:deno_node/internal/errors.ts"; @@ -16,9 +17,8 @@ import { Agent } from "ext:deno_node/_http_agent.mjs"; import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; -import { upgradeHttpRaw } from "ext:deno_http/00_serve.js"; -import * as httpRuntime from "ext:runtime/40_http.js"; import { connResetException } from "ext:deno_node/internal/errors.ts"; +import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js"; enum STATUS_CODES { /** RFC 7231, 6.2.1 */ @@ -427,7 +427,7 @@ export class ServerResponse extends NodeWritable { finished = false; headersSent = false; #firstChunk: Chunk | null = null; - #reqEvent?: Deno.RequestEvent; + #resolve: (value: Response | PromiseLike) => void; static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) { if (typeof chunk === "string") { @@ -443,7 +443,7 @@ export class ServerResponse extends NodeWritable { return status === 101 || status === 204 || status === 205 || status === 304; } - constructor(reqEvent: undefined | Deno.RequestEvent) { + constructor(resolve: (value: Response | PromiseLike) => void) { let controller: ReadableByteStreamController; const readable = new ReadableStream({ start(c) { @@ -485,7 +485,7 @@ export class ServerResponse extends NodeWritable { }, }); this.#readable = readable; - this.#reqEvent = reqEvent; + this.#resolve = resolve; } setHeader(name: string, value: string) { @@ -536,16 +536,13 @@ export class ServerResponse extends NodeWritable { if (ServerResponse.#bodyShouldBeNull(this.statusCode!)) { body = null; } - this.#reqEvent!.respondWith( + this.#resolve( new Response(body, { headers: this.#headers, status: this.statusCode, statusText: this.statusMessage, }), - ).catch(() => { - // TODO(bartlomieju): this error should be handled somehow - // ignore this error - }); + ); } // deno-lint-ignore no-explicit-any @@ -577,7 +574,7 @@ export class IncomingMessageForServer extends NodeReadable { // These properties are used by `npm:forwarded` for example. socket: { remoteAddress: string; remotePort: number }; - constructor(req: Request, conn: Deno.Conn) { + constructor(req: Request, remoteAddr: { hostname: string; port: number }) { // Check if no body (GET/HEAD/OPTIONS/...) const reader = req.body?.getReader(); super({ @@ -605,8 +602,8 @@ export class IncomingMessageForServer extends NodeReadable { this.url = req.url?.slice(req.url.indexOf("/", 8)); this.method = req.method; this.socket = { - remoteAddress: conn.remoteAddr.hostname, - remotePort: conn.remoteAddr.port, + remoteAddress: remoteAddr.hostname, + remotePort: remoteAddr.port, }; this.#req = req; } @@ -648,10 +645,17 @@ export function Server(handler?: ServerHandler): ServerImpl { class ServerImpl extends EventEmitter { #httpConnections: Set = new Set(); #listener?: Deno.Listener; + + #addr: Deno.NetAddr; + #hasClosed = false; + #ac?: AbortController; + #servePromise: Deferred; listening = false; constructor(handler?: ServerHandler) { super(); + this.#servePromise = deferred(); + this.#servePromise.then(() => this.emit("close")); if (handler !== undefined) { this.on("request", handler); } @@ -676,70 +680,52 @@ class ServerImpl extends EventEmitter { // TODO(bnoordhuis) Node prefers [::] when host is omitted, // we on the other hand default to 0.0.0.0. + const hostname = options.host ?? "0.0.0.0"; + this.#addr = { + hostname, + port, + } as Deno.NetAddr; this.listening = true; - const hostname = options.host ?? ""; - this.#listener = Deno.listen({ port, hostname }); - nextTick(() => this.#listenLoop()); + nextTick(() => this.#serve()); return this; } - async #listenLoop() { - const go = async (tcpConn: Deno.Conn, httpConn: Deno.HttpConn) => { - try { - for (;;) { - let reqEvent = null; - try { - // Note: httpConn.nextRequest() calls httpConn.close() on error. - reqEvent = await httpConn.nextRequest(); - } catch { - // Connection closed. - // TODO(bnoordhuis) Emit "clientError" event on the http.Server - // instance? Node emits it when request parsing fails and expects - // the listener to send a raw 4xx HTTP response on the underlying - // net.Socket but we don't have one to pass to the listener. - } - if (reqEvent === null) { - break; - } - const req = new IncomingMessageForServer(reqEvent.request, tcpConn); - if (req.upgrade && this.listenerCount("upgrade") > 0) { - const conn = await upgradeHttpRaw( - reqEvent.request, - tcpConn, - ) as Deno.Conn; - const socket = new Socket({ - handle: new TCP(constants.SERVER, conn), - }); - this.emit("upgrade", req, socket, Buffer.from([])); - return; - } else { - const res = new ServerResponse(reqEvent); - this.emit("request", req, res); - } - } - } finally { - this.#httpConnections.delete(httpConn); + #serve() { + const ac = new AbortController(); + const handler = (request: Request, info: Deno.ServeHandlerInfo) => { + const req = new IncomingMessageForServer(request, info.remoteAddr); + if (req.upgrade && this.listenerCount("upgrade") > 0) { + const { conn, response } = upgradeHttpRaw(request); + const socket = new Socket({ + handle: new TCP(constants.SERVER, conn), + }); + this.emit("upgrade", req, socket, Buffer.from([])); + return response; + } else { + return new Promise((resolve): void => { + const res = new ServerResponse(resolve); + this.emit("request", req, res); + }); } }; - const listener = this.#listener; - - if (listener !== undefined) { - this.emit("listening"); - - for await (const conn of listener) { - let httpConn: Deno.HttpConn; - try { - httpConn = httpRuntime.serveHttp(conn); - } catch { - continue; /// Connection closed. - } - - this.#httpConnections.add(httpConn); - go(conn, httpConn); - } + if (this.#hasClosed) { + return; } + this.#ac = ac; + serve( + { + handler: handler as Deno.ServeHandler, + ...this.#addr, + signal: ac.signal, + // @ts-ignore Might be any without `--unstable` flag + onListen: ({ port }) => { + this.#addr!.port = port; + this.emit("listening"); + }, + }, + ).then(() => this.#servePromise!.resolve()); } setTimeout() { @@ -750,6 +736,7 @@ class ServerImpl extends EventEmitter { const listening = this.listening; this.listening = false; + this.#hasClosed = true; if (typeof cb === "function") { if (listening) { this.once("close", cb); @@ -760,31 +747,20 @@ class ServerImpl extends EventEmitter { } } - nextTick(() => this.emit("close")); - - if (listening) { - this.#listener!.close(); - this.#listener = undefined; - - for (const httpConn of this.#httpConnections) { - try { - httpConn.close(); - } catch { - // Already closed. - } - } - - this.#httpConnections.clear(); + if (listening && this.#ac) { + this.#ac.abort(); + this.#ac = undefined; + } else { + this.#servePromise!.resolve(); } return this; } address() { - const addr = this.#listener!.addr as Deno.NetAddr; return { - port: addr.port, - address: addr.hostname, + port: this.#addr.port, + address: this.#addr.hostname, }; } }