diff --git a/README.md b/README.md index c461662..6c47cf5 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,25 @@ fastify.listen(3000, err => { } }) ``` -**NB:** + +### Attaching event handlers + +It is important that websocket route handlers attach event handlers synchronously during handler execution to avoid accidentally dropping messages. If you want to do any async work in your websocket handler, say to authenticate a user or load data from a datastore, ensure you attach any `on('message')` handlers *before* you trigger this async work. Otherwise, messages might arrive while this async work is underway, and if there is no handler listening for this data, it will be silently dropped. + +Here's an example of how to attach message handlers synchronously while still accessing asynchronous resources. We store a promise for the async thing in a local variable, and then attach the message handler synchronously, and then make the message handler itself asynchronous to grab the async data and do some processing. + +```javascript +fastify.get('/*', { websocket: true }, (connection, request) => { + const sessionPromise = request.getSession() // example async session getter, called synchronously to return a promise + + connection.socket.on('message', async (message) => { + const session = await sessionPromise() + // do somthing with the message and session + }) +}) +``` + +**NB:** This plugin uses the same router as the fastify instance, this has a few implications to take into account: - Websocket route handlers follow the usual `fastify` request lifecycle. @@ -150,7 +168,7 @@ fastify.listen(3000, err => { ### Custom error handler: -You can optionally provide a custom errorHandler that will be used to handle any cleaning up: +You can optionally provide a custom errorHandler that will be used to handle any cleaning up: ```js 'use strict' @@ -189,7 +207,7 @@ fastify.listen(3000, err => { }) ``` -## Options : +## Options `fastify-websocket` accept these options for [`ws`](https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketserveroptions-callback) : @@ -200,7 +218,6 @@ fastify.listen(3000, err => { - `server` - A pre-created Node.js HTTP/S server. - `verifyClient` - A function which can be used to validate incoming connections. - `handleProtocols` - A function which can be used to handle the WebSocket subprotocols. -- `noServer` - Enable no server mode. - `clientTracking` - Specifies whether or not to track clients. - `perMessageDeflate` - Enable/disable permessage-deflate. - `maxPayload` - The maximum allowed message size in bytes. @@ -209,7 +226,9 @@ For more informations you can check [`ws` options documentation](https://github. _**NB:** By default if you do not provide a `server` option `fastify-websocket` will bind your websocket server instance to the scoped `fastify` instance._ -_**NB:** `path` option from `ws` shouldn't be provided since the routing is handled by fastify itself_ +_**NB:** the `path` option from `ws` shouldn't be provided since the routing is handled by fastify itself_ + +_**NB:** the `noServer` option from `ws` shouldn't be provided since the point of fastify-websocket is to listen on the fastify server. If you want a custom server, you can use the `server` option, and if you want more control, you can use the `ws` library directly_ ## Acknowledgements diff --git a/index.js b/index.js index 703258f..8a0ba96 100644 --- a/index.js +++ b/index.js @@ -4,7 +4,8 @@ const { ServerResponse } = require('http') const fp = require('fastify-plugin') const WebSocket = require('ws') -const kWs = Symbol('ws') +const kWs = Symbol('ws-socket') +const kWsHead = Symbol('ws-head') function fastifyWebsocket (fastify, opts, next) { let errorHandler = defaultErrorHandler @@ -16,25 +17,48 @@ function fastifyWebsocket (fastify, opts, next) { errorHandler = opts.errorHandler } - const options = Object.assign({}, opts.options) - if (options.path) { - fastify.log.warn('ws server path option shouldn\'t be provided, use a route instead') + if (opts.options && opts.options.noServer) { + return next(new Error("fastify-websocket doesn't support the ws noServer option. If you want to create a websocket server detatched from fastify, use the ws library directly.")) } - if (!options.server && !options.noServer) { - options.server = fastify.server + + const wssOptions = Object.assign({ noServer: true }, opts.options) + + if (wssOptions.path) { + fastify.log.warn('ws server path option shouldn\'t be provided, use a route instead') } - const wss = new WebSocket.Server(options) - wss.on('connection', handleRouting) + // We always handle the upgrading manually so we can dispatch through the fastify stack before upgrading + const websocketListenServer = wssOptions.server || fastify.server + delete wssOptions.server + const wss = new WebSocket.Server(wssOptions) fastify.decorate('websocketServer', wss) + const handleUpgrade = (rawRequest, callback) => { + wss.handleUpgrade(rawRequest, rawRequest[kWs], rawRequest[kWsHead], (socket) => { + wss.emit('connection', socket, rawRequest) + + const connection = WebSocket.createWebSocketStream(socket) + connection.socket = socket + + connection.socket.on('newListener', event => { + if (event === 'message') { + connection.resume() + } + }) + + callback(connection) + }) + } + fastify.addHook('onError', (request, reply, error, done) => { if (request.raw[kWs]) { // Hijack reply to prevent fastify from sending the error after onError hooks are done running reply.hijack() - // Handle the error - errorHandler.call(this, error, request.raw[kWs], request, reply) + handleUpgrade(request.raw, connection => { + // Handle the error + errorHandler.call(this, error, connection, request, reply) + }) } done() }) @@ -63,24 +87,44 @@ function fastifyWebsocket (fastify, opts, next) { } } + // we always override the route handler so we can close websocket connections to routes to handlers that don't support websocket connections routeOptions.handler = (request, reply) => { + // within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so if (request.raw[kWs]) { reply.hijack() - let result - if (isWebsocketRoute) { - result = wsHandler.call(fastify, request.raw[kWs], request) - } else { - result = noHandle.call(fastify, request.raw[kWs], request.raw) - } - if (result && typeof result.catch === 'function') { - result.catch(err => errorHandler.call(this, err, request.raw[kWs], request, reply)) - } + handleUpgrade(request.raw, connection => { + let result + if (isWebsocketRoute) { + result = wsHandler.call(fastify, connection, request) + } else { + result = noHandle.call(fastify, connection, request) + } + + if (result && typeof result.catch === 'function') { + result.catch(err => errorHandler.call(this, err, connection, request, reply)) + } + }) } else { return handler.call(fastify, request, reply) } } }) + // We add an event listener to the nodejs standard library `upgrade` event on the raw `http.Server` instance. We save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket. + websocketListenServer.on('upgrade', (rawRequest, socket, head) => { + rawRequest[kWs] = socket + rawRequest[kWsHead] = head + + if (closing) { + handleUpgrade(rawRequest, (connection) => { + connection.socket.close(1001) + }) + } else { + const rawResponse = new ServerResponse(rawRequest) + fastify.routing(rawRequest, rawResponse) + } + }) + fastify.addHook('onClose', close) let closing = false @@ -102,9 +146,9 @@ function fastifyWebsocket (fastify, opts, next) { } } - function noHandle (conn, req) { - this.log.info({ path: req.url }, 'closed incoming websocket connection') - req[kWs].socket.close() + function noHandle (connection, rawRequest) { + this.log.info({ path: rawRequest.url }, 'closed incoming websocket connection for path with no websocket handler') + connection.socket.close() } function defaultErrorHandler (error, conn, request, reply) { @@ -120,31 +164,14 @@ function fastifyWebsocket (fastify, opts, next) { const oldDefaultRoute = fastify.getDefaultRoute() fastify.setDefaultRoute(function (req, res) { if (req[kWs]) { - noHandle.call(fastify, req[kWs], req) + handleUpgrade(req, (connection) => { + noHandle.call(fastify, connection, req) + }) } else { return oldDefaultRoute(req, res) } }) - function handleRouting (connection, request) { - if (closing) { - connection.close(1001) - return - } - - const response = new ServerResponse(request) - request[kWs] = WebSocket.createWebSocketStream(connection) - request[kWs].socket = connection - - request[kWs].socket.on('newListener', event => { - if (event === 'message') { - request[kWs].resume() - } - }) - - fastify.routing(request, response) - } - next() } diff --git a/test/base.js b/test/base.js index 41b7ccf..8da980e 100644 --- a/test/base.js +++ b/test/base.js @@ -472,55 +472,10 @@ test('Should keep processing message when many medium sized messages are sent', }) }) -test('Should not set server if noServer option is set', (t) => { - t.plan(5) - +test('Should error server if the noServer option is set', (t) => { + t.plan(1) const fastify = Fastify() - t.tearDown(() => { - fastify.close() - }) - - const options = { - noServer: true - } - - fastify.register(fastifyWebsocket, { options }) - // this is all that's needed to create an echo server - fastify.get('/', { websocket: true }, (connection, request) => { - connection.pipe(connection) - t.tearDown(() => connection.destroy()) - }) - - // As the websocketserver is now completely detached, we have to - // handle the upgrade event. - fastify.ready((err) => { - t.error(err) - - t.assert(!fastify.websocketServer.server) - - fastify.server.on('upgrade', (request, socket, head) => { - fastify.websocketServer.handleUpgrade(request, socket, head, (ws) => { - fastify.websocketServer.emit('connection', ws, request) - }) - }) - }) - - fastify.listen(0, (err) => { - t.error(err) - - const ws = new WebSocket('ws://localhost:' + fastify.server.address().port) - const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) - t.tearDown(() => client.destroy()) - - client.setEncoding('utf8') - client.write('hello') - - client.once('data', (chunk) => { - t.equal(chunk, 'hello') - fastify.close(function (err) { - t.error(err) - }) - }) - }) + fastify.register(fastifyWebsocket, { options: { noServer: true } }) + t.rejects(fastify.ready()) }) diff --git a/test/hooks.js b/test/hooks.js index ad7e4f5..72ce050 100644 --- a/test/hooks.js +++ b/test/hooks.js @@ -179,3 +179,45 @@ test('Should not hijack reply for a normal http request in the internal onError }) }) }) + +test('Should run async hooks and still deliver quickly sent messages', (t) => { + t.plan(3) + const fastify = Fastify() + + t.tearDown(() => fastify.close()) + + fastify.register(fastifyWebsocket) + + fastify.addHook( + 'preValidation', + async () => await new Promise((resolve) => setTimeout(resolve, 25)) + ) + + fastify.get('/echo', { websocket: true }, (conn, request) => { + conn.setEncoding('utf8') + conn.write('hello client') + t.tearDown(conn.destroy.bind(conn)) + + conn.socket.on('message', (message) => { + t.equal(message.toString('utf-8'), 'hello server') + conn.end() + }) + }) + + fastify.listen(0, (err) => { + t.error(err) + const ws = new WebSocket( + 'ws://localhost:' + fastify.server.address().port + '/echo' + ) + const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' }) + t.tearDown(client.destroy.bind(client)) + + client.setEncoding('utf8') + client.write('hello server') + + client.once('data', (chunk) => { + t.equal(chunk, 'hello client') + client.end() + }) + }) +})