Skip to content

Commit

Permalink
feat: nolocal flag support for bridges (#737)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Lando <daniel.sorridi@gmail.com>
  • Loading branch information
oldrich-s and robertsLando authored May 12, 2022
1 parent 00549ee commit f82b8f8
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 33 deletions.
1 change: 1 addition & 0 deletions aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ function storeRetained (packet, done) {
}

function emitPacket (packet, done) {
if (this.client) packet.clientId = this.client.id
this.broker.mq.emit(packet, done)
}

Expand Down
17 changes: 14 additions & 3 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,20 @@ function Client (broker, conn, req) {
conn.on('end', this.close.bind(this))
this._eos = eos(this.conn, this.close.bind(this))

this.deliver0 = function deliverQoS0 (_packet, cb) {
const getToForwardPacket = (_packet) => {
// Mqttv5 3.8.3.1: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html#_Toc3901169
// prevent to forward messages sent by the same client when no-local flag is set
if (_packet.clientId === that.id && _packet.nl) return

const toForward = dedupe(that, _packet) &&
that.broker.authorizeForward(that, _packet)

return toForward
}

this.deliver0 = function deliverQoS0 (_packet, cb) {
const toForward = getToForwardPacket(_packet)

if (toForward) {
// Give nodejs some time to clear stacks, or we will see
// "Maximum call stack size exceeded" in a very high load
Expand All @@ -114,8 +125,8 @@ function Client (broker, conn, req) {
that.deliver0(_packet, cb)
return
}
const toForward = dedupe(that, _packet) &&
that.broker.authorizeForward(that, _packet)
const toForward = getToForwardPacket(_packet)

if (toForward) {
setImmediate(() => {
const packet = new QoSPacket(toForward, that)
Expand Down
13 changes: 6 additions & 7 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,12 @@ function addSubs (sub, done) {
const nl = this.nl
let func = qos > 0 ? client.deliverQoS : client.deliver0

if (!rap) {
const deliverFunc = func
func = function handlePacketSubscription (_packet, cb) {
_packet = new Packet(_packet, broker)
_packet.retain = false
deliverFunc(_packet, cb)
}
const deliverFunc = func
func = function handlePacketSubscription (_packet, cb) {
_packet = new Packet(_packet, broker)
_packet.nl = nl
if (!rap) _packet.retain = false
deliverFunc(_packet, cb)
}

// [MQTT-4.7.2-1]
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@
"websocket-stream": "^5.5.2"
},
"dependencies": {
"aedes-packet": "^2.3.1",
"aedes-persistence": "^9.0.1",
"aedes-packet": "^3.0.0",
"aedes-persistence": "^9.1.1",
"end-of-stream": "^1.4.4",
"fastfall": "^1.5.1",
"fastparallel": "^2.4.1",
Expand Down
38 changes: 28 additions & 10 deletions test/auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ test('authentication error when non numeric return code is passed', function (t)
test('authorize publish', function (t) {
t.plan(4)

const s = connect(setup())
const s = connect(setup(), { clientId: 'my-client-xyz' })
t.teardown(s.broker.close.bind(s.broker))

const expected = {
Expand All @@ -445,6 +445,7 @@ test('authorize publish', function (t) {
t.notOk(Object.prototype.hasOwnProperty.call(packet, 'messageId'), 'should not contain messageId in QoS 0')
expected.brokerId = s.broker.id
expected.brokerCounter = s.broker.counter
expected.clientId = 'my-client-xyz'
delete expected.length
t.same(packet, expected, 'packet matches')
cb()
Expand All @@ -460,7 +461,7 @@ test('authorize publish', function (t) {
test('authorize waits for authenticate', function (t) {
t.plan(6)

const s = setup()
const s = setup(aedes({ clientId: 'my-client-xyz-2' }))
t.teardown(s.broker.close.bind(s.broker))

s.broker.authenticate = function (client, username, password, cb) {
Expand All @@ -485,7 +486,8 @@ test('authorize waits for authenticate', function (t) {
qos: 0,
retain: false,
length: 12,
dup: false
dup: false,
clientId: 'my-client'
}

s.broker.mq.on('hello', function (packet, cb) {
Expand Down Expand Up @@ -519,12 +521,13 @@ test('authorize publish from configOptions', function (t) {
t.plan(4)

const s = connect(setup(aedes({
clientId: 'my-client-xyz-3',
authorizePublish: function (client, packet, cb) {
t.ok(client, 'client exists')
t.same(packet, expected, 'packet matches')
cb()
}
})))
})), { clientId: 'my-client-xyz-3' })
t.teardown(s.broker.close.bind(s.broker))

const expected = {
Expand All @@ -541,6 +544,7 @@ test('authorize publish from configOptions', function (t) {
t.notOk(Object.prototype.hasOwnProperty.call(packet, 'messageId'), 'should not contain messageId in QoS 0')
expected.brokerId = s.broker.id
expected.brokerCounter = s.broker.counter
expected.clientId = 'my-client-xyz-3'
delete expected.length
t.same(packet, expected, 'packet matches')
cb()
Expand Down Expand Up @@ -589,7 +593,7 @@ test('do not authorize publish', function (t) {
test('modify qos out of range in authorize publish ', function (t) {
t.plan(2)

const s = connect(setup())
const s = connect(setup(), { clientId: 'my-client-xyz-4' })
t.teardown(s.broker.close.bind(s.broker))

const expected = {
Expand All @@ -599,7 +603,8 @@ test('modify qos out of range in authorize publish ', function (t) {
qos: 0,
retain: false,
length: 12,
dup: false
dup: false,
clientId: 'my-client-xyz-4'
}

s.broker.authorizePublish = function (client, packet, cb) {
Expand Down Expand Up @@ -805,12 +810,19 @@ test('negate multiple subscriptions', function (t) {
test('negate subscription with correct persistence', function (t) {
t.plan(6)

// rh, rap, nl are undefined because mqtt.parser is set to MQTT 3.1.1 and will thus erase these props from s.inStream.write
const expected = [{
topic: 'hello',
qos: 0
qos: 0,
rh: undefined,
rap: undefined,
nl: undefined
}, {
topic: 'world',
qos: 0
qos: 0,
rh: undefined,
rap: undefined,
nl: undefined
}]

const broker = aedes()
Expand Down Expand Up @@ -839,10 +851,16 @@ test('negate subscription with correct persistence', function (t) {
messageId: 24,
subscriptions: [{
topic: 'hello',
qos: 0
qos: 0,
rh: 0,
rap: true,
nl: false
}, {
topic: 'world',
qos: 0
qos: 0,
rh: 0,
rap: true,
nl: false
}]
})
})
Expand Down
12 changes: 8 additions & 4 deletions test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ test('test aedes.Server', function (t) {
test('publish QoS 0', function (t) {
t.plan(2)

const s = connect(setup())
const s = connect(setup(), { clientId: 'my-client-xyz-5' })
t.teardown(s.broker.close.bind(s.broker))

const expected = {
Expand All @@ -29,7 +29,8 @@ test('publish QoS 0', function (t) {
payload: Buffer.from('world'),
qos: 0,
retain: false,
dup: false
dup: false,
clientId: 'my-client-xyz-5'
}

s.broker.mq.on('hello', function (packet, cb) {
Expand Down Expand Up @@ -128,7 +129,7 @@ test('publish to $SYS topic throws error', function (t) {
qos: 0,
retain: false
}
const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos }]
const expectedSubs = ele.clean ? null : [{ topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }]

subscribe(t, s, 'hello', ele.qos, function () {
s.outStream.once('data', function (packet) {
Expand Down Expand Up @@ -188,7 +189,10 @@ test('return write errors to callback', function (t) {
qos: 0,
retain: false
}
const subs = [{ topic: 'hello', qos: ele.qos }, { topic: 'world', qos: ele.qos }]
const subs = [
{ topic: 'hello', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined },
{ topic: 'world', qos: ele.qos, rh: undefined, rap: undefined, nl: undefined }
]
const expectedSubs = ele.clean ? null : subs

subscribeMultiple(t, s, subs, [ele.qos, ele.qos], function () {
Expand Down
57 changes: 57 additions & 0 deletions test/bridge.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict'

const { test } = require('tap')
const { setup, connect, subscribe } = require('./helper')

for (const qos of [0, 1, 2]) {
const packet = {
qos,
cmd: 'publish',
topic: 'hello',
payload: 'world'
}

if (qos > 0) packet.messageId = 42

test('normal client sends a publish message and shall receive it back, qos = ' + qos, function (t) {
const s = connect(setup())
t.teardown(s.broker.close.bind(s.broker))

const handle = setTimeout(() => {
t.fail('did not receive packet back')
t.end()
}, 1000)

subscribe(t, s, 'hello', qos, function () {
s.outStream.on('data', (packet) => {
if (packet.cmd === 'publish') {
clearTimeout(handle)
t.end()
} else if (packet.cmd === 'pubrec') {
s.inStream.write({ cmd: 'pubrel', messageId: 42 })
}
})

s.inStream.write(packet)
})
})

test('bridge client sends a publish message but shall not receive it back, qos = ' + qos, function (t) {
// protocolVersion 128 + 4 means mqtt 3.1.1 with bridgeMode enabled
// https://github.com/mqttjs/mqtt-packet/blob/7f7c2ed8bcb4b2c582851d120a94e0b4a731f661/parser.js#L171
const s = connect(setup(), { clientId: 'my-client-bridge-1', protocolVersion: 128 + 4 })
t.teardown(s.broker.close.bind(s.broker))

const handle = setTimeout(() => t.end(), 1000)

subscribe(t, s, 'hello', qos, function () {
s.outStream.on('data', function () {
clearTimeout(handle)
t.fail('should not receive packet back')
t.end()
})

s.inStream.write(packet)
})
})
}
12 changes: 7 additions & 5 deletions test/client-pub-sub.js
Original file line number Diff line number Diff line change
Expand Up @@ -905,10 +905,10 @@ test('should not receive a message on negated subscription', function (t) {
test('programmatically add custom subscribe', function (t) {
t.plan(6)

const broker = aedes()
const broker = aedes({ clientId: 'my-client-xyz-7' })
t.teardown(broker.close.bind(broker))

const s = connect(setup(broker))
const s = connect(setup(broker), { clientId: 'my-client-xyz-7' })
const expected = {
cmd: 'publish',
topic: 'hello',
Expand All @@ -924,7 +924,8 @@ test('programmatically add custom subscribe', function (t) {
payload: Buffer.from('world'),
qos: 0,
retain: false,
dup: false
dup: false,
clientId: 'my-client-xyz-7'
}
subscribe(t, s, 'hello', 0, function () {
broker.subscribe('hello', deliver, function () {
Expand Down Expand Up @@ -963,9 +964,10 @@ test('custom function in broker.subscribe', function (t) {
qos: 1,
retain: false,
dup: false,
messageId: undefined
messageId: undefined,
clientId: 'my-client-xyz-6'
}
connect(s, {}, function () {
connect(s, { clientId: 'my-client-xyz-6' }, function () {
broker.subscribe('hello', deliver, function () {
t.pass('subscribed')
})
Expand Down
6 changes: 4 additions & 2 deletions test/qos2.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ test('call published method with client with QoS 2', function (t) {
t.teardown(broker.close.bind(broker))

const opts = { clean: cleanSession }
const publisher = connect(setup(broker))
const publisher = connect(setup(broker), { clientId: 'my-client-xyz-8' })
const subscriber = connect(setup(broker), { ...opts, clientId: 'abcde' })
const forwarded = {
cmd: 'publish',
Expand All @@ -248,7 +248,8 @@ test('call published method with client with QoS 2', function (t) {
qos: 2,
retain: false,
dup: false,
messageId: undefined
messageId: undefined,
clientId: 'my-client-xyz-8'
}
const expected = {
cmd: 'publish',
Expand All @@ -262,6 +263,7 @@ test('call published method with client with QoS 2', function (t) {
broker.authorizeForward = function (client, packet) {
forwarded.brokerId = broker.id
forwarded.brokerCounter = broker.counter
delete packet.nl
t.same(packet, forwarded, 'forwarded packet must match')
return packet
}
Expand Down

0 comments on commit f82b8f8

Please sign in to comment.