Skip to content

Commit

Permalink
fix: prevent packets leaks in cluster env (#777)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando authored Sep 15, 2022
1 parent c80a8df commit 4fad708
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 2 deletions.
22 changes: 22 additions & 0 deletions aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,19 @@ function Aedes (opts) {
this.brokers = {}

const heartbeatTopic = $SYS_PREFIX + that.id + '/heartbeat'
const birthTopic = $SYS_PREFIX + that.id + '/birth'

this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval)

const bufId = Buffer.from(that.id, 'utf8')

// in a cluster env this is used to warn other broker instances
// that this broker is alive
that.publish({
topic: birthTopic,
payload: bufId
}, noop)

function heartbeat () {
that.publish({
topic: heartbeatTopic,
Expand Down Expand Up @@ -141,6 +150,19 @@ function Aedes (opts) {
done()
})

this.mq.on($SYS_PREFIX + '+/birth', function brokerBorn (packet, done) {
const brokerId = packet.payload.toString()

// reset duplicates counter
if (brokerId !== that.id) {
for (const clientId in that.clients) {
delete that.clients[clientId].duplicates[brokerId]
}
}

done()
})

this.mq.on($SYS_PREFIX + '+/new/clients', function closeSameClients (packet, done) {
const serverId = packet.topic.split('/')[1]
const clientId = packet.payload.toString()
Expand Down
41 changes: 41 additions & 0 deletions test/events.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const { test } = require('tap')
const mqemitter = require('mqemitter')
const { setup, connect, subscribe } = require('./helper')
const aedes = require('../')

Expand All @@ -16,9 +17,49 @@ test('publishes an hearbeat', function (t) {
const id = message.topic.match(/\$SYS\/([^/]+)\/heartbeat/)[1]
t.equal(id, broker.id, 'broker id matches')
t.same(message.payload.toString(), id, 'message has id as the payload')
cb()
})
})

test('publishes birth', function (t) {
t.plan(4)

const mq = mqemitter()
const brokerId = 'test-broker'
const fakeBroker = 'fake-broker'
const clientId = 'test-client'

mq.on(`$SYS/${brokerId}/birth`, (message, cb) => {
t.pass('broker birth received')
t.same(message.payload.toString(), brokerId, 'message has id as the payload')
cb()
})

const broker = aedes({
id: brokerId,
mq
})

broker.on('client', (client) => {
t.equal(client.id, clientId, 'client connected')
// set a fake counter on a fake broker
process.nextTick(() => {
broker.clients[clientId].duplicates[fakeBroker] = 42
mq.emit({ topic: `$SYS/${fakeBroker}/birth`, payload: Buffer.from(fakeBroker) })
})
})

mq.on(`$SYS/${fakeBroker}/birth`, (message, cb) => {
process.nextTick(() => {
t.equal(!!broker.clients[clientId].duplicates[fakeBroker], false, 'client duplicates has been resetted')
cb()
})
})

const s = connect(setup(broker), { clientId })
t.teardown(s.broker.close.bind(s.broker))
})

;['$mcollina', '$SYS'].forEach(function (topic) {
test('does not forward $ prefixed topics to # subscription - ' + topic, function (t) {
t.plan(4)
Expand Down
4 changes: 2 additions & 2 deletions test/retain.js
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ test('new QoS 0 subscribers receive QoS 0 retained messages when clean', functio
})

clock.setTimeout(() => {
t.equal(broker.counter, 8)
t.equal(broker.counter, 9)
}, 200)
})

Expand Down Expand Up @@ -315,7 +315,7 @@ test('new QoS 0 subscribers receive downgraded QoS 1 retained messages when clea
})
})
broker.on('closed', function () {
t.equal(broker.counter, 7)
t.equal(broker.counter, 8)
})
})

Expand Down

0 comments on commit 4fad708

Please sign in to comment.