From 018545fbeb67311653156f45fa6eb3d99f4cb211 Mon Sep 17 00:00:00 2001 From: Khafra Date: Wed, 15 May 2024 00:17:15 -0400 Subject: [PATCH 01/20] handshake --- lib/web/fetch/data-url.js | 1 + lib/web/websocket/connection.js | 21 +++++++++++++-------- lib/web/websocket/receiver.js | 17 +++++++++++++++-- lib/web/websocket/util.js | 28 +++++++++++++++++++++++++++- lib/web/websocket/websocket.js | 6 +++--- 5 files changed, 59 insertions(+), 14 deletions(-) diff --git a/lib/web/fetch/data-url.js b/lib/web/fetch/data-url.js index 3f42e2eb6b2..7a74db6bde8 100644 --- a/lib/web/fetch/data-url.js +++ b/lib/web/fetch/data-url.js @@ -737,6 +737,7 @@ module.exports = { collectAnHTTPQuotedString, serializeAMimeType, removeChars, + removeHTTPWhitespace, minimizeSupportedMimeType, HTTP_TOKEN_CODEPOINTS, isomorphicDecode diff --git a/lib/web/websocket/connection.js b/lib/web/websocket/connection.js index 664fc3f0780..88be10c87da 100644 --- a/lib/web/websocket/connection.js +++ b/lib/web/websocket/connection.js @@ -8,7 +8,7 @@ const { kReceivedClose, kResponse } = require('./symbols') -const { fireEvent, failWebsocketConnection, isClosing, isClosed, isEstablished } = require('./util') +const { fireEvent, failWebsocketConnection, isClosing, isClosed, isEstablished, parseExtensions } = require('./util') const { channels } = require('../../core/diagnostics') const { CloseEvent } = require('./events') const { makeRequest } = require('../fetch/request') @@ -31,7 +31,7 @@ try { * @param {URL} url * @param {string|string[]} protocols * @param {import('./websocket').WebSocket} ws - * @param {(response: any) => void} onEstablish + * @param {(response: any, extensions: string[] | undefined) => void} onEstablish * @param {Partial} options */ function establishWebSocketConnection (url, protocols, client, ws, onEstablish, options) { @@ -92,11 +92,11 @@ function establishWebSocketConnection (url, protocols, client, ws, onEstablish, // "permessage-deflate" extension header value. // https://github.com/mozilla/gecko-dev/blob/ce78234f5e653a5d3916813ff990f053510227bc/netwerk/protocol/websocket/WebSocketChannel.cpp#L2673 // TODO: enable once permessage-deflate is supported - const permessageDeflate = '' // 'permessage-deflate; 15' + const permessageDeflate = 'permessage-deflate' // 'permessage-deflate; 15' // 10. Append (`Sec-WebSocket-Extensions`, permessageDeflate) to // request’s header list. - // request.headersList.append('sec-websocket-extensions', permessageDeflate) + request.headersList.set('sec-websocket-extensions', permessageDeflate) // 11. Fetch request with useParallelQueue set to true, and // processResponse given response being these steps: @@ -167,10 +167,15 @@ function establishWebSocketConnection (url, protocols, client, ws, onEstablish, // header field to determine which extensions are requested is // discussed in Section 9.1.) const secExtension = response.headersList.get('Sec-WebSocket-Extensions') + let extensions - if (secExtension !== null && secExtension !== permessageDeflate) { - failWebsocketConnection(ws, 'Received different permessage-deflate than the one set.') - return + if (secExtension !== null) { + extensions = parseExtensions(secExtension) + + if (!extensions.has(permessageDeflate)) { + failWebsocketConnection(ws, 'Sec-WebSocket-Extensions header does not match.') + return + } } // 6. If the response includes a |Sec-WebSocket-Protocol| header field @@ -206,7 +211,7 @@ function establishWebSocketConnection (url, protocols, client, ws, onEstablish, }) } - onEstablish(response) + onEstablish(response, extensions) } }) diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index 85b6edf649c..748d14123c5 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -33,10 +33,14 @@ class ByteParser extends Writable { #info = {} #fragments = [] - constructor (ws) { + /** @type {Set} */ + #extensions + + constructor (ws, extensions) { super() this.ws = ws + this.#extensions = new Set(extensions) } /** @@ -91,7 +95,16 @@ class ByteParser extends Writable { // the negotiated extensions defines the meaning of such a nonzero // value, the receiving endpoint MUST _Fail the WebSocket // Connection_. - if (rsv1 !== 0 || rsv2 !== 0 || rsv3 !== 0) { + // This document allocates the RSV1 bit of the WebSocket header for + // PMCEs and calls the bit the "Per-Message Compressed" bit. On a + // WebSocket connection where a PMCE is in use, this bit indicates + // whether a message is compressed or not. + if (rsv1 !== 0 && !this.#extensions.has('permessage-deflate')) { + failWebsocketConnection(this.ws, 'Expected RSV1 to be clear.') + return + } + + if (rsv2 !== 0 || rsv3 !== 0) { failWebsocketConnection(this.ws, 'RSV1, RSV2, RSV3 must be clear') return } diff --git a/lib/web/websocket/util.js b/lib/web/websocket/util.js index ea5b29d3549..844cff2d007 100644 --- a/lib/web/websocket/util.js +++ b/lib/web/websocket/util.js @@ -4,6 +4,7 @@ const { kReadyState, kController, kResponse, kBinaryType, kWebSocketURL } = requ const { states, opcodes } = require('./constants') const { ErrorEvent, createFastMessageEvent } = require('./events') const { isUtf8 } = require('node:buffer') +const { collectASequenceOfCodePointsFast, removeHTTPWhitespace } = require('../fetch/data-url') /* globals Blob */ @@ -234,6 +235,30 @@ function isValidOpcode (opcode) { return isTextBinaryFrame(opcode) || isContinuationFrame(opcode) || isControlFrame(opcode) } +/** + * Parses a Sec-WebSocket-Extensions header value. + * @param {string} extensions + * @returns {Map} + */ +function parseExtensions (extensions) { + const position = { position: 0 } + const extensionList = new Map() + + while (position.position < extensions.length) { + const pair = collectASequenceOfCodePointsFast(';', extensions, position) + const [name, value = ''] = pair.split('=') + + extensionList.set( + removeHTTPWhitespace(name, true, false), + removeHTTPWhitespace(value, false, true) + ) + + position.position++ + } + + return extensionList +} + // https://nodejs.org/api/intl.html#detecting-internationalization-support const hasIntl = typeof process.versions.icu === 'string' const fatalDecoder = hasIntl ? new TextDecoder('utf-8', { fatal: true }) : undefined @@ -265,5 +290,6 @@ module.exports = { isControlFrame, isContinuationFrame, isTextBinaryFrame, - isValidOpcode + isValidOpcode, + parseExtensions } diff --git a/lib/web/websocket/websocket.js b/lib/web/websocket/websocket.js index 7b62dde43c6..aa197e6b16c 100644 --- a/lib/web/websocket/websocket.js +++ b/lib/web/websocket/websocket.js @@ -135,7 +135,7 @@ class WebSocket extends EventTarget { protocols, client, this, - (response) => this.#onConnectionEstablished(response), + (response, extensions) => this.#onConnectionEstablished(response, extensions), options ) @@ -458,12 +458,12 @@ class WebSocket extends EventTarget { /** * @see https://websockets.spec.whatwg.org/#feedback-from-the-protocol */ - #onConnectionEstablished (response) { + #onConnectionEstablished (response, parsedExtensions) { // processResponse is called when the "response’s header list has been received and initialized." // once this happens, the connection is open this[kResponse] = response - const parser = new ByteParser(this) + const parser = new ByteParser(this, parsedExtensions) parser.on('drain', onParserDrain) parser.on('error', onParserError.bind(this)) From 142832246a5a21212190b9720cc1d13f8a35f69a Mon Sep 17 00:00:00 2001 From: Khafra Date: Wed, 15 May 2024 00:19:21 -0400 Subject: [PATCH 02/20] fixup --- lib/web/websocket/connection.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/web/websocket/connection.js b/lib/web/websocket/connection.js index 88be10c87da..1c2ef9132fe 100644 --- a/lib/web/websocket/connection.js +++ b/lib/web/websocket/connection.js @@ -91,12 +91,11 @@ function establishWebSocketConnection (url, protocols, client, ws, onEstablish, // 9. Let permessageDeflate be a user-agent defined // "permessage-deflate" extension header value. // https://github.com/mozilla/gecko-dev/blob/ce78234f5e653a5d3916813ff990f053510227bc/netwerk/protocol/websocket/WebSocketChannel.cpp#L2673 - // TODO: enable once permessage-deflate is supported - const permessageDeflate = 'permessage-deflate' // 'permessage-deflate; 15' + const permessageDeflate = 'permessage-deflate' // 10. Append (`Sec-WebSocket-Extensions`, permessageDeflate) to // request’s header list. - request.headersList.set('sec-websocket-extensions', permessageDeflate) + request.headersList.append('sec-websocket-extensions', permessageDeflate) // 11. Fetch request with useParallelQueue set to true, and // processResponse given response being these steps: From c7c92ddf52c7310b7bf463686e7dc915ddb55cd7 Mon Sep 17 00:00:00 2001 From: Khafra Date: Wed, 15 May 2024 11:25:24 -0400 Subject: [PATCH 03/20] +85 autobahn test passes --- lib/web/websocket/permessage-deflate.js | 39 +++++++++++++++++++++++++ lib/web/websocket/receiver.js | 36 +++++++++++++++-------- 2 files changed, 63 insertions(+), 12 deletions(-) create mode 100644 lib/web/websocket/permessage-deflate.js diff --git a/lib/web/websocket/permessage-deflate.js b/lib/web/websocket/permessage-deflate.js new file mode 100644 index 00000000000..6b808e7239e --- /dev/null +++ b/lib/web/websocket/permessage-deflate.js @@ -0,0 +1,39 @@ +'use strict' + +const { createInflateRaw } = require('node:zlib') + +const tail = Buffer.from([0x00, 0x00, 0xff, 0xff]) +const kBuffer = Symbol('kBuffer') + +class PerMessageDeflate { + /** @type {import('node:zlib').InflateRaw} */ + #inflate + + decompress (chunk, fin, callback) { + // An endpoint uses the following algorithm to decompress a message. + // 1. Append 4 octets of 0x00 0x00 0xff 0xff to the tail end of the + // payload of the message. + // 2. Decompress the resulting data using DEFLATE. + + if (!this.#inflate) { + this.#inflate = createInflateRaw(/* TODO */) + this.#inflate[kBuffer] = [] + + this.#inflate.on('data', (data) => this.#inflate[kBuffer].push(data)) + } + + this.#inflate.write(chunk) + if (fin) { + this.#inflate.write(tail) + } + + this.#inflate.flush(() => { + const full = Buffer.concat(this.#inflate[kBuffer]) + callback(full) + + this.#inflate[kBuffer].length = 0 + }) + } +} + +module.exports = { PerMessageDeflate } diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index 748d14123c5..ea94bd50b53 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -17,6 +17,7 @@ const { } = require('./util') const { WebsocketFrameSend } = require('./frame') const { closeWebSocketConnection } = require('./connection') +const { PerMessageDeflate } = require('./permessage-deflate') // This code was influenced by ws released under the MIT license. // Copyright (c) 2011 Einar Otto Stangvik @@ -33,14 +34,18 @@ class ByteParser extends Writable { #info = {} #fragments = [] - /** @type {Set} */ + /** @type {Map} */ #extensions constructor (ws, extensions) { super() this.ws = ws - this.#extensions = new Set(extensions) + this.#extensions = extensions == null ? new Map() : extensions + + if (this.#extensions.has('permessage-deflate')) { + this.#extensions.set('permessage-deflate', new PerMessageDeflate()) + } } /** @@ -157,6 +162,7 @@ class ByteParser extends Writable { this.#info.masked = masked this.#info.fin = fin this.#info.fragmented = fragmented + this.#info.compressed = rsv1 !== 0 } else if (this.#state === parserStates.PAYLOADLENGTH_16) { if (this.#byteOffset < 2) { return callback() @@ -199,16 +205,22 @@ class ByteParser extends Writable { if (isControlFrame(this.#info.opcode)) { this.#loop = this.parseControlFrame(body) } else { - this.#fragments.push(body) - - // If the frame is not fragmented, a message has been received. - // If the frame is fragmented, it will terminate with a fin bit set - // and an opcode of 0 (continuation), therefore we handle that when - // parsing continuation frames, not here. - if (!this.#info.fragmented && this.#info.fin) { - const fullMessage = Buffer.concat(this.#fragments) - websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage) - this.#fragments.length = 0 + if (!this.#info.compressed) { + this.#fragments.push(body) + + // If the frame is not fragmented, a message has been received. + // If the frame is fragmented, it will terminate with a fin bit set + // and an opcode of 0 (continuation), therefore we handle that when + // parsing continuation frames, not here. + if (!this.#info.fragmented && this.#info.fin) { + const fullMessage = Buffer.concat(this.#fragments) + websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage) + this.#fragments.length = 0 + } + } else { + this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (data) => { + websocketMessageReceived(this.ws, this.#info.binaryType, data) + }) } } From 1c04f05516dd2b5974e1d9bb127c7014046338dc Mon Sep 17 00:00:00 2001 From: Khafra Date: Wed, 15 May 2024 11:59:50 -0400 Subject: [PATCH 04/20] fixup --- lib/web/websocket/permessage-deflate.js | 10 ++++++++-- lib/web/websocket/receiver.js | 5 +++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/web/websocket/permessage-deflate.js b/lib/web/websocket/permessage-deflate.js index 6b808e7239e..521330223a8 100644 --- a/lib/web/websocket/permessage-deflate.js +++ b/lib/web/websocket/permessage-deflate.js @@ -4,6 +4,7 @@ const { createInflateRaw } = require('node:zlib') const tail = Buffer.from([0x00, 0x00, 0xff, 0xff]) const kBuffer = Symbol('kBuffer') +const kLength = Symbol('kLength') class PerMessageDeflate { /** @type {import('node:zlib').InflateRaw} */ @@ -18,8 +19,12 @@ class PerMessageDeflate { if (!this.#inflate) { this.#inflate = createInflateRaw(/* TODO */) this.#inflate[kBuffer] = [] + this.#inflate[kLength] = 0 - this.#inflate.on('data', (data) => this.#inflate[kBuffer].push(data)) + this.#inflate.on('data', (data) => { + this.#inflate[kBuffer].push(data) + this.#inflate[kLength] += data.length + }) } this.#inflate.write(chunk) @@ -28,10 +33,11 @@ class PerMessageDeflate { } this.#inflate.flush(() => { - const full = Buffer.concat(this.#inflate[kBuffer]) + const full = Buffer.concat(this.#inflate[kBuffer], this.#inflate[kLength]) callback(full) this.#inflate[kBuffer].length = 0 + this.#inflate[kLength] = 0 }) } } diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index ea94bd50b53..663ac51686e 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -220,7 +220,12 @@ class ByteParser extends Writable { } else { this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (data) => { websocketMessageReceived(this.ws, this.#info.binaryType, data) + + this.#state = parserStates.INFO + callback() }) + + return } } From e0729eb0040f2731a6f2770bb0db8e66aaf30a56 Mon Sep 17 00:00:00 2001 From: Khafra Date: Wed, 15 May 2024 18:34:55 -0400 Subject: [PATCH 05/20] fixup --- lib/web/websocket/receiver.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index 663ac51686e..ae67f3eba55 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -140,7 +140,7 @@ class ByteParser extends Writable { return } - if (isContinuationFrame(opcode) && this.#fragments.length === 0) { + if (isContinuationFrame(opcode) && this.#fragments.length === 0 && !this.#info.compressed) { failWebsocketConnection(this.ws, 'Unexpected continuation frame') return } @@ -156,13 +156,13 @@ class ByteParser extends Writable { if (isTextBinaryFrame(opcode)) { this.#info.binaryType = opcode + this.#info.compressed = rsv1 !== 0 } this.#info.opcode = opcode this.#info.masked = masked this.#info.fin = fin this.#info.fragmented = fragmented - this.#info.compressed = rsv1 !== 0 } else if (this.#state === parserStates.PAYLOADLENGTH_16) { if (this.#byteOffset < 2) { return callback() From 00e4713201209de3c4be15b20783c6124c61713b Mon Sep 17 00:00:00 2001 From: Khafra Date: Wed, 15 May 2024 18:42:30 -0400 Subject: [PATCH 06/20] fixup --- lib/web/websocket/receiver.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index ae67f3eba55..48de3e19569 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -221,11 +221,11 @@ class ByteParser extends Writable { this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (data) => { websocketMessageReceived(this.ws, this.#info.binaryType, data) - this.#state = parserStates.INFO - callback() + this.#loop = true + this.run(callback) }) - return + this.#loop = false } } From f3d6c7f46e0d13a3a14ae9cdbfc4f2daf3560526 Mon Sep 17 00:00:00 2001 From: Khafra Date: Wed, 15 May 2024 18:56:24 -0400 Subject: [PATCH 07/20] fixup --- lib/web/websocket/connection.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/web/websocket/connection.js b/lib/web/websocket/connection.js index 1c2ef9132fe..f8a9231763e 100644 --- a/lib/web/websocket/connection.js +++ b/lib/web/websocket/connection.js @@ -283,7 +283,9 @@ function closeWebSocketConnection (ws, code, reason, reasonByteLength) { * @param {Buffer} chunk */ function onSocketData (chunk) { - if (!this.ws[kByteParser].write(chunk)) { + const parser = this.ws[kByteParser] + + if (parser.writable && !parser.write(chunk)) { this.pause() } } From d960de647919b7d43f65e685abc5490e613294d2 Mon Sep 17 00:00:00 2001 From: Khafra Date: Wed, 15 May 2024 20:28:26 -0400 Subject: [PATCH 08/20] fixup --- lib/web/websocket/connection.js | 4 +--- lib/web/websocket/permessage-deflate.js | 15 ++++++++++++++- lib/web/websocket/receiver.js | 10 +++++++--- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/lib/web/websocket/connection.js b/lib/web/websocket/connection.js index f8a9231763e..1c2ef9132fe 100644 --- a/lib/web/websocket/connection.js +++ b/lib/web/websocket/connection.js @@ -283,9 +283,7 @@ function closeWebSocketConnection (ws, code, reason, reasonByteLength) { * @param {Buffer} chunk */ function onSocketData (chunk) { - const parser = this.ws[kByteParser] - - if (parser.writable && !parser.write(chunk)) { + if (!this.ws[kByteParser].write(chunk)) { this.pause() } } diff --git a/lib/web/websocket/permessage-deflate.js b/lib/web/websocket/permessage-deflate.js index 521330223a8..3918fd436c0 100644 --- a/lib/web/websocket/permessage-deflate.js +++ b/lib/web/websocket/permessage-deflate.js @@ -10,6 +10,12 @@ class PerMessageDeflate { /** @type {import('node:zlib').InflateRaw} */ #inflate + #options = {} + + constructor (extensions) { + this.#options.clientNoContextTakeover = extensions.has('client_no_context_takeover') + } + decompress (chunk, fin, callback) { // An endpoint uses the following algorithm to decompress a message. // 1. Append 4 octets of 0x00 0x00 0xff 0xff to the tail end of the @@ -25,6 +31,8 @@ class PerMessageDeflate { this.#inflate[kBuffer].push(data) this.#inflate[kLength] += data.length }) + + this.#inflate.on('error', (err) => callback(err)) } this.#inflate.write(chunk) @@ -34,10 +42,15 @@ class PerMessageDeflate { this.#inflate.flush(() => { const full = Buffer.concat(this.#inflate[kBuffer], this.#inflate[kLength]) - callback(full) this.#inflate[kBuffer].length = 0 this.#inflate[kLength] = 0 + + callback(null, full) + + if (fin && this.#options.clientNoContextTakeover) { + this.#inflate.reset() + } }) } } diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index 48de3e19569..b499f2b7fda 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -44,7 +44,7 @@ class ByteParser extends Writable { this.#extensions = extensions == null ? new Map() : extensions if (this.#extensions.has('permessage-deflate')) { - this.#extensions.set('permessage-deflate', new PerMessageDeflate()) + this.#extensions.set('permessage-deflate', new PerMessageDeflate(extensions)) } } @@ -218,7 +218,12 @@ class ByteParser extends Writable { this.#fragments.length = 0 } } else { - this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (data) => { + this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => { + if (error) { + closeWebSocketConnection(this.ws, 1007, error.message, error.message.length) + return + } + websocketMessageReceived(this.ws, this.#info.binaryType, data) this.#loop = true @@ -363,7 +368,6 @@ class ByteParser extends Writable { this.ws[kReadyState] = states.CLOSING this.ws[kReceivedClose] = true - this.end() return false } else if (opcode === opcodes.PING) { // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in From ef8e1e017bde5083725373e9f100d8e78f72d9d3 Mon Sep 17 00:00:00 2001 From: Khafra Date: Thu, 16 May 2024 10:25:09 -0400 Subject: [PATCH 09/20] fixup --- lib/web/websocket/permessage-deflate.js | 17 +++++++++++++++-- lib/web/websocket/util.js | 20 +++++++++++++++++++- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/lib/web/websocket/permessage-deflate.js b/lib/web/websocket/permessage-deflate.js index 3918fd436c0..d349863888f 100644 --- a/lib/web/websocket/permessage-deflate.js +++ b/lib/web/websocket/permessage-deflate.js @@ -1,6 +1,7 @@ 'use strict' -const { createInflateRaw } = require('node:zlib') +const { createInflateRaw, Z_DEFAULT_WINDOWBITS } = require('node:zlib') +const { isValidClientWindowBits } = require('./util') const tail = Buffer.from([0x00, 0x00, 0xff, 0xff]) const kBuffer = Symbol('kBuffer') @@ -14,6 +15,7 @@ class PerMessageDeflate { constructor (extensions) { this.#options.clientNoContextTakeover = extensions.has('client_no_context_takeover') + this.#options.clientMaxWindowBits = extensions.get('client_max_window_bits') } decompress (chunk, fin, callback) { @@ -23,7 +25,18 @@ class PerMessageDeflate { // 2. Decompress the resulting data using DEFLATE. if (!this.#inflate) { - this.#inflate = createInflateRaw(/* TODO */) + let windowBits = Z_DEFAULT_WINDOWBITS + + if (this.#options.clientMaxWindowBits) { // empty values default to Z_DEFAULT_WINDOWBITS + if (!isValidClientWindowBits(this.#options.clientMaxWindowBits)) { + callback(new Error('Invalid client_max_window_bits')) + return + } + + windowBits = Number.parseInt(this.#options.clientMaxWindowBits) + } + + this.#inflate = createInflateRaw({ windowBits }) this.#inflate[kBuffer] = [] this.#inflate[kLength] = 0 diff --git a/lib/web/websocket/util.js b/lib/web/websocket/util.js index 844cff2d007..8a2bae5de45 100644 --- a/lib/web/websocket/util.js +++ b/lib/web/websocket/util.js @@ -259,6 +259,23 @@ function parseExtensions (extensions) { return extensionList } +/** + * @see https://www.rfc-editor.org/rfc/rfc7692#section-7.1.2.2 + * @description "client-max-window-bits = 1*DIGIT" + * @param {string} value + */ +function isValidClientWindowBits (value) { + for (let i = 0; i < value.length; i++) { + const byte = value.charCodeAt(i) + + if (byte < 0x30 || byte > 0x39) { + return false + } + } + + return true +} + // https://nodejs.org/api/intl.html#detecting-internationalization-support const hasIntl = typeof process.versions.icu === 'string' const fatalDecoder = hasIntl ? new TextDecoder('utf-8', { fatal: true }) : undefined @@ -291,5 +308,6 @@ module.exports = { isContinuationFrame, isTextBinaryFrame, isValidOpcode, - parseExtensions + parseExtensions, + isValidClientWindowBits } From b50e9d9d04eada4e9cfd466c5d1f5cc7a6f6bb4e Mon Sep 17 00:00:00 2001 From: Khafra Date: Thu, 16 May 2024 16:24:00 -0400 Subject: [PATCH 10/20] fixup --- lib/web/websocket/connection.js | 2 +- lib/web/websocket/websocket.js | 15 ++++++++++++++- test/autobahn/client.js | 12 +++++++++--- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/lib/web/websocket/connection.js b/lib/web/websocket/connection.js index 1c2ef9132fe..4a3eec6c5bd 100644 --- a/lib/web/websocket/connection.js +++ b/lib/web/websocket/connection.js @@ -91,7 +91,7 @@ function establishWebSocketConnection (url, protocols, client, ws, onEstablish, // 9. Let permessageDeflate be a user-agent defined // "permessage-deflate" extension header value. // https://github.com/mozilla/gecko-dev/blob/ce78234f5e653a5d3916813ff990f053510227bc/netwerk/protocol/websocket/WebSocketChannel.cpp#L2673 - const permessageDeflate = 'permessage-deflate' + const permessageDeflate = options.node?.['client-extensions'] ?? '' // 10. Append (`Sec-WebSocket-Extensions`, permessageDeflate) to // request’s header list. diff --git a/lib/web/websocket/websocket.js b/lib/web/websocket/websocket.js index aa197e6b16c..a0e66f38d66 100644 --- a/lib/web/websocket/websocket.js +++ b/lib/web/websocket/websocket.js @@ -549,6 +549,14 @@ webidl.converters['DOMString or sequence'] = function (V, prefix, arg return webidl.converters.DOMString(V, prefix, argument) } +webidl.converters.WebSocketInitNodeOptions = webidl.dictionaryConverter([ + { + key: 'client-extensions', + converter: webidl.converters.DOMString, + defaultValue: () => '' + } +]) + // This implements the proposal made in https://github.com/whatwg/websockets/issues/42 webidl.converters.WebSocketInit = webidl.dictionaryConverter([ { @@ -558,12 +566,17 @@ webidl.converters.WebSocketInit = webidl.dictionaryConverter([ }, { key: 'dispatcher', - converter: (V) => V, + converter: webidl.converters.any, defaultValue: () => getGlobalDispatcher() }, { key: 'headers', converter: webidl.nullableConverter(webidl.converters.HeadersInit) + }, + { + key: 'node', + converter: webidl.converters.WebSocketInitNodeOptions, + defaultValue: () => ({}) } ]) diff --git a/test/autobahn/client.js b/test/autobahn/client.js index 41bf1d61063..a2c74522c71 100644 --- a/test/autobahn/client.js +++ b/test/autobahn/client.js @@ -6,19 +6,25 @@ let currentTest = 1 let testCount const autobahnFuzzingserverUrl = process.env.FUZZING_SERVER_URL || 'ws://localhost:9001' +const options = { + node: { + 'client-extensions': 'permessage-deflate' + } +} function nextTest () { let ws if (currentTest > testCount) { - ws = new WebSocket(`${autobahnFuzzingserverUrl}/updateReports?agent=undici`) + ws = new WebSocket(`${autobahnFuzzingserverUrl}/updateReports?agent=undici`, options) return } console.log(`Running test case ${currentTest}/${testCount}`) ws = new WebSocket( - `${autobahnFuzzingserverUrl}/runCase?case=${currentTest}&agent=undici` + `${autobahnFuzzingserverUrl}/runCase?case=${currentTest}&agent=undici`, + options ) ws.addEventListener('message', (data) => { ws.send(data.data) @@ -32,7 +38,7 @@ function nextTest () { }) } -const ws = new WebSocket(`${autobahnFuzzingserverUrl}/getCaseCount`) +const ws = new WebSocket(`${autobahnFuzzingserverUrl}/getCaseCount`, options) ws.addEventListener('message', (data) => { testCount = parseInt(data.data) }) From f41e2fff070936cb414a73d3f6398d47f8c54d2d Mon Sep 17 00:00:00 2001 From: Khafra Date: Fri, 17 May 2024 10:15:59 -0400 Subject: [PATCH 11/20] fixup --- lib/web/websocket/connection.js | 5 +++++ lib/web/websocket/util.js | 1 + 2 files changed, 6 insertions(+) diff --git a/lib/web/websocket/connection.js b/lib/web/websocket/connection.js index 4a3eec6c5bd..3a9c649aaac 100644 --- a/lib/web/websocket/connection.js +++ b/lib/web/websocket/connection.js @@ -294,6 +294,11 @@ function onSocketData (chunk) { */ function onSocketClose () { const { ws } = this + const { [kResponse]: response } = ws + + response.socket.off('data', onSocketData) + response.socket.off('close', onSocketClose) + response.socket.off('error', onSocketError) // If the TCP connection was closed after the // WebSocket closing handshake was completed, the WebSocket connection diff --git a/lib/web/websocket/util.js b/lib/web/websocket/util.js index 8a2bae5de45..e5ce7899752 100644 --- a/lib/web/websocket/util.js +++ b/lib/web/websocket/util.js @@ -240,6 +240,7 @@ function isValidOpcode (opcode) { * @param {string} extensions * @returns {Map} */ +// TODO(@Uzlopak, @KhafraDev): make compliant https://datatracker.ietf.org/doc/html/rfc6455#section-9.1 function parseExtensions (extensions) { const position = { position: 0 } const extensionList = new Map() From dc7ab819113c99a2e7d3128c87d8c9db439bf826 Mon Sep 17 00:00:00 2001 From: Khafra Date: Fri, 17 May 2024 10:25:48 -0400 Subject: [PATCH 12/20] fixup --- lib/web/websocket/connection.js | 2 +- lib/web/websocket/websocket.js | 13 ------------- test/autobahn/client.js | 12 +++--------- 3 files changed, 4 insertions(+), 23 deletions(-) diff --git a/lib/web/websocket/connection.js b/lib/web/websocket/connection.js index 3a9c649aaac..ce067f74d69 100644 --- a/lib/web/websocket/connection.js +++ b/lib/web/websocket/connection.js @@ -91,7 +91,7 @@ function establishWebSocketConnection (url, protocols, client, ws, onEstablish, // 9. Let permessageDeflate be a user-agent defined // "permessage-deflate" extension header value. // https://github.com/mozilla/gecko-dev/blob/ce78234f5e653a5d3916813ff990f053510227bc/netwerk/protocol/websocket/WebSocketChannel.cpp#L2673 - const permessageDeflate = options.node?.['client-extensions'] ?? '' + const permessageDeflate = 'permessage-deflate' // 10. Append (`Sec-WebSocket-Extensions`, permessageDeflate) to // request’s header list. diff --git a/lib/web/websocket/websocket.js b/lib/web/websocket/websocket.js index a0e66f38d66..4243f6c5830 100644 --- a/lib/web/websocket/websocket.js +++ b/lib/web/websocket/websocket.js @@ -549,14 +549,6 @@ webidl.converters['DOMString or sequence'] = function (V, prefix, arg return webidl.converters.DOMString(V, prefix, argument) } -webidl.converters.WebSocketInitNodeOptions = webidl.dictionaryConverter([ - { - key: 'client-extensions', - converter: webidl.converters.DOMString, - defaultValue: () => '' - } -]) - // This implements the proposal made in https://github.com/whatwg/websockets/issues/42 webidl.converters.WebSocketInit = webidl.dictionaryConverter([ { @@ -572,11 +564,6 @@ webidl.converters.WebSocketInit = webidl.dictionaryConverter([ { key: 'headers', converter: webidl.nullableConverter(webidl.converters.HeadersInit) - }, - { - key: 'node', - converter: webidl.converters.WebSocketInitNodeOptions, - defaultValue: () => ({}) } ]) diff --git a/test/autobahn/client.js b/test/autobahn/client.js index a2c74522c71..41bf1d61063 100644 --- a/test/autobahn/client.js +++ b/test/autobahn/client.js @@ -6,25 +6,19 @@ let currentTest = 1 let testCount const autobahnFuzzingserverUrl = process.env.FUZZING_SERVER_URL || 'ws://localhost:9001' -const options = { - node: { - 'client-extensions': 'permessage-deflate' - } -} function nextTest () { let ws if (currentTest > testCount) { - ws = new WebSocket(`${autobahnFuzzingserverUrl}/updateReports?agent=undici`, options) + ws = new WebSocket(`${autobahnFuzzingserverUrl}/updateReports?agent=undici`) return } console.log(`Running test case ${currentTest}/${testCount}`) ws = new WebSocket( - `${autobahnFuzzingserverUrl}/runCase?case=${currentTest}&agent=undici`, - options + `${autobahnFuzzingserverUrl}/runCase?case=${currentTest}&agent=undici` ) ws.addEventListener('message', (data) => { ws.send(data.data) @@ -38,7 +32,7 @@ function nextTest () { }) } -const ws = new WebSocket(`${autobahnFuzzingserverUrl}/getCaseCount`, options) +const ws = new WebSocket(`${autobahnFuzzingserverUrl}/getCaseCount`) ws.addEventListener('message', (data) => { testCount = parseInt(data.data) }) From b2c4514d31f7f624f16dda56c5c0f538d0b27039 Mon Sep 17 00:00:00 2001 From: Khafra Date: Fri, 17 May 2024 18:55:02 -0400 Subject: [PATCH 13/20] fixup --- lib/web/websocket/receiver.js | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index b499f2b7fda..83c00681338 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -122,10 +122,10 @@ class ByteParser extends Writable { // If we are already parsing a text/binary frame and do not receive either // a continuation frame or close frame, fail the connection. - if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) { - failWebsocketConnection(this.ws, 'Expected continuation frame') - return - } + // if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) { + // failWebsocketConnection(this.ws, 'Expected continuation frame') + // return + // } if (this.#info.fragmented && fragmented) { // A fragmented frame can't be fragmented itself @@ -204,6 +204,7 @@ class ByteParser extends Writable { if (isControlFrame(this.#info.opcode)) { this.#loop = this.parseControlFrame(body) + this.#state = parserStates.INFO } else { if (!this.#info.compressed) { this.#fragments.push(body) @@ -217,6 +218,8 @@ class ByteParser extends Writable { websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage) this.#fragments.length = 0 } + + this.#state = parserStates.INFO } else { this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => { if (error) { @@ -224,17 +227,27 @@ class ByteParser extends Writable { return } - websocketMessageReceived(this.ws, this.#info.binaryType, data) + this.#fragments.push(data) + + if (!this.#info.fin) { + this.#state = parserStates.INFO + this.#loop = true + this.run(callback) + return + } + + websocketMessageReceived(this.ws, this.#info.binaryType, Buffer.concat(this.#fragments)) this.#loop = true + this.#state = parserStates.INFO this.run(callback) + this.#fragments.length = 0 }) this.#loop = false + break } } - - this.#state = parserStates.INFO } } } From 679e199c99396f07a7a824e2537a573c06637842 Mon Sep 17 00:00:00 2001 From: Khafra Date: Fri, 17 May 2024 19:42:11 -0400 Subject: [PATCH 14/20] fixup --- lib/web/websocket/receiver.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/web/websocket/receiver.js b/lib/web/websocket/receiver.js index 83c00681338..3a8b2abb611 100644 --- a/lib/web/websocket/receiver.js +++ b/lib/web/websocket/receiver.js @@ -122,10 +122,10 @@ class ByteParser extends Writable { // If we are already parsing a text/binary frame and do not receive either // a continuation frame or close frame, fail the connection. - // if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) { - // failWebsocketConnection(this.ws, 'Expected continuation frame') - // return - // } + if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) { + failWebsocketConnection(this.ws, 'Expected continuation frame') + return + } if (this.#info.fragmented && fragmented) { // A fragmented frame can't be fragmented itself From 24eda3adb44bdb98ce2425bf11ffae39ea457069 Mon Sep 17 00:00:00 2001 From: Khafra Date: Sat, 18 May 2024 11:07:08 -0400 Subject: [PATCH 15/20] fixup --- lib/web/websocket/connection.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/web/websocket/connection.js b/lib/web/websocket/connection.js index ce067f74d69..bb87d361e4b 100644 --- a/lib/web/websocket/connection.js +++ b/lib/web/websocket/connection.js @@ -91,7 +91,7 @@ function establishWebSocketConnection (url, protocols, client, ws, onEstablish, // 9. Let permessageDeflate be a user-agent defined // "permessage-deflate" extension header value. // https://github.com/mozilla/gecko-dev/blob/ce78234f5e653a5d3916813ff990f053510227bc/netwerk/protocol/websocket/WebSocketChannel.cpp#L2673 - const permessageDeflate = 'permessage-deflate' + const permessageDeflate = 'permessage-deflate; client_max_window_bits' // 10. Append (`Sec-WebSocket-Extensions`, permessageDeflate) to // request’s header list. @@ -171,7 +171,7 @@ function establishWebSocketConnection (url, protocols, client, ws, onEstablish, if (secExtension !== null) { extensions = parseExtensions(secExtension) - if (!extensions.has(permessageDeflate)) { + if (!extensions.has('permessage-deflate')) { failWebsocketConnection(ws, 'Sec-WebSocket-Extensions header does not match.') return } From 63ac9eede645062db986e3e2d7a925c782221259 Mon Sep 17 00:00:00 2001 From: Khafra Date: Sat, 18 May 2024 16:02:41 -0400 Subject: [PATCH 16/20] add basic send queue --- lib/web/websocket/constants.js | 10 +++- lib/web/websocket/sender.js | 86 ++++++++++++++++++++++++++++++++++ lib/web/websocket/websocket.js | 67 ++++++++++---------------- 3 files changed, 119 insertions(+), 44 deletions(-) create mode 100644 lib/web/websocket/sender.js diff --git a/lib/web/websocket/constants.js b/lib/web/websocket/constants.js index d5de91460f5..2019b5b67a7 100644 --- a/lib/web/websocket/constants.js +++ b/lib/web/websocket/constants.js @@ -46,6 +46,13 @@ const parserStates = { const emptyBuffer = Buffer.allocUnsafe(0) +const sendHints = { + string: 1, + typedArray: 2, + arrayBuffer: 3, + blob: 4 +} + module.exports = { uid, sentCloseFrameState, @@ -54,5 +61,6 @@ module.exports = { opcodes, maxUnsigned16Bit, parserStates, - emptyBuffer + emptyBuffer, + sendHints } diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js new file mode 100644 index 00000000000..25d3f34147a --- /dev/null +++ b/lib/web/websocket/sender.js @@ -0,0 +1,86 @@ +'use strict' + +const { WebsocketFrameSend } = require('./frame') +const { opcodes, sendHints } = require('./constants') + +/** @type {Uint8Array} */ +const FastBuffer = Buffer[Symbol.species] + +class SendQueue { + #queued = new Set() + #size = 0 + + /** @type {import('net').Socket} */ + #socket + + constructor (socket) { + this.#socket = socket + } + + add (item, cb, hint) { + if (hint !== sendHints.blob) { + if (this.#size === 0) { + this.#dispatch(item, cb, hint) + } else { + this.#queued.add([item, cb, true, hint]) + this.#size++ + + this.#run() + } + + return + } + + const promise = item.arrayBuffer() + const queue = [null, cb, false, hint] + promise.then((ab) => { + queue[0] = ab + queue[2] = true + + this.#run() + }) + + this.#queued.add(queue) + this.#size++ + } + + #run () { + for (const queued of this.#queued) { + const [data, cb, done, hint] = queued + + if (!done) return + + this.#queued.delete(queued) + this.#size-- + + this.#dispatch(data, cb, hint) + } + } + + #dispatch (data, cb, hint) { + let value + + switch (hint) { + case sendHints.string: + value = Buffer.from(data) + break + case sendHints.arrayBuffer: + case sendHints.blob: + value = new FastBuffer(data) + break + case sendHints.typedArray: + value = new FastBuffer(data.buffer, data.byteOffset, data.byteLength) + break + } + + const frame = new WebsocketFrameSend() + const opcode = hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY + + frame.frameData = value + const buffer = frame.createFrame(opcode) + + this.#socket.write(buffer, cb) + } +} + +module.exports = { SendQueue } diff --git a/lib/web/websocket/websocket.js b/lib/web/websocket/websocket.js index 4243f6c5830..83d4ee94e30 100644 --- a/lib/web/websocket/websocket.js +++ b/lib/web/websocket/websocket.js @@ -3,7 +3,7 @@ const { webidl } = require('../fetch/webidl') const { URLSerializer } = require('../fetch/data-url') const { environmentSettingsObject } = require('../fetch/util') -const { staticPropertyDescriptors, states, sentCloseFrameState, opcodes } = require('./constants') +const { staticPropertyDescriptors, states, sentCloseFrameState, sendHints } = require('./constants') const { kWebSocketURL, kReadyState, @@ -21,17 +21,15 @@ const { fireEvent } = require('./util') const { establishWebSocketConnection, closeWebSocketConnection } = require('./connection') -const { WebsocketFrameSend } = require('./frame') const { ByteParser } = require('./receiver') const { kEnumerableProperty, isBlobLike } = require('../../core/util') const { getGlobalDispatcher } = require('../../global') const { types } = require('node:util') const { ErrorEvent, CloseEvent } = require('./events') +const { SendQueue } = require('./sender') let experimentalWarned = false -const FastBuffer = Buffer[Symbol.species] - // https://websockets.spec.whatwg.org/#interface-definition class WebSocket extends EventTarget { #events = { @@ -45,6 +43,9 @@ class WebSocket extends EventTarget { #protocol = '' #extensions = '' + /** @type {SendQueue} */ + #sendQueue + /** * @param {string} url * @param {string|string[]} protocols @@ -229,9 +230,6 @@ class WebSocket extends EventTarget { return } - /** @type {import('stream').Duplex} */ - const socket = this[kResponse].socket - // If data is a string if (typeof data === 'string') { // If the WebSocket connection is established and the WebSocket @@ -245,14 +243,12 @@ class WebSocket extends EventTarget { // the bufferedAmount attribute by the number of bytes needed to // express the argument as UTF-8. - const value = Buffer.from(data) - const frame = new WebsocketFrameSend(value) - const buffer = frame.createFrame(opcodes.TEXT) + const length = Buffer.byteLength(data) - this.#bufferedAmount += value.byteLength - socket.write(buffer, () => { - this.#bufferedAmount -= value.byteLength - }) + this.#bufferedAmount += length + this.#sendQueue.add(data, () => { + this.#bufferedAmount -= length + }, sendHints.string) } else if (types.isArrayBuffer(data)) { // If the WebSocket connection is established, and the WebSocket // closing handshake has not yet started, then the user agent must @@ -266,14 +262,10 @@ class WebSocket extends EventTarget { // increase the bufferedAmount attribute by the length of the // ArrayBuffer in bytes. - const value = new FastBuffer(data) - const frame = new WebsocketFrameSend(value) - const buffer = frame.createFrame(opcodes.BINARY) - - this.#bufferedAmount += value.byteLength - socket.write(buffer, () => { - this.#bufferedAmount -= value.byteLength - }) + this.#bufferedAmount += data.byteLength + this.#sendQueue.add(data, () => { + this.#bufferedAmount -= data.byteLength + }, sendHints.arrayBuffer) } else if (ArrayBuffer.isView(data)) { // If the WebSocket connection is established, and the WebSocket // closing handshake has not yet started, then the user agent must @@ -287,15 +279,10 @@ class WebSocket extends EventTarget { // not throw an exception must increase the bufferedAmount attribute // by the length of data’s buffer in bytes. - const ab = new FastBuffer(data.buffer, data.byteOffset, data.byteLength) - - const frame = new WebsocketFrameSend(ab) - const buffer = frame.createFrame(opcodes.BINARY) - - this.#bufferedAmount += ab.byteLength - socket.write(buffer, () => { - this.#bufferedAmount -= ab.byteLength - }) + this.#bufferedAmount += data.byteLength + this.#sendQueue.add(data, () => { + this.#bufferedAmount -= data.byteLength + }, sendHints.typedArray) } else if (isBlobLike(data)) { // If the WebSocket connection is established, and the WebSocket // closing handshake has not yet started, then the user agent must @@ -308,18 +295,10 @@ class WebSocket extends EventTarget { // an exception must increase the bufferedAmount attribute by the size // of the Blob object’s raw data, in bytes. - const frame = new WebsocketFrameSend() - - data.arrayBuffer().then((ab) => { - const value = new FastBuffer(ab) - frame.frameData = value - const buffer = frame.createFrame(opcodes.BINARY) - - this.#bufferedAmount += value.byteLength - socket.write(buffer, () => { - this.#bufferedAmount -= value.byteLength - }) - }) + this.#bufferedAmount += data.size + this.#sendQueue.add(data, () => { + this.#bufferedAmount -= data.size + }, sendHints.blob) } } @@ -470,6 +449,8 @@ class WebSocket extends EventTarget { response.socket.ws = this this[kByteParser] = parser + this.#sendQueue = new SendQueue(response.socket) + // 1. Change the ready state to OPEN (1). this[kReadyState] = states.OPEN From 842e2e9649f9379fdde45132224b71e38b9d2b00 Mon Sep 17 00:00:00 2001 From: Khafra Date: Sat, 18 May 2024 17:22:53 -0400 Subject: [PATCH 17/20] fixup --- test/autobahn/client.js | 1 + 1 file changed, 1 insertion(+) diff --git a/test/autobahn/client.js b/test/autobahn/client.js index 41bf1d61063..53bc17e722c 100644 --- a/test/autobahn/client.js +++ b/test/autobahn/client.js @@ -12,6 +12,7 @@ function nextTest () { if (currentTest > testCount) { ws = new WebSocket(`${autobahnFuzzingserverUrl}/updateReports?agent=undici`) + ws.addEventListener('close', () => require('./report')) return } From fee7c380cf66ca7bac37643bd69cabc1ea7b5468 Mon Sep 17 00:00:00 2001 From: Khafra Date: Sun, 19 May 2024 14:19:53 -0400 Subject: [PATCH 18/20] fixup --- lib/web/websocket/sender.js | 37 +++++++++++++++++----------------- test/websocket/send-mutable.js | 34 +++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 19 deletions(-) create mode 100644 test/websocket/send-mutable.js diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index 25d3f34147a..b9fc7a72364 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -19,10 +19,12 @@ class SendQueue { add (item, cb, hint) { if (hint !== sendHints.blob) { + const data = clone(item, hint) + if (this.#size === 0) { - this.#dispatch(item, cb, hint) + this.#dispatch(data, cb, hint) } else { - this.#queued.add([item, cb, true, hint]) + this.#queued.add([data, cb, true, hint]) this.#size++ this.#run() @@ -34,7 +36,7 @@ class SendQueue { const promise = item.arrayBuffer() const queue = [null, cb, false, hint] promise.then((ab) => { - queue[0] = ab + queue[0] = clone(ab, hint) queue[2] = true this.#run() @@ -58,29 +60,26 @@ class SendQueue { } #dispatch (data, cb, hint) { - let value - - switch (hint) { - case sendHints.string: - value = Buffer.from(data) - break - case sendHints.arrayBuffer: - case sendHints.blob: - value = new FastBuffer(data) - break - case sendHints.typedArray: - value = new FastBuffer(data.buffer, data.byteOffset, data.byteLength) - break - } - const frame = new WebsocketFrameSend() const opcode = hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY - frame.frameData = value + frame.frameData = data const buffer = frame.createFrame(opcode) this.#socket.write(buffer, cb) } } +function clone (data, hint) { + switch (hint) { + case sendHints.string: + return Buffer.from(data) + case sendHints.arrayBuffer: + case sendHints.blob: + return new FastBuffer(data) + case sendHints.typedArray: + return Buffer.copyBytesFrom(data) + } +} + module.exports = { SendQueue } diff --git a/test/websocket/send-mutable.js b/test/websocket/send-mutable.js new file mode 100644 index 00000000000..fa1cc86aecc --- /dev/null +++ b/test/websocket/send-mutable.js @@ -0,0 +1,34 @@ +'use strict' + +const { test } = require('node:test') +const { WebSocketServer } = require('ws') +const { WebSocket } = require('../..') +const { tspl } = require('@matteo.collina/tspl') + +test('check cloned', async (t) => { + const assert = tspl(t, { plan: 2 }) + + const server = new WebSocketServer({ port: 0 }) + const buffer = new Uint8Array([0x61]) + + server.on('connection', (ws) => { + ws.on('message', (data) => { + assert.deepStrictEqual(data, Buffer.from([0x61])) + }) + }) + + const ws = new WebSocket(`ws://localhost:${server.address().port}`) + + ws.addEventListener('open', () => { + ws.send(new Blob([buffer])) + ws.send(buffer) + buffer[0] = 1 + }) + + t.after(() => { + server.close() + ws.close() + }) + + await assert.completed +}) From f24467be431aa1bca0b7809946c1b6e79e8cfa24 Mon Sep 17 00:00:00 2001 From: Khafra Date: Sun, 19 May 2024 22:31:04 -0400 Subject: [PATCH 19/20] fixup --- lib/web/websocket/permessage-deflate.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/web/websocket/permessage-deflate.js b/lib/web/websocket/permessage-deflate.js index d349863888f..18988f59642 100644 --- a/lib/web/websocket/permessage-deflate.js +++ b/lib/web/websocket/permessage-deflate.js @@ -62,7 +62,7 @@ class PerMessageDeflate { callback(null, full) if (fin && this.#options.clientNoContextTakeover) { - this.#inflate.reset() + // this.#inflate.reset() } }) } From eab6f933ebcd4dec2d6a8586d87ab6de1e60d3ba Mon Sep 17 00:00:00 2001 From: Khafra Date: Mon, 20 May 2024 00:36:10 -0400 Subject: [PATCH 20/20] fix EVERY FAILURE!!!! --- lib/web/websocket/permessage-deflate.js | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/lib/web/websocket/permessage-deflate.js b/lib/web/websocket/permessage-deflate.js index 18988f59642..76cb366d5e5 100644 --- a/lib/web/websocket/permessage-deflate.js +++ b/lib/web/websocket/permessage-deflate.js @@ -14,8 +14,8 @@ class PerMessageDeflate { #options = {} constructor (extensions) { - this.#options.clientNoContextTakeover = extensions.has('client_no_context_takeover') - this.#options.clientMaxWindowBits = extensions.get('client_max_window_bits') + this.#options.serverNoContextTakeover = extensions.has('server_no_context_takeover') + this.#options.serverMaxWindowBits = extensions.get('server_max_window_bits') } decompress (chunk, fin, callback) { @@ -27,13 +27,13 @@ class PerMessageDeflate { if (!this.#inflate) { let windowBits = Z_DEFAULT_WINDOWBITS - if (this.#options.clientMaxWindowBits) { // empty values default to Z_DEFAULT_WINDOWBITS - if (!isValidClientWindowBits(this.#options.clientMaxWindowBits)) { - callback(new Error('Invalid client_max_window_bits')) + if (this.#options.serverMaxWindowBits) { // empty values default to Z_DEFAULT_WINDOWBITS + if (!isValidClientWindowBits(this.#options.serverMaxWindowBits)) { + callback(new Error('Invalid server_max_window_bits')) return } - windowBits = Number.parseInt(this.#options.clientMaxWindowBits) + windowBits = Number.parseInt(this.#options.serverMaxWindowBits) } this.#inflate = createInflateRaw({ windowBits }) @@ -45,7 +45,10 @@ class PerMessageDeflate { this.#inflate[kLength] += data.length }) - this.#inflate.on('error', (err) => callback(err)) + this.#inflate.on('error', (err) => { + this.#inflate = null + callback(err) + }) } this.#inflate.write(chunk) @@ -60,10 +63,6 @@ class PerMessageDeflate { this.#inflate[kLength] = 0 callback(null, full) - - if (fin && this.#options.clientNoContextTakeover) { - // this.#inflate.reset() - } }) } }