Skip to content

Commit

Permalink
feat(websocket): handle ping/pong frames & fix fragmented frames (#1809)
Browse files Browse the repository at this point in the history
* feat(websocket): handle ping/pong frames & fix bugs

* fix: don't quit parsing on ping/pong frame

* fix: parse fragmented frames

* fix: remove hack in tests
  • Loading branch information
KhafraDev authored Dec 17, 2022
1 parent 7e12397 commit 585cdf6
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 27 deletions.
26 changes: 26 additions & 0 deletions docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,29 @@ diagnosticsChannel.channel('undici:websocket:socket_error').subscribe((error) =>
console.log(error)
})
```

## `undici:websocket:ping`

This message is published after the client receives a ping frame, if the connection is not closing.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:ping').subscribe(({ payload }) => {
// a Buffer or undefined, containing the optional application data of the frame
console.log(payload)
})
```

## `undici:websocket:pong`

This message is published after the client receives a pong frame.

```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload }) => {
// a Buffer or undefined, containing the optional application data of the frame
console.log(payload)
})
```
2 changes: 0 additions & 2 deletions lib/websocket/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ const { fetching } = require('../fetch/index')
const { getGlobalDispatcher } = require('../..')

const channels = {}
channels.ping = diagnosticsChannel.channel('undici:websocket:ping')
channels.pong = diagnosticsChannel.channel('undici:websocket:pong')
channels.open = diagnosticsChannel.channel('undici:websocket:open')
channels.close = diagnosticsChannel.channel('undici:websocket:close')
channels.socketError = diagnosticsChannel.channel('undici:websocket:socket_error')
Expand Down
94 changes: 85 additions & 9 deletions lib/websocket/receiver.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
'use strict'

const { Writable } = require('stream')
const diagnosticsChannel = require('diagnostics_channel')
const { parserStates, opcodes, states } = require('./constants')
const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util')
const { WebsocketFrameSend } = require('./frame')

const channels = {}
channels.ping = diagnosticsChannel.channel('undici:websocket:ping')
channels.pong = diagnosticsChannel.channel('undici:websocket:pong')

