Skip to content

Commit

Permalink
Don't lose messages when hooks are asynchronous
Browse files Browse the repository at this point in the history
Before this change, fastify-websocket processed the upgrade request from the client before dispatching the request from the router. This means it created the websocket object and the duplex stream wrapping it before invoking all the userland code that might want to do stuff in hooks like authentication, or loading a session, or whatever.

If those hooks that run after the creation of the websocket are asynchronous, that means that the websocket that has already been set up and connected can receive messages during the execution of those async hooks. Generally I think developers would attach onMessage handlers in their route handler functions. If they do that, those message handler functions wouldn't be attached until after all the pre-route handlers have been run, which would mean any messages that arrive during hook processing won't trigger any message handlers and get silently dropped!

Instead, we should buffer any incoming messages until the userland route handler is ready to work with the WebSocket, and give it the chance to synchronously attach event listeners. The `net.Socket` will buffer any messages, and then the `ws` library will dispatch them as messages on the next process tick after setting up the `WebSocket` instance. The handler will run in between there and get a chance to register listeners. Hooks won't have access to the websocket if we do this, but they already didn't, and I think this is a big enough gotcha that that's fine.

To implement this, we stop using the `connection` event of the `ws` server, and instead listen to the `upgrade` event of the `http.Server`, and still dispatch that request through fastify. Then, in the route handler, we use `wss.handleUpgradeRequest` to create the WebSocket. This means that the `ws` server runs without actually attaching to an `http.Server`, which I think makes sense since we want finer control over when the upgrade is actually handled.  Because this requires some special options to be passed to the `ws` server, I opted to remove support for the `noServer` option to the ws server. I think this is fine -- it seems kind of strange to use a http sever plugin to register a websocket system that you don't want listening using that http server, but it is a breaking change. We still support the custom `server` option that gets passed through to `ws`.
  • Loading branch information
