From f6246f09051c9dfb3ce00a66a7abc498f9fbb61d Mon Sep 17 00:00:00 2001 From: Michael Kaufman <2073135+mkaufmaner@users.noreply.github.com> Date: Mon, 17 Jul 2023 04:49:15 -0400 Subject: [PATCH] chore: http/2 benchmark (#35) Co-authored-by: Carlos Fuentes --- benchmarks/benchmark-http2.js | 306 ++++++++++++++++++++++++++++++++ benchmarks/benchmark-https.js | 319 ++++++++++++++++++++++++++++++++++ benchmarks/benchmark.js | 12 +- benchmarks/server-http2.js | 49 ++++++ benchmarks/server-https.js | 41 +++++ lib/client.js | 5 +- package.json | 2 +- 7 files changed, 727 insertions(+), 7 deletions(-) create mode 100644 benchmarks/benchmark-http2.js create mode 100644 benchmarks/benchmark-https.js create mode 100644 benchmarks/server-http2.js create mode 100644 benchmarks/server-https.js diff --git a/benchmarks/benchmark-http2.js b/benchmarks/benchmark-http2.js new file mode 100644 index 00000000000..d8555de22b5 --- /dev/null +++ b/benchmarks/benchmark-http2.js @@ -0,0 +1,306 @@ +'use strict' + +const { connect } = require('http2') +const { createSecureContext } = require('tls') +const os = require('os') +const path = require('path') +const { readFileSync } = require('fs') +const { table } = require('table') +const { Writable } = require('stream') +const { WritableStream } = require('stream/web') +const { isMainThread } = require('worker_threads') + +const { Pool, Client, fetch, Agent, setGlobalDispatcher } = require('..') + +const ca = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'ca.pem'), 'utf8') +const servername = 'agent1' + +const iterations = (parseInt(process.env.SAMPLES, 10) || 10) + 1 +const errorThreshold = parseInt(process.env.ERROR_THRESHOLD, 10) || 3 +const connections = parseInt(process.env.CONNECTIONS, 10) || 50 +const pipelining = parseInt(process.env.PIPELINING, 10) || 10 +const parallelRequests = parseInt(process.env.PARALLEL, 10) || 100 +const headersTimeout = parseInt(process.env.HEADERS_TIMEOUT, 10) || 0 +const bodyTimeout = parseInt(process.env.BODY_TIMEOUT, 10) || 0 +const dest = {} + +if (process.env.PORT) { + dest.port = process.env.PORT + dest.url = `https://localhost:${process.env.PORT}` +} else { + dest.url = 'https://localhost' + dest.socketPath = path.join(os.tmpdir(), 'undici.sock') +} + +const httpsBaseOptions = { + ca, + servername, + protocol: 'https:', + hostname: 'localhost', + method: 'GET', + path: '/', + query: { + frappucino: 'muffin', + goat: 'scone', + pond: 'moose', + foo: ['bar', 'baz', 'bal'], + bool: true, + numberKey: 256 + }, + ...dest +} + +const http2ClientOptions = { + secureContext: createSecureContext({ ca }), + servername +} + +const undiciOptions = { + path: '/', + method: 'GET', + headersTimeout, + bodyTimeout +} + +const Class = connections > 1 ? Pool : Client +const dispatcher = new Class(httpsBaseOptions.url, { + allowH2: true, + pipelining, + connections, + connect: { + rejectUnauthorized: false, + ca, + servername + }, + ...dest +}) + +setGlobalDispatcher(new Agent({ + allowH2: true, + pipelining, + connections, + connect: { + rejectUnauthorized: false, + ca, + servername + } +})) + +class SimpleRequest { + constructor (resolve) { + this.dst = new Writable({ + write (chunk, encoding, callback) { + callback() + } + }).on('finish', resolve) + } + + onConnect (abort) { } + + onHeaders (statusCode, headers, resume) { + this.dst.on('drain', resume) + } + + onData (chunk) { + return this.dst.write(chunk) + } + + onComplete () { + this.dst.end() + } + + onError (err) { + throw err + } +} + +function makeParallelRequests (cb) { + return Promise.all(Array.from(Array(parallelRequests)).map(() => new Promise(cb))) +} + +function printResults (results) { + // Sort results by least performant first, then compare relative performances and also printing padding + let last + + const rows = Object.entries(results) + // If any failed, put on the top of the list, otherwise order by mean, ascending + .sort((a, b) => (!a[1].success ? -1 : b[1].mean - a[1].mean)) + .map(([name, result]) => { + if (!result.success) { + return [name, result.size, 'Errored', 'N/A', 'N/A'] + } + + // Calculate throughput and relative performance + const { size, mean, standardError } = result + const relative = last !== 0 ? (last / mean - 1) * 100 : 0 + + // Save the slowest for relative comparison + if (typeof last === 'undefined') { + last = mean + } + + return [ + name, + size, + `${((connections * 1e9) / mean).toFixed(2)} req/sec`, + `± ${((standardError / mean) * 100).toFixed(2)} %`, + relative > 0 ? `+ ${relative.toFixed(2)} %` : '-' + ] + }) + + console.log(results) + + // Add the header row + rows.unshift(['Tests', 'Samples', 'Result', 'Tolerance', 'Difference with slowest']) + + return table(rows, { + columns: { + 0: { + alignment: 'left' + }, + 1: { + alignment: 'right' + }, + 2: { + alignment: 'right' + }, + 3: { + alignment: 'right' + }, + 4: { + alignment: 'right' + } + }, + drawHorizontalLine: (index, size) => index > 0 && index < size, + border: { + bodyLeft: '│', + bodyRight: '│', + bodyJoin: '│', + joinLeft: '|', + joinRight: '|', + joinJoin: '|' + } + }) +} + +const experiments = { + 'http2 - request' () { + return makeParallelRequests(resolve => { + connect(dest.url, http2ClientOptions, (session) => { + const headers = { + ':path': '/', + ':method': 'GET', + ':scheme': 'https', + ':authority': `localhost:${dest.port}` + } + + const request = session.request(headers) + + request.pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ).on('finish', resolve) + }) + }) + }, + 'undici - pipeline' () { + return makeParallelRequests(resolve => { + dispatcher + .pipeline(undiciOptions, data => { + return data.body + }) + .end() + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }, + 'undici - request' () { + return makeParallelRequests(resolve => { + try { + dispatcher.request(undiciOptions).then(({ body }) => { + body + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('error', (err) => { + console.log('undici - request - dispatcher.request - body - error', err) + }) + .on('finish', () => { + resolve() + }) + }) + } catch (err) { + console.error('undici - request - dispatcher.request - requestCount', err) + } + }) + }, + 'undici - stream' () { + return makeParallelRequests(resolve => { + return dispatcher + .stream(undiciOptions, () => { + return new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + }) + .then(resolve) + }) + }, + 'undici - dispatch' () { + return makeParallelRequests(resolve => { + dispatcher.dispatch(undiciOptions, new SimpleRequest(resolve)) + }) + } +} + +if (process.env.PORT) { + // fetch does not support the socket + experiments['undici - fetch'] = () => { + return makeParallelRequests(resolve => { + fetch(dest.url, {}).then(res => { + res.body.pipeTo(new WritableStream({ write () { }, close () { resolve() } })) + }).catch(console.log) + }) + } +} + +async function main () { + const { cronometro } = await import('cronometro') + + cronometro( + experiments, + { + iterations, + errorThreshold, + print: false + }, + (err, results) => { + if (err) { + throw err + } + + console.log(printResults(results)) + dispatcher.destroy() + } + ) +} + +if (isMainThread) { + main() +} else { + module.exports = main +} diff --git a/benchmarks/benchmark-https.js b/benchmarks/benchmark-https.js new file mode 100644 index 00000000000..a364f0a0c43 --- /dev/null +++ b/benchmarks/benchmark-https.js @@ -0,0 +1,319 @@ +'use strict' + +const https = require('https') +const os = require('os') +const path = require('path') +const { readFileSync } = require('fs') +const { table } = require('table') +const { Writable } = require('stream') +const { WritableStream } = require('stream/web') +const { isMainThread } = require('worker_threads') + +const { Pool, Client, fetch, Agent, setGlobalDispatcher } = require('..') + +const ca = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'ca.pem'), 'utf8') +const servername = 'agent1' + +const iterations = (parseInt(process.env.SAMPLES, 10) || 10) + 1 +const errorThreshold = parseInt(process.env.ERROR_TRESHOLD, 10) || 3 +const connections = parseInt(process.env.CONNECTIONS, 10) || 50 +const pipelining = parseInt(process.env.PIPELINING, 10) || 10 +const parallelRequests = parseInt(process.env.PARALLEL, 10) || 100 +const headersTimeout = parseInt(process.env.HEADERS_TIMEOUT, 10) || 0 +const bodyTimeout = parseInt(process.env.BODY_TIMEOUT, 10) || 0 +const dest = {} + +if (process.env.PORT) { + dest.port = process.env.PORT + dest.url = `https://localhost:${process.env.PORT}` +} else { + dest.url = 'https://localhost' + dest.socketPath = path.join(os.tmpdir(), 'undici.sock') +} + +const httpsBaseOptions = { + ca, + servername, + protocol: 'https:', + hostname: 'localhost', + method: 'GET', + path: '/', + query: { + frappucino: 'muffin', + goat: 'scone', + pond: 'moose', + foo: ['bar', 'baz', 'bal'], + bool: true, + numberKey: 256 + }, + ...dest +} + +const httpsNoKeepAliveOptions = { + ...httpsBaseOptions, + agent: new https.Agent({ + keepAlive: false, + maxSockets: connections, + // rejectUnauthorized: false, + ca, + servername + }) +} + +const httpsKeepAliveOptions = { + ...httpsBaseOptions, + agent: new https.Agent({ + keepAlive: true, + maxSockets: connections, + // rejectUnauthorized: false, + ca, + servername + }) +} + +const undiciOptions = { + path: '/', + method: 'GET', + headersTimeout, + bodyTimeout +} + +const Class = connections > 1 ? Pool : Client +const dispatcher = new Class(httpsBaseOptions.url, { + pipelining, + connections, + connect: { + // rejectUnauthorized: false, + ca, + servername + }, + ...dest +}) + +setGlobalDispatcher(new Agent({ + pipelining, + connections, + connect: { + // rejectUnauthorized: false, + ca, + servername + } +})) + +class SimpleRequest { + constructor (resolve) { + this.dst = new Writable({ + write (chunk, encoding, callback) { + callback() + } + }).on('finish', resolve) + } + + onConnect (abort) { } + + onHeaders (statusCode, headers, resume) { + this.dst.on('drain', resume) + } + + onData (chunk) { + return this.dst.write(chunk) + } + + onComplete () { + this.dst.end() + } + + onError (err) { + throw err + } +} + +function makeParallelRequests (cb) { + return Promise.all(Array.from(Array(parallelRequests)).map(() => new Promise(cb))) +} + +function printResults (results) { + // Sort results by least performant first, then compare relative performances and also printing padding + let last + + const rows = Object.entries(results) + // If any failed, put on the top of the list, otherwise order by mean, ascending + .sort((a, b) => (!a[1].success ? -1 : b[1].mean - a[1].mean)) + .map(([name, result]) => { + if (!result.success) { + return [name, result.size, 'Errored', 'N/A', 'N/A'] + } + + // Calculate throughput and relative performance + const { size, mean, standardError } = result + const relative = last !== 0 ? (last / mean - 1) * 100 : 0 + + // Save the slowest for relative comparison + if (typeof last === 'undefined') { + last = mean + } + + return [ + name, + size, + `${((connections * 1e9) / mean).toFixed(2)} req/sec`, + `± ${((standardError / mean) * 100).toFixed(2)} %`, + relative > 0 ? `+ ${relative.toFixed(2)} %` : '-' + ] + }) + + console.log(results) + + // Add the header row + rows.unshift(['Tests', 'Samples', 'Result', 'Tolerance', 'Difference with slowest']) + + return table(rows, { + columns: { + 0: { + alignment: 'left' + }, + 1: { + alignment: 'right' + }, + 2: { + alignment: 'right' + }, + 3: { + alignment: 'right' + }, + 4: { + alignment: 'right' + } + }, + drawHorizontalLine: (index, size) => index > 0 && index < size, + border: { + bodyLeft: '│', + bodyRight: '│', + bodyJoin: '│', + joinLeft: '|', + joinRight: '|', + joinJoin: '|' + } + }) +} + +const experiments = { + 'https - no keepalive' () { + return makeParallelRequests(resolve => { + https.get(httpsNoKeepAliveOptions, res => { + res + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }) + }, + 'https - keepalive' () { + return makeParallelRequests(resolve => { + https.get(httpsKeepAliveOptions, res => { + res + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }) + }, + 'undici - pipeline' () { + return makeParallelRequests(resolve => { + dispatcher + .pipeline(undiciOptions, data => { + return data.body + }) + .end() + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }, + 'undici - request' () { + return makeParallelRequests(resolve => { + dispatcher.request(undiciOptions).then(({ body }) => { + body + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }) + }, + 'undici - stream' () { + return makeParallelRequests(resolve => { + return dispatcher + .stream(undiciOptions, () => { + return new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + }) + .then(resolve) + }) + }, + 'undici - dispatch' () { + return makeParallelRequests(resolve => { + dispatcher.dispatch(undiciOptions, new SimpleRequest(resolve)) + }) + } +} + +if (process.env.PORT) { + // fetch does not support the socket + experiments['undici - fetch'] = () => { + return makeParallelRequests(resolve => { + fetch(dest.url, {}).then(res => { + res.body.pipeTo(new WritableStream({ write () { }, close () { resolve() } })) + }).catch(console.log) + }) + } +} + +async function main () { + const { cronometro } = await import('cronometro') + + cronometro( + experiments, + { + iterations, + errorThreshold, + print: false + }, + (err, results) => { + if (err) { + throw err + } + + console.log(printResults(results)) + dispatcher.destroy() + } + ) +} + +if (isMainThread) { + main() +} else { + module.exports = main +} diff --git a/benchmarks/benchmark.js b/benchmarks/benchmark.js index 4cf129f7b98..5bf3d2ede4f 100644 --- a/benchmarks/benchmark.js +++ b/benchmarks/benchmark.js @@ -73,7 +73,13 @@ const dispatcher = new Class(httpBaseOptions.url, { ...dest }) -setGlobalDispatcher(new Agent({ pipelining, connections })) +setGlobalDispatcher(new Agent({ + pipelining, + connections, + connect: { + rejectUnauthorized: false + } +})) class SimpleRequest { constructor (resolve) { @@ -84,7 +90,7 @@ class SimpleRequest { }).on('finish', resolve) } - onConnect (abort) {} + onConnect (abort) { } onHeaders (statusCode, headers, resume) { this.dst.on('drain', resume) @@ -260,7 +266,7 @@ if (process.env.PORT) { experiments['undici - fetch'] = () => { return makeParallelRequests(resolve => { fetch(dest.url).then(res => { - res.body.pipeTo(new WritableStream({ write () {}, close () { resolve() } })) + res.body.pipeTo(new WritableStream({ write () { }, close () { resolve() } })) }).catch(console.log) }) } diff --git a/benchmarks/server-http2.js b/benchmarks/server-http2.js new file mode 100644 index 00000000000..0be99cd2fd7 --- /dev/null +++ b/benchmarks/server-http2.js @@ -0,0 +1,49 @@ +'use strict' + +const { unlinkSync, readFileSync } = require('fs') +const { createSecureServer } = require('http2') +const os = require('os') +const path = require('path') +const cluster = require('cluster') + +const key = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'key.pem'), 'utf8') +const cert = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'cert.pem'), 'utf8') + +const socketPath = path.join(os.tmpdir(), 'undici.sock') + +const port = process.env.PORT || socketPath +const timeout = parseInt(process.env.TIMEOUT, 10) || 1 +const workers = parseInt(process.env.WORKERS) || os.cpus().length + +const sessionTimeout = 600e3 // 10 minutes + +if (cluster.isPrimary) { + try { + unlinkSync(socketPath) + } catch (_) { + // Do nothing if the socket does not exist + } + + for (let i = 0; i < workers; i++) { + cluster.fork() + } +} else { + const buf = Buffer.alloc(64 * 1024, '_') + const server = createSecureServer( + { + key, + cert, + allowHTTP1: true, + sessionTimeout + }, + (req, res) => { + setTimeout(() => { + res.end(buf) + }, timeout) + } + ) + + server.keepAliveTimeout = 600e3 + + server.listen(port) +} diff --git a/benchmarks/server-https.js b/benchmarks/server-https.js new file mode 100644 index 00000000000..f0275d9cbcc --- /dev/null +++ b/benchmarks/server-https.js @@ -0,0 +1,41 @@ +'use strict' + +const { unlinkSync, readFileSync } = require('fs') +const { createServer } = require('https') +const os = require('os') +const path = require('path') +const cluster = require('cluster') + +const key = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'key.pem'), 'utf8') +const cert = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'cert.pem'), 'utf8') + +const socketPath = path.join(os.tmpdir(), 'undici.sock') + +const port = process.env.PORT || socketPath +const timeout = parseInt(process.env.TIMEOUT, 10) || 1 +const workers = parseInt(process.env.WORKERS) || os.cpus().length + +if (cluster.isPrimary) { + try { + unlinkSync(socketPath) + } catch (_) { + // Do nothing if the socket does not exist + } + + for (let i = 0; i < workers; i++) { + cluster.fork() + } +} else { + const buf = Buffer.alloc(64 * 1024, '_') + const server = createServer({ + key, + cert, + keepAliveTimeout: 600e3 + }, (req, res) => { + setTimeout(() => { + res.end(buf) + }, timeout) + }) + + server.listen(port) +} diff --git a/lib/client.js b/lib/client.js index 543c6925fcb..46d374d9213 100644 --- a/lib/client.js +++ b/lib/client.js @@ -83,8 +83,7 @@ const { HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, HTTP2_HEADER_CONTENT_LENGTH, - HTTP2_HEADER_EXPECT, - HTTP2_HEADER_STATUS + HTTP2_HEADER_EXPECT } } = http2 @@ -1158,7 +1157,7 @@ async function connect (client) { }) if (client.destroyed) { - util.destroy(socket.on('error', () => {}), new ClientDestroyedError()) + util.destroy(socket.on('error', () => { }), new ClientDestroyedError()) return } diff --git a/package.json b/package.json index d5e312a6828..091f900af4e 100644 --- a/package.json +++ b/package.json @@ -133,4 +133,4 @@ "dependencies": { "busboy": "^1.6.0" } -} +} \ No newline at end of file