Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add #injectWS #276

Merged
merged 11 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,69 @@ fastify.register(require('@fastify/websocket'), {
})
```

### Testing

Testing the ws handler can be quite tricky, luckily `fastify-websocket` decorates fastify instance with `injectWS`.
It allows to test easily a websocket endpoint.

#### App.js

```js
'use strict'

const Fastify = require('fastify')
const FastifyWebSocket = require('@fastify/websocket')

const App = Fastify()

App.register(FastifyWebSocket);
App.register(async function(fastify) {
fastify.get('/', { websocket: true }, (connection) => {
connection.socket.on('message', message => {
connection.socket.send('hi from server')
})
})
})

module.exports = App
```

#### App.test.js

```js
'use strict'

const { test } = require('tap')
const Fastify = require('fastify')
const App = require('./app.js')

test('connect to /', async (t) => {
t.plan(1)

const fastify = Fastify()
fastify.register(App)
t.teardown(fastify.close.bind(fastify))

const ws = await fastify.injectWS('/')
let resolve;
const promise = new Promise(r => { resolve = r })

ws.on('message', (data) => {
resolve(data.toString());
})
ws.send('hi from client')

t.assert(await promise, 'hi from server')
// Remember to close the ws at the end
ws.terminate()
})
```

#### Things to know
- Websocket need to be closed manually at the end of each test.
- `fastify.ready()` needs to be awaited to ensure that fastify has been decorated.
- You need to register the event listener before sending the message if you need to process server response.

## Options

`@fastify/websocket` accept these options for [`ws`](https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketserveroptions-callback) :
Expand Down
66 changes: 65 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
'use strict'

const { ServerResponse } = require('node:http')
const { PassThrough } = require('node:stream')
const { randomBytes } = require('node:crypto')
const fp = require('fastify-plugin')
const WebSocket = require('ws')
const Duplexify = require('duplexify')

const kWs = Symbol('ws-socket')
const kWsHead = Symbol('ws-head')

function fastifyWebsocket (fastify, opts, next) {
fastify.decorateRequest('ws', null)
const injectedWs = []

let errorHandler = defaultErrorHandler
if (opts.errorHandler) {
Expand Down Expand Up @@ -47,6 +51,57 @@ function fastifyWebsocket (fastify, opts, next) {
const wss = new WebSocket.Server(wssOptions)
fastify.decorate('websocketServer', wss)

async function injectWS (path = '/') {
const server2Client = new PassThrough()
const client2Server = new PassThrough()

const serverStream = new Duplexify(server2Client, client2Server)
const clientStream = new Duplexify(client2Server, server2Client)
DanieleFedeli marked this conversation as resolved.
Show resolved Hide resolved

const ws = new WebSocket(null, { isServer: false })
const head = Buffer.from([])

let resolve
const promise = new Promise(_resolve => { resolve = _resolve })

ws.on('open', () => {
clientStream.removeListener('data', onData)
resolve(ws)
})

const onData = (chunk) => {
// Assign the socket only if the upgrade was successful and the socket is open
// istanbul ignore next
if (chunk.toString().includes('HTTP/1.1 101 Switching Protocols')) {
ws._isServer = false
ws.setSocket(clientStream, head, { maxPayload: 0 })
injectedWs.push(ws)
}
}

clientStream.on('data', onData)

const req = {
method: 'GET',
headers: {
connection: 'upgrade',
upgrade: 'websocket',
'sec-websocket-version': 13,
'sec-websocket-key': randomBytes(16).toString('base64')
},
httpVersion: '1.1',
url: path,
[kWs]: serverStream,
[kWsHead]: head
}

websocketListenServer.emit('upgrade', req, req[kWs], req[kWsHead])
DanieleFedeli marked this conversation as resolved.
Show resolved Hide resolved

return promise
}

fastify.decorate('injectWS', injectWS)

function onUpgrade (rawRequest, socket, head) {
// Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
rawRequest[kWs] = socket
Expand Down Expand Up @@ -164,6 +219,15 @@ function fastifyWebsocket (fastify, opts, next) {
client.close()
}
}

// Close all the mocked socket used by injectWS
// This is needed to avoid test hanging
if (injectedWs.length) {
for (const ws of injectedWs) {
ws.terminate()
}
}
DanieleFedeli marked this conversation as resolved.
Show resolved Hide resolved

fastify.server.removeListener('upgrade', onUpgrade)

server.close(done)
Expand All @@ -181,7 +245,7 @@ function fastifyWebsocket (fastify, opts, next) {
// Since we already handled the error, adding this listener prevents the ws
// library from emitting the error and causing an uncaughtException
// Reference: https://github.com/websockets/ws/blob/master/lib/stream.js#L35
conn.on('error', _ => {})
conn.on('error', _ => { })
request.log.error(error)
conn.destroy(error)
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"tsd": "^0.29.0"
},
"dependencies": {
"duplexify": "^4.1.2",
"fastify-plugin": "^4.0.0",
"ws": "^8.0.0"
},
Expand Down
85 changes: 85 additions & 0 deletions test/inject.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
'use strict'

const { test } = require('tap')
const Fastify = require('fastify')
const fastifyWebsocket = require('..')

function buildFastify (t) {
const fastify = Fastify()
t.teardown(() => { fastify.close() })
fastify.register(fastifyWebsocket)
return fastify
}

test('routes correctly the message', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
const promise = new Promise((resolve) => { _resolve = resolve })

fastify.register(
async function (instance) {
instance.get('/ws', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS('/ws')
ws.send(message)
t.same(await promise, message)
})

test('redirect on / if no path specified', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
const promise = new Promise((resolve) => { _resolve = resolve })

fastify.register(
async function (instance) {
instance.get('/', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS()
ws.send(message)
t.same(await promise, message)
})

test('routes correctly the message between two routes', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
let _reject
const promise = new Promise((resolve, reject) => { _resolve = resolve; _reject = reject })

fastify.register(
async function (instance) {
instance.get('/ws', { websocket: true }, function (conn) {
conn.once('data', () => {
_reject('wrong-route')
})
})

instance.get('/ws-2', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS('/ws-2')
ws.send(message)
t.same(await promise, message)
})
DanieleFedeli marked this conversation as resolved.
Show resolved Hide resolved
7 changes: 2 additions & 5 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ declare module 'fastify' {
interface FastifyInstance<RawServer, RawRequest, RawReply, Logger, TypeProvider> {
get: RouteShorthandMethod<RawServer, RawRequest, RawReply, TypeProvider>,
websocketServer: WebSocket.Server,
injectWS: (path?: string) => Promise<WebSocket>
}

interface FastifyRequest {
Expand Down Expand Up @@ -66,7 +67,6 @@ type FastifyWebsocket = FastifyPluginCallback<fastifyWebsocket.WebsocketPluginOp
declare namespace fastifyWebsocket {

interface WebSocketServerOptions extends Omit<WebSocket.ServerOptions, "path"> { }

export type WebsocketHandler<
RawServer extends RawServerBase = RawServerDefault,
RawRequest extends RawRequestDefaultExpression<RawServer> = RawRequestDefaultExpression<RawServer>,
Expand All @@ -80,18 +80,15 @@ declare namespace fastifyWebsocket {
connection: SocketStream,
request: FastifyRequest<RequestGeneric, RawServer, RawRequest, SchemaCompiler, TypeProvider, ContextConfig, Logger>
) => void | Promise<any>;

export interface SocketStream extends Duplex {
socket: WebSocket;
}

export interface WebsocketPluginOptions {
errorHandler?: (this: FastifyInstance, error: Error, connection: SocketStream, request: FastifyRequest, reply: FastifyReply) => void;
options?: WebSocketServerOptions;
connectionOptions?: DuplexOptions;
preClose?: preCloseHookHandler | preCloseAsyncHookHandler;
}

export interface RouteOptions<
RawServer extends RawServerBase = RawServerDefault,
RawRequest extends RawRequestDefaultExpression<RawServer> = RawRequestDefaultExpression<RawServer>,
Expand All @@ -108,4 +105,4 @@ declare namespace fastifyWebsocket {
}

declare function fastifyWebsocket(...params: Parameters<FastifyWebsocket>): ReturnType<FastifyWebsocket>
export = fastifyWebsocket
export = fastifyWebsocket