airhorns committed Mar 6, 2021
1 parent a825ffc commit a4732ae
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 96 deletions.
29 changes: 24 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,25 @@ fastify.listen(3000, err => {
}
})
```
**NB:**

### Attaching event handlers

It is important that websocket route handlers attach event handlers synchronously during handler execution to avoid accidentally dropping messages. If you want to do any async work in your websocket handler, say to authenticate a user or load data from a datastore, ensure you attach any `on('message')` handlers *before* you trigger this async work. Otherwise, messages might arrive while this async work is underway, and if there is no handler listening for this data, it will be silently dropped.

Here's an example of how to attach message handlers synchronously while still accessing asynchronous resources. We store a promise for the async thing in a local variable, and then attach the message handler synchronously, and then make the message handler itself asynchronous to grab the async data and do some processing.

```javascript
fastify.get('/*', { websocket: true }, (connection, request) => {
const sessionPromise = request.getSession() // example async session getter, called synchronously to return a promise

connection.socket.on('message', async (message) => {
const session = await sessionPromise()
// do somthing with the message and session
})
})
```

**NB:**

This plugin uses the same router as the fastify instance, this has a few implications to take into account:
- Websocket route handlers follow the usual `fastify` request lifecycle.
Expand Down Expand Up @@ -150,7 +168,7 @@ fastify.listen(3000, err => {

### Custom error handler:

You can optionally provide a custom errorHandler that will be used to handle any cleaning up:
You can optionally provide a custom errorHandler that will be used to handle any cleaning up:

```js
'use strict'
Expand Down Expand Up @@ -189,7 +207,7 @@ fastify.listen(3000, err => {
})
```

## Options :
## Options

`fastify-websocket` accept these options for [`ws`](https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketserveroptions-callback) :

Expand All @@ -200,7 +218,6 @@ fastify.listen(3000, err => {
- `server` - A pre-created Node.js HTTP/S server.
- `verifyClient` - A function which can be used to validate incoming connections.
- `handleProtocols` - A function which can be used to handle the WebSocket subprotocols.
- `noServer` - Enable no server mode.
- `clientTracking` - Specifies whether or not to track clients.
- `perMessageDeflate` - Enable/disable permessage-deflate.
- `maxPayload` - The maximum allowed message size in bytes.
Expand All @@ -209,7 +226,9 @@ For more informations you can check [`ws` options documentation](https://github.

_**NB:** By default if you do not provide a `server` option `fastify-websocket` will bind your websocket server instance to the scoped `fastify` instance._

_**NB:** `path` option from `ws` shouldn't be provided since the routing is handled by fastify itself_
_**NB:** the `path` option from `ws` shouldn't be provided since the routing is handled by fastify itself_

_**NB:** the `noServer` option from `ws` shouldn't be provided since the point of fastify-websocket is to listen on the fastify server. If you want a custom server, you can use the `server` option, and if you want more control, you can use the `ws` library directly_

## Acknowledgements

Expand Down
111 changes: 69 additions & 42 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ const { ServerResponse } = require('http')
const fp = require('fastify-plugin')
const WebSocket = require('ws')

const kWs = Symbol('ws')
const kWs = Symbol('ws-socket')
const kWsHead = Symbol('ws-head')

function fastifyWebsocket (fastify, opts, next) {
let errorHandler = defaultErrorHandler
Expand All @@ -16,25 +17,48 @@ function fastifyWebsocket (fastify, opts, next) {
errorHandler = opts.errorHandler
}

const options = Object.assign({}, opts.options)
if (options.path) {
fastify.log.warn('ws server path option shouldn\'t be provided, use a route instead')
if (opts.options && opts.options.noServer) {
return next(new Error("fastify-websocket doesn't support the ws noServer option. If you want to create a websocket server detatched from fastify, use the ws library directly."))
}
if (!options.server && !options.noServer) {
options.server = fastify.server

const wssOptions = Object.assign({ noServer: true }, opts.options)

if (wssOptions.path) {
fastify.log.warn('ws server path option shouldn\'t be provided, use a route instead')
}

const wss = new WebSocket.Server(options)
wss.on('connection', handleRouting)
// We always handle the upgrading manually so we can dispatch through the fastify stack before upgrading
const websocketListenServer = wssOptions.server || fastify.server
delete wssOptions.server

const wss = new WebSocket.Server(wssOptions)
fastify.decorate('websocketServer', wss)

const handleUpgrade = (rawRequest, callback) => {
wss.handleUpgrade(rawRequest, rawRequest[kWs], rawRequest[kWsHead], (socket) => {
wss.emit('connection', socket, rawRequest)

const connection = WebSocket.createWebSocketStream(socket)
connection.socket = socket

connection.socket.on('newListener', event => {
if (event === 'message') {
connection.resume()
}
})

callback(connection)
})
}

fastify.addHook('onError', (request, reply, error, done) => {
if (request.raw[kWs]) {
// Hijack reply to prevent fastify from sending the error after onError hooks are done running
reply.hijack()
// Handle the error
errorHandler.call(this, error, request.raw[kWs], request, reply)
handleUpgrade(request.raw, connection => {
// Handle the error
errorHandler.call(this, error, connection, request, reply)
})
}
done()
})
Expand Down Expand Up @@ -63,24 +87,44 @@ function fastifyWebsocket (fastify, opts, next) {
}
}

// we always override the route handler so we can close websocket connections to routes to handlers that don't support websocket connections
routeOptions.handler = (request, reply) => {
// within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so
if (request.raw[kWs]) {
reply.hijack()
let result
if (isWebsocketRoute) {
result = wsHandler.call(fastify, request.raw[kWs], request)
} else {
result = noHandle.call(fastify, request.raw[kWs], request.raw)
}
if (result && typeof result.catch === 'function') {
result.catch(err => errorHandler.call(this, err, request.raw[kWs], request, reply))
}
handleUpgrade(request.raw, connection => {
let result
if (isWebsocketRoute) {
result = wsHandler.call(fastify, connection, request)
} else {
result = noHandle.call(fastify, connection, request)
}

if (result && typeof result.catch === 'function') {
result.catch(err => errorHandler.call(this, err, connection, request, reply))
}
})
} else {
return handler.call(fastify, request, reply)
}
}
})

// We add an event listener to the nodejs standard library `upgrade` event on the raw `http.Server` instance. We save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
websocketListenServer.on('upgrade', (rawRequest, socket, head) => {
rawRequest[kWs] = socket
rawRequest[kWsHead] = head

if (closing) {
handleUpgrade(rawRequest, (connection) => {
connection.socket.close(1001)
})
} else {
const rawResponse = new ServerResponse(rawRequest)
fastify.routing(rawRequest, rawResponse)
}
})

fastify.addHook('onClose', close)

let closing = false
Expand All @@ -102,9 +146,9 @@ function fastifyWebsocket (fastify, opts, next) {
}
}

function noHandle (conn, req) {
this.log.info({ path: req.url }, 'closed incoming websocket connection')
req[kWs].socket.close()
function noHandle (connection, rawRequest) {
this.log.info({ path: rawRequest.url }, 'closed incoming websocket connection for path with no websocket handler')
connection.socket.close()
}

function defaultErrorHandler (error, conn, request, reply) {
Expand All @@ -120,31 +164,14 @@ function fastifyWebsocket (fastify, opts, next) {
const oldDefaultRoute = fastify.getDefaultRoute()
fastify.setDefaultRoute(function (req, res) {
if (req[kWs]) {
noHandle.call(fastify, req[kWs], req)
handleUpgrade(req, (connection) => {
noHandle.call(fastify, connection, req)
})
} else {
return oldDefaultRoute(req, res)
}
})

function handleRouting (connection, request) {
if (closing) {
connection.close(1001)
return
}

const response = new ServerResponse(request)
request[kWs] = WebSocket.createWebSocketStream(connection)
request[kWs].socket = connection

request[kWs].socket.on('newListener', event => {
if (event === 'message') {
request[kWs].resume()
}
})

fastify.routing(request, response)
}

next()
}

Expand Down
53 changes: 4 additions & 49 deletions test/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -472,55 +472,10 @@ test('Should keep processing message when many medium sized messages are sent',
})
})

test('Should not set server if noServer option is set', (t) => {
t.plan(5)

test('Should error server if the noServer option is set', (t) => {
t.plan(1)
const fastify = Fastify()
t.tearDown(() => {
fastify.close()
})

const options = {
noServer: true
}

fastify.register(fastifyWebsocket, { options })

// this is all that's needed to create an echo server
fastify.get('/', { websocket: true }, (connection, request) => {
connection.pipe(connection)
t.tearDown(() => connection.destroy())
})

// As the websocketserver is now completely detached, we have to
// handle the upgrade event.
fastify.ready((err) => {
t.error(err)

t.assert(!fastify.websocketServer.server)

fastify.server.on('upgrade', (request, socket, head) => {
fastify.websocketServer.handleUpgrade(request, socket, head, (ws) => {
fastify.websocketServer.emit('connection', ws, request)
})
})
})

fastify.listen(0, (err) => {
t.error(err)

const ws = new WebSocket('ws://localhost:' + fastify.server.address().port)
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' })
t.tearDown(() => client.destroy())

client.setEncoding('utf8')
client.write('hello')

client.once('data', (chunk) => {
t.equal(chunk, 'hello')
fastify.close(function (err) {
t.error(err)
})
})
})
fastify.register(fastifyWebsocket, { options: { noServer: true } })
t.rejects(fastify.ready())
})
42 changes: 42 additions & 0 deletions test/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,45 @@ test('Should not hijack reply for a normal http request in the internal onError
})
})
})

test('Should run async hooks and still deliver quickly sent messages', (t) => {
t.plan(3)
const fastify = Fastify()

t.tearDown(() => fastify.close())

fastify.register(fastifyWebsocket)

fastify.addHook(
'preValidation',
async () => await new Promise((resolve) => setTimeout(resolve, 25))
)

fastify.get('/echo', { websocket: true }, (conn, request) => {
conn.setEncoding('utf8')
conn.write('hello client')
t.tearDown(conn.destroy.bind(conn))

conn.socket.on('message', (message) => {
t.equal(message.toString('utf-8'), 'hello server')
conn.end()
})
})

fastify.listen(0, (err) => {
t.error(err)
const ws = new WebSocket(
'ws://localhost:' + fastify.server.address().port + '/echo'
)
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' })
t.tearDown(client.destroy.bind(client))

client.setEncoding('utf8')
client.write('hello server')

client.once('data', (chunk) => {
t.equal(chunk, 'hello client')
client.end()
})
})
})

0 comments on commit a4732ae

Please sign in to comment.