Skip to content

Commit 5e0a694

Browse files
authored
Update readme and add example to show how one might manage auto reconnecting (#189)
feature: add autoreconnect example and update logic to support it
1 parent b7f8f16 commit 5e0a694

13 files changed

+704
-178
lines changed

README.md

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,25 @@
55
## Table of Contents
66

77
- [RabbitMQ client for the stream protocol for Node.JS](#rabbitmq-client-for-the-stream-protocol-for-nodejs)
8+
- [Table of Contents](#table-of-contents)
89
- [Overview](#overview)
910
- [Installing via NPM](#installing-via-npm)
1011
- [Getting started](#getting-started)
1112
- [Usage](#usage)
1213
- [Connect](#connect)
13-
- [Connect through TLS/SSL](#connect-through-tls-ssl)
14+
- [Connect through TLS/SSL](#connect-through-tlsssl)
1415
- [Basic Publish](#basic-publish)
1516
- [Sub Batch Entry Publishing](#sub-batch-entry-publishing)
1617
- [Basic Consuming](#basic-consuming)
1718
- [Single Active Consumer](#single-active-consumer)
1819
- [Clustering](#clustering)
19-
- [Load Balancer](#loadbalancer)
20+
- [Load Balancer](#load-balancer)
21+
- [Super Stream](#super-stream)
22+
- [Filtering](#filtering)
23+
- [Mitigating connection issues](#mitigating-connection-issues)
2024
- [Running Examples](#running-examples)
2125
- [Build from source](#build-from-source)
2226
- [MISC](#misc)
23-
- [Super Stream](#super-stream)
24-
- [Filtering](#filtering)
2527

2628
## Overview
2729

@@ -210,9 +212,12 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message
210212

211213
### Clustering
212214

213-
Every time we create a new producer or a new consumer, a new connection is created.
214-
In particular for the producer the connection is created on the node leader.
215-
For more running the tests in a cluster follow the readme under the folder /cluster
215+
Every time we create a new producer or a new consumer, a new connection object is created. The underlying TCP connections can be shared among different producers and different consumers. Note however that:
216+
217+
- each `Client` instance has a unique connection, which is not shared in any case.
218+
- for producers the connection is created on the node leader.
219+
- consumers and producers do not share connections.
220+
For more about running the tests in a cluster follow the readme under the folder /cluster
216221

217222
### Load Balancer
218223

@@ -320,6 +325,41 @@ await sleep(2000)
320325
await client.close()
321326
```
322327

328+
### Mitigating connection issues
329+
330+
The library exposes some utility functions and properties that can help in building a more robust client application. One simple use case that is addressed in one of the examples (`example/autoreconnect_example.js`) shows how to build a client application that can handle simple network issues like a temporary disconnection. In this scenario we are _not_ dealing with a complex broker-side service disruption or a cluster reorganization; in particular, we assume that the stream topology and the node host names do not change.
331+
332+
The approach can be simply summed up as: register a `connection_closed` listener when instantiating a `Client` object, and then call the `client.restart().then(...)` method in its body.
333+
334+
```typescript
335+
const connectionClosedCallback = () => {
336+
logger.info(`In connection closed event...`)
337+
client
338+
.restart()
339+
.then(() => {
340+
logger.info(`Connections restarted!`)
341+
})
342+
.catch((reason) => {
343+
logger.warn(`Could not reconnect to Rabbit! ${reason}`)
344+
})
345+
}
346+
347+
client = await rabbit.connect({
348+
//...
349+
listeners: { connection_closed: connectionClosedCallback },
350+
//...
351+
})
352+
```
353+
354+
There are various considerations to keep in mind when building a client application around these features:
355+
356+
- this `connection_closed` callback is registered on the event of the TCP socket used by the `Client` object instance. This callback cannot be an `async` function, so we need to use `then(...).catch(...)` to deal with the outcome of the restart attempt.
357+
- clients, producers and consumers expose a utility function `getConnectionInfo()` that returns information about the state of the underlying TCP socket and the logical connection to the broker. In particular, the `ready` field indicates if the handshake with the broker completed correctly.
358+
- consider using the outbox pattern when sending messages to the broker.
359+
- some form of deduplication should be implementend in the client application when receiving messages: the use of offsets when defining consumer instances does not avoid the possibility of receiving messages multiple times.
360+
361+
See the comments and the implemenation in `example/autoreconnect_example.js` for a more in-depth view.
362+
323363
## Running Examples
324364

325365
the folder /example contains a project that shows some examples on how to use the lib, to run it follow this steps

example/package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 252 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,274 @@
1+
/*
2+
3+
Auto-reconnect example: mitigate simple network disconnection issues
4+
5+
In this example we assume that:
6+
- the stream topology does not change (i.e. leader/replicas nodes do not change)
7+
- hostnames/ip addresses do not change
8+
- the connection_closed event is triggered on the TCP connection used by the Client instance
9+
10+
The example is composed of
11+
- message generation part (mimicks the behavior of a client application)
12+
- toy outbox pattern implementation (in-memory structure, no persistence of data, etc.)
13+
- a client instance with a registered callback on the connection_closed event
14+
- scheduled delivery of messages through a producer
15+
- very simple publish_confirm handling
16+
- one consumer
17+
- a scheduled reachability interruption (in this case obtained by launching `docker-compose restart`)
18+
- a scheduled process that logs the state of the application (connections, message counters)
19+
*/
20+
121
const rabbit = require("rabbitmq-stream-js-client")
222
const { randomUUID } = require("crypto")
23+
const { exec } = require("child_process")
24+
const { promisify, inspect } = require("util")
25+
const promiseExec = promisify(exec)
26+
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))
327

428
const rabbitUser = process.env.RABBITMQ_USER || "rabbit"
529
const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit"
30+
let client = undefined
31+
let publisher = undefined
32+
let consumer = undefined
33+
let consumerOffset = rabbit.Offset.first()
34+
let streamName = `example-${randomUUID()}`
35+
const publisherOutbox = { messageIds: [], messages: new Map(), publishingIds: new Map(), offset: 0 }
36+
const received = new Set()
37+
let publishingId = 1n
38+
let restartCount = 0
39+
let published = 0
40+
let confirmed = 0
41+
let callbackCalled = 0
42+
const logger = {
43+
debug: (msg) => { return },
44+
info: (msg) => console.log(`[info]\t[${new Date().toISOString()}] ${msg}`),
45+
error: (msg) => console.log(`[error]\t[${new Date().toISOString()}] ${msg}`),
46+
warn: (msg) => console.log(`[warn]\t[${new Date().toISOString()}] ${msg}`),
47+
}
48+
49+
50+
function getNodesFromEnv() {
51+
const envValue = process.env.RABBIT_MQ_TEST_NODES ?? "localhost:5552"
52+
const nodes = envValue.split(";")
53+
return nodes.map((n) => {
54+
const [host, port] = n.split(":")
55+
return { host: host ?? "localhost", port: parseInt(port) ?? 5552 }
56+
})
57+
}
58+
59+
async function triggerConnectionIssue() {
60+
const res = await promiseExec("cd .. && docker-compose restart")
61+
return true
62+
}
63+
64+
/*
65+
very simple message generation function
66+
*/
67+
function generateMessage() {
68+
const payload = Buffer.from(`${randomUUID()}`)
69+
const messageId = `${randomUUID()}`
70+
return { payload, messageId }
71+
}
72+
73+
/*
74+
at each iteration, a new message is put in the outbox.
75+
This mimicks a client application that generates messages to be sent.
76+
*/
77+
function scheduleMessageProduction() {
78+
setInterval(() => {
79+
const { payload, messageId } = generateMessage()
80+
publisherOutbox.messageIds.push(messageId)
81+
publisherOutbox.messages.set(messageId, payload)
82+
}, 50)
83+
}
84+
85+
/*
86+
at each iteration, a new message is read from the outbox and sent using the publisher.
87+
Note that the operation is executed only if
88+
there is a new message to be sent and if the publisher connection is at least established.
89+
If the publisher is not `ready`, then the message will be cached internally.
90+
*/
91+
async function scheduleMessageDelivery() {
92+
setInterval(async () => {
93+
//keep track of the last message sent (but not yet confirmed)
94+
const messageOffset = publisherOutbox.offset
95+
const oldestMessageId = publisherOutbox.messageIds[messageOffset]
96+
//is the publisher socket open?
97+
const { writable } = publisher?.getConnectionInfo() ?? false
98+
if (publisher && writable && oldestMessageId !== undefined) {
99+
const message = publisherOutbox.messages.get(oldestMessageId)
100+
const res = await publisher.send(message, { messageProperties: { messageId: `${oldestMessageId}` } })
101+
published++
102+
publisherOutbox.offset++
103+
if (res.publishingId !== undefined) {
104+
//keep track of the messageId, by mapping it with the protocol-generated publishingId
105+
publisherOutbox.publishingIds.set(res.publishingId, oldestMessageId)
106+
}
107+
}
108+
}, 10)
109+
}
110+
111+
/*
112+
at each interval, the state of the outbox, the message counters and the state of client connections will be logged.
113+
*/
114+
function scheduleLogInfo() {
115+
setInterval(() => {
116+
logger.info(`outbox queue length: ${publisherOutbox.messageIds.length} offset ${publisherOutbox.offset}`)
117+
logger.info(`${inspect({ published, confirmed, received: received.size })}`)
118+
logger.info(`client local port: ${inspect(client && client.getConnectionInfo().localPort)} consumer local port: ${inspect(consumer && consumer.getConnectionInfo().localPort)} publisher local port: ${inspect(publisher && publisher.getConnectionInfo().localPort)}`)
119+
}, 3000)
120+
}
6121

7-
async function main() {
8-
const streamName = `example-${randomUUID()}`
9-
console.log(`Creating stream ${streamName}`)
10122

11-
let client = undefined
123+
/*
124+
at each interval, trigger a connection problem.
125+
*/
126+
async function triggerConnectionIssues() {
127+
return new Promise((res, rej) => {
128+
setInterval(async () => {
129+
logger.info("Closing!")
130+
restartCount++
131+
await triggerConnectionIssue()
132+
if (restartCount >= 1000) {
133+
try {
134+
logger.info("Terminating...")
135+
if (client) await client.close()
136+
res(true)
137+
return
138+
}
139+
catch (e) {
140+
rej(e)
141+
return
142+
}
143+
}
144+
//after this message is logged, the client connections should reopen
145+
logger.info("\nNow it should reopen!\n")
146+
}, 60000)
147+
})
148+
}
149+
150+
/*
151+
when setting up the publisher, we register a callback on the `publish_confirm` event that
152+
informs us that the broker has correctly received the sent message. This triggers an update on
153+
the outbox state (the message is considered as sent)
154+
*/
155+
async function setupPublisher(client) {
156+
const publisherRef = `publisher - ${randomUUID()}`
157+
const publisherConfig = { stream: streamName, publisherRef: publisherRef, connectionClosedListener: (err) => { return } }
158+
/*
159+
confirmedIds contains the list of publishingIds linked to messages correctly published in the stream
160+
These ids are not the messageIds that have been set in the message properties
161+
*/
162+
const publisherConfirmCallback = (err, confirmedIds) => {
163+
if (err) {
164+
logger.info(`Publish confirm error ${inspect(err)} `)
165+
return
166+
}
167+
confirmed = confirmed + confirmedIds.length
168+
confirmedMessageIds = confirmedIds.map((publishingId) => {
169+
const messageId = publisherOutbox.publishingIds.get(publishingId)
170+
publisherOutbox.publishingIds.delete(publishingId)
171+
return messageId
172+
})
12173

13-
const connectToRabbit = async () => {
174+
publisherOutbox.messageIds = publisherOutbox.messageIds.filter((id) => {
175+
return !confirmedMessageIds.includes(id)
176+
})
177+
confirmedMessageIds.forEach((id) => {
178+
if (publisherOutbox.messages.delete(id)) {
179+
publisherOutbox.offset = publisherOutbox.offset - 1
180+
}
181+
})
182+
}
183+
const publisher = await client.declarePublisher(publisherConfig)
184+
publisher.on("publish_confirm", publisherConfirmCallback)
185+
publisherOutbox.offset = 0
186+
return publisher
187+
}
188+
189+
/*
190+
in the consumer we can use the `messageId` property to make sure each message is "handled" once.
191+
*/
192+
async function setupConsumer(client) {
193+
const consumerConfig = { stream: streamName, offset: rabbit.Offset.timestamp(new Date()), connectionClosedListener: (err) => { return } }
194+
const receiveCallback = (msg) => {
195+
const msgId = msg.messageProperties.messageId
196+
if (received.has(msgId)) {
197+
/*On restart, the consumer sets automatically its offset as the latest handled message index.
198+
For sanity, some sort of deduplication is still needed.
199+
@see https://blog.rabbitmq.com/posts/2021/09/rabbitmq-streams-offset-tracking/
200+
and Consumer.storeOffset and Consumer.queryOffset for a more complete approach
201+
*/
202+
logger.info(`dedup: ${msgId}`)
203+
}
204+
received.add(msgId)
205+
consumerOffset = msg.offset
206+
return
207+
}
208+
return client.declareConsumer(consumerConfig, receiveCallback)
209+
}
210+
211+
212+
/*
213+
setup of a client instance, a producer and a consumer.
214+
The core of the example is represented by the implementation of the
215+
`connection_closed` callback, in which the `client.restart()` method is called.
216+
This triggers the reset of all TCP sockets involved, for all producers and consumers,
217+
as well as for the TCP socket used by the client itself.
218+
*/
219+
async function setup() {
220+
try {
221+
const connectionClosedCallback = () => {
222+
logger.info(`In connection closed event...`)
223+
callbackCalled++
224+
if (callbackCalled > 10) {
225+
process.exit(0)
226+
}
227+
client.restart().then(() => {
228+
logger.info(`Connections restarted!`)
229+
}).catch((reason) => {
230+
logger.warn(`Could not reconnect to Rabbit! ${reason}`)
231+
})
232+
}
233+
const firstNode = getNodesFromEnv()[0]
234+
logger.info(`Now invoking rabbit.connect on ${inspect(firstNode)}`)
14235
client = await rabbit.connect({
15-
hostname: "localhost",
16-
port: 5553,
236+
hostname: firstNode.host,
237+
port: firstNode.port,
17238
username: rabbitUser,
18239
password: rabbitPassword,
19-
listeners: {
20-
connection_closed: async () => {
21-
await sleep(Math.random() * 3000)
22-
connectToRabbit()
23-
.then(() => console.log("Successfully re-connected to rabbit!"))
24-
.catch((e) => console.error("Error while reconnecting to Rabbit!", e))
25-
},
26-
},
240+
listeners: { connection_closed: connectionClosedCallback, },
27241
vhost: "/",
28242
heartbeat: 0,
29-
})
243+
}, logger)
244+
await client.createStream({ stream: streamName, arguments: {} })
245+
publisher = await setupPublisher(client)
246+
consumer = await setupConsumer(client)
247+
return { client, publisher, consumer }
30248
}
249+
catch (err) {
250+
logger.warn(`Setup-wide error: ${inspect(err)}`)
251+
}
252+
}
31253

32-
await connectToRabbit()
33-
34-
await sleep(2000)
35-
36-
console.log("Closing!")
37-
await client.close()
38-
console.log("Now it should reopen!")
39254

40-
await sleep(10000)
255+
async function main() {
256+
//instantiate the client, the producer and the consumer
257+
await setup()
258+
//schedule the task that inserts new messages in the outbox
259+
scheduleMessageProduction()
260+
//schedule the task that attempts to send a message to the broker, taking it from the outbox
261+
await scheduleMessageDelivery()
262+
//schedule the task that logs connection info and message counters
263+
scheduleLogInfo()
264+
//schedule the task that triggers a (more or less simulated) network issue
265+
await triggerConnectionIssues()
41266
}
42267

43268
main()
44-
.then(() => console.log("done!"))
269+
.then(() => logger.info("done!"))
45270
.catch((res) => {
46-
console.log("ERROR ", res)
271+
logger.info("ERROR ", res)
47272
process.exit(-1)
48273
})
49-
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))
274+

0 commit comments

Comments
 (0)