Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: provide websocket instead of stream to avoid potential backpressure issues (#289) #290

Merged
merged 4 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 32 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ After registering this plugin, you can choose on which routes the WS server will
const fastify = require('fastify')()
fastify.register(require('@fastify/websocket'))
fastify.register(async function (fastify) {
fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
connection.socket.on('message', message => {
fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server')
socket.send('hi from server')
})
})
})
Expand All @@ -63,17 +63,17 @@ fastify.register(require('@fastify/websocket'), {
})

fastify.register(async function (fastify) {
fastify.get('/*', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
connection.socket.on('message', message => {
fastify.get('/*', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from wildcard route')
socket.send('hi from wildcard route')
})
})

fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
connection.socket.on('message', message => {
fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server')
socket.send('hi from server')
})
})
})
Expand All @@ -93,10 +93,10 @@ It is important that websocket route handlers attach event handlers synchronousl
Here is an example of how to attach message handlers synchronously while still accessing asynchronous resources. We store a promise for the async thing in a local variable, attach the message handler synchronously, and then make the message handler itself asynchronous to grab the async data and do some processing:

```javascript
fastify.get('/*', { websocket: true }, (connection, request) => {
fastify.get('/*', { websocket: true }, (socket, request) => {
const sessionPromise = request.getSession() // example async session getter, called synchronously to return a promise

connection.socket.on('message', async (message) => {
socket.on('message', async (message) => {
const session = await sessionPromise()
// do something with the message and session
})
Expand All @@ -113,9 +113,9 @@ fastify.addHook('preValidation', async (request, reply) => {
await reply.code(401).send("not authenticated");
}
})
fastify.get('/', { websocket: true }, (connection, req) => {
fastify.get('/', { websocket: true }, (socket, req) => {
// the connection will only be opened for authenticated incoming requests
connection.socket.on('message', message => {
socket.on('message', message => {
// ...
})
})
Expand All @@ -134,13 +134,13 @@ import websocket from '@fastify/websocket'
const fastify = Fastify()
await fastify.register(websocket)

fastify.get('/', { websocket: true }, function wsHandler (connection, req) {
fastify.get('/', { websocket: true }, function wsHandler (socket, req) {
// bound to fastify server
this.myDecoration.someFunc()

connection.socket.on('message', message => {
socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server')
socket.send('hi from server')
})
})

Expand All @@ -154,8 +154,8 @@ If you need to handle both HTTP requests and incoming socket connections on the

const fastify = require('fastify')()

function handle (conn, req) {
conn.pipe(conn) // creates an echo server
function handle (socket, req) {
socket.on('message', (data) => socket.send(data)) // creates an echo server
}

fastify.register(require('@fastify/websocket'), {
Expand All @@ -171,13 +171,12 @@ fastify.register(async function () {
// this will handle http requests
reply.send({ hello: 'world' })
},
wsHandler: (conn, req) => {
wsHandler: (socket, req) => {
// this will handle websockets connections
conn.setEncoding('utf8')
conn.write('hello client')
socket.send('hello client')

conn.once('data', chunk => {
conn.end()
socket.once('message', chunk => {
socket.close()
})
}
})
Expand All @@ -201,10 +200,10 @@ Neither the `errorHandler` passed to this plugin or fastify's `onError` hook wil
const fastify = require('fastify')()

fastify.register(require('@fastify/websocket'), {
errorHandler: function (error, conn /* SocketStream */, req /* FastifyRequest */, reply /* FastifyReply */) {
errorHandler: function (error, socket /* WebSocket */, req /* FastifyRequest */, reply /* FastifyReply */) {
// Do stuff
// destroy/close connection
conn.destroy(error)
socket.terminate()
},
options: {
maxPayload: 1048576, // we set the maximum allowed messages size to 1 MiB (1024 bytes * 1024 bytes)
Expand All @@ -217,10 +216,10 @@ fastify.register(require('@fastify/websocket'), {
}
})

fastify.get('/', { websocket: true }, (connection /* SocketStream */, req /* FastifyRequest */) => {
connection.socket.on('message', message => {
fastify.get('/', { websocket: true }, (socket /* WebSocket */, req /* FastifyRequest */) => {
socket.on('message', message => {
// message.toString() === 'hi from client'
connection.socket.send('hi from server')
socket.send('hi from server')
})
})

Expand All @@ -247,8 +246,8 @@ fastify.register(require('@fastify/websocket'), {
preClose: (done) => { // Note: can also use async style, without done-callback
const server = this.websocketServer

for (const connection of server.clients) {
connection.close(1001, 'WS server is going offline in custom manner, sending a code + message')
for (const socket of server.clients) {
socket.close(1001, 'WS server is going offline in custom manner, sending a code + message')
}

server.close(done)
Expand Down Expand Up @@ -282,9 +281,9 @@ App.register(async function(fastify) {
}
})

fastify.get('/', { websocket: true }, (connection) => {
connection.socket.on('message', message => {
connection.socket.send('hi from server')
fastify.get('/', { websocket: true }, (socket) => {
socket.on('message', message => {
socket.send('hi from server')
})
})
})
Expand Down Expand Up @@ -350,15 +349,6 @@ _**NB** The `path` option from `ws` should not be provided since the routing is

_**NB** The `noServer` option from `ws` should not be provided since the point of @fastify/websocket is to listen on the fastify server. If you want a custom server, you can use the `server` option, and if you want more control, you can use the `ws` library directly_

You can also pass the following as `connectionOptions` for [createWebSocketStream](https://github.com/websockets/ws/blob/master/doc/ws.md#createwebsocketstreamwebsocket-options).

- `allowHalfOpen` <boolean> If set to false, then the stream will automatically end the writable side when the readable side ends. Default: true.
- `readable` <boolean> Sets whether the Duplex should be readable. Default: true.
- `writable` <boolean> Sets whether the Duplex should be writable. Default: true.
- `readableObjectMode` <boolean> Sets objectMode for readable side of the stream. Has no effect if objectMode is true. Default: false.
- `readableHighWaterMark` <number> Sets highWaterMark for the readable side of the stream.
- `writableHighWaterMark` <number> Sets highWaterMark for the writable side of the stream.

[ws](https://github.com/websockets/ws) does not allow you to set `objectMode` or `writableObjectMode` to true
## Acknowledgements

Expand Down
36 changes: 11 additions & 25 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,11 @@ function fastifyWebsocket (fastify, opts, next) {
wss.handleUpgrade(rawRequest, rawRequest[kWs], rawRequest[kWsHead], (socket) => {
wss.emit('connection', socket, rawRequest)

const connection = WebSocket.createWebSocketStream(socket, opts.connectionOptions)
connection.socket = socket

connection.on('error', (error) => {
socket.on('error', (error) => {
fastify.log.error(error)
})

connection.socket.on('newListener', event => {
if (event === 'message') {
connection.resume()
}
})

callback(connection)
callback(socket)
})
}

Expand Down Expand Up @@ -187,20 +178,20 @@ function fastifyWebsocket (fastify, opts, next) {
// within the route handler, we check if there has been a connection upgrade by looking at request.raw[kWs]. we need to dispatch the normal HTTP handler if not, and hijack to dispatch the websocket handler if so
if (request.raw[kWs]) {
reply.hijack()
handleUpgrade(request.raw, connection => {
handleUpgrade(request.raw, socket => {
let result
try {
if (isWebsocketRoute) {
result = wsHandler.call(this, connection, request)
result = wsHandler.call(this, socket, request)
} else {
result = noHandle.call(this, connection, request)
result = noHandle.call(this, socket, request)
}
} catch (err) {
return errorHandler.call(this, err, connection, request, reply)
return errorHandler.call(this, err, socket, request, reply)
}

if (result && typeof result.catch === 'function') {
result.catch(err => errorHandler.call(this, err, connection, request, reply))
result.catch(err => errorHandler.call(this, err, socket, request, reply))
}
})
} else {
Expand Down Expand Up @@ -229,19 +220,14 @@ function fastifyWebsocket (fastify, opts, next) {
done()
}

function noHandle (connection, rawRequest) {
function noHandle (socket, rawRequest) {
this.log.info({ path: rawRequest.url }, 'closed incoming websocket connection for path with no websocket handler')
connection.socket.close()
socket.close()
}

function defaultErrorHandler (error, conn, request) {
// Before destroying the connection, we attach an error listener.
// Since we already handled the error, adding this listener prevents the ws
// library from emitting the error and causing an uncaughtException
// Reference: https://github.com/websockets/ws/blob/master/lib/stream.js#L35
conn.on('error', _ => { })
function defaultErrorHandler (error, socket, request) {
request.log.error(error)
conn.destroy(error)
socket.terminate()
}

next()
Expand Down
Loading
Loading