class ByteParser extends Writable {
#buffers = []
#byteOffset = 0

#state = parserStates.INFO

#info = {}
#fragments = []

constructor (ws) {
super()
Expand Down Expand Up @@ -48,9 +54,13 @@ class ByteParser extends Writable {
this.#info.fin = (buffer[0] & 0x80) !== 0
this.#info.opcode = buffer[0] & 0x0F

const fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION
// If we receive a fragmented message, we use the type of the first
// frame to parse the full message as binary/text, when it's terminated
this.#info.originalOpcode ??= this.#info.opcode

this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION

if (fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
// Only text and binary frames can be fragmented
failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
return
Expand All @@ -67,7 +77,11 @@ class ByteParser extends Writable {
this.#state = parserStates.PAYLOADLENGTH_64
}

if (
if (this.#info.fragmented && payloadLength > 125) {
// A fragmented frame can't be fragmented itself
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
return
} else if (
(this.#info.opcode === opcodes.PING ||
this.#info.opcode === opcodes.PONG ||
this.#info.opcode === opcodes.CLOSE) &&
Expand Down Expand Up @@ -109,10 +123,63 @@ class ByteParser extends Writable {
// that _The WebSocket Closing Handshake is Started_ and that the
// WebSocket connection is in the CLOSING state.
this.ws[kReadyState] = states.CLOSING

this.ws[kReceivedClose] = true

this.#buffers = [buffer.subarray(2 + payloadLength)]
this.#byteOffset -= 2 + payloadLength

this.end()

return
} else if (this.#info.opcode === opcodes.PING) {
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
// response, unless it already received a Close frame.
// A Pong frame sent in response to a Ping frame must have identical
// "Application data"

if (!this.ws[kReceivedClose]) {
const body = payloadLength === 0 ? undefined : buffer.subarray(2, payloadLength + 2)
const frame = new WebsocketFrameSend(body)

this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))

if (channels.ping.hasSubscribers) {
channels.ping.publish({
payload: body
})
}
}

this.#buffers = [buffer.subarray(2 + payloadLength)]
this.#byteOffset -= 2 + payloadLength
this.#state = parserStates.INFO

if (this.#byteOffset > 0) {
return this.run(callback)
} else {
callback()
return
}
} else if (this.#info.opcode === opcodes.PONG) {
// A Pong frame MAY be sent unsolicited. This serves as a
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected.

if (channels.pong.hasSubscribers) {
channels.pong.publish({
payload: buffer.subarray(2, payloadLength + 2)
})
}

this.#buffers = [buffer.subarray(2 + payloadLength)]
this.#byteOffset -= 2 + payloadLength

if (this.#byteOffset > 0) {
return this.run(callback)
} else {
callback()
return
}
}

// TODO: handle control frames here. Since they are unfragmented, and can
Expand Down Expand Up @@ -154,19 +221,28 @@ class ByteParser extends Writable {
// If the server sent multiple frames in a single chunk
const buffer = Buffer.concat(this.#buffers, this.#byteOffset)

this.#info.data = buffer.subarray(0, this.#info.payloadLength)
const body = buffer.subarray(0, this.#info.payloadLength)
this.#fragments.push(body)

if (this.#byteOffset > this.#info.payloadLength) {
this.#buffers = [buffer.subarray(this.#info.data.length)]
this.#byteOffset -= this.#info.data.length
this.#buffers = [buffer.subarray(body.length)]
this.#byteOffset -= body.length
} else {
this.#buffers.length = 0
this.#byteOffset = 0
}

websocketMessageReceived(this.ws, this.#info.opcode, this.#info.data)
// If the frame is unfragmented, or a fragmented frame was terminated,
// a message was received
if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) {
const fullMessage = Buffer.concat(this.#fragments)

websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage)

this.#info = {}
this.#fragments.length = 0
}

this.#info = {}
this.#state = parserStates.INFO
}
}
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@
"build:wasm": "node build/wasm.js --docker",
"lint": "standard | snazzy",
"lint:fix": "standard --fix | snazzy",
"test": "npm run test:tap && npm run test:node-fetch && npm run test:fetch && npm run test:wpt && npm run test:jest && tsd",
"test": "npm run test:tap && npm run test:node-fetch && npm run test:fetch && npm run test:wpt && npm run test:websocket && npm run test:jest && tsd",
"test:node-fetch": "node scripts/verifyVersion.js 16 || mocha test/node-fetch",
"test:fetch": "node scripts/verifyVersion.js 16 || (npm run build:node && tap test/fetch/*.js && tap test/webidl/*.js)",
"test:jest": "node scripts/verifyVersion.js 14 || jest",
"test:tap": "tap test/*.js test/diagnostics-channel/*.js",
"test:tdd": "tap test/*.js test/diagnostics-channel/*.js -w",
"test:typescript": "tsd && tsc test/imports/undici-import.ts",
"test:websocket": "node scripts/verifyVersion.js 18 || tap test/websocket/*.js",
"test:wpt": "node scripts/verifyVersion 18 || (node test/wpt/start-fetch.mjs && node test/wpt/start-FileAPI.mjs && node test/wpt/start-mimesniff.mjs && node test/wpt/start-xhr.mjs && node test/wpt/start-websockets.mjs)",
"coverage": "nyc --reporter=text --reporter=html npm run test",
"coverage:ci": "nyc --reporter=lcov npm run test",
Expand Down
40 changes: 40 additions & 0 deletions test/websocket/fragments.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict'

const { test } = require('tap')
const { WebSocketServer } = require('ws')
const { WebSocket } = require('../..')
const diagnosticsChannel = require('diagnostics_channel')

test('Fragmented frame with a ping frame in the middle of it', (t) => {
t.plan(2)

const server = new WebSocketServer({ port: 0 })

server.on('connection', (ws) => {
const socket = ws._socket

socket.write(Buffer.from([0x01, 0x03, 0x48, 0x65, 0x6c])) // Text frame "Hel"
socket.write(Buffer.from([0x89, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f])) // ping "Hello"
socket.write(Buffer.from([0x80, 0x02, 0x6c, 0x6f])) // Text frame "lo"
})

t.teardown(() => {
for (const client of server.clients) {
client.close()
}

server.close()
})

const ws = new WebSocket(`ws://localhost:${server.address().port}`)

ws.addEventListener('message', ({ data }) => {
t.same(data, 'Hello')

ws.close()
})

diagnosticsChannel.channel('undici:websocket:ping').subscribe(
({ payload }) => t.same(payload, Buffer.from('Hello'))
)
})
46 changes: 46 additions & 0 deletions test/websocket/opening-handshake.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const { test } = require('tap')
const { createServer } = require('http')
const { WebSocketServer } = require('ws')
const { WebSocket } = require('../..')

test('WebSocket connecting to server that isn\'t a Websocket server', (t) => {
t.plan(5)

const server = createServer((req, res) => {
t.equal(req.headers.connection, 'upgrade')
t.equal(req.headers.upgrade, 'websocket')
t.ok(req.headers['sec-websocket-key'])
t.equal(req.headers['sec-websocket-version'], '13')

res.end()
server.unref()
}).listen(0, () => {
const ws = new WebSocket(`ws://localhost:${server.address().port}`)

// Server isn't a websocket server
ws.onmessage = ws.onopen = t.fail

ws.addEventListener('error', t.pass)
})

t.teardown(server.close.bind(server))
})

test('Open event is emitted', (t) => {
t.plan(1)

const server = new WebSocketServer({ port: 0 })

server.on('connection', (ws) => {
ws.close(1000)
})

t.teardown(server.close.bind(server))

const ws = new WebSocket(`ws://localhost:${server.address().port}`)

ws.onmessage = ws.onerror = t.fail
ws.addEventListener('open', t.pass)
})
46 changes: 46 additions & 0 deletions test/websocket/ping-pong.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'

const { test } = require('tap')
const { WebSocketServer } = require('ws')
const diagnosticsChannel = require('diagnostics_channel')
const { WebSocket } = require('../..')

test('Receives ping and parses body', (t) => {
t.plan(1)

const server = new WebSocketServer({ port: 0 })

server.on('connection', (ws) => {
ws.ping('Hello, world')
})

t.teardown(server.close.bind(server))

const ws = new WebSocket(`ws://localhost:${server.address().port}`)
ws.onerror = ws.onmessage = t.fail

diagnosticsChannel.channel('undici:websocket:ping').subscribe(({ payload }) => {
t.same(payload, Buffer.from('Hello, world'))
ws.close()
})
})

test('Receives pong and parses body', (t) => {
t.plan(1)

const server = new WebSocketServer({ port: 0 })

server.on('connection', (ws) => {
ws.pong('Pong')
})

t.teardown(server.close.bind(server))

const ws = new WebSocket(`ws://localhost:${server.address().port}`)
ws.onerror = ws.onmessage = t.fail

diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload }) => {
t.same(payload, Buffer.from('Pong'))
ws.close()
})
})
17 changes: 2 additions & 15 deletions test/wpt/server/websocket.mjs
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
import { WebSocketServer } from 'ws'
import { server } from './server.mjs'

// When sending a buffer to ws' send method, it auto
// sets the type to binary. This breaks some tests.
const textData = [
'¥¥¥¥¥¥',
'Message to send',
'𐐇',
'\ufffd',
'',
'null',
'c'.repeat(65000)
]

// The file router server handles sending the url, closing,
// and sending messages back to the main process for us.
// The types for WebSocketServer don't include a `request`
Expand All @@ -24,7 +12,7 @@ const wss = new WebSocketServer({
})

wss.on('connection', (ws) => {
ws.on('message', (data) => {
ws.on('message', (data, isBinary) => {
const str = data.toString('utf-8')

if (str === 'Goodbye') {
Expand All @@ -34,8 +22,7 @@ wss.on('connection', (ws) => {
return
}

const binary = !textData.includes(str)
ws.send(data, { binary })
ws.send(data, { binary: isBinary })
})

// Some tests, such as `Create-blocked-port.any.js` do NOT
Expand Down

0 comments on commit 585cdf6

Please sign in to comment.