From cb3ce0d99565e264c1ef6bf908395b1b7959f14a Mon Sep 17 00:00:00 2001 From: Daniele Fedeli Date: Mon, 19 Feb 2024 16:50:34 +0100 Subject: [PATCH] Add #injectWS (#276) * Add #injectWS Now is possible to invoke an websocket handler without listening * Up-to-date documentation Added testing paragraph * Removed unused dependency * remove ws.terminate() in the plugin The user must manually close the ws in the test * add upgradeContext parameter in injectWS It allows to enhance the request made for upgrading the socket * Rejects if websocket upgrade failed * remove useless line in docs * rejects with 'Unexpected server response: ' Implementation as close as possibile to ws connectiong error * Fix test * Fix types --------- Signed-off-by: Daniele Fedeli Co-authored-by: Daniele Fedeli --- README.md | 72 ++++++++++++++++++++++++ index.js | 60 +++++++++++++++++++- package.json | 1 + test/inject.test.js | 134 ++++++++++++++++++++++++++++++++++++++++++++ types/index.d.ts | 9 ++- 5 files changed, 270 insertions(+), 6 deletions(-) create mode 100644 test/inject.test.js diff --git a/README.md b/README.md index 9bac2c3..4860ba6 100644 --- a/README.md +++ b/README.md @@ -256,6 +256,78 @@ fastify.register(require('@fastify/websocket'), { }) ``` +### Testing + +Testing the ws handler can be quite tricky, luckily `fastify-websocket` decorates fastify instance with `injectWS`. +It allows to test easily a websocket endpoint. + +The signature of injectWS is the following: `([path], [upgradeContext])`. + +#### App.js + +```js +'use strict' + +const Fastify = require('fastify') +const FastifyWebSocket = require('@fastify/websocket') + +const App = Fastify() + +App.register(FastifyWebSocket); + +App.register(async function(fastify) { + fastify.addHook('preValidation', async (request, reply) => { + if (request.headers['api-key'] !== 'some-random-key') { + return reply.code(401).send() + } + }) + + fastify.get('/', { websocket: true }, (connection) => { + connection.socket.on('message', message => { + connection.socket.send('hi from server') + }) + }) +}) + +module.exports = App +``` + +#### App.test.js + +```js +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const App = require('./app.js') + +test('connect to /', async (t) => { + t.plan(1) + + const fastify = Fastify() + fastify.register(App) + t.teardown(fastify.close.bind(fastify)) + + const ws = await fastify.injectWS('/', {headers: { "api-key" : "some-random-key" }}) + let resolve; + const promise = new Promise(r => { resolve = r }) + + ws.on('message', (data) => { + resolve(data.toString()); + }) + ws.send('hi from client') + + t.assert(await promise, 'hi from server') + // Remember to close the ws at the end + ws.terminate() +}) +``` + +#### Things to know +- Websocket need to be closed manually at the end of each test. +- `fastify.ready()` needs to be awaited to ensure that fastify has been decorated. +- You need to register the event listener before sending the message if you need to process server response. + ## Options `@fastify/websocket` accept these options for [`ws`](https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketserveroptions-callback) : diff --git a/index.js b/index.js index 6b555c6..ef639ef 100644 --- a/index.js +++ b/index.js @@ -1,8 +1,11 @@ 'use strict' const { ServerResponse } = require('node:http') +const { PassThrough } = require('node:stream') +const { randomBytes } = require('node:crypto') const fp = require('fastify-plugin') const WebSocket = require('ws') +const Duplexify = require('duplexify') const kWs = Symbol('ws-socket') const kWsHead = Symbol('ws-head') @@ -47,6 +50,60 @@ function fastifyWebsocket (fastify, opts, next) { const wss = new WebSocket.Server(wssOptions) fastify.decorate('websocketServer', wss) + async function injectWS (path = '/', upgradeContext = {}) { + const server2Client = new PassThrough() + const client2Server = new PassThrough() + + const serverStream = new Duplexify(server2Client, client2Server) + const clientStream = new Duplexify(client2Server, server2Client) + + const ws = new WebSocket(null, undefined, { isServer: false }) + const head = Buffer.from([]) + + let resolve, reject + const promise = new Promise((_resolve, _reject) => { resolve = _resolve; reject = _reject }) + + ws.on('open', () => { + clientStream.removeListener('data', onData) + resolve(ws) + }) + + const onData = (chunk) => { + if (chunk.toString().includes('HTTP/1.1 101 Switching Protocols')) { + ws._isServer = false + ws.setSocket(clientStream, head, { maxPayload: 0 }) + } else { + clientStream.removeListener('data', onData) + const statusCode = Number(chunk.toString().match(/HTTP\/1.1 (\d+)/)[1]) + reject(new Error('Unexpected server response: ' + statusCode)) + } + } + + clientStream.on('data', onData) + + const req = { + ...upgradeContext, + method: 'GET', + headers: { + ...upgradeContext.headers, + connection: 'upgrade', + upgrade: 'websocket', + 'sec-websocket-version': 13, + 'sec-websocket-key': randomBytes(16).toString('base64') + }, + httpVersion: '1.1', + url: path, + [kWs]: serverStream, + [kWsHead]: head + } + + websocketListenServer.emit('upgrade', req, req[kWs], req[kWsHead]) + + return promise + } + + fastify.decorate('injectWS', injectWS) + function onUpgrade (rawRequest, socket, head) { // Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket. rawRequest[kWs] = socket @@ -164,6 +221,7 @@ function fastifyWebsocket (fastify, opts, next) { client.close() } } + fastify.server.removeListener('upgrade', onUpgrade) server.close(done) @@ -181,7 +239,7 @@ function fastifyWebsocket (fastify, opts, next) { // Since we already handled the error, adding this listener prevents the ws // library from emitting the error and causing an uncaughtException // Reference: https://github.com/websockets/ws/blob/master/lib/stream.js#L35 - conn.on('error', _ => {}) + conn.on('error', _ => { }) request.log.error(error) conn.destroy(error) } diff --git a/package.json b/package.json index 7207bcf..55bce2f 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "tsd": "^0.30.1" }, "dependencies": { + "duplexify": "^4.1.2", "fastify-plugin": "^4.0.0", "ws": "^8.0.0" }, diff --git a/test/inject.test.js b/test/inject.test.js new file mode 100644 index 0000000..bb012c0 --- /dev/null +++ b/test/inject.test.js @@ -0,0 +1,134 @@ +'use strict' + +const { test } = require('tap') +const Fastify = require('fastify') +const fastifyWebsocket = require('..') + +function buildFastify (t) { + const fastify = Fastify() + t.teardown(() => { fastify.close() }) + fastify.register(fastifyWebsocket) + return fastify +} + +test('routes correctly the message', async (t) => { + const fastify = buildFastify(t) + const message = 'hi from client' + + let _resolve + const promise = new Promise((resolve) => { _resolve = resolve }) + + fastify.register( + async function (instance) { + instance.get('/ws', { websocket: true }, function (conn) { + conn.once('data', chunk => { + _resolve(chunk.toString()) + }) + }) + }) + + await fastify.ready() + const ws = await fastify.injectWS('/ws') + ws.send(message) + t.same(await promise, message) + ws.terminate() +}) + +test('redirect on / if no path specified', async (t) => { + const fastify = buildFastify(t) + const message = 'hi from client' + + let _resolve + const promise = new Promise((resolve) => { _resolve = resolve }) + + fastify.register( + async function (instance) { + instance.get('/', { websocket: true }, function (conn) { + conn.once('data', chunk => { + _resolve(chunk.toString()) + }) + }) + }) + + await fastify.ready() + const ws = await fastify.injectWS() + ws.send(message) + t.same(await promise, message) + ws.terminate() +}) + +test('routes correctly the message between two routes', async (t) => { + const fastify = buildFastify(t) + const message = 'hi from client' + + let _resolve + let _reject + const promise = new Promise((resolve, reject) => { _resolve = resolve; _reject = reject }) + + fastify.register( + async function (instance) { + instance.get('/ws', { websocket: true }, function (conn) { + conn.once('data', () => { + _reject('wrong-route') + }) + }) + + instance.get('/ws-2', { websocket: true }, function (conn) { + conn.once('data', chunk => { + _resolve(chunk.toString()) + }) + }) + }) + + await fastify.ready() + const ws = await fastify.injectWS('/ws-2') + ws.send(message) + t.same(await promise, message) + ws.terminate() +}) + +test('use the upgrade context to upgrade if there is some hook', async (t) => { + const fastify = buildFastify(t) + const message = 'hi from client' + + let _resolve + const promise = new Promise((resolve) => { _resolve = resolve }) + + fastify.register( + async function (instance) { + instance.addHook('preValidation', async (request, reply) => { + if (request.headers['api-key'] !== 'some-random-key') { + return reply.code(401).send() + } + }) + + instance.get('/', { websocket: true }, function (conn) { + conn.once('data', chunk => { + _resolve(chunk.toString()) + }) + }) + }) + + await fastify.ready() + const ws = await fastify.injectWS('/', { headers: { 'api-key': 'some-random-key' } }) + ws.send(message) + t.same(await promise, message) + ws.terminate() +}) + +test('rejects if the websocket is not upgraded', async (t) => { + const fastify = buildFastify(t) + + fastify.register( + async function (instance) { + instance.addHook('preValidation', async (request, reply) => { + return reply.code(401).send() + }) + + instance.get('/', { websocket: true }, function (conn) { + }) + }) + + await fastify.ready() + t.rejects(fastify.injectWS('/'), 'Unexpected server response: 401') +}) diff --git a/types/index.d.ts b/types/index.d.ts index cd4fc29..44420ea 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -27,9 +27,12 @@ declare module 'fastify' { websocket?: boolean; } + type InjectWSFn = + ((path?: string, upgradeContext?: Partial) => Promise) + interface FastifyInstance { - get: RouteShorthandMethod, websocketServer: WebSocket.Server, + injectWS: InjectWSFn } interface FastifyRequest { @@ -67,7 +70,6 @@ type FastifyWebsocket = FastifyPluginCallback { } - export type WebsocketHandler< RawServer extends RawServerBase = RawServerDefault, RawRequest extends RawRequestDefaultExpression = RawRequestDefaultExpression, @@ -81,18 +83,15 @@ declare namespace fastifyWebsocket { connection: SocketStream, request: FastifyRequest ) => void | Promise; - export interface SocketStream extends Duplex { socket: WebSocket; } - export interface WebsocketPluginOptions { errorHandler?: (this: FastifyInstance, error: Error, connection: SocketStream, request: FastifyRequest, reply: FastifyReply) => void; options?: WebSocketServerOptions; connectionOptions?: DuplexOptions; preClose?: preCloseHookHandler | preCloseAsyncHookHandler; } - export interface RouteOptions< RawServer extends RawServerBase = RawServerDefault, RawRequest extends RawRequestDefaultExpression = RawRequestDefaultExpression,