diff --git a/benchmarks/benchmark-http2.js b/benchmarks/benchmark-http2.js new file mode 100644 index 00000000000..d8555de22b5 --- /dev/null +++ b/benchmarks/benchmark-http2.js @@ -0,0 +1,306 @@ +'use strict' + +const { connect } = require('http2') +const { createSecureContext } = require('tls') +const os = require('os') +const path = require('path') +const { readFileSync } = require('fs') +const { table } = require('table') +const { Writable } = require('stream') +const { WritableStream } = require('stream/web') +const { isMainThread } = require('worker_threads') + +const { Pool, Client, fetch, Agent, setGlobalDispatcher } = require('..') + +const ca = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'ca.pem'), 'utf8') +const servername = 'agent1' + +const iterations = (parseInt(process.env.SAMPLES, 10) || 10) + 1 +const errorThreshold = parseInt(process.env.ERROR_THRESHOLD, 10) || 3 +const connections = parseInt(process.env.CONNECTIONS, 10) || 50 +const pipelining = parseInt(process.env.PIPELINING, 10) || 10 +const parallelRequests = parseInt(process.env.PARALLEL, 10) || 100 +const headersTimeout = parseInt(process.env.HEADERS_TIMEOUT, 10) || 0 +const bodyTimeout = parseInt(process.env.BODY_TIMEOUT, 10) || 0 +const dest = {} + +if (process.env.PORT) { + dest.port = process.env.PORT + dest.url = `https://localhost:${process.env.PORT}` +} else { + dest.url = 'https://localhost' + dest.socketPath = path.join(os.tmpdir(), 'undici.sock') +} + +const httpsBaseOptions = { + ca, + servername, + protocol: 'https:', + hostname: 'localhost', + method: 'GET', + path: '/', + query: { + frappucino: 'muffin', + goat: 'scone', + pond: 'moose', + foo: ['bar', 'baz', 'bal'], + bool: true, + numberKey: 256 + }, + ...dest +} + +const http2ClientOptions = { + secureContext: createSecureContext({ ca }), + servername +} + +const undiciOptions = { + path: '/', + method: 'GET', + headersTimeout, + bodyTimeout +} + +const Class = connections > 1 ? Pool : Client +const dispatcher = new Class(httpsBaseOptions.url, { + allowH2: true, + pipelining, + connections, + connect: { + rejectUnauthorized: false, + ca, + servername + }, + ...dest +}) + +setGlobalDispatcher(new Agent({ + allowH2: true, + pipelining, + connections, + connect: { + rejectUnauthorized: false, + ca, + servername + } +})) + +class SimpleRequest { + constructor (resolve) { + this.dst = new Writable({ + write (chunk, encoding, callback) { + callback() + } + }).on('finish', resolve) + } + + onConnect (abort) { } + + onHeaders (statusCode, headers, resume) { + this.dst.on('drain', resume) + } + + onData (chunk) { + return this.dst.write(chunk) + } + + onComplete () { + this.dst.end() + } + + onError (err) { + throw err + } +} + +function makeParallelRequests (cb) { + return Promise.all(Array.from(Array(parallelRequests)).map(() => new Promise(cb))) +} + +function printResults (results) { + // Sort results by least performant first, then compare relative performances and also printing padding + let last + + const rows = Object.entries(results) + // If any failed, put on the top of the list, otherwise order by mean, ascending + .sort((a, b) => (!a[1].success ? -1 : b[1].mean - a[1].mean)) + .map(([name, result]) => { + if (!result.success) { + return [name, result.size, 'Errored', 'N/A', 'N/A'] + } + + // Calculate throughput and relative performance + const { size, mean, standardError } = result + const relative = last !== 0 ? (last / mean - 1) * 100 : 0 + + // Save the slowest for relative comparison + if (typeof last === 'undefined') { + last = mean + } + + return [ + name, + size, + `${((connections * 1e9) / mean).toFixed(2)} req/sec`, + `± ${((standardError / mean) * 100).toFixed(2)} %`, + relative > 0 ? `+ ${relative.toFixed(2)} %` : '-' + ] + }) + + console.log(results) + + // Add the header row + rows.unshift(['Tests', 'Samples', 'Result', 'Tolerance', 'Difference with slowest']) + + return table(rows, { + columns: { + 0: { + alignment: 'left' + }, + 1: { + alignment: 'right' + }, + 2: { + alignment: 'right' + }, + 3: { + alignment: 'right' + }, + 4: { + alignment: 'right' + } + }, + drawHorizontalLine: (index, size) => index > 0 && index < size, + border: { + bodyLeft: '│', + bodyRight: '│', + bodyJoin: '│', + joinLeft: '|', + joinRight: '|', + joinJoin: '|' + } + }) +} + +const experiments = { + 'http2 - request' () { + return makeParallelRequests(resolve => { + connect(dest.url, http2ClientOptions, (session) => { + const headers = { + ':path': '/', + ':method': 'GET', + ':scheme': 'https', + ':authority': `localhost:${dest.port}` + } + + const request = session.request(headers) + + request.pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ).on('finish', resolve) + }) + }) + }, + 'undici - pipeline' () { + return makeParallelRequests(resolve => { + dispatcher + .pipeline(undiciOptions, data => { + return data.body + }) + .end() + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }, + 'undici - request' () { + return makeParallelRequests(resolve => { + try { + dispatcher.request(undiciOptions).then(({ body }) => { + body + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('error', (err) => { + console.log('undici - request - dispatcher.request - body - error', err) + }) + .on('finish', () => { + resolve() + }) + }) + } catch (err) { + console.error('undici - request - dispatcher.request - requestCount', err) + } + }) + }, + 'undici - stream' () { + return makeParallelRequests(resolve => { + return dispatcher + .stream(undiciOptions, () => { + return new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + }) + .then(resolve) + }) + }, + 'undici - dispatch' () { + return makeParallelRequests(resolve => { + dispatcher.dispatch(undiciOptions, new SimpleRequest(resolve)) + }) + } +} + +if (process.env.PORT) { + // fetch does not support the socket + experiments['undici - fetch'] = () => { + return makeParallelRequests(resolve => { + fetch(dest.url, {}).then(res => { + res.body.pipeTo(new WritableStream({ write () { }, close () { resolve() } })) + }).catch(console.log) + }) + } +} + +async function main () { + const { cronometro } = await import('cronometro') + + cronometro( + experiments, + { + iterations, + errorThreshold, + print: false + }, + (err, results) => { + if (err) { + throw err + } + + console.log(printResults(results)) + dispatcher.destroy() + } + ) +} + +if (isMainThread) { + main() +} else { + module.exports = main +} diff --git a/benchmarks/benchmark-https.js b/benchmarks/benchmark-https.js new file mode 100644 index 00000000000..a364f0a0c43 --- /dev/null +++ b/benchmarks/benchmark-https.js @@ -0,0 +1,319 @@ +'use strict' + +const https = require('https') +const os = require('os') +const path = require('path') +const { readFileSync } = require('fs') +const { table } = require('table') +const { Writable } = require('stream') +const { WritableStream } = require('stream/web') +const { isMainThread } = require('worker_threads') + +const { Pool, Client, fetch, Agent, setGlobalDispatcher } = require('..') + +const ca = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'ca.pem'), 'utf8') +const servername = 'agent1' + +const iterations = (parseInt(process.env.SAMPLES, 10) || 10) + 1 +const errorThreshold = parseInt(process.env.ERROR_TRESHOLD, 10) || 3 +const connections = parseInt(process.env.CONNECTIONS, 10) || 50 +const pipelining = parseInt(process.env.PIPELINING, 10) || 10 +const parallelRequests = parseInt(process.env.PARALLEL, 10) || 100 +const headersTimeout = parseInt(process.env.HEADERS_TIMEOUT, 10) || 0 +const bodyTimeout = parseInt(process.env.BODY_TIMEOUT, 10) || 0 +const dest = {} + +if (process.env.PORT) { + dest.port = process.env.PORT + dest.url = `https://localhost:${process.env.PORT}` +} else { + dest.url = 'https://localhost' + dest.socketPath = path.join(os.tmpdir(), 'undici.sock') +} + +const httpsBaseOptions = { + ca, + servername, + protocol: 'https:', + hostname: 'localhost', + method: 'GET', + path: '/', + query: { + frappucino: 'muffin', + goat: 'scone', + pond: 'moose', + foo: ['bar', 'baz', 'bal'], + bool: true, + numberKey: 256 + }, + ...dest +} + +const httpsNoKeepAliveOptions = { + ...httpsBaseOptions, + agent: new https.Agent({ + keepAlive: false, + maxSockets: connections, + // rejectUnauthorized: false, + ca, + servername + }) +} + +const httpsKeepAliveOptions = { + ...httpsBaseOptions, + agent: new https.Agent({ + keepAlive: true, + maxSockets: connections, + // rejectUnauthorized: false, + ca, + servername + }) +} + +const undiciOptions = { + path: '/', + method: 'GET', + headersTimeout, + bodyTimeout +} + +const Class = connections > 1 ? Pool : Client +const dispatcher = new Class(httpsBaseOptions.url, { + pipelining, + connections, + connect: { + // rejectUnauthorized: false, + ca, + servername + }, + ...dest +}) + +setGlobalDispatcher(new Agent({ + pipelining, + connections, + connect: { + // rejectUnauthorized: false, + ca, + servername + } +})) + +class SimpleRequest { + constructor (resolve) { + this.dst = new Writable({ + write (chunk, encoding, callback) { + callback() + } + }).on('finish', resolve) + } + + onConnect (abort) { } + + onHeaders (statusCode, headers, resume) { + this.dst.on('drain', resume) + } + + onData (chunk) { + return this.dst.write(chunk) + } + + onComplete () { + this.dst.end() + } + + onError (err) { + throw err + } +} + +function makeParallelRequests (cb) { + return Promise.all(Array.from(Array(parallelRequests)).map(() => new Promise(cb))) +} + +function printResults (results) { + // Sort results by least performant first, then compare relative performances and also printing padding + let last + + const rows = Object.entries(results) + // If any failed, put on the top of the list, otherwise order by mean, ascending + .sort((a, b) => (!a[1].success ? -1 : b[1].mean - a[1].mean)) + .map(([name, result]) => { + if (!result.success) { + return [name, result.size, 'Errored', 'N/A', 'N/A'] + } + + // Calculate throughput and relative performance + const { size, mean, standardError } = result + const relative = last !== 0 ? (last / mean - 1) * 100 : 0 + + // Save the slowest for relative comparison + if (typeof last === 'undefined') { + last = mean + } + + return [ + name, + size, + `${((connections * 1e9) / mean).toFixed(2)} req/sec`, + `± ${((standardError / mean) * 100).toFixed(2)} %`, + relative > 0 ? `+ ${relative.toFixed(2)} %` : '-' + ] + }) + + console.log(results) + + // Add the header row + rows.unshift(['Tests', 'Samples', 'Result', 'Tolerance', 'Difference with slowest']) + + return table(rows, { + columns: { + 0: { + alignment: 'left' + }, + 1: { + alignment: 'right' + }, + 2: { + alignment: 'right' + }, + 3: { + alignment: 'right' + }, + 4: { + alignment: 'right' + } + }, + drawHorizontalLine: (index, size) => index > 0 && index < size, + border: { + bodyLeft: '│', + bodyRight: '│', + bodyJoin: '│', + joinLeft: '|', + joinRight: '|', + joinJoin: '|' + } + }) +} + +const experiments = { + 'https - no keepalive' () { + return makeParallelRequests(resolve => { + https.get(httpsNoKeepAliveOptions, res => { + res + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }) + }, + 'https - keepalive' () { + return makeParallelRequests(resolve => { + https.get(httpsKeepAliveOptions, res => { + res + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }) + }, + 'undici - pipeline' () { + return makeParallelRequests(resolve => { + dispatcher + .pipeline(undiciOptions, data => { + return data.body + }) + .end() + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }, + 'undici - request' () { + return makeParallelRequests(resolve => { + dispatcher.request(undiciOptions).then(({ body }) => { + body + .pipe( + new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + ) + .on('finish', resolve) + }) + }) + }, + 'undici - stream' () { + return makeParallelRequests(resolve => { + return dispatcher + .stream(undiciOptions, () => { + return new Writable({ + write (chunk, encoding, callback) { + callback() + } + }) + }) + .then(resolve) + }) + }, + 'undici - dispatch' () { + return makeParallelRequests(resolve => { + dispatcher.dispatch(undiciOptions, new SimpleRequest(resolve)) + }) + } +} + +if (process.env.PORT) { + // fetch does not support the socket + experiments['undici - fetch'] = () => { + return makeParallelRequests(resolve => { + fetch(dest.url, {}).then(res => { + res.body.pipeTo(new WritableStream({ write () { }, close () { resolve() } })) + }).catch(console.log) + }) + } +} + +async function main () { + const { cronometro } = await import('cronometro') + + cronometro( + experiments, + { + iterations, + errorThreshold, + print: false + }, + (err, results) => { + if (err) { + throw err + } + + console.log(printResults(results)) + dispatcher.destroy() + } + ) +} + +if (isMainThread) { + main() +} else { + module.exports = main +} diff --git a/benchmarks/benchmark.js b/benchmarks/benchmark.js index 4cf129f7b98..5bf3d2ede4f 100644 --- a/benchmarks/benchmark.js +++ b/benchmarks/benchmark.js @@ -73,7 +73,13 @@ const dispatcher = new Class(httpBaseOptions.url, { ...dest }) -setGlobalDispatcher(new Agent({ pipelining, connections })) +setGlobalDispatcher(new Agent({ + pipelining, + connections, + connect: { + rejectUnauthorized: false + } +})) class SimpleRequest { constructor (resolve) { @@ -84,7 +90,7 @@ class SimpleRequest { }).on('finish', resolve) } - onConnect (abort) {} + onConnect (abort) { } onHeaders (statusCode, headers, resume) { this.dst.on('drain', resume) @@ -260,7 +266,7 @@ if (process.env.PORT) { experiments['undici - fetch'] = () => { return makeParallelRequests(resolve => { fetch(dest.url).then(res => { - res.body.pipeTo(new WritableStream({ write () {}, close () { resolve() } })) + res.body.pipeTo(new WritableStream({ write () { }, close () { resolve() } })) }).catch(console.log) }) } diff --git a/benchmarks/server-http2.js b/benchmarks/server-http2.js new file mode 100644 index 00000000000..0be99cd2fd7 --- /dev/null +++ b/benchmarks/server-http2.js @@ -0,0 +1,49 @@ +'use strict' + +const { unlinkSync, readFileSync } = require('fs') +const { createSecureServer } = require('http2') +const os = require('os') +const path = require('path') +const cluster = require('cluster') + +const key = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'key.pem'), 'utf8') +const cert = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'cert.pem'), 'utf8') + +const socketPath = path.join(os.tmpdir(), 'undici.sock') + +const port = process.env.PORT || socketPath +const timeout = parseInt(process.env.TIMEOUT, 10) || 1 +const workers = parseInt(process.env.WORKERS) || os.cpus().length + +const sessionTimeout = 600e3 // 10 minutes + +if (cluster.isPrimary) { + try { + unlinkSync(socketPath) + } catch (_) { + // Do nothing if the socket does not exist + } + + for (let i = 0; i < workers; i++) { + cluster.fork() + } +} else { + const buf = Buffer.alloc(64 * 1024, '_') + const server = createSecureServer( + { + key, + cert, + allowHTTP1: true, + sessionTimeout + }, + (req, res) => { + setTimeout(() => { + res.end(buf) + }, timeout) + } + ) + + server.keepAliveTimeout = 600e3 + + server.listen(port) +} diff --git a/benchmarks/server-https.js b/benchmarks/server-https.js new file mode 100644 index 00000000000..f0275d9cbcc --- /dev/null +++ b/benchmarks/server-https.js @@ -0,0 +1,41 @@ +'use strict' + +const { unlinkSync, readFileSync } = require('fs') +const { createServer } = require('https') +const os = require('os') +const path = require('path') +const cluster = require('cluster') + +const key = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'key.pem'), 'utf8') +const cert = readFileSync(path.join(__dirname, '..', 'test', 'fixtures', 'cert.pem'), 'utf8') + +const socketPath = path.join(os.tmpdir(), 'undici.sock') + +const port = process.env.PORT || socketPath +const timeout = parseInt(process.env.TIMEOUT, 10) || 1 +const workers = parseInt(process.env.WORKERS) || os.cpus().length + +if (cluster.isPrimary) { + try { + unlinkSync(socketPath) + } catch (_) { + // Do nothing if the socket does not exist + } + + for (let i = 0; i < workers; i++) { + cluster.fork() + } +} else { + const buf = Buffer.alloc(64 * 1024, '_') + const server = createServer({ + key, + cert, + keepAliveTimeout: 600e3 + }, (req, res) => { + setTimeout(() => { + res.end(buf) + }, timeout) + }) + + server.listen(port) +} diff --git a/docs/api/Client.md b/docs/api/Client.md index fc7c5d26e8f..591fb97c44b 100644 --- a/docs/api/Client.md +++ b/docs/api/Client.md @@ -17,6 +17,8 @@ Returns: `Client` ### Parameter: `ClientOptions` +> ⚠️ Warning: The `H2` support is experimental. + * **bodyTimeout** `number | null` (optional) - Default: `300e3` - The timeout after which a request will time out, in milliseconds. Monitors time between receiving body data. Use `0` to disable it entirely. Defaults to 300 seconds. * **headersTimeout** `number | null` (optional) - Default: `300e3` - The amount of time the parser will wait to receive the complete HTTP headers while not sending the request. Defaults to 300 seconds. * **keepAliveMaxTimeout** `number | null` (optional) - Default: `600e3` - The maximum allowed `keepAliveTimeout` when overridden by *keep-alive* hints from the server. Defaults to 10 minutes. @@ -30,6 +32,8 @@ Returns: `Client` * **interceptors** `{ Client: DispatchInterceptor[] }` - Default: `[RedirectInterceptor]` - A list of interceptors that are applied to the dispatch method. Additional logic can be applied (such as, but not limited to: 302 status code handling, authentication, cookies, compression and caching). Note that the behavior of interceptors is Experimental and might change at any given time. * **autoSelectFamily**: `boolean` (optional) - Default: depends on local Node version, on Node 18.13.0 and above is `false`. Enables a family autodetection algorithm that loosely implements section 5 of [RFC 8305](https://tools.ietf.org/html/rfc8305#section-5). See [here](https://nodejs.org/api/net.html#socketconnectoptions-connectlistener) for more details. This option is ignored if not supported by the current Node version. * **autoSelectFamilyAttemptTimeout**: `number` - Default: depends on local Node version, on Node 18.13.0 and above is `250`. The amount of time in milliseconds to wait for a connection attempt to finish before trying the next address when using the `autoSelectFamily` option. See [here](https://nodejs.org/api/net.html#socketconnectoptions-connectlistener) for more details. +* **allowH2**: `boolean` - Default: `false`. Enables support for H2 if the server has assigned bigger priority to it through ALPN negotiation. +* **maxConcurrentStreams**: `number` - Default: `100`. Dictates the maximum number of concurrent streams for a single H2 session. It can be overriden by a SETTINGS remote frame. #### Parameter: `ConnectOptions` diff --git a/docs/api/Dispatcher.md b/docs/api/Dispatcher.md index a50642948aa..a04be8cff1e 100644 --- a/docs/api/Dispatcher.md +++ b/docs/api/Dispatcher.md @@ -202,6 +202,7 @@ Returns: `Boolean` - `false` if dispatcher is busy and further dispatch calls wo * **bodyTimeout** `number | null` (optional) - The timeout after which a request will time out, in milliseconds. Monitors time between receiving body data. Use `0` to disable it entirely. Defaults to 300 seconds. * **headersTimeout** `number | null` (optional) - The amount of time the parser will wait to receive the complete HTTP headers while not sending the request. Defaults to 300 seconds. * **throwOnError** `boolean` (optional) - Default: `false` - Whether Undici should throw an error upon receiving a 4xx or 5xx response from the server. +* **expectContinue** `boolean` (optional) - Default: `false` - For H2, it appends the expect: 100-continue header, and halts the request body until a 100-continue is received from the remote server #### Parameter: `DispatchHandler` diff --git a/lib/api/api-connect.js b/lib/api/api-connect.js index 0503b1a2f0e..fd2b6ad97a5 100644 --- a/lib/api/api-connect.js +++ b/lib/api/api-connect.js @@ -1,7 +1,7 @@ 'use strict' -const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors') const { AsyncResource } = require('async_hooks') +const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors') const util = require('../core/util') const { addSignal, removeSignal } = require('./abort-signal') @@ -50,7 +50,13 @@ class ConnectHandler extends AsyncResource { removeSignal(this) this.callback = null - const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) + + let headers = rawHeaders + // Indicates is an HTTP2Session + if (headers != null) { + headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) + } + this.runInAsyncScope(callback, null, null, { statusCode, headers, diff --git a/lib/api/api-request.js b/lib/api/api-request.js index 71d7e926b4c..f130ecc9867 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -95,7 +95,6 @@ class RequestHandler extends AsyncResource { this.callback = null this.res = body - if (callback !== null) { if (this.throwOnError && statusCode >= 400) { this.runInAsyncScope(getResolveErrorBodyCallback, null, diff --git a/lib/client.js b/lib/client.js index 7d9ec8d7c27..ebc9a6afc11 100644 --- a/lib/client.js +++ b/lib/client.js @@ -6,6 +6,8 @@ const assert = require('assert') const net = require('net') +const http2 = require('http2') +const { pipeline } = require('stream') const util = require('./core/util') const timers = require('./timers') const Request = require('./core/request') @@ -67,8 +69,30 @@ const { kDispatch, kInterceptors, kLocalAddress, - kMaxResponseSize + kMaxResponseSize, + kHTTPConnVersion, + // HTTP2 + kHost, + kHTTP2Session, + kHTTP2SessionState, + kHTTP2BuildRequest, + kHTTP2CopyHeaders, + kHTTP1BuildRequest } = require('./core/symbols') +const { + constants: { + HTTP2_HEADER_AUTHORITY, + HTTP2_HEADER_METHOD, + HTTP2_HEADER_PATH, + HTTP2_HEADER_CONTENT_LENGTH, + HTTP2_HEADER_EXPECT, + HTTP2_HEADER_STATUS + } +} = http2 + +// Experimental +let h2ExperimentalWarned = false + const FastBuffer = Buffer[Symbol.species] const kClosedResolve = Symbol('kClosedResolve') @@ -122,7 +146,10 @@ class Client extends DispatcherBase { localAddress, maxResponseSize, autoSelectFamily, - autoSelectFamilyAttemptTimeout + autoSelectFamilyAttemptTimeout, + // h2 + allowH2, + maxConcurrentStreams } = {}) { super() @@ -205,10 +232,20 @@ class Client extends DispatcherBase { throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number') } + // h2 + if (allowH2 != null && typeof allowH2 !== 'boolean') { + throw new InvalidArgumentError('allowH2 must be a valid boolean value') + } + + if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) { + throw new InvalidArgumentError('maxConcurrentStreams must be a possitive integer, greater than 0') + } + if (typeof connect !== 'function') { connect = buildConnector({ ...tls, maxCachedSessions, + allowH2, socketPath, timeout: connectTimeout, ...(util.nodeHasAutoSelectFamily && autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined), @@ -240,6 +277,18 @@ class Client extends DispatcherBase { this[kMaxRequests] = maxRequestsPerClient this[kClosedResolve] = null this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1 + this[kHTTPConnVersion] = 'h1' + + // HTTP/2 + this[kHTTP2Session] = null + this[kHTTP2SessionState] = !allowH2 + ? null + : { + // streams: null, // Fixed queue of streams - For future support of `push` + openStreams: 0, // Keep track of them to decide wether or not unref the session + maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server + } + this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}` // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. @@ -298,7 +347,9 @@ class Client extends DispatcherBase { [kDispatch] (opts, handler) { const origin = opts.origin || this[kUrl].origin - const request = new Request(origin, opts, handler) + const request = this[kHTTPConnVersion] === 'h2' + ? Request[kHTTP2BuildRequest](origin, opts, handler) + : Request[kHTTP1BuildRequest](origin, opts, handler) this[kQueue].push(request) if (this[kResuming]) { @@ -319,6 +370,8 @@ class Client extends DispatcherBase { } async [kClose] () { + // TODO: for H2 we need to gracefully flush the remaining enqueued + // request and close each stream. return new Promise((resolve) => { if (!this[kSize]) { resolve(null) @@ -345,6 +398,12 @@ class Client extends DispatcherBase { resolve() } + if (this[kHTTP2Session] != null) { + util.destroy(this[kHTTP2Session], err) + this[kHTTP2Session] = null + this[kHTTP2SessionState] = null + } + if (!this[kSocket]) { queueMicrotask(callback) } else { @@ -356,6 +415,64 @@ class Client extends DispatcherBase { } } +function onHttp2SessionError (err) { + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + this[kSocket][kError] = err + + onError(this[kClient], err) +} + +function onHttp2FrameError (type, code, id) { + const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) + + if (id === 0) { + this[kSocket][kError] = err + onError(this[kClient], err) + } +} + +function onHttp2SessionEnd () { + util.destroy(this, new SocketError('other side closed')) + util.destroy(this[kSocket], new SocketError('other side closed')) +} + +function onHTTP2GoAway (code) { + const client = this[kClient] + const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`) + client[kSocket] = null + client[kHTTP2Session] = null + + if (client.destroyed) { + assert(this[kPending] === 0) + + // Fail entire queue. + const requests = client[kQueue].splice(client[kRunningIdx]) + for (let i = 0; i < requests.length; i++) { + const request = requests[i] + errorRequest(this, request, err) + } + } else if (client[kRunning] > 0) { + // Fail head of pipeline. + const request = client[kQueue][client[kRunningIdx]] + client[kQueue][client[kRunningIdx]++] = null + + errorRequest(client, request, err) + } + + client[kPendingIdx] = client[kRunningIdx] + + assert(client[kRunning] === 0) + + client.emit('disconnect', + client[kUrl], + [client], + err + ) + + resume(client) +} + const constants = require('./llhttp/constants') const createRedirectInterceptor = require('./interceptor/redirectInterceptor') const EMPTY_BUF = Buffer.alloc(0) @@ -946,16 +1063,18 @@ function onSocketReadable () { } function onSocketError (err) { - const { [kParser]: parser } = this + const { [kClient]: client, [kParser]: parser } = this assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') - // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded - // to the user. - if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so for as a valid response. - parser.onMessageComplete() - return + if (client[kHTTPConnVersion] !== 'h2') { + // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded + // to the user. + if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so for as a valid response. + parser.onMessageComplete() + return + } } this[kError] = err @@ -984,27 +1103,31 @@ function onError (client, err) { } function onSocketEnd () { - const { [kParser]: parser } = this + const { [kParser]: parser, [kClient]: client } = this - if (parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - parser.onMessageComplete() - return + if (client[kHTTPConnVersion] !== 'h2') { + if (parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + parser.onMessageComplete() + return + } } util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) } function onSocketClose () { - const { [kClient]: client } = this + const { [kClient]: client, [kParser]: parser } = this - if (!this[kError] && this[kParser].statusCode && !this[kParser].shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - this[kParser].onMessageComplete() - } + if (client[kHTTPConnVersion] === 'h1' && parser) { + if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + parser.onMessageComplete() + } - this[kParser].destroy() - this[kParser] = null + this[kParser].destroy() + this[kParser] = null + } const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) @@ -1092,24 +1215,54 @@ async function connect (client) { return } - if (!llhttpInstance) { - llhttpInstance = await llhttpPromise - llhttpPromise = null - } - client[kConnecting] = false assert(socket) - socket[kNoRef] = false - socket[kWriting] = false - socket[kReset] = false - socket[kBlocking] = false - socket[kError] = null - socket[kParser] = new Parser(client, socket, llhttpInstance) - socket[kClient] = client + const isH2 = socket.alpnProtocol === 'h2' + if (isH2) { + if (!h2ExperimentalWarned) { + h2ExperimentalWarned = true + process.emitWarning('H2 support is experimental, expect them to change at any time.', { + code: 'UNDICI-H2' + }) + } + + const session = http2.connect(client[kUrl], { + createConnection: () => socket, + peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams + }) + + client[kHTTPConnVersion] = 'h2' + session[kClient] = client + session[kSocket] = socket + session.on('error', onHttp2SessionError) + session.on('frameError', onHttp2FrameError) + session.on('end', onHttp2SessionEnd) + session.on('goaway', onHTTP2GoAway) + session.on('close', onSocketClose) + session.unref() + + client[kHTTP2Session] = session + socket[kHTTP2Session] = session + } else { + if (!llhttpInstance) { + llhttpInstance = await llhttpPromise + llhttpPromise = null + } + + socket[kNoRef] = false + socket[kWriting] = false + socket[kReset] = false + socket[kBlocking] = false + socket[kParser] = new Parser(client, socket, llhttpInstance) + } + socket[kCounter] = 0 socket[kMaxRequests] = client[kMaxRequests] + socket[kClient] = client + socket[kError] = null + socket .on('error', onSocketError) .on('readable', onSocketReadable) @@ -1208,7 +1361,7 @@ function _resume (client, sync) { const socket = client[kSocket] - if (socket && !socket.destroyed) { + if (socket && !socket.destroyed && socket.alpnProtocol !== 'h2') { if (client[kSize] === 0) { if (!socket[kNoRef] && socket.unref) { socket.unref() @@ -1273,7 +1426,7 @@ function _resume (client, sync) { return } - if (!socket) { + if (!socket && !client[kHTTP2Session]) { connect(client) return } @@ -1334,6 +1487,11 @@ function _resume (client, sync) { } function write (client, request) { + if (client[kHTTPConnVersion] === 'h2') { + writeH2(client, client[kHTTP2Session], request) + return + } + const { body, method, path, host, upgrade, headers, blocking, reset } = request // https://tools.ietf.org/html/rfc7231#section-4.3.1 @@ -1489,9 +1647,286 @@ function write (client, request) { return true } -function writeStream ({ body, client, request, socket, contentLength, header, expectsPayload }) { +function writeH2 (client, session, request) { + const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request + + let headers + if (typeof reqHeaders === 'string') headers = Request[kHTTP2CopyHeaders](reqHeaders.trim()) + else headers = reqHeaders + + if (upgrade) { + errorRequest(client, request, new Error('Upgrade not supported for H2')) + return false + } + + try { + // TODO(HTTP/2): Should we call onConnect immediately or on stream ready event? + request.onConnect((err) => { + if (request.aborted || request.completed) { + return + } + + errorRequest(client, request, err || new RequestAbortedError()) + }) + } catch (err) { + errorRequest(client, request, err) + } + + if (request.aborted) { + return false + } + + let stream + const h2State = client[kHTTP2SessionState] + + headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost] + headers[HTTP2_HEADER_PATH] = path + + if (method === 'CONNECT') { + session.ref() + // we are already connected, streams are pending, first request + // will create a new stream. We trigger a request to create the stream and wait until + // `ready` event is triggered + stream = session.request(headers, { endStream: false, signal }) + + if (stream.id && !stream.pending) { + request.onUpgrade(null, null, stream) + ++h2State.openStreams + } else { + stream.once('ready', () => { + request.onUpgrade(null, null, stream) + ++h2State.openStreams + }) + } + + stream.once('close', () => { + h2State.openStreams -= 1 + // TODO(HTTP/2): unref only if current streams count is 0 + if (h2State.openStreams === 0) session.unref() + }) + + return true + } else { + headers[HTTP2_HEADER_METHOD] = method + } + + // https://tools.ietf.org/html/rfc7231#section-4.3.1 + // https://tools.ietf.org/html/rfc7231#section-4.3.2 + // https://tools.ietf.org/html/rfc7231#section-4.3.5 + + // Sending a payload body on a request that does not + // expect it can cause undefined behavior on some + // servers and corrupt connection state. Do not + // re-use the connection for further requests. + + const expectsPayload = ( + method === 'PUT' || + method === 'POST' || + method === 'PATCH' + ) + + if (body && typeof body.read === 'function') { + // Try to read EOF in order to get length. + body.read(0) + } + + let contentLength = util.bodyLength(body) + + if (contentLength == null) { + contentLength = request.contentLength + } + + if (contentLength === 0 || !expectsPayload) { + // https://tools.ietf.org/html/rfc7230#section-3.3.2 + // A user agent SHOULD NOT send a Content-Length header field when + // the request message does not contain a payload body and the method + // semantics do not anticipate such a body. + + contentLength = null + } + + if (request.contentLength != null && request.contentLength !== contentLength) { + if (client[kStrictContentLength]) { + errorRequest(client, request, new RequestContentLengthMismatchError()) + return false + } + + process.emitWarning(new RequestContentLengthMismatchError()) + } + + if (contentLength != null) { + assert(body, 'no body must not have content length') + headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}` + } + + session.ref() + + if (expectContinue) { + headers[HTTP2_HEADER_EXPECT] = '100-continue' + /** + * @type {import('node:http2').ClientHttp2Stream} + */ + stream = session.request(headers, { endStream: false, signal }) + + stream.once('continue', writeBodyH2) + } else { + /** @type {import('node:http2').ClientHttp2Stream} */ + stream = session.request(headers, { endStream: false, signal }) + writeBodyH2() + } + + // Increment counter as we have new several streams open + ++h2State.openStreams + + stream.once('response', headers => { + if (request.onHeaders(Number(headers[HTTP2_HEADER_STATUS]), headers, stream.resume.bind(stream), '') === false) { + stream.pause() + } + }) + + stream.once('end', () => { + request.onComplete([]) + }) + + stream.on('data', (chunk) => { + if (request.onData(chunk) === false) stream.pause() + }) + + stream.once('close', () => { + h2State.openStreams -= 1 + // TODO(HTTP/2): unref only if current streams count is 0 + if (h2State.openStreams === 0) session.unref() + }) + + stream.once('error', function (err) { + if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { + h2State.streams -= 1 + util.destroy(stream, err) + } + }) + + stream.once('frameError', (type, code) => { + const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) + errorRequest(client, request, err) + + if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { + h2State.streams -= 1 + util.destroy(stream, err) + } + }) + + // stream.on('aborted', () => { + // // TODO(HTTP/2): Support aborted + // }) + + // stream.on('timeout', () => { + // // TODO(HTTP/2): Support timeout + // }) + + // stream.on('push', headers => { + // // TODO(HTTP/2): Suppor push + // }) + + // stream.on('trailers', headers => { + // // TODO(HTTP/2): Support trailers + // }) + + return true + + function writeBodyH2 () { + /* istanbul ignore else: assertion */ + if (!body) { + request.onRequestSent() + } else if (util.isBuffer(body)) { + assert(contentLength === body.byteLength, 'buffer body must have content length') + stream.cork() + stream.write(body) + stream.uncork() + request.onBodySent(body) + request.onRequestSent() + } else if (util.isBlobLike(body)) { + if (typeof body.stream === 'function') { + writeIterable({ + client, + request, + contentLength, + h2stream: stream, + expectsPayload, + body: body.stream(), + socket: client[kSocket], + header: '' + }) + } else { + writeBlob({ + body, + client, + request, + contentLength, + expectsPayload, + h2stream: stream, + header: '', + socket: client[kSocket] + }) + } + } else if (util.isStream(body)) { + writeStream({ + body, + client, + request, + contentLength, + expectsPayload, + socket: client[kSocket], + h2stream: stream, + header: '' + }) + } else if (util.isIterable(body)) { + writeIterable({ + body, + client, + request, + contentLength, + expectsPayload, + header: '', + h2stream: stream, + socket: client[kSocket] + }) + } else { + assert(false) + } + } +} + +function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined') + if (client[kHTTPConnVersion] === 'h2') { + // For HTTP/2, is enough to pipe the stream + const pipe = pipeline( + body, + h2stream, + (err) => { + if (err) { + util.destroy(body, err) + util.destroy(h2stream, err) + } else { + request.onRequestSent() + } + } + ) + + pipe.on('data', onPipeData) + pipe.once('end', () => { + pipe.removeListener('data', onPipeData) + util.destroy(pipe) + }) + + function onPipeData (chunk) { + request.onBodySent(chunk) + } + + return + } + let finished = false const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header }) @@ -1572,9 +2007,10 @@ function writeStream ({ body, client, request, socket, contentLength, header, ex .on('error', onFinished) } -async function writeBlob ({ body, client, request, socket, contentLength, header, expectsPayload }) { +async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { assert(contentLength === body.size, 'blob body must have content length') + const isH2 = client[kHTTPConnVersion] === 'h2' try { if (contentLength != null && contentLength !== body.size) { throw new RequestContentLengthMismatchError() @@ -1582,10 +2018,16 @@ async function writeBlob ({ body, client, request, socket, contentLength, header const buffer = Buffer.from(await body.arrayBuffer()) - socket.cork() - socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1') - socket.write(buffer) - socket.uncork() + if (isH2) { + h2stream.cork() + h2stream.write(buffer) + h2stream.uncork() + } else { + socket.cork() + socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1') + socket.write(buffer) + socket.uncork() + } request.onBodySent(buffer) request.onRequestSent() @@ -1596,11 +2038,11 @@ async function writeBlob ({ body, client, request, socket, contentLength, header resume(client) } catch (err) { - util.destroy(socket, err) + util.destroy(isH2 ? h2stream : socket, err) } } -async function writeIterable ({ body, client, request, socket, contentLength, header, expectsPayload }) { +async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined') let callback = null @@ -1622,6 +2064,33 @@ async function writeIterable ({ body, client, request, socket, contentLength, he } }) + if (client[kHTTPConnVersion] === 'h2') { + h2stream + .on('close', onDrain) + .on('drain', onDrain) + + try { + // It's up to the user to somehow abort the async iterable. + for await (const chunk of body) { + if (socket[kError]) { + throw socket[kError] + } + + if (!h2stream.write(chunk)) { + await waitForDrain() + } + } + } catch (err) { + h2stream.destroy(err) + } finally { + h2stream + .off('close', onDrain) + .off('drain', onDrain) + } + + return + } + socket .on('close', onDrain) .on('drain', onDrain) diff --git a/lib/core/connect.js b/lib/core/connect.js index f3b5cc33edd..bb71085a156 100644 --- a/lib/core/connect.js +++ b/lib/core/connect.js @@ -71,7 +71,7 @@ if (global.FinalizationRegistry) { } } -function buildConnector ({ maxCachedSessions, socketPath, timeout, ...opts }) { +function buildConnector ({ allowH2, maxCachedSessions, socketPath, timeout, ...opts }) { if (maxCachedSessions != null && (!Number.isInteger(maxCachedSessions) || maxCachedSessions < 0)) { throw new InvalidArgumentError('maxCachedSessions must be a positive integer or zero') } @@ -79,7 +79,7 @@ function buildConnector ({ maxCachedSessions, socketPath, timeout, ...opts }) { const options = { path: socketPath, ...opts } const sessionCache = new SessionCache(maxCachedSessions == null ? 100 : maxCachedSessions) timeout = timeout == null ? 10e3 : timeout - + allowH2 = allowH2 != null ? allowH2 : false return function connect ({ hostname, host, protocol, port, servername, localAddress, httpSocket }, callback) { let socket if (protocol === 'https:') { @@ -99,6 +99,8 @@ function buildConnector ({ maxCachedSessions, socketPath, timeout, ...opts }) { servername, session, localAddress, + // TODO(HTTP/2): Add support for h2c + ALPNProtocols: allowH2 ? ['http/1.1', 'h2'] : ['http/1.1'], socket: httpSocket, // upgrade socket connection port: port || 443, host: hostname diff --git a/lib/core/request.js b/lib/core/request.js index 6c9a24d5d59..3ddc7fdbe32 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -5,6 +5,7 @@ const { NotSupportedError } = require('./errors') const assert = require('assert') +const { kHTTP2BuildRequest, kHTTP2CopyHeaders, kHTTP1BuildRequest } = require('./symbols') const util = require('./util') // tokenRegExp and headerCharRegex have been lifted from @@ -62,7 +63,8 @@ class Request { headersTimeout, bodyTimeout, reset, - throwOnError + throwOnError, + expectContinue }, handler) { if (typeof path !== 'string') { throw new InvalidArgumentError('path must be a string') @@ -98,6 +100,10 @@ class Request { throw new InvalidArgumentError('invalid reset') } + if (expectContinue != null && typeof expectContinue !== 'boolean') { + throw new InvalidArgumentError('invalid expectContinue') + } + this.headersTimeout = headersTimeout this.bodyTimeout = bodyTimeout @@ -150,6 +156,9 @@ class Request { this.headers = '' + // Only for H2 + this.expectContinue = expectContinue != null ? expectContinue : false + if (Array.isArray(headers)) { if (headers.length % 2 !== 0) { throw new InvalidArgumentError('headers array must be even') @@ -269,13 +278,62 @@ class Request { return this[kHandler].onError(error) } + // TODO: adjust to support H2 addHeader (key, value) { processHeader(this, key, value) return this } + + static [kHTTP1BuildRequest] (origin, opts, handler) { + // TODO: Migrate header parsing here, to make Requests + // HTTP agnostic + return new Request(origin, opts, handler) + } + + static [kHTTP2BuildRequest] (origin, opts, handler) { + const headers = opts.headers + opts = { ...opts, headers: null } + + const request = new Request(origin, opts, handler) + + request.headers = {} + + if (Array.isArray(headers)) { + if (headers.length % 2 !== 0) { + throw new InvalidArgumentError('headers array must be even') + } + for (let i = 0; i < headers.length; i += 2) { + processHeader(request, headers[i], headers[i + 1], true) + } + } else if (headers && typeof headers === 'object') { + const keys = Object.keys(headers) + for (let i = 0; i < keys.length; i++) { + const key = keys[i] + processHeader(request, key, headers[key], true) + } + } else if (headers != null) { + throw new InvalidArgumentError('headers must be an object or an array') + } + + return request + } + + static [kHTTP2CopyHeaders] (raw) { + const rawHeaders = raw.split('\r\n') + + const headers = {} + for (const header of rawHeaders) { + const [key, value] = header.split(': ') + + if (headers[key]) headers[key] += `,${value}` + else headers[key] = value + } + + return headers + } } -function processHeaderValue (key, val) { +function processHeaderValue (key, val, skipAppend) { if (val && typeof val === 'object') { throw new InvalidArgumentError(`invalid ${key} header`) } @@ -286,10 +344,10 @@ function processHeaderValue (key, val) { throw new InvalidArgumentError(`invalid ${key} header`) } - return `${key}: ${val}\r\n` + return skipAppend ? val : `${key}: ${val}\r\n` } -function processHeader (request, key, val) { +function processHeader (request, key, val, skipAppend = false) { if (val && (typeof val === 'object' && !Array.isArray(val))) { throw new InvalidArgumentError(`invalid ${key} header`) } else if (val === undefined) { @@ -357,10 +415,16 @@ function processHeader (request, key, val) { } else { if (Array.isArray(val)) { for (let i = 0; i < val.length; i++) { - request.headers += processHeaderValue(key, val[i]) + if (skipAppend) { + if (request.headers[key]) request.headers[key] += `,${processHeaderValue(key, val[i], skipAppend)}` + else request.headers[key] = processHeaderValue(key, val[i], skipAppend) + } else { + request.headers += processHeaderValue(key, val[i]) + } } } else { - request.headers += processHeaderValue(key, val) + if (skipAppend) request.headers[key] = processHeaderValue(key, val, skipAppend) + else request.headers += processHeaderValue(key, val) } } } diff --git a/lib/core/symbols.js b/lib/core/symbols.js index c852107a72a..c2492f4355f 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -51,5 +51,11 @@ module.exports = { kProxy: Symbol('proxy agent options'), kCounter: Symbol('socket request counter'), kInterceptors: Symbol('dispatch interceptors'), - kMaxResponseSize: Symbol('max response size') + kMaxResponseSize: Symbol('max response size'), + kHTTP2Session: Symbol('http2Session'), + kHTTP2SessionState: Symbol('http2Session state'), + kHTTP2BuildRequest: Symbol('http2 build request'), + kHTTP1BuildRequest: Symbol('http1 build request'), + kHTTP2CopyHeaders: Symbol('http2 copy headers'), + kHTTPConnVersion: Symbol('http connection version') } diff --git a/lib/core/util.js b/lib/core/util.js index 88e34a90123..0e6197a4b29 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -199,6 +199,7 @@ function destroy (stream, err) { // See: https://github.com/nodejs/node/pull/38505/files stream.socket = null } + stream.destroy(err) } else if (err) { process.nextTick((stream, err) => { @@ -218,6 +219,9 @@ function parseKeepAliveTimeout (val) { } function parseHeaders (headers, obj = {}) { + // For H2 support + if (!Array.isArray(headers)) return headers + for (let i = 0; i < headers.length; i += 2) { const key = headers[i].toString().toLowerCase() let val = obj[key] diff --git a/lib/fetch/index.js b/lib/fetch/index.js index 8faae32a70f..50f1b9f3fcd 100644 --- a/lib/fetch/index.js +++ b/lib/fetch/index.js @@ -1979,19 +1979,37 @@ async function httpNetworkFetch ( let location = '' const headers = new Headers() - for (let n = 0; n < headersList.length; n += 2) { - const key = headersList[n + 0].toString('latin1') - const val = headersList[n + 1].toString('latin1') - if (key.toLowerCase() === 'content-encoding') { - // https://www.rfc-editor.org/rfc/rfc7231#section-3.1.2.1 - // "All content-coding values are case-insensitive..." - codings = val.toLowerCase().split(',').map((x) => x.trim()).reverse() - } else if (key.toLowerCase() === 'location') { - location = val + // For H2, the headers are a plain JS object + // We distinguish between them and iterate accordingly + if (Array.isArray(headersList)) { + for (let n = 0; n < headersList.length; n += 2) { + const key = headersList[n + 0].toString('latin1') + const val = headersList[n + 1].toString('latin1') + if (key.toLowerCase() === 'content-encoding') { + // https://www.rfc-editor.org/rfc/rfc7231#section-3.1.2.1 + // "All content-coding values are case-insensitive..." + codings = val.toLowerCase().split(',').map((x) => x.trim()) + } else if (key.toLowerCase() === 'location') { + location = val + } + + headers.append(key, val) } + } else { + const keys = Object.keys(headersList) + for (const key of keys) { + const val = headersList[key] + if (key.toLowerCase() === 'content-encoding') { + // https://www.rfc-editor.org/rfc/rfc7231#section-3.1.2.1 + // "All content-coding values are case-insensitive..." + codings = val.toLowerCase().split(',').map((x) => x.trim()).reverse() + } else if (key.toLowerCase() === 'location') { + location = val + } - headers.append(key, val) + headers.append(key, val) + } } this.body = new Readable({ read: resume }) diff --git a/package.json b/package.json index 969aaf6680f..863e85604f4 100644 --- a/package.json +++ b/package.json @@ -133,4 +133,4 @@ "dependencies": { "busboy": "^1.6.0" } -} +} \ No newline at end of file diff --git a/test/fetch/encoding.js b/test/fetch/encoding.js index 5817924521a..75d8fc37d5b 100644 --- a/test/fetch/encoding.js +++ b/test/fetch/encoding.js @@ -17,10 +17,10 @@ test('content-encoding header is case-iNsENsITIve', async (t) => { res.setHeader('Content-Encoding', contentCodings) res.setHeader('Content-Type', 'text/plain') - gzip.pipe(brotli).pipe(res) + brotli.pipe(gzip).pipe(res) - gzip.write(text) - gzip.end() + brotli.write(text) + brotli.end() }).listen(0) t.teardown(server.close.bind(server)) @@ -43,10 +43,10 @@ test('response decompression according to content-encoding should be handled in res.setHeader('Content-Encoding', contentCodings) res.setHeader('Content-Type', 'text/plain') - deflate.pipe(gzip).pipe(res) + gzip.pipe(deflate).pipe(res) - deflate.write(text) - deflate.end() + gzip.write(text) + gzip.end() }).listen(0) t.teardown(server.close.bind(server)) diff --git a/test/fetch/http2.js b/test/fetch/http2.js new file mode 100644 index 00000000000..246c14e1eef --- /dev/null +++ b/test/fetch/http2.js @@ -0,0 +1,256 @@ +'use strict' + +const { createSecureServer } = require('node:http2') +const { createReadStream, readFileSync } = require('node:fs') +const { once } = require('node:events') +const { Blob } = require('node:buffer') + +const { test, plan } = require('tap') +const pem = require('https-pem') + +const { Client, fetch } = require('../..') + +plan(4) + +test('[Fetch] Should handle h2 request with body (string or buffer)', async t => { + const server = createSecureServer(pem) + const expectedBody = 'hello from client!' + const expectedRequestBody = 'hello h2!' + const requestBody = [] + + server.on('stream', async (stream, headers) => { + stream.on('data', chunk => requestBody.push(chunk)) + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }) + + stream.end(expectedRequestBody) + }) + + t.plan(2) + + server.listen() + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + const response = await fetch( + `https://localhost:${server.address().port}/`, + // Needs to be passed to disable the reject unauthorized + { + method: 'POST', + dispatcher: client, + headers: { + 'x-my-header': 'foo', + 'content-type': 'text-plain' + }, + body: expectedBody + } + ) + + const responseBody = await response.text() + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + t.equal(Buffer.concat(requestBody).toString('utf-8'), expectedBody) + t.equal(responseBody, expectedRequestBody) +}) + +test('[Fetch] Should handle h2 request with body (stream)', async t => { + const server = createSecureServer(pem) + const expectedBody = readFileSync(__filename, 'utf-8') + const stream = createReadStream(__filename) + const requestChunks = [] + + server.on('stream', async (stream, headers) => { + t.equal(headers[':method'], 'PUT') + t.equal(headers[':path'], '/') + t.equal(headers[':scheme'], 'https') + + stream.on('data', chunk => requestChunks.push(chunk)) + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }) + + stream.end('hello h2!') + }) + + t.plan(8) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await fetch( + `https://localhost:${server.address().port}/`, + // Needs to be passed to disable the reject unauthorized + { + method: 'PUT', + dispatcher: client, + headers: { + 'x-my-header': 'foo', + 'content-type': 'text-plain' + }, + body: stream, + duplex: 'half' + } + ) + + const responseBody = await response.text() + + t.equal(response.status, 200) + t.equal(response.headers.get('content-type'), 'text/plain; charset=utf-8') + t.equal(response.headers.get('x-custom-h2'), 'foo') + t.equal(responseBody, 'hello h2!') + t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody) +}) +test('Should handle h2 request with body (Blob)', { skip: !Blob }, async t => { + const server = createSecureServer(pem) + const expectedBody = 'asd' + const requestChunks = [] + const body = new Blob(['asd'], { + type: 'text/plain' + }) + + server.on('stream', async (stream, headers) => { + t.equal(headers[':method'], 'POST') + t.equal(headers[':path'], '/') + t.equal(headers[':scheme'], 'https') + + stream.on('data', chunk => requestChunks.push(chunk)) + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }) + + stream.end('hello h2!') + }) + + t.plan(8) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await fetch( + `https://localhost:${server.address().port}/`, + // Needs to be passed to disable the reject unauthorized + { + body, + method: 'POST', + dispatcher: client, + headers: { + 'x-my-header': 'foo', + 'content-type': 'text-plain' + } + } + ) + + const responseBody = await response.arrayBuffer() + + t.equal(response.status, 200) + t.equal(response.headers.get('content-type'), 'text/plain; charset=utf-8') + t.equal(response.headers.get('x-custom-h2'), 'foo') + t.same(new TextDecoder().decode(responseBody).toString(), 'hello h2!') + t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody) +}) + +test( + 'Should handle h2 request with body (Blob:ArrayBuffer)', + { skip: !Blob }, + async t => { + const server = createSecureServer(pem) + const expectedBody = 'hello' + const requestChunks = [] + const expectedResponseBody = { hello: 'h2' } + const buf = Buffer.from(expectedBody) + const body = new ArrayBuffer(buf.byteLength) + + buf.copy(new Uint8Array(body)) + + server.on('stream', async (stream, headers) => { + t.equal(headers[':method'], 'PUT') + t.equal(headers[':path'], '/') + t.equal(headers[':scheme'], 'https') + + stream.on('data', chunk => requestChunks.push(chunk)) + + stream.respond({ + 'content-type': 'application/json', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }) + + stream.end(JSON.stringify(expectedResponseBody)) + }) + + t.plan(8) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await fetch( + `https://localhost:${server.address().port}/`, + // Needs to be passed to disable the reject unauthorized + { + body, + method: 'PUT', + dispatcher: client, + headers: { + 'x-my-header': 'foo', + 'content-type': 'text-plain' + } + } + ) + + const responseBody = await response.json() + + t.equal(response.status, 200) + t.equal(response.headers.get('content-type'), 'application/json') + t.equal(response.headers.get('x-custom-h2'), 'foo') + t.same(responseBody, expectedResponseBody) + t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody) + } +) diff --git a/test/http2-alpn.js b/test/http2-alpn.js new file mode 100644 index 00000000000..04b8cb6abd8 --- /dev/null +++ b/test/http2-alpn.js @@ -0,0 +1,277 @@ +'use strict' + +const https = require('node:https') +const { once } = require('node:events') +const { createSecureServer } = require('node:http2') +const { readFileSync } = require('node:fs') +const { join } = require('node:path') +const { test } = require('tap') + +const { Client } = require('..') + +// get the crypto fixtures +const key = readFileSync(join(__dirname, 'fixtures', 'key.pem'), 'utf8') +const cert = readFileSync(join(__dirname, 'fixtures', 'cert.pem'), 'utf8') +const ca = readFileSync(join(__dirname, 'fixtures', 'ca.pem'), 'utf8') + +test('Should upgrade to HTTP/2 when HTTPS/1 is available for GET', async (t) => { + t.plan(10) + + const body = [] + const httpsBody = [] + + // create the server and server stream handler + const server = createSecureServer( + { + key, + cert, + allowHTTP1: true + }, + (req, res) => { + const { socket: { alpnProtocol } } = req.httpVersion === '2.0' ? req.stream.session : req + + // handle http/1 requests + res.writeHead(200, { + 'content-type': 'application/json; charset=utf-8', + 'x-custom-request-header': req.headers['x-custom-request-header'] || '', + 'x-custom-response-header': `using ${req.httpVersion}` + }) + res.end(JSON.stringify({ + alpnProtocol, + httpVersion: req.httpVersion + })) + } + ) + + server.listen(0) + await once(server, 'listening') + + // close the server on teardown + t.teardown(server.close.bind(server)) + + // set the port + const port = server.address().port + + // test undici against http/2 + const client = new Client(`https://localhost:${port}`, { + connect: { + ca, + servername: 'agent1' + }, + allowH2: true + }) + + // close the client on teardown + t.teardown(client.close.bind(client)) + + // make an undici request using where it wants http/2 + const response = await client.request({ + path: '/', + method: 'GET', + headers: { + 'x-custom-request-header': 'want 2.0' + } + }) + + response.body.on('data', chunk => { + body.push(chunk) + }) + + await once(response.body, 'end') + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-type'], 'application/json; charset=utf-8') + t.equal(response.headers['x-custom-request-header'], 'want 2.0') + t.equal(response.headers['x-custom-response-header'], 'using 2.0') + t.equal(Buffer.concat(body).toString('utf8'), JSON.stringify({ + alpnProtocol: 'h2', + httpVersion: '2.0' + })) + + // make an https request for http/1 to confirm undici is using http/2 + const httpsOptions = { + ca, + servername: 'agent1', + headers: { + 'x-custom-request-header': 'want 1.1' + } + } + + const httpsResponse = await new Promise((resolve, reject) => { + const httpsRequest = https.get(`https://localhost:${port}/`, httpsOptions, (res) => { + res.on('data', (chunk) => { + httpsBody.push(chunk) + }) + + res.on('end', () => { + resolve(res) + }) + }).on('error', (err) => { + reject(err) + }) + + t.teardown(httpsRequest.destroy.bind(httpsRequest)) + }) + + t.equal(httpsResponse.statusCode, 200) + t.equal(httpsResponse.headers['content-type'], 'application/json; charset=utf-8') + t.equal(httpsResponse.headers['x-custom-request-header'], 'want 1.1') + t.equal(httpsResponse.headers['x-custom-response-header'], 'using 1.1') + t.equal(Buffer.concat(httpsBody).toString('utf8'), JSON.stringify({ + alpnProtocol: false, + httpVersion: '1.1' + })) +}) + +test('Should upgrade to HTTP/2 when HTTPS/1 is available for POST', async (t) => { + t.plan(15) + + const requestChunks = [] + const responseBody = [] + + const httpsRequestChunks = [] + const httpsResponseBody = [] + + const expectedBody = 'hello' + const buf = Buffer.from(expectedBody) + const body = new ArrayBuffer(buf.byteLength) + + buf.copy(new Uint8Array(body)) + + // create the server and server stream handler + const server = createSecureServer( + { + key, + cert, + allowHTTP1: true + }, + (req, res) => { + // use the stream handler for http2 + if (req.httpVersion === '2.0') { + return + } + + const { socket: { alpnProtocol } } = req + + req.on('data', (chunk) => { + httpsRequestChunks.push(chunk) + }) + + req.on('end', () => { + // handle http/1 requests + res.writeHead(201, { + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-request-header': req.headers['x-custom-request-header'] || '', + 'x-custom-alpn-protocol': alpnProtocol + }) + res.end('hello http/1!') + }) + } + ) + + server.on('stream', (stream, headers) => { + t.equal(headers[':method'], 'POST') + t.equal(headers[':path'], '/') + t.equal(headers[':scheme'], 'https') + + const { socket: { alpnProtocol } } = stream.session + + stream.on('data', (chunk) => { + requestChunks.push(chunk) + }) + + stream.respond({ + ':status': 201, + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-request-header': headers['x-custom-request-header'] || '', + 'x-custom-alpn-protocol': alpnProtocol + }) + + stream.end('hello h2!') + }) + + server.listen(0) + await once(server, 'listening') + + // close the server on teardown + t.teardown(server.close.bind(server)) + + // set the port + const port = server.address().port + + // test undici against http/2 + const client = new Client(`https://localhost:${port}`, { + connect: { + ca, + servername: 'agent1' + }, + allowH2: true + }) + + // close the client on teardown + t.teardown(client.close.bind(client)) + + // make an undici request using where it wants http/2 + const response = await client.request({ + path: '/', + method: 'POST', + headers: { + 'x-custom-request-header': 'want 2.0' + }, + body + }) + + response.body.on('data', (chunk) => { + responseBody.push(chunk) + }) + + await once(response.body, 'end') + + t.equal(response.statusCode, 201) + t.equal(response.headers['content-type'], 'text/plain; charset=utf-8') + t.equal(response.headers['x-custom-request-header'], 'want 2.0') + t.equal(response.headers['x-custom-alpn-protocol'], 'h2') + t.equal(Buffer.concat(responseBody).toString('utf-8'), 'hello h2!') + t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody) + + // make an https request for http/1 to confirm undici is using http/2 + const httpsOptions = { + ca, + servername: 'agent1', + method: 'POST', + headers: { + 'content-type': 'text/plain; charset=utf-8', + 'content-length': Buffer.byteLength(body), + 'x-custom-request-header': 'want 1.1' + } + } + + const httpsResponse = await new Promise((resolve, reject) => { + const httpsRequest = https.request(`https://localhost:${port}/`, httpsOptions, (res) => { + res.on('data', (chunk) => { + httpsResponseBody.push(chunk) + }) + + res.on('end', () => { + resolve(res) + }) + }).on('error', (err) => { + reject(err) + }) + + httpsRequest.on('error', (err) => { + reject(err) + }) + + httpsRequest.write(Buffer.from(body)) + + t.teardown(httpsRequest.destroy.bind(httpsRequest)) + }) + + t.equal(httpsResponse.statusCode, 201) + t.equal(httpsResponse.headers['content-type'], 'text/plain; charset=utf-8') + t.equal(httpsResponse.headers['x-custom-request-header'], 'want 1.1') + t.equal(httpsResponse.headers['x-custom-alpn-protocol'], 'false') + t.equal(Buffer.concat(httpsResponseBody).toString('utf-8'), 'hello http/1!') + t.equal(Buffer.concat(httpsRequestChunks).toString('utf-8'), expectedBody) +}) diff --git a/test/http2.js b/test/http2.js index ab8752a7816..8fd9c616bc3 100644 --- a/test/http2.js +++ b/test/http2.js @@ -1,32 +1,949 @@ 'use strict' -const { test } = require('tap') -const { Client, errors } = require('..') -const { createSecureServer } = require('http2') +const { createSecureServer } = require('node:http2') +const { createReadStream, readFileSync } = require('node:fs') +const { once } = require('node:events') +const { Blob } = require('node:buffer') +const { Writable, pipeline, PassThrough, Readable } = require('node:stream') + +const { test, plan } = require('tap') const pem = require('https-pem') -test('throw http2 not supported error', (t) => { - t.plan(1) +const { Client } = require('..') + +const isGreaterThanv20 = Number(process.version.slice(1).split('.')[0]) >= 20 + +plan(18) + +test('Should support H2 connection', async t => { + const body = [] + const server = createSecureServer(pem) + + server.on('stream', (stream, headers) => { + t.equal(headers['x-my-header'], 'foo') + t.equal(headers[':method'], 'GET') + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'hello', + ':status': 200 + }) + stream.end('hello h2!') + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.plan(6) + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }) + + response.body.on('data', chunk => { + body.push(chunk) + }) + + await once(response.body, 'end') + t.equal(response.statusCode, 200) + t.equal(response.headers['content-type'], 'text/plain; charset=utf-8') + t.equal(response.headers['x-custom-h2'], 'hello') + t.equal(Buffer.concat(body).toString('utf8'), 'hello h2!') +}) + +test('Should support H2 connection (headers as array)', async t => { + const body = [] + const server = createSecureServer(pem) + + server.on('stream', (stream, headers) => { + t.equal(headers['x-my-header'], 'foo') + t.equal(headers['x-my-drink'], 'coffee,tea') + t.equal(headers[':method'], 'GET') + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'hello', + ':status': 200 + }) + stream.end('hello h2!') + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.plan(7) + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await client.request({ + path: '/', + method: 'GET', + headers: ['x-my-header', 'foo', 'x-my-drink', ['coffee', 'tea']] + }) + + response.body.on('data', chunk => { + body.push(chunk) + }) + + await once(response.body, 'end') + t.equal(response.statusCode, 200) + t.equal(response.headers['content-type'], 'text/plain; charset=utf-8') + t.equal(response.headers['x-custom-h2'], 'hello') + t.equal(Buffer.concat(body).toString('utf8'), 'hello h2!') +}) + +test('Should support H2 GOAWAY (server-side)', async t => { + const body = [] + const server = createSecureServer(pem) + + server.on('stream', (stream, headers) => { + t.equal(headers['x-my-header'], 'foo') + t.equal(headers[':method'], 'GET') + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'hello', + ':status': 200 + }) + stream.end('hello h2!') + }) + + server.on('session', session => { + setTimeout(() => { + session.goaway(204) + }, 1000) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.plan(9) + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }) + + response.body.on('data', chunk => { + body.push(chunk) + }) + + await once(response.body, 'end') + t.equal(response.statusCode, 200) + t.equal(response.headers['content-type'], 'text/plain; charset=utf-8') + t.equal(response.headers['x-custom-h2'], 'hello') + t.equal(Buffer.concat(body).toString('utf8'), 'hello h2!') + + const [url, disconnectClient, err] = await once(client, 'disconnect') + + t.type(url, URL) + t.same(disconnectClient, [client]) + t.equal(err.message, 'HTTP/2: "GOAWAY" frame received with code 204') +}) + +test('Should throw if bad allowH2 has been pased', async t => { + try { + // eslint-disable-next-line + new Client('https://localhost:1000', { + allowH2: 'true' + }) + t.fail() + } catch (error) { + t.equal(error.message, 'allowH2 must be a valid boolean value') + } +}) + +test('Should throw if bad maxConcurrentStreams has been pased', async t => { + try { + // eslint-disable-next-line + new Client('https://localhost:1000', { + allowH2: true, + maxConcurrentStreams: {} + }) + t.fail() + } catch (error) { + t.equal( + error.message, + 'maxConcurrentStreams must be a possitive integer, greater than 0' + ) + } + + try { + // eslint-disable-next-line + new Client('https://localhost:1000', { + allowH2: true, + maxConcurrentStreams: -1 + }) + t.fail() + } catch (error) { + t.equal( + error.message, + 'maxConcurrentStreams must be a possitive integer, greater than 0' + ) + } +}) + +test( + 'Request should fail if allowH2 is false and server advertises h1 only', + { skip: isGreaterThanv20 }, + async t => { + const server = createSecureServer( + { + ...pem, + allowHTTP1: false, + ALPNProtocols: ['http/1.1'] + }, + (req, res) => { + t.fail('Should not create a valid h2 stream') + } + ) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + allowH2: false, + connect: { + rejectUnauthorized: false + } + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) - const server = createSecureServer({ key: pem.key, cert: pem.cert }, (req, res) => { - res.stream.respond({ 'content-type': 'text/plain' }) - res.stream.end('hello') - }).on('unknownProtocol', (socket) => { - // continue sending data in http2 to our http1.1 client to trigger error - socket.write('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n') + const response = await client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }) + + t.equal(response.statusCode, 403) + } +) + +test( + '[v20] Request should fail if allowH2 is false and server advertises h1 only', + { skip: !isGreaterThanv20 }, + async t => { + const server = createSecureServer( + { + ...pem, + allowHTTP1: false, + ALPNProtocols: ['http/1.1'] + }, + (req, res) => { + t.fail('Should not create a valid h2 stream') + } + ) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + allowH2: false, + connect: { + rejectUnauthorized: false + } + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + t.plan(2) + + try { + await client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }) + } catch (error) { + t.equal( + error.message, + 'Client network socket disconnected before secure TLS connection was established' + ) + t.equal(error.code, 'ECONNRESET') + } + } +) + +test('Should handle h2 continue', async t => { + const requestBody = [] + const server = createSecureServer(pem, () => {}) + const responseBody = [] + + server.on('checkContinue', (request, response) => { + t.equal(request.headers.expect, '100-continue') + t.equal(request.headers['x-my-header'], 'foo') + t.equal(request.headers[':method'], 'POST') + response.writeContinue() + + request.on('data', chunk => requestBody.push(chunk)) + + response.writeHead(200, { + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'foo' + }) + response.end('hello h2!') + }) + + t.plan(7) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + expectContinue: true, + allowH2: true }) + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await client.request({ + path: '/', + method: 'POST', + headers: { + 'x-my-header': 'foo' + }, + expectContinue: true + }) + + response.body.on('data', chunk => { + responseBody.push(chunk) + }) + + await once(response.body, 'end') + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-type'], 'text/plain; charset=utf-8') + t.equal(response.headers['x-custom-h2'], 'foo') + t.equal(Buffer.concat(responseBody).toString('utf-8'), 'hello h2!') +}) + +test('Dispatcher#Stream', t => { + const server = createSecureServer(pem) + const expectedBody = 'hello from client!' + const bufs = [] + let requestBody = '' + + server.on('stream', async (stream, headers) => { + stream.setEncoding('utf-8') + stream.on('data', chunk => { + requestBody += chunk + }) + + stream.respond({ ':status': 200, 'x-custom': 'custom-header' }) + stream.end('hello h2!') + }) + + t.plan(4) + + server.listen(0, async () => { + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + await client.stream( + { path: '/', opaque: { bufs }, method: 'POST', body: expectedBody }, + ({ statusCode, headers, opaque: { bufs } }) => { + t.equal(statusCode, 200) + t.equal(headers['x-custom'], 'custom-header') + + return new Writable({ + write (chunk, _encoding, cb) { + bufs.push(chunk) + cb() + } + }) + } + ) + + t.equal(Buffer.concat(bufs).toString('utf-8'), 'hello h2!') + t.equal(requestBody, expectedBody) + }) +}) + +test('Dispatcher#Pipeline', t => { + const server = createSecureServer(pem) + const expectedBody = 'hello from client!' + const bufs = [] + let requestBody = '' + + server.on('stream', async (stream, headers) => { + stream.setEncoding('utf-8') + stream.on('data', chunk => { + requestBody += chunk + }) + + stream.respond({ ':status': 200, 'x-custom': 'custom-header' }) + stream.end('hello h2!') + }) + + t.plan(5) + + server.listen(0, () => { + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + pipeline( + new Readable({ + read () { + this.push(Buffer.from(expectedBody)) + this.push(null) + } + }), + client.pipeline( + { path: '/', method: 'POST', body: expectedBody }, + ({ statusCode, headers, body }) => { + t.equal(statusCode, 200) + t.equal(headers['x-custom'], 'custom-header') + + return pipeline(body, new PassThrough(), () => {}) + } + ), + new Writable({ + write (chunk, _, cb) { + bufs.push(chunk) + cb() + } + }), + err => { + t.error(err) + t.equal(Buffer.concat(bufs).toString('utf-8'), 'hello h2!') + t.equal(requestBody, expectedBody) + } + ) + }) +}) + +test('Dispatcher#Connect', t => { + const server = createSecureServer(pem) + const expectedBody = 'hello from client!' + let requestBody = '' + + server.on('stream', async (stream, headers) => { + stream.setEncoding('utf-8') + stream.on('data', chunk => { + requestBody += chunk + }) + + stream.respond({ ':status': 200, 'x-custom': 'custom-header' }) + stream.end('hello h2!') + }) + + t.plan(6) server.listen(0, () => { const client = new Client(`https://localhost:${server.address().port}`, { - tls: { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + let result = '' + client.connect({ path: '/' }, (err, { socket }) => { + t.error(err) + socket.on('data', chunk => { + result += chunk + }) + socket.on('response', headers => { + t.equal(headers[':status'], 200) + t.equal(headers['x-custom'], 'custom-header') + t.notOk(socket.closed) + }) + + // We need to handle the error event although + // is not controlled by Undici, the fact that a session + // is destroyed and destroys subsequent streams, causes + // unhandled errors to surface if not handling this event. + socket.on('error', () => {}) + + socket.once('end', () => { + t.equal(requestBody, expectedBody) + t.equal(result, 'hello h2!') + }) + socket.end(expectedBody) + }) + }) +}) + +test('Dispatcher#Upgrade', t => { + const server = createSecureServer(pem) + + server.on('stream', async (stream, headers) => { + stream.end() + }) + + t.plan(1) + + server.listen(0, async () => { + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + try { + await client.upgrade({ path: '/' }) + } catch (error) { + t.equal(error.message, 'Upgrade not supported for H2') + } + }) +}) + +test('Dispatcher#destroy', async t => { + const promises = [] + const server = createSecureServer(pem) + + server.on('stream', (stream, headers) => { + setTimeout(stream.end.bind(stream), 1500) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.plan(4) + t.teardown(server.close.bind(server)) + + promises.push( + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' } }) + ) + + promises.push( + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }) + ) + + promises.push( + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }) + ) + + promises.push( + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }) + ) + + await client.destroy() + + const results = await Promise.allSettled(promises) + + t.equal(results[0].status, 'rejected') + t.equal(results[1].status, 'rejected') + t.equal(results[2].status, 'rejected') + t.equal(results[3].status, 'rejected') +}) + +test('Should handle h2 request with body (string or buffer) - dispatch', t => { + const server = createSecureServer(pem) + const expectedBody = 'hello from client!' + const response = [] + const requestBody = [] + + server.on('stream', async (stream, headers) => { + stream.on('data', chunk => requestBody.push(chunk)) + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }) + + stream.end('hello h2!') + }) + + t.plan(7) + + server.listen(0, () => { + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) t.teardown(client.close.bind(client)) - client.request({ path: '/', method: 'GET' }, (err, data) => { - t.type(err, errors.HTTPParserError) + client.dispatch( + { + path: '/', + method: 'POST', + headers: { + 'x-my-header': 'foo', + 'content-type': 'text/plain' + }, + body: expectedBody + }, + { + onConnect () { + t.ok(true) + }, + onError (err) { + t.error(err) + }, + onHeaders (statusCode, headers) { + t.equal(statusCode, 200) + t.equal(headers['content-type'], 'text/plain; charset=utf-8') + t.equal(headers['x-custom-h2'], 'foo') + }, + onData (chunk) { + response.push(chunk) + }, + onBodySent (body) { + t.equal(body.toString('utf-8'), expectedBody) + }, + onComplete () { + t.equal(Buffer.concat(response).toString('utf-8'), 'hello h2!') + t.equal( + Buffer.concat(requestBody).toString('utf-8'), + 'hello from client!' + ) + } + } + ) + }) +}) + +test('Should handle h2 request with body (stream)', async t => { + const server = createSecureServer(pem) + const expectedBody = readFileSync(__filename, 'utf-8') + const stream = createReadStream(__filename) + const requestChunks = [] + const responseBody = [] + + server.on('stream', async (stream, headers) => { + t.equal(headers[':method'], 'PUT') + t.equal(headers[':path'], '/') + t.equal(headers[':scheme'], 'https') + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }) + + for await (const chunk of stream) { + requestChunks.push(chunk) + } + + stream.end('hello h2!') + }) + + t.plan(8) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await client.request({ + path: '/', + method: 'PUT', + headers: { + 'x-my-header': 'foo' + }, + body: stream + }) + + for await (const chunk of response.body) { + responseBody.push(chunk) + } + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-type'], 'text/plain; charset=utf-8') + t.equal(response.headers['x-custom-h2'], 'foo') + t.equal(Buffer.concat(responseBody).toString('utf-8'), 'hello h2!') + t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody) +}) + +test('Should handle h2 request with body (iterable)', async t => { + const server = createSecureServer(pem) + const expectedBody = 'hello' + const requestChunks = [] + const responseBody = [] + const iterableBody = { + [Symbol.iterator]: function * () { + const end = expectedBody.length - 1 + for (let i = 0; i < end + 1; i++) { + yield expectedBody[i] + } + + return expectedBody[end] + } + } + + server.on('stream', async (stream, headers) => { + t.equal(headers[':method'], 'POST') + t.equal(headers[':path'], '/') + t.equal(headers[':scheme'], 'https') + + stream.on('data', chunk => requestChunks.push(chunk)) + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 }) + + stream.end('hello h2!') + }) + + t.plan(8) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await client.request({ + path: '/', + method: 'POST', + headers: { + 'x-my-header': 'foo' + }, + body: iterableBody + }) + + response.body.on('data', chunk => { + responseBody.push(chunk) }) + + await once(response.body, 'end') + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-type'], 'text/plain; charset=utf-8') + t.equal(response.headers['x-custom-h2'], 'foo') + t.equal(Buffer.concat(responseBody).toString('utf-8'), 'hello h2!') + t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody) }) + +test('Should handle h2 request with body (Blob)', { skip: !Blob }, async t => { + const server = createSecureServer(pem) + const expectedBody = 'asd' + const requestChunks = [] + const responseBody = [] + const body = new Blob(['asd'], { + type: 'application/json' + }) + + server.on('stream', async (stream, headers) => { + t.equal(headers[':method'], 'POST') + t.equal(headers[':path'], '/') + t.equal(headers[':scheme'], 'https') + + stream.on('data', chunk => requestChunks.push(chunk)) + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }) + + stream.end('hello h2!') + }) + + t.plan(8) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await client.request({ + path: '/', + method: 'POST', + headers: { + 'x-my-header': 'foo' + }, + body + }) + + response.body.on('data', chunk => { + responseBody.push(chunk) + }) + + await once(response.body, 'end') + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-type'], 'text/plain; charset=utf-8') + t.equal(response.headers['x-custom-h2'], 'foo') + t.equal(Buffer.concat(responseBody).toString('utf-8'), 'hello h2!') + t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody) +}) + +test( + 'Should handle h2 request with body (Blob:ArrayBuffer)', + { skip: !Blob }, + async t => { + const server = createSecureServer(pem) + const expectedBody = 'hello' + const requestChunks = [] + const responseBody = [] + const buf = Buffer.from(expectedBody) + const body = new ArrayBuffer(buf.byteLength) + + buf.copy(new Uint8Array(body)) + + server.on('stream', async (stream, headers) => { + t.equal(headers[':method'], 'POST') + t.equal(headers[':path'], '/') + t.equal(headers[':scheme'], 'https') + + stream.on('data', chunk => requestChunks.push(chunk)) + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': headers['x-my-header'], + ':status': 200 + }) + + stream.end('hello h2!') + }) + + t.plan(8) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t.teardown(server.close.bind(server)) + t.teardown(client.close.bind(client)) + + const response = await client.request({ + path: '/', + method: 'POST', + headers: { + 'x-my-header': 'foo' + }, + body + }) + + response.body.on('data', chunk => { + responseBody.push(chunk) + }) + + await once(response.body, 'end') + + t.equal(response.statusCode, 200) + t.equal(response.headers['content-type'], 'text/plain; charset=utf-8') + t.equal(response.headers['x-custom-h2'], 'foo') + t.equal(Buffer.concat(responseBody).toString('utf-8'), 'hello h2!') + t.equal(Buffer.concat(requestChunks).toString('utf-8'), expectedBody) + } +) diff --git a/types/client.d.ts b/types/client.d.ts index 56074a15ae7..3d348f9427b 100644 --- a/types/client.d.ts +++ b/types/client.d.ts @@ -72,6 +72,16 @@ export declare namespace Client { autoSelectFamily?: boolean; /** The amount of time in milliseconds to wait for a connection attempt to finish before trying the next address when using the `autoSelectFamily` option. */ autoSelectFamilyAttemptTimeout?: number; + /** + * @description Enables support for H2 if the server has assigned bigger priority to it through ALPN negotiation. + * @default false + */ + allowH2?: boolean; + /** + * @description Dictates the maximum number of concurrent streams for a single H2 session. It can be overriden by a SETTINGS remote frame. + * @default 100 + */ + maxConcurrentStreams?: number } export interface SocketInfo { localAddress?: string diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index 7f621371f86..42a78ba0834 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -117,6 +117,8 @@ declare namespace Dispatcher { reset?: boolean; /** Whether Undici should throw an error upon receiving a 4xx or 5xx response from the server. Defaults to false */ throwOnError?: boolean; + /** For H2, it appends the expect: 100-continue header, and halts the request body until a 100-continue is received from the remote server*/ + expectContinue?: boolean; } export interface ConnectOptions { path: string;