From 6a6b336b98ee946d8c705301621fff360d0420e9 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Tue, 20 Aug 2024 09:27:41 +0900 Subject: [PATCH] update --- bench.mjs | 2 +- benchmarks/_util/runner.js | 128 ++++++++++++++++++++ benchmarks/_util/websocket-simple-server.js | 44 ++----- benchmarks/server-websocket.mjs | 1 + benchmarks/websocket-benchmark.mjs | 95 ++++++++++++++- benchmarks/websocket/server/simple.mjs | 2 +- 6 files changed, 236 insertions(+), 36 deletions(-) create mode 100644 benchmarks/_util/runner.js create mode 100644 benchmarks/server-websocket.mjs diff --git a/bench.mjs b/bench.mjs index fb8cfb0e5c6..d5405395372 100644 --- a/bench.mjs +++ b/bench.mjs @@ -76,4 +76,4 @@ await run() for (const ws of connections) { ws.close() -} \ No newline at end of file +} diff --git a/benchmarks/_util/runner.js b/benchmarks/_util/runner.js new file mode 100644 index 00000000000..5798f41ed81 --- /dev/null +++ b/benchmarks/_util/runner.js @@ -0,0 +1,128 @@ +//@ts-check + +"use strict"; + +class Info { + /**@type {string} */ + #name; + /**@type {bigint} */ + #current; + /**@type {bigint} */ + #finish; + /**@type {(...args: any[]) => any} */ + #callback; + /**@type {boolean} */ + #finalized = false; + + /** + * @param {string} name + * @param {(...args: any[]) => any} callback + */ + constructor(name, callback) { + this.#name = name; + this.#callback = callback; + } + + get name() { + return this.#name; + } + + start() { + if (this.#finalized) { + throw new TypeError("called after finished."); + } + this.#current = process.hrtime.bigint(); + } + + end() { + if (this.#finalized) { + throw new TypeError("called after finished."); + } + this.#finish = process.hrtime.bigint(); + this.#finalized = true; + this.#callback(); + } + + diff() { + return Number(this.#finish - this.#current); + } +} + +/** + * @typedef BenchMarkHandler + * @type {(ev: { name: string; start(): void; end(): void; }) => any} + */ + +/** + * @param {Record} experiments + * @param {{}} [options] + * @returns {Promise<{ name: string; average: number; samples: number; fn: BenchMarkHandler; min: number; max: number }[]>} + */ +async function bench(experiments, options = {}) { + const names = Object.keys(experiments); + + /**@type {{ name: string; average: number; samples: number; fn: BenchMarkHandler; min: number; max: number }[]} */ + const results = []; + + async function waitMaybePromiseLike(p) { + if ( + (typeof p === "object" || typeof p === "function") && + p !== null && + typeof p.then === "function" + ) { + await p; + } + } + + for (let i = 0; i < names.length; ++i) { + const name = names[i]; + const fn = experiments[name]; + const samples = []; + + let timing = 0; + + for (let j = 0; j < 128 || timing < 800_000_000; ++j) { + let resolve = (value) => {}, + reject = (reason) => {}, + promise = new Promise( + (done, fail) => ((resolve = done), (reject = fail)) + ); + + const info = new Info(name, resolve); + + try { + const p = fn(info); + + await waitMaybePromiseLike(p); + } catch (err) { + reject(err); + } + + await promise; + + samples.push({ time: 1e6 * info.diff() }); + + timing += info.diff(); + } + + const average = + samples.map((v) => v.time).reduce((a, b) => a + b, 0) / samples.length; + + results.push({ + name: names[i], + average: average, + samples: samples.length, + fn: fn, + min: samples.reduce((a, acc) => Math.min(a, acc.time), samples[0].time), + max: samples.reduce((a, acc) => Math.max(a, acc.time), samples[0].time), + }); + } + + return results; +} + +function print() { + +} + +module.exports = { bench }; diff --git a/benchmarks/_util/websocket-simple-server.js b/benchmarks/_util/websocket-simple-server.js index d75a86a28dd..6d08c82a07e 100644 --- a/benchmarks/_util/websocket-simple-server.js +++ b/benchmarks/_util/websocket-simple-server.js @@ -6,29 +6,6 @@ const stream = require('node:stream') const uid = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11' -function safeBufferConcat (chunks, length) { - if (length === undefined) { - length = 0 - for (let i = 0; i < chunks.length; ++i) { - length += chunks[i].length - } - } - const buffer = Buffer.allocUnsafeSlow(length) - - let offset = 0 - for (let i = 0; i < chunks.length; ++i) { - const chunk = chunks[i] - buffer.set(chunk, offset) - offset += chunk.length - } - - if (length !== offset) { - buffer.fill(0, offset, buffer.length) - } - - return buffer -} - class ws { /** * @param {number} opcode @@ -72,10 +49,10 @@ class ws { } /** - * @param {Uint8Array} buffer + * @param {Uint8Array | null} buffer */ static parseFrame (buffer) { - if (buffer.length < 2) { + if (buffer === null || buffer.length < 2) { return null } @@ -125,6 +102,7 @@ class ws { } static Stream = class extends stream.Writable { + /** @type {Uint8Array | null} */ #head = null #receivedLength = 0 @@ -133,7 +111,7 @@ class ws { if (this.#head === null) { this.#head = chunk } else { - this.#head = safeBufferConcat([this.#head, chunk]) + this.#head = Buffer.concat([this.#head, chunk]) } const head = this.#head const parsed = ws.parseFrame(head) @@ -172,7 +150,7 @@ class ws { if (this.#head === null) { this.#head = chunk } else if (this.#head.length < 2) { - this.#head = safeBufferConcat([this.#head, chunk]) + this.#head = Buffer.concat([this.#head, chunk]) merged = true } else { this.#receivedLength += chunk.length @@ -187,7 +165,7 @@ class ws { const start = length - (parsed.offset + parsed.length) if (chunk.length < start) { if (merged) throw new Error('fatal error') - this.#head = safeBufferConcat([this.#head, chunk]).subarray( + this.#head = Buffer.concat([this.#head, chunk]).subarray( start ) } else { @@ -268,7 +246,7 @@ class ws { writeFrame (frame) { if (this.#socket.writable) { - return new Promise((resolve, reject) => { + return /** @type {Promise} */(new Promise((resolve, reject) => { this.#socket.write(frame, (err) => { if (err) { reject(err) @@ -276,13 +254,13 @@ class ws { resolve() } }) - }) + })) } } async close () { if (this.#socket.writable) { - await this.writeFrame(ws.createFrame(ws.opcode.CLOSE)) + await this.writeFrame(ws.createFrame(ws.opcode.CLOSE, new Uint8Array(0))) this.#socket.end() } } @@ -297,7 +275,9 @@ class ws { */ function setup ({ onConnection, parseBody }) { const server = http.createServer((_req, res) => { - res.end('') + // Http handler + res.writeHead(404) + res.end('404 Not Found') }) server.on('upgrade', (req, socket, _head) => { diff --git a/benchmarks/server-websocket.mjs b/benchmarks/server-websocket.mjs new file mode 100644 index 00000000000..b0c4ea6be7a --- /dev/null +++ b/benchmarks/server-websocket.mjs @@ -0,0 +1 @@ +import './websocket/server/simple.mjs' diff --git a/benchmarks/websocket-benchmark.mjs b/benchmarks/websocket-benchmark.mjs index 0aaaa13386c..fea04b3c4e6 100644 --- a/benchmarks/websocket-benchmark.mjs +++ b/benchmarks/websocket-benchmark.mjs @@ -1,3 +1,94 @@ -import { cronometro } from "cronometro" +//@ts-check -// TODO +import { bench } from "./_util/runner.js"; +import { WebSocket } from "../index.js"; +import { randomBytes } from "node:crypto"; +const __BINARY_SIZE__ = 1024 * 256; + +const binary = randomBytes(__BINARY_SIZE__); + +/** + * @param {{ + * send(buffer: Uint8Array | string): void; + * addEventListener(name: string, listener: (...args: any) => void, options?: { once: boolean }): void + * }} socket + * @param {Uint8Array | string} data + * @param {(...args: any) => any} callback + */ +function waitWrite(socket, data, callback) { + return /** @type {Promise} */(new Promise((resolve, reject) => { + socket.send(data); + + socket.addEventListener( + "message", + (ev) => { + resolve(); + callback(); + }, + { once: true } + ); + })) +} + +/**@type {Record { + ev.end(); + }); +}; + +async function connect() { + ws = new WebSocket("ws://localhost:5001"); + + await /**@type {Promise} */ ( + new Promise((resolve, reject) => { + if (ws === null) { + return void reject(new Error("called before initialized")); + } + function onOpen() { + resolve(); + this.removeEventListener("open", onOpen); + this.removeEventListener("error", onError); + } + function onError(err) { + reject(err); + this.removeEventListener("open", onOpen); + this.removeEventListener("error", onError); + } + ws.addEventListener("open", onOpen); + ws.addEventListener("error", onError); + }) + ); +} + +connect() + .then(() => bench(experiments, {})) + .then((results) => { + if (ws === null) { + throw new Error("called before initialized"); + } + + ws.close(); + + console.log(results); + }) + .catch((err) => { + process.nextTick((err) => { + throw err; + }, err); + }); + +function print(results) { + +} + +export {}; diff --git a/benchmarks/websocket/server/simple.mjs b/benchmarks/websocket/server/simple.mjs index 377697dd128..eac1e70cf40 100644 --- a/benchmarks/websocket/server/simple.mjs +++ b/benchmarks/websocket/server/simple.mjs @@ -8,7 +8,7 @@ if (cluster.isPrimary) { cluster.fork() } } else { - const emptyFrame = ws.createFrame(ws.opcode.BINARY, Buffer.allocUnsafe(0)) + const emptyFrame = ws.createFrame(ws.opcode.BINARY, new Uint8Array(0)) const server = setup({ onConnection (ctrl) {