Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add #injectWS #276

Merged
merged 11 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,78 @@ fastify.register(require('@fastify/websocket'), {
})
```

### Testing

Testing the ws handler can be quite tricky, luckily `fastify-websocket` decorates fastify instance with `injectWS`.
It allows to test easily a websocket endpoint.

The signature of injectWS is the following: `([path], [upgradeContext])`.

#### App.js

```js
'use strict'

const Fastify = require('fastify')
const FastifyWebSocket = require('@fastify/websocket')

const App = Fastify()

App.register(FastifyWebSocket);

App.register(async function(fastify) {
fastify.addHook('preValidation', async (request, reply) => {
if (request.headers['api-key'] !== 'some-random-key') {
return reply.code(401).send()
}
})

fastify.get('/', { websocket: true }, (connection) => {
connection.socket.on('message', message => {
connection.socket.send('hi from server')
})
})
})

module.exports = App
```

#### App.test.js

```js
'use strict'

const { test } = require('tap')
const Fastify = require('fastify')
const App = require('./app.js')

test('connect to /', async (t) => {
t.plan(1)

const fastify = Fastify()
fastify.register(App)
t.teardown(fastify.close.bind(fastify))

const ws = await fastify.injectWS('/', {headers: { "api-key" : "some-random-key" }})
let resolve;
const promise = new Promise(r => { resolve = r })

ws.on('message', (data) => {
resolve(data.toString());
})
ws.send('hi from client')

t.assert(await promise, 'hi from server')
// Remember to close the ws at the end
ws.terminate()
})
```

#### Things to know
- Websocket need to be closed manually at the end of each test.
- `fastify.ready()` needs to be awaited to ensure that fastify has been decorated.
- You need to register the event listener before sending the message if you need to process server response.

## Options

`@fastify/websocket` accept these options for [`ws`](https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketserveroptions-callback) :
Expand Down
60 changes: 59 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict'

const { ServerResponse } = require('node:http')
const { PassThrough } = require('node:stream')
const { randomBytes } = require('node:crypto')
const fp = require('fastify-plugin')
const WebSocket = require('ws')
const Duplexify = require('duplexify')

const kWs = Symbol('ws-socket')
const kWsHead = Symbol('ws-head')
Expand Down Expand Up @@ -47,6 +50,60 @@ function fastifyWebsocket (fastify, opts, next) {
const wss = new WebSocket.Server(wssOptions)
fastify.decorate('websocketServer', wss)

async function injectWS (path = '/', upgradeContext = {}) {
const server2Client = new PassThrough()
const client2Server = new PassThrough()

const serverStream = new Duplexify(server2Client, client2Server)
const clientStream = new Duplexify(client2Server, server2Client)
DanieleFedeli marked this conversation as resolved.
Show resolved Hide resolved

const ws = new WebSocket(null, undefined, { isServer: false })
const head = Buffer.from([])

let resolve, reject
const promise = new Promise((_resolve, _reject) => { resolve = _resolve; reject = _reject })

ws.on('open', () => {
clientStream.removeListener('data', onData)
resolve(ws)
})

const onData = (chunk) => {
if (chunk.toString().includes('HTTP/1.1 101 Switching Protocols')) {
ws._isServer = false
ws.setSocket(clientStream, head, { maxPayload: 0 })
} else {
clientStream.removeListener('data', onData)
const statusCode = Number(chunk.toString().match(/HTTP\/1.1 (\d+)/)[1])
reject(new Error('Unexpected server response: ' + statusCode))
}
}

clientStream.on('data', onData)

const req = {
...upgradeContext,
method: 'GET',
headers: {
...upgradeContext.headers,
connection: 'upgrade',
upgrade: 'websocket',
'sec-websocket-version': 13,
'sec-websocket-key': randomBytes(16).toString('base64')
},
httpVersion: '1.1',
url: path,
[kWs]: serverStream,
[kWsHead]: head
}

websocketListenServer.emit('upgrade', req, req[kWs], req[kWsHead])
DanieleFedeli marked this conversation as resolved.
Show resolved Hide resolved

return promise
}

fastify.decorate('injectWS', injectWS)

function onUpgrade (rawRequest, socket, head) {
// Save a reference to the socket and then dispatch the request through the normal fastify router so that it will invoke hooks and then eventually a route handler that might upgrade the socket.
rawRequest[kWs] = socket
Expand Down Expand Up @@ -164,6 +221,7 @@ function fastifyWebsocket (fastify, opts, next) {
client.close()
}
}

fastify.server.removeListener('upgrade', onUpgrade)

server.close(done)
Expand All @@ -181,7 +239,7 @@ function fastifyWebsocket (fastify, opts, next) {
// Since we already handled the error, adding this listener prevents the ws
// library from emitting the error and causing an uncaughtException
// Reference: https://github.com/websockets/ws/blob/master/lib/stream.js#L35
conn.on('error', _ => {})
conn.on('error', _ => { })
request.log.error(error)
conn.destroy(error)
}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"tsd": "^0.30.1"
},
"dependencies": {
"duplexify": "^4.1.2",
"fastify-plugin": "^4.0.0",
"ws": "^8.0.0"
},
Expand Down
134 changes: 134 additions & 0 deletions test/inject.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
'use strict'

const { test } = require('tap')
const Fastify = require('fastify')
const fastifyWebsocket = require('..')

function buildFastify (t) {
const fastify = Fastify()
t.teardown(() => { fastify.close() })
fastify.register(fastifyWebsocket)
return fastify
}

test('routes correctly the message', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
const promise = new Promise((resolve) => { _resolve = resolve })

fastify.register(
async function (instance) {
instance.get('/ws', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS('/ws')
ws.send(message)
t.same(await promise, message)
ws.terminate()
})

test('redirect on / if no path specified', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
const promise = new Promise((resolve) => { _resolve = resolve })

fastify.register(
async function (instance) {
instance.get('/', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS()
ws.send(message)
t.same(await promise, message)
ws.terminate()
})

test('routes correctly the message between two routes', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
let _reject
const promise = new Promise((resolve, reject) => { _resolve = resolve; _reject = reject })

fastify.register(
async function (instance) {
instance.get('/ws', { websocket: true }, function (conn) {
conn.once('data', () => {
_reject('wrong-route')
})
})

instance.get('/ws-2', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS('/ws-2')
ws.send(message)
t.same(await promise, message)
ws.terminate()
})

test('use the upgrade context to upgrade if there is some hook', async (t) => {
const fastify = buildFastify(t)
const message = 'hi from client'

let _resolve
const promise = new Promise((resolve) => { _resolve = resolve })

fastify.register(
async function (instance) {
instance.addHook('preValidation', async (request, reply) => {
if (request.headers['api-key'] !== 'some-random-key') {
return reply.code(401).send()
}
})

instance.get('/', { websocket: true }, function (conn) {
conn.once('data', chunk => {
_resolve(chunk.toString())
})
})
})

await fastify.ready()
const ws = await fastify.injectWS('/', { headers: { 'api-key': 'some-random-key' } })
ws.send(message)
t.same(await promise, message)
ws.terminate()
})
DanieleFedeli marked this conversation as resolved.
Show resolved Hide resolved

test('rejects if the websocket is not upgraded', async (t) => {
const fastify = buildFastify(t)

fastify.register(
async function (instance) {
instance.addHook('preValidation', async (request, reply) => {
return reply.code(401).send()
})

instance.get('/', { websocket: true }, function (conn) {
})
})

await fastify.ready()
t.rejects(fastify.injectWS('/'), 'Unexpected server response: 401')
})
9 changes: 4 additions & 5 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ declare module 'fastify' {
websocket?: boolean;
}

type InjectWSFn<RawRequest> =
((path?: string, upgradeContext?: Partial<RawRequest>) => Promise<WebSocket>)

interface FastifyInstance<RawServer, RawRequest, RawReply, Logger, TypeProvider> {
get: RouteShorthandMethod<RawServer, RawRequest, RawReply, TypeProvider, Logger>,
websocketServer: WebSocket.Server,
injectWS: InjectWSFn<RawRequest>
}

interface FastifyRequest {
Expand Down Expand Up @@ -67,7 +70,6 @@ type FastifyWebsocket = FastifyPluginCallback<fastifyWebsocket.WebsocketPluginOp
declare namespace fastifyWebsocket {

interface WebSocketServerOptions extends Omit<WebSocket.ServerOptions, "path"> { }

export type WebsocketHandler<
RawServer extends RawServerBase = RawServerDefault,
RawRequest extends RawRequestDefaultExpression<RawServer> = RawRequestDefaultExpression<RawServer>,
Expand All @@ -81,18 +83,15 @@ declare namespace fastifyWebsocket {
connection: SocketStream,
request: FastifyRequest<RequestGeneric, RawServer, RawRequest, SchemaCompiler, TypeProvider, ContextConfig, Logger>
) => void | Promise<any>;

export interface SocketStream extends Duplex {
socket: WebSocket;
}

export interface WebsocketPluginOptions {
errorHandler?: (this: FastifyInstance, error: Error, connection: SocketStream, request: FastifyRequest, reply: FastifyReply) => void;
options?: WebSocketServerOptions;
connectionOptions?: DuplexOptions;
preClose?: preCloseHookHandler | preCloseAsyncHookHandler;
}

export interface RouteOptions<
RawServer extends RawServerBase = RawServerDefault,
RawRequest extends RawRequestDefaultExpression<RawServer> = RawRequestDefaultExpression<RawServer>,
Expand Down
Loading