diff --git a/aedes.js b/aedes.js index e7de15fa..b9c1d024 100644 --- a/aedes.js +++ b/aedes.js @@ -115,27 +115,25 @@ function Aedes (opts) { } function checkAndPublish (will, done) { - const needsPublishing = - !that.brokers[will.brokerId] || - that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) < - Date.now() - - if (needsPublishing) { - // randomize this, so that multiple brokers - // do not publish the same wills at the same time - that.publish(will, function publishWill (err) { - if (err) { - return done(err) - } + const notPublish = + that.brokers[will.brokerId] !== undefined && that.brokers[will.brokerId] + (3 * opts.heartbeatInterval) >= Date.now() + + if (notPublish) return done() + // randomize this, so that multiple brokers + // do not publish the same wills at the same time + this.authorizePublish(that.clients[will.clientId] || null, will, function (err) { + if (err) { return doneWill() } + that.publish(will, doneWill) + + function doneWill (err) { + if (err) { return done(err) } that.persistence.delWill({ id: will.clientId, brokerId: will.brokerId }, done) - }) - } else { - done() - } + } + }) } this.mq.on($SYS_PREFIX + '+/heartbeat', function storeBroker (packet, done) { diff --git a/docs/Aedes.md b/docs/Aedes.md index 8e7f41ae..67f15e7c 100644 --- a/docs/Aedes.md +++ b/docs/Aedes.md @@ -291,7 +291,7 @@ Please refer to [Connect Return Code](http://docs.oasis-open.org/mqtt/mqtt/v3.1. ## Handler: authorizePublish (client, packet, callback) -- client: [``](./Client.md) +- client: [``](./Client.md) | `null` - packet: `` [`PUBLISH`][PUBLISH] - callback: `` `(error) => void` - error `` | `null` @@ -301,6 +301,8 @@ Invoked when 1. publish LWT to all online clients 2. incoming client publish +`client` is `null` when aedes publishes obsolete LWT without connected clients + If invoked `callback` with no errors, server authorizes the packet otherwise emits `clientError` with `error`. If an `error` occurs the client connection will be closed, but no error is returned to the client (MQTT-3.3.5-2) ```js diff --git a/test/types/aedes.test-d.ts b/test/types/aedes.test-d.ts index 448ee621..a4a813f7 100644 --- a/test/types/aedes.test-d.ts +++ b/test/types/aedes.test-d.ts @@ -38,7 +38,7 @@ broker = new Aedes({ callback(error, false) } }, - authorizePublish: (client: Client, packet: PublishPacket, callback) => { + authorizePublish: (client: Client | null, packet: PublishPacket, callback) => { if (packet.topic === 'aaaa') { return callback(new Error('wrong topic')) } diff --git a/test/will.js b/test/will.js index 640249e8..d2893d28 100644 --- a/test/will.js +++ b/test/will.js @@ -72,7 +72,7 @@ test('calling close two times should not deliver two wills', function (t) { }) test('delivers old will in case of a crash', function (t) { - t.plan(6) + t.plan(8) const persistence = memory() const will = { @@ -91,10 +91,16 @@ test('delivers old will in case of a crash', function (t) { }, will, function (err) { t.error(err, 'no error') + let authorized = false const interval = 10 // ms, so that the will check happens fast! const broker = aedes({ persistence, - heartbeatInterval: interval + heartbeatInterval: interval, + authorizePublish: function (client, packet, callback) { + t.strictSame(client, null, 'client must be null') + authorized = true + callback(null) + } }) t.teardown(broker.close.bind(broker)) @@ -113,6 +119,48 @@ test('delivers old will in case of a crash', function (t) { t.same(packet.payload, will.payload, 'payload matches') t.equal(packet.qos, will.qos, 'qos matches') t.equal(packet.retain, will.retain, 'retain matches') + t.equal(authorized, true, 'authorization called') + cb() + } + }) +}) + +test('deliver old will without authorization in case of a crash', function (t) { + t.plan(2) + + const persistence = memory() + const will = { + topic: 'mywill', + payload: Buffer.from('last will'), + qos: 0, + retain: false + } + + persistence.broker = { + id: 'anotherBroker' + } + + persistence.putWill({ + id: 'myClientId42' + }, will, function (err) { + t.error(err, 'no error') + + const interval = 10 // ms, so that the will check happens fast! + const broker = aedes({ + persistence, + heartbeatInterval: interval, + authorizePublish: function (client, packet, callback) { + t.strictSame(client, null, 'client must be null') + callback(new Error()) + } + }) + + t.teardown(broker.close.bind(broker)) + + broker.mq.on('mywill', check) + + function check (packet, cb) { + t.fail('received will without authorization') cb() } }) diff --git a/types/instance.d.ts b/types/instance.d.ts index 06ff02f0..dc22b355 100644 --- a/types/instance.d.ts +++ b/types/instance.d.ts @@ -33,7 +33,7 @@ declare module 'aedes' { done: (error: AuthenticateError | null, success: boolean | null) => void ) => void - type AuthorizePublishHandler = (client: Client, packet: PublishPacket, callback: (error?: Error | null) => void) => void + type AuthorizePublishHandler = (client: Client | null, packet: PublishPacket, callback: (error?: Error | null) => void) => void type AuthorizeSubscribeHandler = (client: Client, subscription: Subscription, callback: (error: Error | null, subscription?: Subscription | null) => void) => void