diff --git a/LICENSE b/LICENSE index 02845407..08205b16 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2015 Matteo Collina, http://matteocollina.com +Copyright (c) 2015-2017 Matteo Collina, http://matteocollina.com Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation diff --git a/README.md b/README.md index 48b23f1e..6f4d820d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Aedes  [![Build Status](https://travis-ci.org/mcollina/aedes.svg)](https://travis-ci.org/mcollina/aedes) [![Coverage Status](https://coveralls.io/repos/mcollina/aedes/badge.svg?branch=master&service=github)](https://coveralls.io/github/mcollina/aedes?branch=master) +# Aedes  [![Build Status](https://travis-ci.org/mcollina/aedes.svg?branch=master)](https://travis-ci.org/mcollina/aedes) [![Coverage Status](https://coveralls.io/repos/mcollina/aedes/badge.svg?branch=master&service=github)](https://coveralls.io/github/mcollina/aedes?branch=master) Barebone MQTT server that can run on any stream server. @@ -392,32 +392,6 @@ You can subscribe on the following `$SYS` topics to get client presence: - `$SYS/+/disconnect/clients` - will inform about client disconnections. The payload will contain the `clientId` of the connected/disconnected client - - -## Todo - -* [x] QoS 0 support -* [x] Retain messages support -* [x] QoS 1 support -* [x] QoS 2 support -* [x] clean=false support -* [x] Keep alive support -* [x] Will messages must survive crash -* [x] Authentication -* [x] Events -* [x] Wait a CONNECT packet only for X seconds -* [x] Support a CONNECT packet without a clientId -* [x] Disconnect other clients with the same client.id -* [x] Write docs -* [x] Support counting the number of offline clients and subscriptions -* [x] Performance optimizations for QoS 1 and Qos 2 -* [x] Add `client#publish()` and `client#subscribe()` -* [x] move the persistence in a separate module -* [x] mongo persistence ([external module](http://npm.im/aedes-persistence-mongodb)) -* [x] redis persistence ([external module](http://npm.im/aedes-persistence-redis)) -* [x] leveldb persistence ([external module](http://npm.im/aedes-persistence-level)) -* [ ] cluster support (external module) - ## Acknowledgements This library is born after a lot of discussion with all diff --git a/aedes.js b/aedes.js index 4e6bf1a1..364f1d4a 100644 --- a/aedes.js +++ b/aedes.js @@ -191,21 +191,18 @@ function DoEnqueues () { that.complete = null that.topic = null - broker._parallel( - status, - doEnqueue, subs, complete) + broker.persistence.outgoingEnqueueCombi(subs, status.packet, complete) broker._enqueuers.release(that) } } } +// + is 43 +// # is 35 function removeSharp (sub) { - return sub.topic !== '#' -} - -function doEnqueue (sub, done) { - this.broker.persistence.outgoingEnqueue(sub, this.packet, done) + var code = sub.topic.charCodeAt(0) + return code !== 43 && code !== 35 } function callPublished (_, done) { diff --git a/lib/client.js b/lib/client.js index 3abfd86f..e41bbf02 100644 --- a/lib/client.js +++ b/lib/client.js @@ -251,6 +251,10 @@ Client.prototype.close = function (done) { var list = (state.getBuffer && state.getBuffer()) || state.buffer list.forEach(drainRequest) + // clear up the drain event listeners + that.conn.emit('drain') + that.conn.removeAllListeners('drain') + if (conn.destroySoon) { conn.destroySoon() } if (conn.destroy) { diff --git a/lib/handlers/publish.js b/lib/handlers/publish.js index 10ea62fc..f3c8cd87 100644 --- a/lib/handlers/publish.js +++ b/lib/handlers/publish.js @@ -17,6 +17,22 @@ var publishActions = [ enqueuePublish ] function handlePublish (client, packet, done) { + var topic = packet.topic + var err + if (topic.length === 0) { + err = new Error('empty topic not allowed in PUBLISH') + return done(err) + } + for (var i = 0; i < topic.length; i++) { + switch (topic.charCodeAt(i)) { + case 35: + err = new Error('# is not allowed in PUBLISH') + return done(err) + case 43: + err = new Error('+ is not allowed in PUBLISH') + return done(err) + } + } client.broker._series(client, publishActions, packet, done) } diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 1796f3ce..86d24b5c 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -4,6 +4,7 @@ var write = require('../write') var fastfall = require('fastfall') var Packet = require('aedes-packet') var through = require('through2') +var validateTopic = require('./validations').validateTopic var topicActions = fastfall([ authorize, storeSubscriptions, @@ -22,7 +23,7 @@ function handleSubscribe (client, packet, done) { var subs = packet.subscriptions var granted = [] - broker._parallel( + broker._series( new SubscribeState(client, packet, done, granted), doSubscribe, subs, @@ -36,6 +37,10 @@ function doSubscribe (sub, done) { function authorize (sub, done) { var client = this.client + var err = validateTopic(sub.topic, 'SUBSCRIBE') + if (err) { + return done(err) + } client.broker.authorizeSubscribe(client, sub, done) } @@ -95,7 +100,7 @@ function subTopic (sub, done) { break } - if (sub.topic === '#') { + if (isWildcardThatMatchesSys(sub.topic)) { func = blockSys(func) } @@ -113,6 +118,13 @@ function subTopic (sub, done) { } } +// + is 43 +// # is 35 +function isWildcardThatMatchesSys (topic) { + var code = topic.charCodeAt(0) + return code === 43 || code === 35 +} + function completeSubscribe (err) { var packet = this.packet var client = this.client diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 927eda52..620b5256 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -1,6 +1,7 @@ 'use strict' var write = require('../write') +var validateTopic = require('./validations').validateTopic function UnsubscribeState (client, packet, finish, granted) { this.client = client @@ -11,9 +12,18 @@ function UnsubscribeState (client, packet, finish, granted) { function handleUnsubscribe (client, packet, done) { var broker = client.broker + var unsubscriptions = packet.unsubscriptions + var err + + for (var i = 0; i < unsubscriptions.length; i++) { + err = validateTopic(unsubscriptions[i], 'UNSUBSCRIBE') + if (err) { + return done(err) + } + } if (packet.messageId) { - broker.persistence.removeSubscriptions(client, packet.unsubscriptions, function (err) { + broker.persistence.removeSubscriptions(client, unsubscriptions, function (err) { if (err) { return done(err) } @@ -27,7 +37,7 @@ function handleUnsubscribe (client, packet, done) { function actualUnsubscribe (client, packet, done) { var broker = client.broker - broker._parallel( + broker._series( new UnsubscribeState(client, packet, done, null), doUnsubscribe, packet.unsubscriptions, diff --git a/lib/handlers/validations.js b/lib/handlers/validations.js new file mode 100644 index 00000000..16d9922b --- /dev/null +++ b/lib/handlers/validations.js @@ -0,0 +1,29 @@ +'use strict' + +function validateTopic (topic, message) { + var end = topic.length - 1 + var endMinus = end - 1 + var slashInPreEnd = endMinus > 0 && topic.charCodeAt(endMinus) !== 47 + if (topic.length === 0) { + return new Error('impossible to ' + message + ' to an empty topic') + } + for (var i = 0; i < topic.length; i++) { + switch (topic.charCodeAt(i)) { + case 35: + var notAtTheEnd = i !== end + if (notAtTheEnd || slashInPreEnd) { + return new Error('# is only allowed in ' + message + ' in the last position') + } + break + case 43: + var pastChar = i < end - 1 && topic.charCodeAt(i + 1) !== 47 + var preChar = i > 1 && topic.charCodeAt(i - 1) !== 47 + if (pastChar || preChar) { + return new Error('+ is only allowed in ' + message + ' between /') + } + break + } + } +} + +module.exports.validateTopic = validateTopic diff --git a/package.json b/package.json index 21aa6804..20904e75 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aedes", - "version": "0.28.0", + "version": "0.29.3", "description": "Stream-based MQTT broker", "main": "aedes.js", "scripts": { @@ -36,19 +36,19 @@ "concat-stream": "^1.4.7", "convert-hrtime": "^2.0.0", "coveralls": "^2.11.6", - "duplexify": "^3.4.1", + "duplexify": "^3.5.1", "faucet": "0.0.1", "istanbul": "^0.4.1", - "mqtt": "^2.9.1", + "mqtt": "^2.11.0", "mqtt-connection": "^3.0.0", "pre-commit": "^1.0.10", - "standard": "^10.0.0", - "tape": "^4.7.0", - "websocket-stream": "^5.0.0" + "standard": "^10.0.3", + "tape": "^4.8.0", + "websocket-stream": "^5.0.1" }, "dependencies": { "aedes-packet": "^1.0.0", - "aedes-persistence": "^4.0.0", + "aedes-persistence": "^5.0.2", "bulk-write-stream": "^1.0.0", "end-of-stream": "^1.1.0", "fastfall": "^1.0.0", @@ -58,9 +58,8 @@ "mqemitter": "^2.1.0", "mqtt-packet": "^5.4.0", "pump": "^1.0.0", - "qlobber": "^0.8.0", "retimer": "^1.0.0", - "reusify": "^1.0.0", + "reusify": "^1.0.2", "safe-buffer": "^5.1.1", "shortid": "^2.1.3", "through2": "^2.0.0", diff --git a/test/auth.js b/test/auth.js index d41fc521..0077d86e 100644 --- a/test/auth.js +++ b/test/auth.js @@ -479,6 +479,70 @@ test('negate subscription', function (t) { }) }) +test('negate multiple subscriptions', function (t) { + t.plan(5) + + var s = connect(setup()) + + s.broker.authorizeSubscribe = function (client, sub, cb) { + t.ok(client, 'client exists') + cb(null, null) + } + + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: 'hello', + qos: 0 + }, { + topic: 'world', + qos: 0 + }] + }) + + s.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'suback') + t.deepEqual(packet.granted, [128, 128]) + t.equal(packet.messageId, 24) + }) +}) + +test('negate multiple subscriptions random times', function (t) { + t.plan(5) + + var s = connect(setup()) + + s.broker.authorizeSubscribe = function (client, sub, cb) { + t.ok(client, 'client exists') + if (sub.topic === 'hello') { + setTimeout(function () { + cb(null, sub) + }, 100) + } else { + cb(null, null) + } + } + + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: 'hello', + qos: 0 + }, { + topic: 'world', + qos: 0 + }] + }) + + s.outStream.once('data', function (packet) { + t.equal(packet.cmd, 'suback') + t.deepEqual(packet.granted, [0, 128]) + t.equal(packet.messageId, 24) + }) +}) + test('failed authentication does not disconnect other client with same clientId', function (t) { t.plan(3) diff --git a/test/basic.js b/test/basic.js index f8a6e835..00b1dbe0 100644 --- a/test/basic.js +++ b/test/basic.js @@ -486,3 +486,122 @@ test('avoid wrong deduping of retain messages', function (t) { publisher.inStream.write(expected) }) + +test('publish empty topic', function (t) { + var s = connect(setup()) + + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + t.end() + }) + + s.inStream.write({ + cmd: 'publish', + topic: '', + payload: 'world' + }) + }) + + eos(s.conn, function () { + t.equal(s.broker.connectedClients, 0, 'no connected clients') + t.end() + }) +}) + +test('publish invalid topic with #', function (t) { + var s = connect(setup()) + + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + t.end() + }) + + s.inStream.write({ + cmd: 'publish', + topic: 'hello/#', + payload: 'world' + }) + }) + + s.broker.on('clientError', function () { + t.end() + }) +}) + +test('publish invalid topic with +', function (t) { + var s = connect(setup()) + + subscribe(t, s, '#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet') + }) + + s.inStream.write({ + cmd: 'publish', + topic: 'hello/+/eee', + payload: 'world' + }) + }) + + s.broker.on('clientError', function () { + t.end() + }) +}) + +;['base/#/sub', 'base/#sub', 'base/sub#', 'base/xyz+/sub', 'base/+xyz/sub'].forEach(function (topic) { + test('subscribe to invalid topic with "' + topic + '"', function (t) { + var s = connect(setup()) + + s.broker.on('clientError', function () { + t.end() + }) + + s.inStream.write({ + cmd: 'subscribe', + messageId: 24, + subscriptions: [{ + topic: topic, + qos: 0 + }] + }) + }) + + test('unsubscribe to invalid topic with "' + topic + '"', function (t) { + var s = connect(setup()) + + s.broker.on('clientError', function () { + t.end() + }) + + s.inStream.write({ + cmd: 'unsubscribe', + messageId: 24, + unsubscriptions: [topic] + }) + }) +}) + +test('clear drain', function (t) { + t.plan(4) + + var s = connect(setup()) + + subscribe(t, s, 'hello', 0, function () { + // fake a busy socket + s.conn.write = function (chunk, enc, cb) { + return false + } + + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }, function () { + t.pass('callback called') + }) + + s.conn.destroy() + }) +}) diff --git a/test/events.js b/test/events.js index 0c4286d8..25df8ea3 100644 --- a/test/events.js +++ b/test/events.js @@ -40,6 +40,25 @@ test('does not forward $SYS topics to # subscription', function (t) { }) }) +test('does not forward $SYS topics to +/# subscription', function (t) { + t.plan(4) + var s = connect(setup()) + + subscribe(t, s, '+/#', 0, function () { + s.outStream.once('data', function (packet) { + t.fail('no packet should be received') + }) + + s.broker.mq.emit({ + cmd: 'publish', + topic: '$SYS/hello', + payload: 'world' + }, function () { + t.pass('nothing happened') + }) + }) +}) + test('does not store $SYS topics to QoS 1 # subscription', function (t) { t.plan(3)