Skip to content

feat: add context to hooks #407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,15 @@ See the example in [examples/reconnection](examples/reconnection).

## wsHooks

On websocket events, the following hooks are available, note **the hooks are all synchronous**.
On websocket events, the following hooks are available, note **the hooks are all synchronous**.
The `context` object is passed to all hooks and contains the `log` property.

- `onIncomingMessage`: A hook function that is called when the request is received from the client `onIncomingMessage(source, target, { data, binary })` (default: `undefined`).
- `onOutgoingMessage`: A hook function that is called when the response is received from the target `onOutgoingMessage(source, target, { data, binary })` (default: `undefined`).
- `onIncomingMessage`: A hook function that is called when the request is received from the client `onIncomingMessage(context, source, target, { data, binary })` (default: `undefined`).
- `onOutgoingMessage`: A hook function that is called when the response is received from the target `onOutgoingMessage(context, source, target, { data, binary })` (default: `undefined`).
- `onConnect`: A hook function that is called when the connection is established `onConnect(source, target)` (default: `undefined`).
- `onDisconnect`: A hook function that is called when the connection is closed `onDisconnect(source)` (default: `undefined`).
- `onReconnect`: A hook function that is called when the connection is reconnected `onReconnect(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled.
- `onPong`: A hook function that is called when the target responds to the ping `onPong(source, target)` (default: `undefined`). The function is called if reconnection feature is enabled.
- `onDisconnect`: A hook function that is called when the connection is closed `onDisconnect(context, source)` (default: `undefined`).
- `onReconnect`: A hook function that is called when the connection is reconnected `onReconnect(context, source, target)` (default: `undefined`). The function is called if reconnection feature is enabled.
- `onPong`: A hook function that is called when the target responds to the ping `onPong(context, source, target)` (default: `undefined`). The function is called if reconnection feature is enabled.

## Benchmarks

Expand Down
4 changes: 2 additions & 2 deletions examples/reconnection/proxy/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async function main () {
// clean backup from the last ping
backup = backup.filter(message => message.timestamp > lastPong)
},
onIncomingMessage: (source, target, message) => {
onIncomingMessage: (context, source, target, message) => {
const m = message.data.toString()
console.log('onIncomingMessage backup', m)
backup.push({ message: m, timestamp: Date.now() })
Expand All @@ -49,7 +49,7 @@ async function main () {
console.log('onDisconnect')
backup.length = 0
},
onReconnect: (source, target) => {
onReconnect: (context, source, target) => {
console.log('onReconnect')
resendMessages(target)
},
Expand Down
26 changes: 16 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,17 @@ function isExternalUrl (url) {

function noop () { }

function createContext (logger) {
return { log: logger }
}

function proxyWebSockets (logger, source, target, hooks) {
const context = createContext(logger)
function close (code, reason) {
if (hooks.onDisconnect) {
waitConnection(target, () => {
try {
hooks.onDisconnect(source)
hooks.onDisconnect(context, source)
} catch (err) {
logger.error({ err }, 'proxy ws error from onDisconnect hook')
}
Expand All @@ -100,7 +105,7 @@ function proxyWebSockets (logger, source, target, hooks) {
source.on('message', (data, binary) => {
if (hooks.onIncomingMessage) {
try {
hooks.onIncomingMessage(source, target, { data, binary })
hooks.onIncomingMessage(context, source, target, { data, binary })
} catch (err) {
logger.error({ err }, 'proxy ws error from onIncomingMessage hook')
}
Expand All @@ -121,7 +126,7 @@ function proxyWebSockets (logger, source, target, hooks) {
target.on('message', (data, binary) => {
if (hooks.onOutgoingMessage) {
try {
hooks.onOutgoingMessage(source, target, { data, binary })
hooks.onOutgoingMessage(context, source, target, { data, binary })
} catch (err) {
logger.error({ err }, 'proxy ws error from onOutgoingMessage hook')
}
Expand All @@ -141,7 +146,7 @@ function proxyWebSockets (logger, source, target, hooks) {
if (hooks.onConnect) {
waitConnection(target, () => {
try {
hooks.onConnect(source, target)
hooks.onConnect(context, source, target)
} catch (err) {
logger.error({ err }, 'proxy ws error from onConnect hook')
}
Expand Down Expand Up @@ -189,6 +194,7 @@ async function reconnect (logger, source, reconnectOptions, hooks, targetParams)
}

function proxyWebSocketsWithReconnection (logger, source, target, options, hooks, targetParams, isReconnecting = false) {
const context = createContext(logger)
function close (code, reason) {
target.pingTimer && clearInterval(target.pingTimer)
target.pingTimer = undefined
Expand All @@ -206,7 +212,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, hooks

if (hooks.onDisconnect) {
try {
hooks.onDisconnect(source)
hooks.onDisconnect(context, source)
} catch (err) {
options.logs && logger.error({ target: targetParams.url, err }, 'proxy ws error from onDisconnect hook')
}
Expand All @@ -231,7 +237,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, hooks
source.isAlive = true
if (hooks.onIncomingMessage) {
try {
hooks.onIncomingMessage(source, target, { data, binary })
hooks.onIncomingMessage(context, source, target, { data, binary })
} catch (err) {
logger.error({ target: targetParams.url, err }, 'proxy ws error from onIncomingMessage hook')
}
Expand Down Expand Up @@ -281,7 +287,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, hooks
target.isAlive = true
if (hooks.onOutgoingMessage) {
try {
hooks.onOutgoingMessage(source, target, { data, binary })
hooks.onOutgoingMessage(context, source, target, { data, binary })
} catch (err) {
logger.error({ target: targetParams.url, err }, 'proxy ws error from onOutgoingMessage hook')
}
Expand All @@ -296,7 +302,7 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, hooks
target.isAlive = true
if (hooks.onPong) {
try {
hooks.onPong(source, target)
hooks.onPong(context, source, target)
} catch (err) {
logger.error({ target: targetParams.url, err }, 'proxy ws error from onPong hook')
}
Expand Down Expand Up @@ -336,13 +342,13 @@ function proxyWebSocketsWithReconnection (logger, source, target, options, hooks
// call onConnect and onReconnect callbacks after the events are bound
if (isReconnecting && hooks.onReconnect) {
try {
hooks.onReconnect(source, target)
hooks.onReconnect(context, source, target)
} catch (err) {
options.logs && logger.error({ target: targetParams.url, err }, 'proxy ws error from onReconnect hook')
}
} else if (hooks.onConnect) {
try {
hooks.onConnect(source, target)
hooks.onConnect(context, source, target)
} catch (err) {
options.logs && logger.error({ target: targetParams.url, err }, 'proxy ws error from onConnect hook')
}
Expand Down
26 changes: 13 additions & 13 deletions test/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -716,18 +716,18 @@ test('multiple websocket upstreams with distinct server options', async (t) => {
test('should call onIncomingMessage and onOutgoingMessage hooks', async (t) => {
const request = 'query () { ... }'
const response = 'data ...'
const onIncomingMessage = (source, target, { data, binary }) => {
const onIncomingMessage = (context, source, target, { data, binary }) => {
assert.strictEqual(data.toString(), request)
assert.strictEqual(binary, false)
logger.info('onIncomingMessage called')
context.log.info('onIncomingMessage called')
}
const onOutgoingMessage = (source, target, { data, binary }) => {
const onOutgoingMessage = (context, source, target, { data, binary }) => {
assert.strictEqual(data.toString(), response)
assert.strictEqual(binary, false)
logger.info('onOutgoingMessage called')
context.log.info('onOutgoingMessage called')
}

const { target, loggerSpy, logger, client } = await createServices({ t, wsHooks: { onIncomingMessage, onOutgoingMessage } })
const { target, loggerSpy, client } = await createServices({ t, wsHooks: { onIncomingMessage, onOutgoingMessage } })

target.ws.on('connection', async (socket) => {
socket.on('message', async (data, binary) => {
Expand All @@ -744,12 +744,12 @@ test('should call onIncomingMessage and onOutgoingMessage hooks', async (t) => {
test('should handle throwing an error in onIncomingMessage and onOutgoingMessage hooks', async (t) => {
const request = 'query () { ... }'
const response = 'data ...'
const onIncomingMessage = (source, target, { data, binary }) => {
const onIncomingMessage = (context, source, target, { data, binary }) => {
assert.strictEqual(data.toString(), request)
assert.strictEqual(binary, false)
throw new Error('onIncomingMessage error')
}
const onOutgoingMessage = (source, target, { data, binary }) => {
const onOutgoingMessage = (context, source, target, { data, binary }) => {
assert.strictEqual(data.toString(), response)
assert.strictEqual(binary, false)
throw new Error('onOutgoingMessage error')
Expand All @@ -770,11 +770,11 @@ test('should handle throwing an error in onIncomingMessage and onOutgoingMessage
})

test('should call onConnect hook', async (t) => {
const onConnect = () => {
logger.info('onConnect called')
const onConnect = (context) => {
context.log.info('onConnect called')
}

const { loggerSpy, logger } = await createServices({ t, wsHooks: { onConnect } })
const { loggerSpy } = await createServices({ t, wsHooks: { onConnect } })

await waitForLogMessage(loggerSpy, 'onConnect called')
})
Expand All @@ -790,11 +790,11 @@ test('should handle throwing an error in onConnect hook', async (t) => {
})

test('should call onDisconnect hook', async (t) => {
const onDisconnect = () => {
logger.info('onDisconnect called')
const onDisconnect = (context) => {
context.log.info('onDisconnect called')
}

const { loggerSpy, logger, client } = await createServices({ t, wsHooks: { onDisconnect } })
const { loggerSpy, client } = await createServices({ t, wsHooks: { onDisconnect } })
client.close()

await waitForLogMessage(loggerSpy, 'onDisconnect called')
Expand Down
34 changes: 17 additions & 17 deletions test/ws-reconnect.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ test('should reconnect when the target connection is closed gracefully and recon
})

test('should call onReconnect hook when the connection is reconnected', async (t) => {
const onReconnect = (source, target) => {
logger.info('onReconnect called')
const onReconnect = (context, source, target) => {
context.log.info('onReconnect called')
}
const wsReconnectOptions = {
pingInterval: 100,
Expand All @@ -156,7 +156,7 @@ test('should call onReconnect hook when the connection is reconnected', async (t
logs: true,
}

const { target, loggerSpy, logger } = await createServices({ t, wsReconnectOptions, wsHooks: { onReconnect } })
const { target, loggerSpy } = await createServices({ t, wsReconnectOptions, wsHooks: { onReconnect } })

target.ws.on('connection', async (socket) => {
socket.on('ping', async () => {
Expand All @@ -173,7 +173,7 @@ test('should call onReconnect hook when the connection is reconnected', async (t
})

test('should handle throwing an error in onReconnect hook', async (t) => {
const onReconnect = (source, target) => {
const onReconnect = () => {
throw new Error('onReconnect error')
}
const wsReconnectOptions = {
Expand Down Expand Up @@ -203,15 +203,15 @@ test('should handle throwing an error in onReconnect hook', async (t) => {
test('should call onIncomingMessage and onOutgoingMessage hooks, with reconnection', async (t) => {
const request = 'query () { ... }'
const response = 'data ...'
const onIncomingMessage = (source, target, { data, binary }) => {
const onIncomingMessage = (context, source, target, { data, binary }) => {
assert.strictEqual(data.toString(), request)
assert.strictEqual(binary, false)
logger.info('onIncomingMessage called')
context.log.info('onIncomingMessage called')
}
const onOutgoingMessage = (source, target, { data, binary }) => {
const onOutgoingMessage = (context, source, target, { data, binary }) => {
assert.strictEqual(data.toString(), response)
assert.strictEqual(binary, false)
logger.info('onOutgoingMessage called')
context.log.info('onOutgoingMessage called')
}
const wsReconnectOptions = {
pingInterval: 100,
Expand All @@ -220,7 +220,7 @@ test('should call onIncomingMessage and onOutgoingMessage hooks, with reconnecti
logs: true,
}

const { target, loggerSpy, logger, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onIncomingMessage, onOutgoingMessage } })
const { target, loggerSpy, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onIncomingMessage, onOutgoingMessage } })

target.ws.on('connection', async (socket) => {
socket.on('message', async (data, binary) => {
Expand All @@ -237,12 +237,12 @@ test('should call onIncomingMessage and onOutgoingMessage hooks, with reconnecti
test('should handle throwing an error in onIncomingMessage and onOutgoingMessage hooks, with reconnection', async (t) => {
const request = 'query () { ... }'
const response = 'data ...'
const onIncomingMessage = ({ data, binary }) => {
const onIncomingMessage = (context, source, target, { data, binary }) => {
assert.strictEqual(data.toString(), request)
assert.strictEqual(binary, false)
throw new Error('onIncomingMessage error')
}
const onOutgoingMessage = ({ data, binary }) => {
const onOutgoingMessage = (context, source, target, { data, binary }) => {
assert.strictEqual(data.toString(), response)
assert.strictEqual(binary, false)
throw new Error('onOutgoingMessage error')
Expand All @@ -269,15 +269,15 @@ test('should handle throwing an error in onIncomingMessage and onOutgoingMessage
})

test('should call onConnect hook', async (t) => {
const onConnect = () => {
logger.info('onConnect called')
const onConnect = (context) => {
context.log.info('onConnect called')
}

const wsReconnectOptions = {
logs: true,
}

const { loggerSpy, logger } = await createServices({ t, wsReconnectOptions, wsHooks: { onConnect } })
const { loggerSpy } = await createServices({ t, wsReconnectOptions, wsHooks: { onConnect } })

await waitForLogMessage(loggerSpy, 'onConnect called')
})
Expand All @@ -297,15 +297,15 @@ test('should handle throwing an error in onConnect hook', async (t) => {
})

test('should call onDisconnect hook', async (t) => {
const onDisconnect = () => {
logger.info('onDisconnect called')
const onDisconnect = (context) => {
context.log.info('onDisconnect called')
}

const wsReconnectOptions = {
logs: true,
}

const { loggerSpy, logger, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onDisconnect } })
const { loggerSpy, client } = await createServices({ t, wsReconnectOptions, wsHooks: { onDisconnect } })
client.close()

await waitForLogMessage(loggerSpy, 'onDisconnect called')
Expand Down
Loading