From 07d32174af6ef8e11053bf43373a8a64e9d7d499 Mon Sep 17 00:00:00 2001 From: Harry Brundage Date: Sat, 6 Mar 2021 17:01:34 -0500 Subject: [PATCH] Don't lose messages when hooks are asynchronous Before this change, fastify-websocket processed the upgrade request from the client before dispatching the request from the router. This means it created the websocket object and the duplex stream wrapping it before invoking all the userland code that might want to do stuff in hooks like authentication, or loading a session, or whatever. If those hooks that run after the creation of the websocket are asynchronous, that means that the websocket that has already been set up and connected can receive messages during the execution of those async hooks. Generally I think developers would attach onMessage handlers in their route handler functions. If they do that, those message handler functions wouldn't be attached until after all the pre-route handlers have been run, which would mean any messages that arrive during hook processing won't trigger any message handlers and get silently dropped! Instead, we should buffer any incoming messages until the userland route handler is ready to work with the WebSocket, and give it the chance to synchronously attach event listeners. The `net.Socket` will buffer any messages, and then the `ws` library will dispatch them as messages on the next process tick after setting up the `WebSocket` instance. The handler will run in between there and get a chance to register listeners. Hooks won't have access to the websocket if we do this, but they already didn't, and I think this is a big enough gotcha that that's fine. To implement this, we stop using the `connection` event of the `ws` server, and instead listen to the `upgrade` event of the `http.Server`, and still dispatch that request through fastify. Then, in the route handler, we use `wss.handleUpgradeRequest` to create the WebSocket. This means that the `ws` server runs without actually attaching to an `http.Server`, which I think makes sense since we want finer control over when the upgrade is actually handled. Because this requires some special options to be passed to the `ws` server, I opted to remove support for the `noServer` option to the ws server. I think this is fine -- it seems kind of strange to use a http sever plugin to register a websocket system that you don't want listening using that http server, but it is a breaking change. We still support the custom `server` option that gets passed through to `ws`. --- README.md | 29 ++++++++++--- index.js | 113 +++++++++++++++++++++++++++++++------------------- test/base.js | 53 ++--------------------- test/hooks.js | 42 +++++++++++++++++++ 4 files changed, 141 insertions(+), 96 deletions(-) diff --git a/README.md b/README.md index c461662..6c47cf5 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,25 @@ fastify.listen(3000, err => { } }) ``` -**NB:** + +### Attaching event handlers + +It is important that websocket route handlers attach event handlers synchronously during handler execution to avoid accidentally dropping messages. If you want to do any async work in your websocket handler, say to authenticate a user or load data from a datastore, ensure you attach any `on('message')` handlers *before* you trigger this async work. Otherwise, messages might arrive while this async work is underway, and if there is no handler listening for this data, it will be silently dropped. + +Here's an example of how to attach message handlers synchronously while still accessing asynchronous resources. We store a promise for the async thing in a local variable, and then attach the message handler synchronously, and then make the message handler itself asynchronous to grab the async data and do some processing. + +```javascript +fastify.get('/*', { websocket: true }, (connection, request) => { + const sessionPromise = request.getSession() // example async session getter, called synchronously to return a promise + + connection.socket.on('message', async (message) => { + const session = await sessionPromise() + // do somthing with the message and session + }) +}) +``` + +**NB:** This plugin uses the same router as the fastify instance, this has a few implications to take into account: - Websocket route handlers follow the usual `fastify` request lifecycle. @@ -150,7 +168,7 @@ fastify.listen(3000, err => { ### Custom error handler: -You can optionally provide a custom errorHandler that will be used to handle any cleaning up: +You can optionally provide a custom errorHandler that will be used to handle any cleaning up: ```js 'use strict' @@ -189,7 +207,7 @@ fastify.listen(3000, err => { }) ``` -## Options : +## Options `fastify-websocket` accept these options for [`ws`](https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketserveroptions-callback) : @@ -200,7 +218,6 @@ fastify.listen(3000, err => { - `server` - A pre-created Node.js HTTP/S server. - `verifyClient` - A function which can be used to validate incoming connections. - `handleProtocols` - A function which can be used to handle the WebSocket subprotocols. -- `noServer` - Enable no server mode. - `clientTracking` - Specifies whether or not to track clients. - `perMessageDeflate` - Enable/disable permessage-deflate. - `maxPayload` - The maximum allowed message size in bytes. @@ -209,7 +226,9 @@ For more informations you can check [`ws` options documentation](https://github. _**NB:** By default if you do not provide a `server` option `fastify-websocket` will bind your websocket server instance to the scoped `fastify` instance._ -_**NB:** `path` option from `ws` shouldn't be provided since the routing is handled by fastify itself_ +_**NB:** the `path` option from `ws` shouldn't be provided since the routing is handled by fastify itself_ + +_**NB:** the `noServer` option from `ws` shouldn't be provided since the point of fastify-websocket is to listen on the fastify server. If you want a custom server, you can use the `server` option, and if you want more control, you can use the `ws` library directly_ ## Acknowledgements diff --git a/index.js b/index.js index 703258f..d08519b 100644 --- a/index.js +++ b/index.js @@ -4,7 +4,8 @@ const { ServerResponse } = require('http') const fp = require('fastify-plugin') const WebSocket = require('ws') -const kWs = Symbol('ws') +const kWs = Symbol('ws-socket') +const kWsHead = Symbol('ws-head') function fastifyWebsocket (fastify, opts, next) { let errorHandler = defaultErrorHandler @@ -16,25 +17,65 @@ function fastifyWebsocket (fastify, opts, next) { errorHandler = opts.errorHandler } - const options = Object.assign({}, opts.options) - if (options.path) { - fastify.log.warn('ws server path option shouldn\'t be provided, use a route instead') + if (opts.options && opts.options.noServer) { + return next(new Error("fastify-websocket doesn't support the ws noServer option. If you want to create a websocket server detatched from fastify, use the ws library directly.")) } - if (!options.server && !options.noServer) { - options.server = fastify.server + + const wssOptions = Object.assign({ noServer: true }, opts.options) + + if (wssOptions.path) { + fastify.log.warn('ws server path option shouldn\'t be provided, use a route instead') } - const wss = new WebSocket.Server(options) - wss.on('connection', handleRouting) + // We always handle upgrading ourselves in this library so that we can dispatch through the fastify stack before actually upgrading + // For this reason, we run the WebSocket.Server in noServer mode, and prevent the user from passing in a http.Server instance for it to attach to. + // Usually, we listen to the upgrade event of the `fastify.server`, but we do still support this server option by just listening to upgrades on it if passed. + const websocketListenServer = wssOptions.server || fastify.server + delete wssOptions.server + const wss = new WebSocket.Server(wssOptions) fastify.decorate('websocketServer', wss) + websocketListenServer.on('upgrade', (rawRequest, socket, head) => { + // Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket. + rawRequest[kWs] = socket + rawRequest[kWsHead] = head + + if (closing) { + handleUpgrade(rawRequest, (connection) => { + connection.socket.close(1001) + }) + } else { + const rawResponse = new ServerResponse(rawRequest) + fastify.routing(rawRequest, rawResponse) + } + }) + + const handleUpgrade = (rawRequest, callback) => { + wss.handleUpgrade(rawRequest, rawRequest[kWs], rawRequest[kWsHead], (socket) => { + wss.emit('connection', socket, rawRequest) + + const connection = WebSocket.createWebSocketStream(socket) + connection.socket = socket + + connection.socket.on('newListener', event => { + if (event === 'message') { + connection.resume() + } + }) + + callback(connection) + }) + } + fastify.addHook('onError', (request, reply, error, done) => { if (request.raw[kWs]) { // Hijack reply to prevent fastify from sending the error after onError hooks are done running reply.hijack() - // Handle the error - errorHandler.call(this, error, request.raw[kWs], request, reply) + handleUpgrade(request.raw, connection => { + // Handle the error + errorHandler.call(this, error, connection, request, reply) + }) } done() }) @@ -63,18 +104,23 @@ function fastifyWebsocket (fastify, opts, next) { } } + // we always override the route handler so we can close websocket connections to routes to handlers that don't support websocket connections routeOptions.handler = (request, reply) => { + // within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so if (request.raw[kWs]) { reply.hijack() - let result - if (isWebsocketRoute) { - result = wsHandler.call(fastify, request.raw[kWs], request) - } else { - result = noHandle.call(fastify, request.raw[kWs], request.raw) - } - if (result && typeof result.catch === 'function') { - result.catch(err => errorHandler.call(this, err, request.raw[kWs], request, reply)) - } + handleUpgrade(request.raw, connection => { + let result + if (isWebsocketRoute) { + result = wsHandler.call(fastify, connection, request) + } else { + result = noHandle.call(fastify, connection, request) + } + + if (result && typeof result.catch === 'function') { + result.catch(err => errorHandler.call(this, err, connection, request, reply)) + } + }) } else { return handler.call(fastify, request, reply) } @@ -102,9 +148,9 @@ function fastifyWebsocket (fastify, opts, next) { } } - function noHandle (conn, req) { - this.log.info({ path: req.url }, 'closed incoming websocket connection') - req[kWs].socket.close() + function noHandle (connection, rawRequest) { + this.log.info({ path: rawRequest.url }, 'closed incoming websocket connection for path with no websocket handler') + connection.socket.close() } function defaultErrorHandler (error, conn, request, reply) { @@ -120,31 +166,14 @@ function fastifyWebsocket (fastify, opts, next) { const oldDefaultRoute = fastify.getDefaultRoute() fastify.setDefaultRoute(function (req, res) { if (req[kWs]) { - noHandle.call(fastify, req[kWs], req) + handleUpgrade(req, (connection) => { + noHandle.call(fastify, connection, req) + }) } else { return oldDefaultRoute(req, res) } }) - function handleRouting (connection, request) { - if (closing) { - connection.close(1001) - return - } - - const response = new ServerResponse(request) - request[kWs] = WebSocket.createWebSocketStream(connection) - request[kWs].socket = connection - - request[kWs].socket.on('newListener', event => { - if (event === 'message') { - request[kWs].resume() - } - }) - - fastify.routing(request, response) - } - next() } diff --git a/test/base.js b/test/base.js index 41b7ccf..8da980e 100644 --- a/test/base.js +++ b/test/base.js @@ -472,55 +472,10 @@ test('Should keep processing message when many medium sized messages are sent', }) }) -test('Should not set server if noServer option is set', (t) => { - t.plan(5) - +test('Should error server if the noServer option is set', (t) => { + t.plan(1) const fastify = Fastify() - t.tearDown(() => { - fastify.close() - }) - - const options = { - noServer: true - } - - fastify.register(fastifyWebsocket, { options }) - // this is all that's needed to create an echo server - fastify.get('/', { websocket: true }, (connection, request) => { - connection.pipe(connection) - t.tearDown(() => connection.destroy()) - }) - - // As the websocketserver is now completely detached, we have to - // handle the upgrade event. - fastify.ready((err) => { - t.error(err) - - t.assert(!fastify.websocketServer.server) - - fastify.server.on('upgrade', (request, socket, head) => { - fastify.websocketServer.handleUpgrade(request, socket, head, (ws) => { - fastify.websocketServer.emit('connection', ws, request) - }) - }) - }) - - fastify.listen(0, (err) => { - t.error(err) - - const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.tearDown(() => client.destroy()) - - client.setEncoding('utf8') - client.write('hello') - - client.once('data', (chunk) => { - t.equal(chunk, 'hello') - fastify.close(function (err) { - t.error(err) - }) - }) - }) + fastify.register(fastifyWebsocket, { options: { noServer: true } }) + t.rejects(fastify.ready()) }) diff --git a/test/hooks.js b/test/hooks.js index ad7e4f5..72ce050 100644 --- a/test/hooks.js +++ b/test/hooks.js @@ -179,3 +179,45 @@ test('Should not hijack reply for a normal http request in the internal onError }) }) }) + +test('Should run async hooks and still deliver quickly sent messages', (t) => { + t.plan(3) + const fastify = Fastify() + + t.tearDown(() => fastify.close()) + + fastify.register(fastifyWebsocket) + + fastify.addHook( + 'preValidation', + async () => await new Promise((resolve) => setTimeout(resolve, 25)) + ) + + fastify.get('/echo', { websocket: true }, (conn, request) => { + conn.setEncoding('utf8') + conn.write('hello client') + t.tearDown(conn.destroy.bind(conn)) + + conn.socket.on('message', (message) => { + t.equal(message.toString('utf-8'), 'hello server') + conn.end() + }) + }) + + fastify.listen(0, (err) => { + t.error(err) + const ws = new WebSocket( + 'ws://localhost:' + fastify.server.address().port + '/echo' + ) + const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) + t.tearDown(client.destroy.bind(client)) + + client.setEncoding('utf8') + client.write('hello server') + + client.once('data', (chunk) => { + t.equal(chunk, 'hello client') + client.end() + }) + }) +})