Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
tsctx committed Aug 20, 2024
1 parent 0e0852f commit 6a6b336
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 36 deletions.
2 changes: 1 addition & 1 deletion bench.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ await run()

for (const ws of connections) {
ws.close()
}
}
128 changes: 128 additions & 0 deletions benchmarks/_util/runner.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
//@ts-check

Check failure on line 1 in benchmarks/_util/runner.js

View workflow job for this annotation

GitHub Actions / Lint

Expected space or tab after '//' in comment

"use strict";

Check failure on line 3 in benchmarks/_util/runner.js

View workflow job for this annotation

GitHub Actions / Lint

Strings must use singlequote

Check failure on line 3 in benchmarks/_util/runner.js

View workflow job for this annotation

GitHub Actions / Lint

Extra semicolon

class Info {
/**@type {string} */

Check failure on line 6 in benchmarks/_util/runner.js

View workflow job for this annotation

GitHub Actions / Lint

Expected exception block, space or tab after '/**' in comment
#name;

Check failure on line 7 in benchmarks/_util/runner.js

View workflow job for this annotation

GitHub Actions / Lint

Extra semicolon
/**@type {bigint} */

Check failure on line 8 in benchmarks/_util/runner.js

View workflow job for this annotation

GitHub Actions / Lint

Expected exception block, space or tab after '/**' in comment
#current;

Check failure on line 9 in benchmarks/_util/runner.js

View workflow job for this annotation

GitHub Actions / Lint

Extra semicolon
/**@type {bigint} */

Check failure on line 10 in benchmarks/_util/runner.js

View workflow job for this annotation

GitHub Actions / Lint

Expected exception block, space or tab after '/**' in comment
#finish;

Check failure on line 11 in benchmarks/_util/runner.js

View workflow job for this annotation

GitHub Actions / Lint

Extra semicolon
/**@type {(...args: any[]) => any} */

Check failure on line 12 in benchmarks/_util/runner.js

View workflow job for this annotation

GitHub Actions / Lint

Expected exception block, space or tab after '/**' in comment
#callback;
/**@type {boolean} */
#finalized = false;

/**
* @param {string} name
* @param {(...args: any[]) => any} callback
*/
constructor(name, callback) {
this.#name = name;
this.#callback = callback;
}

get name() {
return this.#name;
}

start() {
if (this.#finalized) {
throw new TypeError("called after finished.");
}
this.#current = process.hrtime.bigint();
}

end() {
if (this.#finalized) {
throw new TypeError("called after finished.");
}
this.#finish = process.hrtime.bigint();
this.#finalized = true;
this.#callback();
}

diff() {
return Number(this.#finish - this.#current);
}
}

/**
* @typedef BenchMarkHandler
* @type {(ev: { name: string; start(): void; end(): void; }) => any}
*/

/**
* @param {Record<string, BenchMarkHandler>} experiments
* @param {{}} [options]
* @returns {Promise<{ name: string; average: number; samples: number; fn: BenchMarkHandler; min: number; max: number }[]>}
*/
async function bench(experiments, options = {}) {
const names = Object.keys(experiments);

/**@type {{ name: string; average: number; samples: number; fn: BenchMarkHandler; min: number; max: number }[]} */
const results = [];

async function waitMaybePromiseLike(p) {
if (
(typeof p === "object" || typeof p === "function") &&
p !== null &&
typeof p.then === "function"
) {
await p;
}
}

for (let i = 0; i < names.length; ++i) {
const name = names[i];
const fn = experiments[name];
const samples = [];

let timing = 0;

for (let j = 0; j < 128 || timing < 800_000_000; ++j) {
let resolve = (value) => {},
reject = (reason) => {},
promise = new Promise(
(done, fail) => ((resolve = done), (reject = fail))
);

const info = new Info(name, resolve);

try {
const p = fn(info);

await waitMaybePromiseLike(p);
} catch (err) {
reject(err);
}

await promise;

samples.push({ time: 1e6 * info.diff() });

timing += info.diff();
}

const average =
samples.map((v) => v.time).reduce((a, b) => a + b, 0) / samples.length;

results.push({
name: names[i],
average: average,
samples: samples.length,
fn: fn,
min: samples.reduce((a, acc) => Math.min(a, acc.time), samples[0].time),
max: samples.reduce((a, acc) => Math.max(a, acc.time), samples[0].time),
});
}

return results;
}

function print() {

}

module.exports = { bench };
44 changes: 12 additions & 32 deletions benchmarks/_util/websocket-simple-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,6 @@ const stream = require('node:stream')

const uid = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'

function safeBufferConcat (chunks, length) {
if (length === undefined) {
length = 0
for (let i = 0; i < chunks.length; ++i) {
length += chunks[i].length
}
}
const buffer = Buffer.allocUnsafeSlow(length)

let offset = 0
for (let i = 0; i < chunks.length; ++i) {
const chunk = chunks[i]
buffer.set(chunk, offset)
offset += chunk.length
}

if (length !== offset) {
buffer.fill(0, offset, buffer.length)
}

return buffer
}

class ws {
/**
* @param {number} opcode
Expand Down Expand Up @@ -72,10 +49,10 @@ class ws {
}

/**
* @param {Uint8Array} buffer
* @param {Uint8Array | null} buffer
*/
static parseFrame (buffer) {
if (buffer.length < 2) {
if (buffer === null || buffer.length < 2) {
return null
}

Expand Down Expand Up @@ -125,6 +102,7 @@ class ws {
}

static Stream = class extends stream.Writable {
/** @type {Uint8Array | null} */
#head = null
#receivedLength = 0

Expand All @@ -133,7 +111,7 @@ class ws {
if (this.#head === null) {
this.#head = chunk
} else {
this.#head = safeBufferConcat([this.#head, chunk])
this.#head = Buffer.concat([this.#head, chunk])
}
const head = this.#head
const parsed = ws.parseFrame(head)
Expand Down Expand Up @@ -172,7 +150,7 @@ class ws {
if (this.#head === null) {
this.#head = chunk
} else if (this.#head.length < 2) {
this.#head = safeBufferConcat([this.#head, chunk])
this.#head = Buffer.concat([this.#head, chunk])
merged = true
} else {
this.#receivedLength += chunk.length
Expand All @@ -187,7 +165,7 @@ class ws {
const start = length - (parsed.offset + parsed.length)
if (chunk.length < start) {
if (merged) throw new Error('fatal error')
this.#head = safeBufferConcat([this.#head, chunk]).subarray(
this.#head = Buffer.concat([this.#head, chunk]).subarray(
start
)
} else {
Expand Down Expand Up @@ -268,21 +246,21 @@ class ws {

writeFrame (frame) {
if (this.#socket.writable) {
return new Promise((resolve, reject) => {
return /** @type {Promise<void>} */(new Promise((resolve, reject) => {
this.#socket.write(frame, (err) => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}))
}
}

async close () {
if (this.#socket.writable) {
await this.writeFrame(ws.createFrame(ws.opcode.CLOSE))
await this.writeFrame(ws.createFrame(ws.opcode.CLOSE, new Uint8Array(0)))
this.#socket.end()
}
}
Expand All @@ -297,7 +275,9 @@ class ws {
*/
function setup ({ onConnection, parseBody }) {
const server = http.createServer((_req, res) => {
res.end('')
// Http handler
res.writeHead(404)
res.end('404 Not Found')
})

server.on('upgrade', (req, socket, _head) => {
Expand Down
1 change: 1 addition & 0 deletions benchmarks/server-websocket.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import './websocket/server/simple.mjs'
95 changes: 93 additions & 2 deletions benchmarks/websocket-benchmark.mjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,94 @@
import { cronometro } from "cronometro"
//@ts-check

// TODO
import { bench } from "./_util/runner.js";
import { WebSocket } from "../index.js";
import { randomBytes } from "node:crypto";
const __BINARY_SIZE__ = 1024 * 256;

const binary = randomBytes(__BINARY_SIZE__);

/**
* @param {{
* send(buffer: Uint8Array | string): void;
* addEventListener(name: string, listener: (...args: any) => void, options?: { once: boolean }): void
* }} socket
* @param {Uint8Array | string} data
* @param {(...args: any) => any} callback
*/
function waitWrite(socket, data, callback) {
return /** @type {Promise<void>} */(new Promise((resolve, reject) => {
socket.send(data);

socket.addEventListener(
"message",
(ev) => {
resolve();
callback();
},
{ once: true }
);
}))
}

/**@type {Record<string, import('./_util/runner.js').BenchMarkHandler} */
const experiments = {};

/**@type {WebSocket | null} */
let ws;

experiments["sending 256Kib (undici)"] = async function (ev) {
if (ws === null) {
throw new Error("called before initialized");
}
ev.start();
await waitWrite(ws, binary, () => {
ev.end();
});
};

async function connect() {
ws = new WebSocket("ws://localhost:5001");

await /**@type {Promise<void>} */ (
new Promise((resolve, reject) => {
if (ws === null) {
return void reject(new Error("called before initialized"));
}
function onOpen() {
resolve();
this.removeEventListener("open", onOpen);
this.removeEventListener("error", onError);
}
function onError(err) {
reject(err);
this.removeEventListener("open", onOpen);
this.removeEventListener("error", onError);
}
ws.addEventListener("open", onOpen);
ws.addEventListener("error", onError);
})
);
}

connect()
.then(() => bench(experiments, {}))
.then((results) => {
if (ws === null) {
throw new Error("called before initialized");
}

ws.close();

console.log(results);
})
.catch((err) => {
process.nextTick((err) => {
throw err;
}, err);
});

function print(results) {

}

export {};
2 changes: 1 addition & 1 deletion benchmarks/websocket/server/simple.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if (cluster.isPrimary) {
cluster.fork()
}
} else {
const emptyFrame = ws.createFrame(ws.opcode.BINARY, Buffer.allocUnsafe(0))
const emptyFrame = ws.createFrame(ws.opcode.BINARY, new Uint8Array(0))

const server = setup({
onConnection (ctrl) {
Expand Down

0 comments on commit 6a6b336

Please sign in to comment.