diff --git a/lib/web/websocket/connection.js b/lib/web/websocket/connection.js index 2d6675e076c..223e8c445cb 100644 --- a/lib/web/websocket/connection.js +++ b/lib/web/websocket/connection.js @@ -1,12 +1,14 @@ 'use strict' -const { uid, states } = require('./constants') -const { failWebsocketConnection, parseExtensions } = require('./util') +const { uid, states, sentCloseFrameState, emptyBuffer, opcodes } = require('./constants') +const { failWebsocketConnection, parseExtensions, isClosed, isClosing, isEstablished, validateCloseCodeAndReason } = require('./util') const { channels } = require('../../core/diagnostics') const { makeRequest } = require('../fetch/request') const { fetching } = require('../fetch/index') const { Headers, getHeadersList } = require('../fetch/headers') const { getDecodeSplit } = require('../fetch/util') +const { WebsocketFrameSend } = require('./frame') +const assert = require('node:assert') /** @type {import('crypto')} */ let crypto @@ -214,13 +216,81 @@ function establishWebSocketConnection (url, protocols, client, handler, options) } /** - * @param {import('./websocket').Handler} handler - * @param {number} code - * @param {any} reason - * @param {number} reasonByteLength + * @see https://whatpr.org/websockets/48.html#close-the-websocket + * @param {import('./websocket').Handler} object + * @param {number} [code=null] + * @param {string} [reason=''] */ -function closeWebSocketConnection (handler, code, reason, reasonByteLength) { - handler.onClose(code, reason, reasonByteLength) +function closeWebSocketConnection (object, code, reason, validate = false) { + // 1. If code was not supplied, let code be null. + code ??= null + + // 2. If reason was not supplied, let reason be the empty string. + reason ??= '' + + // 3. Validate close code and reason with code and reason. + if (validate) validateCloseCodeAndReason(code, reason) + + // 4. Run the first matching steps from the following list: + // - If object’s ready state is CLOSING (2) or CLOSED (3) + // - If the WebSocket connection is not yet established [WSP] + // - If the WebSocket closing handshake has not yet been started [WSP] + // - Otherwise + if (isClosed(object.readyState) || isClosing(object.readyState)) { + // Do nothing. + } else if (!isEstablished(object.readyState)) { + // Fail the WebSocket connection and set object’s ready state to CLOSING (2). [WSP] + failWebsocketConnection(object) + object.readyState = states.CLOSING + } else if (!object.closeState.has(sentCloseFrameState.SENT) && !object.closeState.has(sentCloseFrameState.RECEIVED)) { + // Upon either sending or receiving a Close control frame, it is said + // that _The WebSocket Closing Handshake is Started_ and that the + // WebSocket connection is in the CLOSING state. + + const frame = new WebsocketFrameSend() + + // If neither code nor reason is present, the WebSocket Close + // message must not have a body. + + // If code is present, then the status code to use in the + // WebSocket Close message must be the integer given by code. + // If code is null and reason is the empty string, the WebSocket Close frame must not have a body. + // If reason is non-empty but code is null, then set code to 1000 ("Normal Closure"). + if (reason.length !== 0 && code === null) { + code = 1000 + } + + // If code is set, then the status code to use in the WebSocket Close frame must be the integer given by code. + assert(code === null || Number.isInteger(code)) + + if (code === null && reason.length === 0) { + frame.frameData = emptyBuffer + } else if (code !== null && reason === null) { + frame.frameData = Buffer.allocUnsafe(2) + frame.frameData.writeUInt16BE(code, 0) + } else if (code !== null && reason !== null) { + // If reason is also present, then reasonBytes must be + // provided in the Close message after the status code. + frame.frameData = Buffer.allocUnsafe(2 + Buffer.byteLength(reason)) + frame.frameData.writeUInt16BE(code, 0) + // the body MAY contain UTF-8-encoded data with value /reason/ + frame.frameData.write(reason, 2, 'utf-8') + } else { + frame.frameData = emptyBuffer + } + + object.socket.write(frame.createFrame(opcodes.CLOSE)) + + object.closeState.add(sentCloseFrameState.SENT) + + // Upon either sending or receiving a Close control frame, it is said + // that _The WebSocket Closing Handshake is Started_ and that the + // WebSocket connection is in the CLOSING state. + object.readyState = states.CLOSING + } else { + // Set object’s ready state to CLOSING (2). + object.readyState = states.CLOSING + } } module.exports = { diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index 69cfa1b3da5..2638abe3bfd 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -225,7 +225,7 @@ class ByteParser extends Writable { } else { this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => { if (error) { - closeWebSocketConnection(this.#handler, 1007, error.message, error.message.length) + closeWebSocketConnection(this.#handler, 1007, error.message) return } @@ -356,7 +356,7 @@ class ByteParser extends Writable { // Upon receiving such a frame, the other peer sends a // Close frame in response, if it hasn't already sent one. - if (!this.#handler.closeState.has(sentCloseFrameState.SENT)) { + if (!this.#handler.closeState.has(sentCloseFrameState.SENT) && !this.#handler.closeState.has(sentCloseFrameState.RECEIVED)) { // If an endpoint receives a Close frame and did not previously send a // Close frame, the endpoint MUST send a Close frame in response. (When // sending a Close frame in response, the endpoint typically echos the diff --git a/lib/web/websocket/stream/websocketerror.js b/lib/web/websocket/stream/websocketerror.js new file mode 100644 index 00000000000..ebe751efd71 --- /dev/null +++ b/lib/web/websocket/stream/websocketerror.js @@ -0,0 +1,81 @@ +'use strict' + +const { webidl } = require('../../fetch/webidl') +const { validateCloseCodeAndReason } = require('../util') +const { kConstruct } = require('../../../core/symbols') +const { kEnumerableProperty } = require('../../../core/util') + +class WebSocketError extends DOMException { + #closeCode + #reason + + constructor (message = '', init = undefined) { + message = webidl.converters.DOMString(message, 'WebSocketError', 'message') + + // 1. Set this 's name to " WebSocketError ". + // 2. Set this 's message to message . + super(message, 'WebSocketError') + + if (init === kConstruct) { + return + } else if (init !== null) { + init = webidl.converters.WebSocketCloseInfo(init) + } + + // 3. Let code be init [" closeCode "] if it exists , or null otherwise. + let code = init.closeCode ?? null + + // 4. Let reason be init [" reason "] if it exists , or the empty string otherwise. + const reason = init.reason ?? '' + + // 5. Validate close code and reason with code and reason . + validateCloseCodeAndReason(code, reason) + + // 6. If reason is non-empty, but code is not set, then set code to 1000 ("Normal Closure"). + if (reason.length !== 0 && code === null) { + code = 1000 + } + + // 7. Set this 's closeCode to code . + this.#closeCode = code + + // 8. Set this 's reason to reason . + this.#reason = reason + } + + get closeCode () { + return this.#closeCode + } + + get reason () { + return this.#reason + } + + /** + * @param {string} message + * @param {number|null} code + * @param {string} reason + */ + static createUnvalidatedWebSocketError (message, code, reason) { + const error = new WebSocketError(message, kConstruct) + error.#closeCode = code + error.#reason = reason + return error + } +} + +const { createUnvalidatedWebSocketError } = WebSocketError +delete WebSocketError.createUnvalidatedWebSocketError + +Object.defineProperties(WebSocketError.prototype, { + closeCode: kEnumerableProperty, + reason: kEnumerableProperty, + [Symbol.toStringTag]: { + value: 'WebSocketError', + writable: false, + enumerable: false, + configurable: true + } +}) + +module.exports = { WebSocketError, createUnvalidatedWebSocketError } diff --git a/lib/web/websocket/stream/websocketstream.js b/lib/web/websocket/stream/websocketstream.js new file mode 100644 index 00000000000..bd426f6af7a --- /dev/null +++ b/lib/web/websocket/stream/websocketstream.js @@ -0,0 +1,511 @@ +'use strict' + +const { createDeferredPromise, environmentSettingsObject } = require('../../fetch/util') +const { states, opcodes, sentCloseFrameState } = require('../constants') +const { webidl } = require('../../fetch/webidl') +const { getURLRecord, isValidSubprotocol, isEstablished, failWebsocketConnection, utf8Decode } = require('../util') +const { establishWebSocketConnection, closeWebSocketConnection } = require('../connection') +const { types } = require('node:util') +const { channels } = require('../../../core/diagnostics') +const { WebsocketFrameSend } = require('../frame') +const { ByteParser } = require('../receiver') +const { WebSocketError, createUnvalidatedWebSocketError } = require('./websocketerror') +const { utf8DecodeBytes } = require('../../fetch/util') +const { kEnumerableProperty } = require('../../../core/util') + +let emittedExperimentalWarning = false + +class WebSocketStream { + // Each WebSocketStream object has an associated url , which is a URL record . + /** @type {URL} */ + #url + + // Each WebSocketStream object has an associated opened promise , which is a promise. + /** @type {ReturnType} */ + #openedPromise + + // Each WebSocketStream object has an associated closed promise , which is a promise. + /** @type {ReturnType} */ + #closedPromise + + // Each WebSocketStream object has an associated readable stream , which is a ReadableStream . + /** @type {ReadableStream} */ + #readableStream + /** @type {ReadableStreamDefaultController} */ + #readableStreamController + + // Each WebSocketStream object has an associated writable stream , which is a WritableStream . + /** @type {WritableStream} */ + #writableStream + + // Each WebSocketStream object has an associated was ever connected , which is a boolean, initially false. + #wasEverConnected = false + + // Each WebSocketStream object has an associated boolean handshake aborted , which is initially false. + #handshakeAborted = false + + /** @type {import('../websocket').Handler} */ + #handler = { + // https://whatpr.org/websockets/48/7b748d3...d5570f3.html#feedback-to-websocket-stream-from-the-protocol + onConnectionEstablished: (response, extensions) => this.#onConnectionEstablished(response, extensions), + onFail: (reason) => this.#onFail(reason), + onMessage: (opcode, data) => this.#onMessage(opcode, data), + onParserError: (err) => {}, // eslint-disable-line + onParserDrain: () => this.#handler.socket.resume(), + onSocketData: (chunk) => { + if (!this.#parser.write(chunk)) { + this.#handler.socket.pause() + } + }, + onSocketError: (err) => { + this.#handler.readyState = states.CLOSING + + if (channels.socketError.hasSubscribers) { + channels.socketError.publish(err) + } + + this.#handler.socket.destroy() + }, + onSocketClose: () => this.#onSocketClose(), + + readyState: states.CONNECTING, + socket: null, + closeState: new Set(), + receivedClose: false + } + + /** @type {import('../receiver').ByteParser} */ + #parser + + /** @type {import('../../fetch/index').Fetch} */ + #controller + + constructor (url, options = undefined) { + if (!emittedExperimentalWarning) { + process.emitWarning('WebSocketStream is experimental! Expect it to change at any time.', { + code: 'UNDICI-WSS' + }) + emittedExperimentalWarning = true + } + + webidl.argumentLengthCheck(arguments, 1, 'WebSocket') + + url = webidl.converters.USVString(url) + if (options !== null) { + options = webidl.converters.WebSocketStreamOptions(options) + } + + // 1. Let baseURL be this 's relevant settings object 's API base URL . + const baseURL = environmentSettingsObject.settingsObject.baseUrl + + // 2. Let urlRecord be the result of getting a URL record given url and baseURL . + const urlRecord = getURLRecord(url, baseURL) + + // 3. Let protocols be options [" protocols "] if it exists , otherwise an empty sequence. + const protocols = options.protocols + + // 4. If any of the values in protocols occur more than once or otherwise fail to match the requirements for elements that comprise the value of ` Sec-WebSocket-Protocol ` fields as defined by The WebSocket Protocol , then throw a " SyntaxError " DOMException . [WSP] + if (protocols.length !== new Set(protocols.map(p => p.toLowerCase())).size) { + throw new DOMException('Invalid Sec-WebSocket-Protocol value', 'SyntaxError') + } + + if (protocols.length > 0 && !protocols.every(p => isValidSubprotocol(p))) { + throw new DOMException('Invalid Sec-WebSocket-Protocol value', 'SyntaxError') + } + + // 5. Set this 's url to urlRecord . + this.#url = urlRecord.toString() + + // 6. Set this 's opened promise and closed promise to new promises. + this.#openedPromise = createDeferredPromise() + this.#closedPromise = createDeferredPromise() + + // 7. Apply backpressure to the WebSocket. + // TODO + + // 8. If options [" signal "] exists , + if (options.signal != null) { + // 8.1. Let signal be options [" signal "]. + const signal = options.signal + + // 8.2. If signal is aborted , then reject this 's opened promise and closed promise with signal ’s abort reason + // and return. + if (signal.aborted) { + this.#openedPromise.reject(signal.reason) + this.#closedPromise.reject(signal.reason) + return + } + + // 8.3. Add the following abort steps to signal : + signal.addEventListener('abort', () => { + // 8.3.1. If the WebSocket connection is not yet established : [WSP] + if (!isEstablished(this.#handler.readyState)) { + // 8.3.1.1. Fail the WebSocket connection . + failWebsocketConnection(this.#handler) + + // Set this 's ready state to CLOSING . + this.#handler.readyState = states.CLOSING + + // Reject this 's opened promise and closed promise with signal ’s abort reason . + this.#openedPromise.reject(signal.reason) + this.#closedPromise.reject(signal.reason) + + // Set this 's handshake aborted to true. + this.#handshakeAborted = true + } + }, { once: true }) + } + + // 9. Let client be this 's relevant settings object . + const client = environmentSettingsObject.settingsObject + + // 10. Run this step in parallel : + // 10.1. Establish a WebSocket connection given urlRecord , protocols , and client . [FETCH] + this.#controller = establishWebSocketConnection( + urlRecord, + protocols, + client, + this.#handler, + options + ) + } + + // The url getter steps are to return this 's url , serialized . + get url () { + return this.#url.toString() + } + + // The opened getter steps are to return this 's opened promise . + get opened () { + return this.#openedPromise.promise + } + + // The closed getter steps are to return this 's closed promise . + get closed () { + return this.#closedPromise.promise + } + + // The close( closeInfo ) method steps are: + close (closeInfo = undefined) { + if (closeInfo !== null) { + closeInfo = webidl.converters.WebSocketCloseInfo(closeInfo) + } + + // 1. Let code be closeInfo [" closeCode "] if present, or null otherwise. + const code = closeInfo.closeCode ?? null + + // 2. Let reason be closeInfo [" reason "]. + const reason = closeInfo.reason + + // 3. Close the WebSocket with this , code , and reason . + closeWebSocketConnection(this.#handler, code, reason, true) + } + + #write (chunk) { + // 1. Let promise be a new promise created in stream ’s relevant realm . + const promise = createDeferredPromise() + + // 2. Let data be null. + let data = null + + // 3. Let opcode be null. + let opcode = null + + // 4. If chunk is a BufferSource , + if (ArrayBuffer.isView(chunk) || types.isArrayBuffer(chunk)) { + // 4.1. Set data to a copy of the bytes given chunk . + data = new Uint8Array(ArrayBuffer.isView(chunk) ? new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength) : chunk) + + // 4.2. Set opcode to a binary frame opcode. + opcode = opcodes.BINARY + } else { + // 5. Otherwise, + + // 5.1. Let string be the result of converting chunk to an IDL USVString . + // If this throws an exception, return a promise rejected with the exception. + let string + + try { + string = webidl.converters.DOMString(chunk) + } catch (e) { + promise.reject(e) + return + } + + // 5.2. Set data to the result of UTF-8 encoding string . + data = new TextEncoder().encode(string) + + // 5.3. Set opcode to a text frame opcode. + opcode = opcodes.TEXT + } + + // 6. In parallel, + // 6.1. Wait until there is sufficient buffer space in stream to send the message. + + // 6.2. If the closing handshake has not yet started , Send a WebSocket Message to stream comprised of data using opcode . + if (!this.#handler.closeState.has(sentCloseFrameState.SENT) && !this.#handler.closeState.has(sentCloseFrameState.RECEIVED)) { + const frame = new WebsocketFrameSend(data) + + this.#handler.socket.write(frame.createFrame(opcode), () => { + promise.resolve(undefined) + }) + } + + // 6.3. Queue a global task on the WebSocket task source given stream ’s relevant global object to resolve promise with undefined. + return promise + } + + /** @type {import('../websocket').Handler['onConnectionEstablished']} */ + #onConnectionEstablished (response, parsedExtensions) { + this.#handler.socket = response.socket + + const parser = new ByteParser(this.#handler, parsedExtensions) + parser.on('drain', () => this.#handler.onParserDrain()) + parser.on('error', (err) => this.#handler.onParserError(err)) + + this.#parser = parser + + // 1. Change stream ’s ready state to OPEN (1). + this.#handler.readyState = states.OPEN + + // 2. Set stream ’s was ever connected to true. + this.#wasEverConnected = true + + // 3. Let extensions be the extensions in use . + const extensions = parsedExtensions ?? '' + + // 4. Let protocol be the subprotocol in use . + const protocol = response.headersList.get('sec-websocket-protocol') ?? '' + + // 5. Let pullAlgorithm be an action that pulls bytes from stream . + // 6. Let cancelAlgorithm be an action that cancels stream with reason , given reason . + // 7. Let readable be a new ReadableStream . + // 8. Set up readable with pullAlgorithm and cancelAlgorithm . + const readable = new ReadableStream({ + start: (controller) => { + this.#readableStreamController = controller + }, + pull (controller) { + let chunk + while (controller.desiredSize > 0 && (chunk = response.socket.read()) !== null) { + controller.enqueue(chunk) + } + }, + cancel: (reason) => this.#cancel(reason) + }) + + // 9. Let writeAlgorithm be an action that writes chunk to stream , given chunk . + // 10. Let closeAlgorithm be an action that closes stream . + // 11. Let abortAlgorithm be an action that aborts stream with reason , given reason . + // 12. Let writable be a new WritableStream . + // 13. Set up writable with writeAlgorithm , closeAlgorithm , and abortAlgorithm . + const writable = new WritableStream({ + write: (chunk) => this.#write(chunk), + close: () => closeWebSocketConnection(this.#handler, null, null), + abort: (reason) => this.#closeUsingReason(reason) + }) + + // Set stream ’s readable stream to readable . + this.#readableStream = readable + + // Set stream ’s writable stream to writable . + this.#writableStream = writable + + // Resolve stream ’s opened promise with WebSocketOpenInfo «[ " extensions " → extensions , " protocol " → protocol , " readable " → readable , " writable " → writable ]». + this.#openedPromise.resolve({ + extensions, + protocol, + readable, + writable + }) + } + + /** @type {import('../websocket').Handler['onMessage']} */ + #onMessage (type, data) { + // 1. If stream’s ready state is not OPEN (1), then return. + if (this.#handler.readyState !== states.OPEN) { + return + } + + // 2. Let chunk be determined by switching on type: + // - type indicates that the data is Text + // a new DOMString containing data + // - type indicates that the data is Binary + // a new Uint8Array object, created in the relevant Realm of the + // WebSocketStream object, whose contents are data + let chunk + + if (type === opcodes.TEXT) { + try { + chunk = utf8Decode(data) + } catch { + failWebsocketConnection(this.#handler, 'Received invalid UTF-8 in text frame.') + return + } + } else if (type === opcodes.BINARY) { + chunk = new Uint8Array(data.buffer, data.byteOffset, data.byteLength) + } + + // 3. Enqueue chunk into stream’s readable stream. + this.#readableStreamController.enqueue(chunk) + + // 4. Apply backpressure to the WebSocket. + } + + /** @type {import('../websocket').Handler['onSocketClose']} */ + #onSocketClose () { + const wasClean = + this.#handler.closeState.has(sentCloseFrameState.SENT) && + this.#handler.closeState.has(sentCloseFrameState.RECEIVED) + + // 1. Change the ready state to CLOSED (3). + this.#handler.readyState = states.CLOSED + + // 2. If stream ’s handshake aborted is true, then return. + if (this.#handshakeAborted) { + return + } + + // 3. If stream ’s was ever connected is false, then reject stream ’s opened promise with a new WebSocketError. + if (!this.#wasEverConnected) { + this.#openedPromise.reject(new WebSocketError('Socket never opened')) + } + + const result = this.#parser.closingInfo + + // 4. Let code be the WebSocket connection close code . + // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5 + // If this Close control frame contains no status code, _The WebSocket + // Connection Close Code_ is considered to be 1005. If _The WebSocket + // Connection is Closed_ and no Close control frame was received by the + // endpoint (such as could occur if the underlying transport connection + // is lost), _The WebSocket Connection Close Code_ is considered to be + // 1006. + let code = result?.code ?? 1005 + + if (!this.#handler.closeState.has(sentCloseFrameState.SENT) && !this.#handler.closeState.has(sentCloseFrameState.RECEIVED)) { + code = 1006 + } + + // 5. Let reason be the result of applying UTF-8 decode without BOM to the WebSocket connection close reason . + const reason = result?.reason == null ? '' : utf8DecodeBytes(Buffer.from(result.reason)) + + // 6. If the connection was closed cleanly , + if (wasClean) { + // 6.1. Close stream ’s readable stream . + this.#readableStream.cancel().catch(() => {}) + + // 6.2. Error stream ’s writable stream with an " InvalidStateError " DOMException indicating that a closed WebSocketStream cannot be written to. + if (!this.#writableStream.locked) { + this.#writableStream.abort(new DOMException('A closed WebSocketStream cannot be written to', 'InvalidStateError')) + } + + // 6.3. Resolve stream ’s closed promise with WebSocketCloseInfo «[ " closeCode " → code , " reason " → reason ]». + this.#closedPromise.resolve({ + closeCode: code, + reason + }) + } else { + // 7. Otherwise, + + // 7.1. Let error be a new WebSocketError whose closeCode is code and reason is reason . + const error = createUnvalidatedWebSocketError('unclean close', code, reason) + + // 7.2. Error stream ’s readable stream with error . + this.#readableStreamController.error(error) + + // 7.3. Error stream ’s writable stream with error . + this.#writableStream.abort(error) + + // 7.4. Reject stream ’s closed promise with error . + this.#closedPromise.reject(error) + } + } + + #onFail (code, reason) { + // If _The WebSocket Connection is Established_ prior to the point where + // the endpoint is required to _Fail the WebSocket Connection_, the + // endpoint SHOULD send a Close frame with an appropriate status code + // (Section 7.4) before proceeding to _Close the WebSocket Connection_. + if (isEstablished(this.#handler.readyState)) { + closeWebSocketConnection(this.#handler, code, reason, false) + } + + this.#controller.abort() + + if (this.#handler.socket && !this.#handler.socket.destroyed) { + this.#handler.socket.destroy() + } + } + + #closeUsingReason (reason) { + // 1. Let code be null. + let code = null + + // 2. Let reasonString be the empty string. + let reasonString = '' + + // 3. If reason implements WebSocketError , + if (reason instanceof WebSocketError) { + // 3.1. Set code to reason ’s closeCode . + code = reason.closeCode + + // 3.2. Set reasonString to reason ’s reason . + reasonString = reason.reason + } + + // 4. Close the WebSocket with stream , code , and reasonString . If this throws an exception, + // discard code and reasonString and close the WebSocket with stream . + closeWebSocketConnection(this.#handler, code, reasonString) + } + + // To cancel a WebSocketStream stream given reason , close using reason giving stream and reason . + #cancel (reason) { + this.#closeUsingReason(reason) + } +} + +Object.defineProperties(WebSocketStream.prototype, { + url: kEnumerableProperty, + opened: kEnumerableProperty, + closed: kEnumerableProperty, + close: kEnumerableProperty, + [Symbol.toStringTag]: { + value: 'WebSocketStream', + writable: false, + enumerable: false, + configurable: true + } +}) + +webidl.converters.WebSocketStreamOptions = webidl.dictionaryConverter([ + { + key: 'protocols', + converter: webidl.sequenceConverter(webidl.converters.USVString), + defaultValue: () => [] + }, + { + key: 'signal', + converter: webidl.nullableConverter(webidl.converters.AbortSignal), + defaultValue: () => null + } +]) + +// TODO: take from request,hs +webidl.converters.AbortSignal ??= webidl.interfaceConverter( + AbortSignal +) + +webidl.converters.WebSocketCloseInfo = webidl.dictionaryConverter([ + { + key: 'closeCode', + converter: (V) => webidl.converters['unsigned short'](V, { enforceRange: true }) + }, + { + key: 'reason', + converter: webidl.converters.USVString, + defaultValue: () => '' + } +]) + +module.exports = { WebSocketStream } diff --git a/lib/web/websocket/util.js b/lib/web/websocket/util.js index 89ef3f316d9..6f43abf8361 100644 --- a/lib/web/websocket/util.js +++ b/lib/web/websocket/util.js @@ -247,6 +247,68 @@ function isValidClientWindowBits (value) { return true } +/** + * @see https://whatpr.org/websockets/48/7b748d3...d5570f3.html#get-a-url-record + * @param {string} url + * @param {string} [baseURL] + */ +function getURLRecord (url, baseURL) { + // 1. Let urlRecord be the result of applying the URL parser to url with baseURL . + // 2. If urlRecord is failure, then throw a " SyntaxError " DOMException . + let urlRecord + + try { + urlRecord = new URL(url, baseURL) + } catch (e) { + throw new DOMException(e, 'SyntaxError') + } + + // 3. If urlRecord ’s scheme is " http ", then set urlRecord ’s scheme to " ws ". + // 4. Otherwise, if urlRecord ’s scheme is " https ", set urlRecord ’s scheme to " wss ". + if (urlRecord.protocol === 'http:') { + urlRecord.protocol = 'ws:' + } else if (urlRecord.protocol === 'https:') { + urlRecord.protocol = 'wss:' + } + + // 5. If urlRecord ’s scheme is not " ws " or " wss ", then throw a " SyntaxError " DOMException . + if (urlRecord.protocol !== 'ws:' && urlRecord.protocol !== 'wss:') { + throw new DOMException('expected a ws: or wss: url', 'SyntaxError') + } + + // If urlRecord ’s fragment is non-null, then throw a " SyntaxError " DOMException . + if (urlRecord.hash.length || urlRecord.href.endsWith('#')) { + throw new DOMException('hash', 'SyntaxError') + } + + // Return urlRecord . + return urlRecord +} + +// https://whatpr.org/websockets/48.html#validate-close-code-and-reason +function validateCloseCodeAndReason (code, reason) { + // 1. If code is not null, but is neither an integer equal to + // 1000 nor an integer in the range 3000 to 4999, inclusive, + // throw an "InvalidAccessError" DOMException. + if (code !== null) { + if (code !== 1000 && (code < 3000 || code > 4999)) { + throw new DOMException('invalid code', 'InvalidAccessError') + } + } + + // 2. If reason is not null, then: + if (reason !== null) { + // 2.1. Let reasonBytes be the result of UTF-8 encoding reason. + // 2.2. If reasonBytes is longer than 123 bytes, then throw a + // "SyntaxError" DOMException. + const reasonBytesLength = Buffer.byteLength(reason) + + if (reasonBytesLength > 123) { + throw new DOMException(`Reason must be less than 123 bytes; received ${reasonBytesLength}`, 'SyntaxError') + } + } +} + /** * Converts a Buffer to utf-8, even on platforms without icu. * @type {(buffer: Buffer) => string} @@ -281,5 +343,7 @@ module.exports = { isValidOpcode, parseExtensions, isValidClientWindowBits, - toArrayBuffer + toArrayBuffer, + getURLRecord, + validateCloseCodeAndReason } diff --git a/lib/web/websocket/websocket.js b/lib/web/websocket/websocket.js index 6d68bda43b3..e04c6b6f921 100644 --- a/lib/web/websocket/websocket.js +++ b/lib/web/websocket/websocket.js @@ -3,7 +3,7 @@ const { webidl } = require('../fetch/webidl') const { URLSerializer } = require('../fetch/data-url') const { environmentSettingsObject } = require('../fetch/util') -const { staticPropertyDescriptors, states, sentCloseFrameState, sendHints, opcodes, emptyBuffer } = require('./constants') +const { staticPropertyDescriptors, states, sentCloseFrameState, sendHints, opcodes } = require('./constants') const { isConnecting, isEstablished, @@ -13,7 +13,7 @@ const { failWebsocketConnection, utf8Decode, toArrayBuffer, - isClosed + getURLRecord } = require('./util') const { establishWebSocketConnection, closeWebSocketConnection } = require('./connection') const { ByteParser } = require('./receiver') @@ -22,7 +22,6 @@ const { getGlobalDispatcher } = require('../../global') const { types } = require('node:util') const { ErrorEvent, CloseEvent, createFastMessageEvent } = require('./events') const { SendQueue } = require('./sender') -const { WebsocketFrameSend } = require('./frame') const { channels } = require('../../core/diagnostics') /** @@ -30,7 +29,6 @@ const { channels } = require('../../core/diagnostics') * @property {(response: any, extensions?: string[]) => void} onConnectionEstablished * @property {(code: number, reason: any) => void} onFail * @property {(opcode: number, data: Buffer) => void} onMessage - * @property {(code: number, reason: any, reasonByteLength: number) => void} onClose * @property {(error: Error) => void} onParserError * @property {() => void} onParserDrain * @property {(chunk: Buffer) => void} onSocketData @@ -63,7 +61,6 @@ class WebSocket extends EventTarget { onConnectionEstablished: (response, extensions) => this.#onConnectionEstablished(response, extensions), onFail: (code, reason) => this.#onFail(code, reason), onMessage: (opcode, data) => this.#onMessage(opcode, data), - onClose: (code, reason, reasonByteLength) => this.#onClose(code, reason, reasonByteLength), onParserError: (err) => this.#onParserError(err), onParserDrain: () => this.#onParserDrain(), onSocketData: (chunk) => { @@ -111,45 +108,16 @@ class WebSocket extends EventTarget { // 1. Let baseURL be this's relevant settings object's API base URL. const baseURL = environmentSettingsObject.settingsObject.baseUrl - // 1. Let urlRecord be the result of applying the URL parser to url with baseURL. - let urlRecord + // 2. Let urlRecord be the result of getting a URL record given url and baseURL. + const urlRecord = getURLRecord(url, baseURL) - try { - urlRecord = new URL(url, baseURL) - } catch (e) { - // 3. If urlRecord is failure, then throw a "SyntaxError" DOMException. - throw new DOMException(e, 'SyntaxError') - } - - // 4. If urlRecord’s scheme is "http", then set urlRecord’s scheme to "ws". - if (urlRecord.protocol === 'http:') { - urlRecord.protocol = 'ws:' - } else if (urlRecord.protocol === 'https:') { - // 5. Otherwise, if urlRecord’s scheme is "https", set urlRecord’s scheme to "wss". - urlRecord.protocol = 'wss:' - } - - // 6. If urlRecord’s scheme is not "ws" or "wss", then throw a "SyntaxError" DOMException. - if (urlRecord.protocol !== 'ws:' && urlRecord.protocol !== 'wss:') { - throw new DOMException( - `Expected a ws: or wss: protocol, got ${urlRecord.protocol}`, - 'SyntaxError' - ) - } - - // 7. If urlRecord’s fragment is non-null, then throw a "SyntaxError" - // DOMException. - if (urlRecord.hash || urlRecord.href.endsWith('#')) { - throw new DOMException('Got fragment', 'SyntaxError') - } - - // 8. If protocols is a string, set protocols to a sequence consisting + // 3. If protocols is a string, set protocols to a sequence consisting // of just that string. if (typeof protocols === 'string') { protocols = [protocols] } - // 9. If any of the values in protocols occur more than once or otherwise + // 4. If any of the values in protocols occur more than once or otherwise // fail to match the requirements for elements that comprise the value // of `Sec-WebSocket-Protocol` fields as defined by The WebSocket // protocol, then throw a "SyntaxError" DOMException. @@ -161,16 +129,15 @@ class WebSocket extends EventTarget { throw new DOMException('Invalid Sec-WebSocket-Protocol value', 'SyntaxError') } - // 10. Set this's url to urlRecord. + // 5. Set this's url to urlRecord. this.#url = new URL(urlRecord.href) - // 11. Let client be this's relevant settings object. + // 6. Let client be this's relevant settings object. const client = environmentSettingsObject.settingsObject - // 12. Run this step in parallel: - - // 1. Establish a WebSocket connection given urlRecord, protocols, - // and client. + // 7. Run this step in parallel: + // 7.1. Establish a WebSocket connection given urlRecord, protocols, + // and client. this.#controller = establishWebSocketConnection( urlRecord, protocols, @@ -211,34 +178,14 @@ class WebSocket extends EventTarget { reason = webidl.converters.USVString(reason) } - // 1. If code is present, but is neither an integer equal to 1000 nor an - // integer in the range 3000 to 4999, inclusive, throw an - // "InvalidAccessError" DOMException. - if (code !== undefined) { - if (code !== 1000 && (code < 3000 || code > 4999)) { - throw new DOMException('invalid code', 'InvalidAccessError') - } - } - - let reasonByteLength = 0 + // 1. If code is the special value "missing", then set code to null. + code ??= null - // 2. If reason is present, then run these substeps: - if (reason !== undefined) { - // 1. Let reasonBytes be the result of encoding reason. - // 2. If reasonBytes is longer than 123 bytes, then throw a - // "SyntaxError" DOMException. - reasonByteLength = Buffer.byteLength(reason) - - if (reasonByteLength > 123) { - throw new DOMException( - `Reason must be less than 123 bytes; received ${reasonByteLength}`, - 'SyntaxError' - ) - } - } + // 2. If reason is the special value "missing", then set reason to the empty string. + reason ??= '' - // 3. Run the first matching steps from the following list: - closeWebSocketConnection(this.#handler, code, reason, reasonByteLength) + // 3. Close the WebSocket with this, code, and reason. + closeWebSocketConnection(this.#handler, code, reason, true) } /** @@ -525,7 +472,7 @@ class WebSocket extends EventTarget { // endpoint SHOULD send a Close frame with an appropriate status code // (Section 7.4) before proceeding to _Close the WebSocket Connection_. if (isEstablished(this.#handler.readyState)) { - this.#onClose(code, reason, reason?.length) + closeWebSocketConnection(this.#handler, code, reason, false) } else { // If the WebSocket connection could not be established, it is also said // that _The WebSocket Connection is Closed_, but not _cleanly_. @@ -582,67 +529,6 @@ class WebSocket extends EventTarget { }) } - #onClose (code, reason, reasonByteLength) { - if (isClosing(this.#handler.readyState) || isClosed(this.#handler.readyState)) { - // If this's ready state is CLOSING (2) or CLOSED (3) - // Do nothing. - } else if (!isEstablished(this.#handler.readyState)) { - // If the WebSocket connection is not yet established - // Fail the WebSocket connection and set this's ready state - // to CLOSING (2). - failWebsocketConnection(this.#handler, 1002, 'Connection was closed before it was established.') - this.#handler.readyState = states.CLOSING - } else if (!this.#handler.closeState.has(sentCloseFrameState.SENT) && !this.#handler.closeState.has(sentCloseFrameState.RECEIVED)) { - // Upon either sending or receiving a Close control frame, it is said - // that _The WebSocket Closing Handshake is Started_ and that the - // WebSocket connection is in the CLOSING state. - - // If the WebSocket closing handshake has not yet been started - // Start the WebSocket closing handshake and set this's ready - // state to CLOSING (2). - // - If neither code nor reason is present, the WebSocket Close - // message must not have a body. - // - If code is present, then the status code to use in the - // WebSocket Close message must be the integer given by code. - // - If reason is also present, then reasonBytes must be - // provided in the Close message after the status code. - - const frame = new WebsocketFrameSend() - - // If neither code nor reason is present, the WebSocket Close - // message must not have a body. - - // If code is present, then the status code to use in the - // WebSocket Close message must be the integer given by code. - if (code !== undefined && reason === undefined) { - frame.frameData = Buffer.allocUnsafe(2) - frame.frameData.writeUInt16BE(code, 0) - } else if (code !== undefined && reason !== undefined) { - // If reason is also present, then reasonBytes must be - // provided in the Close message after the status code. - frame.frameData = Buffer.allocUnsafe(2 + reasonByteLength) - frame.frameData.writeUInt16BE(code, 0) - // the body MAY contain UTF-8-encoded data with value /reason/ - frame.frameData.write(reason, 2, 'utf-8') - } else { - frame.frameData = emptyBuffer - } - - this.#handler.socket.write(frame.createFrame(opcodes.CLOSE)) - - this.#handler.closeState.add(sentCloseFrameState.SENT) - - // Upon either sending or receiving a Close control frame, it is said - // that _The WebSocket Closing Handshake is Started_ and that the - // WebSocket connection is in the CLOSING state. - this.#handler.readyState = states.CLOSING - } else { - // Otherwise - // Set this's ready state to CLOSING (2). - this.#handler.readyState = states.CLOSING - } - } - #onParserError (err) { let message let code @@ -656,7 +542,7 @@ class WebSocket extends EventTarget { fireEvent('error', this, () => new ErrorEvent('error', { error: err, message })) - closeWebSocketConnection(this.#handler, code) + closeWebSocketConnection(this.#handler, code, null) } #onParserDrain () { diff --git a/test/wpt/runner/worker.mjs b/test/wpt/runner/worker.mjs index b4fea2ccf85..0bfdebff179 100644 --- a/test/wpt/runner/worker.mjs +++ b/test/wpt/runner/worker.mjs @@ -20,6 +20,9 @@ import { import { Cache } from '../../../lib/web/cache/cache.js' import { CacheStorage } from '../../../lib/web/cache/cachestorage.js' import { webcrypto } from 'node:crypto' +// TODO(@KhafraDev): move this import once its added to index +import { WebSocketStream } from '../../../lib/web/websocket/stream/websocketstream.js' +import { WebSocketError } from '../../../lib/web/websocket/stream/websocketerror.js' const { initScripts, meta, test, url, path } = workerData @@ -97,6 +100,14 @@ Object.defineProperties(globalThis, { EventSource: { ...globalPropertyDescriptors, value: EventSource + }, + WebSocketStream: { + ...globalPropertyDescriptors, + value: WebSocketStream + }, + WebSocketError: { + ...globalPropertyDescriptors, + value: WebSocketError } }) diff --git a/test/wpt/server/websocket.mjs b/test/wpt/server/websocket.mjs index 9bb05d12612..6f91e1e5131 100644 --- a/test/wpt/server/websocket.mjs +++ b/test/wpt/server/websocket.mjs @@ -7,11 +7,32 @@ import { server } from './server.mjs' // event, so I'm unsure if we can stop relying on server. const wss = new WebSocketServer({ - server, + noServer: true, handleProtocols: (protocols) => protocols.values().next().value }) wss.on('connection', (ws, request) => { + if (request.url === '/protocol_array') { + const line = request.headers['sec-websocket-protocol'] + const protocol = line.split(',')[0] + ws.send(protocol) + return + } else if (request.url === '/remote-close' || request.url.startsWith('/remote-close?')) { + const fullUrl = new URL(request.url, `ws://localhost:${server.address().port}`) + + const code = fullUrl.searchParams.has('code') ? Number(fullUrl.searchParams.get('code')) : undefined + const reason = fullUrl.searchParams.get('reason') ?? undefined + const abrupt = fullUrl.searchParams.get('abrupt') === '1' + + if (abrupt) { + ws._socket.end() + return + } + + ws.close(code, reason) + return + } + ws.on('message', (data, isBinary) => { const str = data.toString('utf-8') @@ -44,3 +65,15 @@ wss.on('connection', (ws, request) => { clearTimeout(timeout) }) }) + +server.on('upgrade', (request, socket, head) => { + if (request.url === '/404') { + socket.write('HTTP/1.1 404 Not Found\r\n\r\n') + socket.destroy() + return + } + + wss.handleUpgrade(request, socket, head, (ws) => { + wss.emit('connection', ws, request) + }) +}) diff --git a/test/wpt/status/websockets.status.json b/test/wpt/status/websockets.status.json index dd36014861b..920501e0d4c 100644 --- a/test/wpt/status/websockets.status.json +++ b/test/wpt/status/websockets.status.json @@ -1,7 +1,24 @@ { "stream": { "tentative": { - "skip": true + "backpressure-send.any.js": { + "note": "Deno also fails this test", + "fail": [ + "backpressure should be applied to sent messages" + ] + }, + "remote-close.any.js": { + "note": "Deno also fails this test", + "fail": [ + "close with unwritten data should not be considered clean" + ] + }, + "abort.any.js": { + "note": "Relies on exact timing - ie. aborting *during* the handshake.", + "flaky": [ + "abort during handshake should work" + ] + } } }, "interfaces": {