diff --git a/lib/internal/quic/binding.js b/lib/internal/quic/binding.js index a2c05a5e2cae88..7866004d19d8b3 100644 --- a/lib/internal/quic/binding.js +++ b/lib/internal/quic/binding.js @@ -67,8 +67,12 @@ function onStreamHeaders() {} function onStreamBlocked() {} +let initialized = false; + module.exports = { initializeBinding() { + if (initialized) return; + initialized = true; initializeCallbacks({ onEndpointClose, onEndpointDone, diff --git a/lib/internal/quic/config.js b/lib/internal/quic/config.js index 73ec45d732a92a..f3913f5c2e7677 100644 --- a/lib/internal/quic/config.js +++ b/lib/internal/quic/config.js @@ -118,7 +118,6 @@ const kRandomConnectionIdStrategy = new RandomConnectionIDStrategy(); * @property {string} [ccAlgorithm] * @property {UDPOptions} [udp] * @property {ArrayBuffer|TypedArray|DataView} [resetTokenSecret] - * @property {AbortSignal} [signal] * * @typedef {Object} PreferredAddress * @property {SocketAddressOrOptions} [ipv4] @@ -176,6 +175,22 @@ const kRandomConnectionIdStrategy = new RandomConnectionIDStrategy(); * @property {AbortSignal} [signal] * @typedef {EndpointConfig|EndpointConfigOptions} EndpointConfigOrOptions * @typedef {SessionConfig|SessionConfigOptions} SessionConfigOrOptions + * + * @typedef {import('../blob.js').Blob} Blob + * @typedef {import('stream').Readable} Readable + * @typedef {ArrayBuffer|TypedArray|DataView|Blob|Readable|string} StreamPayload + * @typedef {Object} StreamOptionsInit + * @property {boolean} [unidirectional] + * @property {Object|Map} [headers] + * @property {Object|Map} [trailers] + * @property {StreamPayload|Promise} [body] + * @property {string} [encoding] + * + * @typedef {Object} ResponseOptionsInit + * @property {Object|Map} [headers] + * @property {Object|Map} [trailers] + * @property {StreamPayload|Promise} [body] + * @property {string} [encoding] */ class EndpointConfig { [kType] = 'endpoint-config'; @@ -214,7 +229,6 @@ class EndpointConfig { ccAlgorithm, udp, resetTokenSecret, - signal, } = options; if (!SocketAddress.isSocketAddress(address)) { @@ -377,13 +391,6 @@ class EndpointConfig { } } - if (signal !== undefined) { - validateAbortSignal(signal, 'options.signal'); - if (signal.aborted) - throw new AbortError(); - } - - this[kOptions] = { address, retryTokenExpiration, @@ -408,7 +415,6 @@ class EndpointConfig { sendBufferSize, ttl, resetTokenSecret: resetTokenSecret || '(generated)', - signal, }; this[kHandle] = new ConfigObject( @@ -455,11 +461,6 @@ class EndpointConfig { return `EndpointConfig ${inspect(this[kOptions], opts)}`; } - - /** @type {AbortSignal} */ - get signal() { - return this[kOptions].signal; - } } class SessionConfig { @@ -863,6 +864,124 @@ class SessionConfig { } } +class StreamOptions { + [kType] = 'stream-options'; + + /** + * @param {*} val + * @returns + */ + static isStreamOptions(val) { + return val?.[kType] === 'stream-options'; + } + + /** + * @param {StreamOptionsInit} [options] + */ + constructor(options = {}) { + validateObject(options, 'options'); + const { + unidirectional = false, + headers, + trailers, + body, + encoding, + } = options; + + this[kOptions] = { + unidirectional, + headers, + trailers, + body, + encoding, + }; + } + + get unidirectional() { + return this[kOptions].unidirectional; + } + + get headers() { + return this[kOptions].headers; + } + + get body() { + return this[kOptions].body; + } + + get encoding() { + return this[kOptions].encoding; + } + + [kInspect](depth, options) { + if (depth < 0) + return this; + + const opts = { + ...options, + depth: options.depth == null ? null : options.depth - 1 + }; + + return `StreamOptions ${inspect(this[kOptions], opts)}`; + } +} + +class ResponseOptions { + [kType] = 'response-options'; + + /** + * @param {*} val + * @returns + */ + static isResponseOptions(val) { + return val?.[kType] === 'response-options'; + } + + /** + * @param {ResponseOptionsInit} [options] + */ + constructor(options = {}) { + validateObject(options, 'options'); + const { + headers, + trailers, + body, + encoding, + } = options; + + this[kOptions] = { + headers, + trailers, + body, + encoding, + }; + } + + get headers() { + return this[kOptions].headers; + } + + get body() { + return this[kOptions].body; + } + + get encoding() { + return this[kOptions].encoding; + } + + [kInspect](depth, options) { + if (depth < 0) + return this; + + const opts = { + ...options, + depth: options.depth == null ? null : options.depth - 1 + }; + + return `ResponseOptions ${inspect(this[kOptions], opts)}`; + } +} + /** * @param {ArrayBuffer|TypedArray|DataView} sessionTicket * @param {ArrayBuffer|TypedArray|DataView} transportParams @@ -900,6 +1019,8 @@ function validateResumeOptions(sessionTicket, transportParams) { module.exports = { EndpointConfig, SessionConfig, + StreamOptions, + ResponseOptions, validateResumeOptions, kSecureContext, kHandle, diff --git a/lib/internal/quic/quic.js b/lib/internal/quic/quic.js index 8b35e8e80d87c5..9a14d5b9bd687e 100644 --- a/lib/internal/quic/quic.js +++ b/lib/internal/quic/quic.js @@ -1,8 +1,10 @@ 'use strict'; const { + ObjectSetPrototypeOf, Promise, PromiseReject, + SafeMap, SafeSet, Symbol, TypeError, @@ -18,6 +20,7 @@ const { if (_createEndpoint === undefined) return; +const kInit = Symbol('kInit'); const kState = Symbol('kState'); const kStats = Symbol('kStats'); @@ -33,9 +36,22 @@ const { initializeBinding, } = require('internal/quic/binding'); +const { + makeTransferable, + kClone, + kDeserialize, +} = require('internal/worker/js_transferable'); + +const { + defineEventHandler, + NodeEventTarget, +} = require('internal/event_target'); + const { EndpointConfig, SessionConfig, + StreamOptions, + ResponseOptions, kHandle, kSecureContext, validateResumeOptions, @@ -93,26 +109,7 @@ function deferred() { }; } -function onAbort() { - this.destroy(new AbortError()); - this[kState].onAbort = undefined; - this[kState].signal = undefined; -} - -class Endpoint { - [kState] = { - onAbort: undefined, - signal: undefined, - close: { - promise: undefined, - resolve: undefined, - reject: undefined, - }, - destroyed: false, - listening: false, - sessions: new SafeSet(), - }; - +class Endpoint extends NodeEventTarget { /** * @param {EndpointConfigOrOptions} [options] */ @@ -127,20 +124,26 @@ class Endpoint { options = new EndpointConfig(options); } - this[kHandle] = _createEndpoint(options[kHandle]); + super(); + this[kInit](_createEndpoint(options[kHandle])); + return makeTransferable(this); + } + + [kInit](handle) { + this[kState] = { + close: { + promise: undefined, + resolve: undefined, + reject: undefined, + }, + destroyed: false, + listening: false, + sessions: new SafeSet(), + }; + + this[kHandle] = handle; this[kHandle][owner_symbol] = this; this[kStats] = new EndpointStats(this[kHandle].stats); - - if (options.signal !== undefined) { - if (options.signal.aborted) - throw new AbortError(); - this[kState].onAbort = onAbort.bind(this); - this[kState].signal = options.signal; - this[kState].signal.addEventListener( - 'abort', - this[kState].onAbort, - { once: true }); - } } /** @@ -302,12 +305,6 @@ class Endpoint { this[kStats][kDetachStats](); this[kHandle] = undefined; - if (state.signal !== undefined && state.onAbort !== undefined) { - state.signal.removeEventListener('abort', state.onAbort); - state.signal = undefined; - state.onAbort = undefined; - } - if (error != null && typeof state.close.reject === 'function') state.close.reject(error); else if (typeof state.close.resolve === 'function') @@ -348,8 +345,36 @@ class Endpoint { stats: this.stats, }, opts)}`; } + + [kClone]() { + const handle = this[kHandle]; + return { + data: { handle }, + deserializeInfo: 'internal/quic/quic:InternalEndpoint', + }; + } + + [kDeserialize]({ handle }) { + this[kInit](handle); + } +} + +class InternalEndpoint extends NodeEventTarget { + constructor() { + super(); + return makeTransferable(this); + } } +defineEventHandler(Endpoint.prototype, 'close'); +defineEventHandler(Endpoint.prototype, 'error'); +defineEventHandler(Endpoint.prototype, 'session'); + +InternalEndpoint.prototype.constructor = Endpoint; +ObjectSetPrototypeOf( + InternalEndpoint.prototype, + Endpoint.prototype); + /** * @typedef {import('../abort_controller').AbortSignal} AbortSignal * @param {EndpointWrap} handle @@ -358,22 +383,158 @@ class Endpoint { */ function createSession(handle, signal) {} -class Session { +class Session extends NodeEventTarget { + [kState] = { + streams: new SafeMap(), + }; + constructor() { // eslint-disable-next-line no-restricted-syntax throw new TypeError('illegal constructor'); } + + openStream(options = new StreamOptions()) { + if (!StreamOptions.isStreamOptions(options)) { + if (options === null || typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', [ + 'StreamOptions', + 'Object', + ], options); + } + options = new StreamOptions(options); + } + } + + close() {} + + destroy(error) {} + + /** @type {boolean} */ + get closing() {} + + /** @type {boolean} */ + get destroyed() {} + + get stats() {} + + [kInspect](depth, options) { + if (depth < 0) + return this; + + const opts = { + ...options, + depth: options.depth == null ? null : options.depth - 1, + }; + + return `Session ${inspect({ + closing: this.closing, + destroyed: this.destroyed, + listening: this.listening, + stats: this.stats, + }, opts)}`; + } } -class Stream { +defineEventHandler(Session.prototype, 'close'); +defineEventHandler(Session.prototype, 'error'); +defineEventHandler(Session.prototype, 'stream'); + +class Stream extends NodeEventTarget { constructor() { // eslint-disable-next-line no-restricted-syntax throw new TypeError('illegal constructor'); } + + /** @type {number} */ + get id() {} + + /** @type {boolean} */ + get unidirectional() {} + + /** @type {boolean} */ + get peerInitiated() {} + + /** + * @param {Error} [error] + */ + destroy(error) {} + + /** @type {boolean} */ + get closing() {} + + /** @type {boolean} */ + get destroyed() {} + + get stats() {} + + /** + * When supported by the protocol, sends response hints + * (for instance, HTTP 1xx status headers) that preceed + * the response. If respondWith is called first, or the + * protocol does not support hints, responseHints() will + * throw. + * @param {Object|Map} headers + */ + responseHints(headers) {} + + /** + * Initiates a response on this stream. Returns a Promise + * that is fulfilled when the response has been completed. + * If the Stream is peerInitiated and unidirectional, or if + * respondWith has already been caled, respondWith will reject + * immediately. + * @returns {Promise} + */ + respondWith(options = new ResponseOptions()) { + if (!ResponseOptions.isResponseOptions(options)) { + if (options === null || typeof options !== 'object') { + throw new ERR_INVALID_ARG_TYPE('options', [ + 'ResponseOptions', + 'Object', + ], options); + } + options = new ResponseOptions(options); + } + } + + /** + * Return a Promise that is fulfilled with the stream data + * contained in an ArrayBuffer + * @returns {Promise} + */ + arrayBuffer() {} + + /** + * @typedef {import('../blob').Blob} Blob + * Return a Promise that is fulfilled with the stream data + * contained in a Blob. + * @returns {Promise} + */ + blob() {} + + /** + * @typedef {import('stream').Readable} Readable + * Return a stream.Readable that may be used for consuming + * the stream data. + * @returns {Readable} + */ + stream() {} + + /** + * Return a Promise that is fulfilled with the stream data + * contained as a string. + * @returns {Promise} + */ + text() {} } +defineEventHandler(Stream.prototype, 'close'); +defineEventHandler(Stream.prototype, 'error'); +defineEventHandler(Stream.prototype, 'headers'); + module.exports = { Endpoint, + InternalEndpoint, Session, Stream, EndpointConfig,