Skip to content

Commit

Permalink
refactor(ext/node): migrate "http" module to use "Deno.serveHttp" API (
Browse files Browse the repository at this point in the history
…denoland#18552)

This commit changes "node:http" module to use "Deno.serveHttp" API
instead of "Deno.serve" API.

---------

Co-authored-by: Matt Mastracci <matthew@mastracci.com>
  • Loading branch information
bartlomieju and mmastrac authored Apr 2, 2023
1 parent 513dada commit 3cd7abf
Showing 1 changed file with 48 additions and 141 deletions.
189 changes: 48 additions & 141 deletions ext/node/polyfills/http.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

const core = globalThis.Deno.core;
const ops = core.ops;
import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
import { type Deferred, deferred } from "ext:deno_node/_util/async.ts";
import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts";
import { Buffer } from "ext:deno_node/buffer.ts";
import { ERR_SERVER_NOT_RUNNING } from "ext:deno_node/internal/errors.ts";
Expand All @@ -19,7 +16,8 @@ import { Agent } from "ext:deno_node/_http_agent.mjs";
import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts";
import { urlToHttpOptions } from "ext:deno_node/internal/url.ts";
import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts";
import * as flash from "ext:deno_flash/01_http.js";
import * as denoHttp from "ext:deno_http/01_http.js";
import * as httpRuntime from "ext:runtime/40_http.js";

enum STATUS_CODES {
/** RFC 7231, 6.2.1 */
Expand Down Expand Up @@ -191,9 +189,6 @@ const METHODS = [

type Chunk = string | Buffer | Uint8Array;

const DenoServe = flash.createServe(ops.op_node_unstable_flash_serve);
const DenoUpgradeHttpRaw = flash.upgradeHttpRaw;

const ENCODER = new TextEncoder();

export interface RequestOptions {
Expand Down Expand Up @@ -411,11 +406,7 @@ export class ServerResponse extends NodeWritable {
finished = false;
headersSent = false;
#firstChunk: Chunk | null = null;
// Used if --unstable flag IS NOT present
#reqEvent?: Deno.RequestEvent;
// Used if --unstable flag IS present
#resolve?: (value: Response | PromiseLike<Response>) => void;
#isFlashRequest: boolean;

static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) {
// TODO(kt3k): This is a workaround for denoland/deno#17194
Expand All @@ -436,10 +427,7 @@ export class ServerResponse extends NodeWritable {
return status === 101 || status === 204 || status === 205 || status === 304;
}

constructor(
reqEvent: undefined | Deno.RequestEvent,
resolve: undefined | ((value: Response | PromiseLike<Response>) => void),
) {
constructor(reqEvent: undefined | Deno.RequestEvent) {
let controller: ReadableByteStreamController;
const readable = new ReadableStream({
start(c) {
Expand Down Expand Up @@ -481,9 +469,7 @@ export class ServerResponse extends NodeWritable {
},
});
this.#readable = readable;
this.#resolve = resolve;
this.#reqEvent = reqEvent;
this.#isFlashRequest = typeof resolve !== "undefined";
}

setHeader(name: string, value: string) {
Expand Down Expand Up @@ -519,9 +505,8 @@ export class ServerResponse extends NodeWritable {
this.statusCode = 200;
this.statusMessage = "OK";
}
// Only taken if --unstable IS NOT present
if (
!this.#isFlashRequest && typeof singleChunk === "string" &&
typeof singleChunk === "string" &&
!this.hasHeader("content-type")
) {
this.setHeader("content-type", "text/plain;charset=UTF-8");
Expand All @@ -535,35 +520,22 @@ export class ServerResponse extends NodeWritable {
if (ServerResponse.#bodyShouldBeNull(this.statusCode!)) {
body = null;
}
if (this.#isFlashRequest) {
this.#resolve!(
new Response(body, {
headers: this.#headers,
status: this.statusCode,
statusText: this.statusMessage,
}),
);
} else {
this.#reqEvent!.respondWith(
new Response(body, {
headers: this.#headers,
status: this.statusCode,
statusText: this.statusMessage,
}),
).catch(() => {
// ignore this error
});
}
this.#reqEvent!.respondWith(
new Response(body, {
headers: this.#headers,
status: this.statusCode,
statusText: this.statusMessage,
}),
).catch(() => {
// TODO(bartlomieju): this error should be handled somehow
// ignore this error
});
}

// deno-lint-ignore no-explicit-any
override end(chunk?: any, encoding?: any, cb?: any): this {
this.finished = true;
if (this.#isFlashRequest) {
// Flash sets both of these headers.
this.#headers.delete("transfer-encoding");
this.#headers.delete("content-length");
} else if (!chunk && this.#headers.has("transfer-encoding")) {
if (!chunk && this.#headers.has("transfer-encoding")) {
// FIXME(bnoordhuis) Node sends a zero length chunked body instead, i.e.,
// the trailing "0\r\n", but respondWith() just hangs when I try that.
this.#headers.set("content-length", "0");
Expand Down Expand Up @@ -641,25 +613,12 @@ export function Server(handler?: ServerHandler): ServerImpl {
}

class ServerImpl extends EventEmitter {
#isFlashServer: boolean;

#httpConnections: Set<Deno.HttpConn> = new Set();
#listener?: Deno.Listener;

#addr?: Deno.NetAddr;
#hasClosed = false;
#ac?: AbortController;
#servePromise?: Deferred<void>;
listening = false;

constructor(handler?: ServerHandler) {
super();
// @ts-ignore Might be undefined without `--unstable` flag
this.#isFlashServer = typeof DenoServe == "function";
if (this.#isFlashServer) {
this.#servePromise = deferred();
this.#servePromise.then(() => this.emit("close"));
}
if (handler !== undefined) {
this.on("request", handler);
}
Expand All @@ -684,26 +643,16 @@ class ServerImpl extends EventEmitter {

// TODO(bnoordhuis) Node prefers [::] when host is omitted,
// we on the other hand default to 0.0.0.0.
if (this.#isFlashServer) {
const hostname = options.host ?? "0.0.0.0";
this.#addr = {
hostname,
port,
} as Deno.NetAddr;
this.listening = true;
nextTick(() => this.#serve());
} else {
this.listening = true;
const hostname = options.host ?? "";
this.#listener = Deno.listen({ port, hostname });
nextTick(() => this.#listenLoop());
}
this.listening = true;
const hostname = options.host ?? "";
this.#listener = Deno.listen({ port, hostname });
nextTick(() => this.#listenLoop());

return this;
}

async #listenLoop() {
const go = async (httpConn: Deno.HttpConn) => {
const go = async (tcpConn: Deno.Conn, httpConn: Deno.HttpConn) => {
try {
for (;;) {
let reqEvent = null;
Expand All @@ -721,8 +670,20 @@ class ServerImpl extends EventEmitter {
break;
}
const req = new IncomingMessageForServer(reqEvent.request);
const res = new ServerResponse(reqEvent, undefined);
this.emit("request", req, res);
if (req.upgrade && this.listenerCount("upgrade") > 0) {
const conn = await denoHttp.upgradeHttpRaw(
reqEvent.request,
tcpConn,
) as Deno.Conn;
const socket = new Socket({
handle: new TCP(constants.SERVER, conn),
});
this.emit("upgrade", req, socket, Buffer.from([]));
return;
} else {
const res = new ServerResponse(reqEvent);
this.emit("request", req, res);
}
}
} finally {
this.#httpConnections.delete(httpConn);
Expand All @@ -737,56 +698,17 @@ class ServerImpl extends EventEmitter {
for await (const conn of listener) {
let httpConn: Deno.HttpConn;
try {
httpConn = Deno.serveHttp(conn);
httpConn = httpRuntime.serveHttp(conn);
} catch {
continue; /// Connection closed.
}

this.#httpConnections.add(httpConn);
go(httpConn);
go(conn, httpConn);
}
}
}

#serve() {
const ac = new AbortController();
const handler = (request: Request) => {
const req = new IncomingMessageForServer(request);
if (req.upgrade && this.listenerCount("upgrade") > 0) {
const [conn, head] = DenoUpgradeHttpRaw(request) as [
Deno.Conn,
Uint8Array,
];
const socket = new Socket({
handle: new TCP(constants.SERVER, conn),
});
this.emit("upgrade", req, socket, Buffer.from(head));
} else {
return new Promise<Response>((resolve): void => {
const res = new ServerResponse(undefined, resolve);
this.emit("request", req, res);
});
}
};

if (this.#hasClosed) {
return;
}
this.#ac = ac;
DenoServe(
{
handler: handler as Deno.ServeHandler,
...this.#addr,
signal: ac.signal,
// @ts-ignore Might be any without `--unstable` flag
onListen: ({ port }) => {
this.#addr!.port = port;
this.emit("listening");
},
},
).then(() => this.#servePromise!.resolve());
}

setTimeout() {
console.error("Not implemented: Server.setTimeout()");
}
Expand All @@ -795,7 +717,6 @@ class ServerImpl extends EventEmitter {
const listening = this.listening;
this.listening = false;

this.#hasClosed = true;
if (typeof cb === "function") {
if (listening) {
this.once("close", cb);
Expand All @@ -806,42 +727,28 @@ class ServerImpl extends EventEmitter {
}
}

if (this.#isFlashServer) {
if (listening && this.#ac) {
this.#ac.abort();
this.#ac = undefined;
} else {
this.#servePromise!.resolve();
}
} else {
nextTick(() => this.emit("close"));
nextTick(() => this.emit("close"));

if (listening) {
this.#listener!.close();
this.#listener = undefined;
if (listening) {
this.#listener!.close();
this.#listener = undefined;

for (const httpConn of this.#httpConnections) {
try {
httpConn.close();
} catch {
// Already closed.
}
for (const httpConn of this.#httpConnections) {
try {
httpConn.close();
} catch {
// Already closed.
}

this.#httpConnections.clear();
}

this.#httpConnections.clear();
}

return this;
}

address() {
let addr;
if (this.#isFlashServer) {
addr = this.#addr!;
} else {
addr = this.#listener!.addr as Deno.NetAddr;
}
const addr = this.#listener!.addr as Deno.NetAddr;
return {
port: addr.port,
address: addr.hostname,
Expand Down

0 comments on commit 3cd7abf

Please sign in to comment.