Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't lose messages when hooks are asynchronous #106

Merged
merged 1 commit into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,25 @@ fastify.listen(3000, err => {
}
})
```
**NB:**

### Attaching event handlers

It is important that websocket route handlers attach event handlers synchronously during handler execution to avoid accidentally dropping messages. If you want to do any async work in your websocket handler, say to authenticate a user or load data from a datastore, ensure you attach any `on('message')` handlers *before* you trigger this async work. Otherwise, messages might arrive while this async work is underway, and if there is no handler listening for this data, it will be silently dropped.

Here's an example of how to attach message handlers synchronously while still accessing asynchronous resources. We store a promise for the async thing in a local variable, and then attach the message handler synchronously, and then make the message handler itself asynchronous to grab the async data and do some processing.

```javascript
fastify.get('/*', { websocket: true }, (connection, request) => {
const sessionPromise = request.getSession() // example async session getter, called synchronously to return a promise

connection.socket.on('message', async (message) => {
const session = await sessionPromise()
// do somthing with the message and session
})
})
```

**NB:**

This plugin uses the same router as the fastify instance, this has a few implications to take into account:
- Websocket route handlers follow the usual `fastify` request lifecycle.
Expand Down Expand Up @@ -150,7 +168,7 @@ fastify.listen(3000, err => {

### Custom error handler:

You can optionally provide a custom errorHandler that will be used to handle any cleaning up:
You can optionally provide a custom errorHandler that will be used to handle any cleaning up:

```js
'use strict'
Expand Down Expand Up @@ -189,7 +207,7 @@ fastify.listen(3000, err => {
})
```

## Options :
## Options

`fastify-websocket` accept these options for [`ws`](https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketserveroptions-callback) :

Expand All @@ -200,7 +218,6 @@ fastify.listen(3000, err => {
- `server` - A pre-created Node.js HTTP/S server.
- `verifyClient` - A function which can be used to validate incoming connections.
- `handleProtocols` - A function which can be used to handle the WebSocket subprotocols.
- `noServer` - Enable no server mode.
- `clientTracking` - Specifies whether or not to track clients.
- `perMessageDeflate` - Enable/disable permessage-deflate.
- `maxPayload` - The maximum allowed message size in bytes.
Expand All @@ -209,7 +226,9 @@ For more informations you can check [`ws` options documentation](https://github.

_**NB:** By default if you do not provide a `server` option `fastify-websocket` will bind your websocket server instance to the scoped `fastify` instance._

_**NB:** `path` option from `ws` shouldn't be provided since the routing is handled by fastify itself_
_**NB:** the `path` option from `ws` shouldn't be provided since the routing is handled by fastify itself_

_**NB:** the `noServer` option from `ws` shouldn't be provided since the point of fastify-websocket is to listen on the fastify server. If you want a custom server, you can use the `server` option, and if you want more control, you can use the `ws` library directly_

## Acknowledgements

Expand Down
113 changes: 71 additions & 42 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ const { ServerResponse } = require('http')
const fp = require('fastify-plugin')
const WebSocket = require('ws')

const kWs = Symbol('ws')
const kWs = Symbol('ws-socket')
const kWsHead = Symbol('ws-head')

function fastifyWebsocket (fastify, opts, next) {
let errorHandler = defaultErrorHandler
Expand All @@ -16,25 +17,65 @@ function fastifyWebsocket (fastify, opts, next) {
errorHandler = opts.errorHandler
}

const options = Object.assign({}, opts.options)
if (options.path) {
fastify.log.warn('ws server path option shouldn\'t be provided, use a route instead')
if (opts.options && opts.options.noServer) {
return next(new Error("fastify-websocket doesn't support the ws noServer option. If you want to create a websocket server detatched from fastify, use the ws library directly."))
}
if (!options.server && !options.noServer) {
options.server = fastify.server

const wssOptions = Object.assign({ noServer: true }, opts.options)

if (wssOptions.path) {
fastify.log.warn('ws server path option shouldn\'t be provided, use a route instead')
}

const wss = new WebSocket.Server(options)
wss.on('connection', handleRouting)
// We always handle upgrading ourselves in this library so that we can dispatch through the fastify stack before actually upgrading
// For this reason, we run the WebSocket.Server in noServer mode, and prevent the user from passing in a http.Server instance for it to attach to.
// Usually, we listen to the upgrade event of the `fastify.server`, but we do still support this server option by just listening to upgrades on it if passed.
const websocketListenServer = wssOptions.server || fastify.server
delete wssOptions.server
mcollina marked this conversation as resolved.
Show resolved Hide resolved

const wss = new WebSocket.Server(wssOptions)
fastify.decorate('websocketServer', wss)

websocketListenServer.on('upgrade', (rawRequest, socket, head) => {
// Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
rawRequest[kWs] = socket
rawRequest[kWsHead] = head

if (closing) {
handleUpgrade(rawRequest, (connection) => {
connection.socket.close(1001)
})
} else {
const rawResponse = new ServerResponse(rawRequest)
fastify.routing(rawRequest, rawResponse)
}
})

const handleUpgrade = (rawRequest, callback) => {
wss.handleUpgrade(rawRequest, rawRequest[kWs], rawRequest[kWsHead], (socket) => {
wss.emit('connection', socket, rawRequest)

const connection = WebSocket.createWebSocketStream(socket)
connection.socket = socket

connection.socket.on('newListener', event => {
if (event === 'message') {
connection.resume()
}
})

callback(connection)
})
}

fastify.addHook('onError', (request, reply, error, done) => {
if (request.raw[kWs]) {
// Hijack reply to prevent fastify from sending the error after onError hooks are done running
reply.hijack()
// Handle the error
errorHandler.call(this, error, request.raw[kWs], request, reply)
handleUpgrade(request.raw, connection => {
// Handle the error
errorHandler.call(this, error, connection, request, reply)
})
}
done()
})
Expand Down Expand Up @@ -63,18 +104,23 @@ function fastifyWebsocket (fastify, opts, next) {
}
}

// we always override the route handler so we can close websocket connections to routes to handlers that don't support websocket connections
routeOptions.handler = (request, reply) => {
// within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so
if (request.raw[kWs]) {
reply.hijack()
let result
if (isWebsocketRoute) {
result = wsHandler.call(fastify, request.raw[kWs], request)
} else {
result = noHandle.call(fastify, request.raw[kWs], request.raw)
}
if (result && typeof result.catch === 'function') {
result.catch(err => errorHandler.call(this, err, request.raw[kWs], request, reply))
}
handleUpgrade(request.raw, connection => {
let result
if (isWebsocketRoute) {
result = wsHandler.call(fastify, connection, request)
} else {
result = noHandle.call(fastify, connection, request)
}

if (result && typeof result.catch === 'function') {
result.catch(err => errorHandler.call(this, err, connection, request, reply))
}
})
} else {
return handler.call(fastify, request, reply)
}
Expand Down Expand Up @@ -102,9 +148,9 @@ function fastifyWebsocket (fastify, opts, next) {
}
}

function noHandle (conn, req) {
this.log.info({ path: req.url }, 'closed incoming websocket connection')
req[kWs].socket.close()
function noHandle (connection, rawRequest) {
this.log.info({ path: rawRequest.url }, 'closed incoming websocket connection for path with no websocket handler')
connection.socket.close()
}

function defaultErrorHandler (error, conn, request, reply) {
Expand All @@ -120,31 +166,14 @@ function fastifyWebsocket (fastify, opts, next) {
const oldDefaultRoute = fastify.getDefaultRoute()
fastify.setDefaultRoute(function (req, res) {
if (req[kWs]) {
noHandle.call(fastify, req[kWs], req)
handleUpgrade(req, (connection) => {
noHandle.call(fastify, connection, req)
})
} else {
return oldDefaultRoute(req, res)
}
})

function handleRouting (connection, request) {
if (closing) {
connection.close(1001)
return
}

const response = new ServerResponse(request)
request[kWs] = WebSocket.createWebSocketStream(connection)
request[kWs].socket = connection

request[kWs].socket.on('newListener', event => {
if (event === 'message') {
request[kWs].resume()
}
})

fastify.routing(request, response)
}

next()
}

Expand Down
53 changes: 4 additions & 49 deletions test/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -472,55 +472,10 @@ test('Should keep processing message when many medium sized messages are sent',
})
})

test('Should not set server if noServer option is set', (t) => {
t.plan(5)

test('Should error server if the noServer option is set', (t) => {
t.plan(1)
const fastify = Fastify()
t.tearDown(() => {
fastify.close()
})

const options = {
noServer: true
}

fastify.register(fastifyWebsocket, { options })

// this is all that's needed to create an echo server
fastify.get('/', { websocket: true }, (connection, request) => {
connection.pipe(connection)
t.tearDown(() => connection.destroy())
})

// As the websocketserver is now completely detached, we have to
// handle the upgrade event.
fastify.ready((err) => {
t.error(err)

t.assert(!fastify.websocketServer.server)

fastify.server.on('upgrade', (request, socket, head) => {
fastify.websocketServer.handleUpgrade(request, socket, head, (ws) => {
fastify.websocketServer.emit('connection', ws, request)
})
})
})

fastify.listen(0, (err) => {
t.error(err)

const ws = new WebSocket('ws://localhost:' + fastify.server.address().port)
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' })
t.tearDown(() => client.destroy())

client.setEncoding('utf8')
client.write('hello')

client.once('data', (chunk) => {
t.equal(chunk, 'hello')
fastify.close(function (err) {
t.error(err)
})
})
})
fastify.register(fastifyWebsocket, { options: { noServer: true } })
t.rejects(fastify.ready())
})
42 changes: 42 additions & 0 deletions test/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,45 @@ test('Should not hijack reply for a normal http request in the internal onError
})
})
})

test('Should run async hooks and still deliver quickly sent messages', (t) => {
t.plan(3)
const fastify = Fastify()

t.tearDown(() => fastify.close())

fastify.register(fastifyWebsocket)

fastify.addHook(
'preValidation',
async () => await new Promise((resolve) => setTimeout(resolve, 25))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests that messages that arrive while async hooks are processing are still seen in the route handler

)

fastify.get('/echo', { websocket: true }, (conn, request) => {
conn.setEncoding('utf8')
conn.write('hello client')
t.tearDown(conn.destroy.bind(conn))

conn.socket.on('message', (message) => {
t.equal(message.toString('utf-8'), 'hello server')
conn.end()
})
})

fastify.listen(0, (err) => {
t.error(err)
const ws = new WebSocket(
'ws://localhost:' + fastify.server.address().port + '/echo'
)
const client = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' })
t.tearDown(client.destroy.bind(client))

client.setEncoding('utf8')
client.write('hello server')

client.once('data', (chunk) => {
t.equal(chunk, 'hello client')
client.end()
})
})
})