diff --git a/.travis.yml b/.travis.yml index d879ff8..22dc60e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,8 @@ before_install: - sudo make install - sudo /sbin/ldconfig - cd .. +before_script: + - (cd node_modules/mosca/node_modules && rm -rf ascoltatori && ln -s ../../.. ascoltatori) language: node_js node_js: - 0.8 diff --git a/lib/amqp_ascoltatore.js b/lib/amqp_ascoltatore.js index ad61170..1ddb017 100644 --- a/lib/amqp_ascoltatore.js +++ b/lib/amqp_ascoltatore.js @@ -96,7 +96,7 @@ AMQPAscoltatore.prototype._startConn = function() { }, function(message, headers, deliveryInfo) { that._queue.shift(); - var topic = deliveryInfo.routingKey.replace(".", "/"); + var topic = deliveryInfo.routingKey.replace(/\./g, "/"); debug("new message received from queue on topic " + topic); that._ascoltatore.publish(topic, message.data.toString()); @@ -134,7 +134,7 @@ AMQPAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) }, 5); }); - this._queue.bind(this._exchange, topic.replace("/", ".").replace("*", "#")); + this._queue.bind(this._exchange, topic.replace(/\//g, ".").replace(/\*/g, "#").replace(/\+/g, '*')); } else { util.defer(done); } @@ -149,7 +149,7 @@ AMQPAscoltatore.prototype.publish = function publish(topic, message, done) { debug("new message published to " + topic); - this._exchange.publish(topic.replace("/", "."), String(message)); + this._exchange.publish(topic.replace(/\//g, "."), String(message)); util.defer(done); }; @@ -164,7 +164,7 @@ AMQPAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do debug("queue unbound to topic " + topic); util.defer(done); }); - this._queue.unbind(this._exchange, topic.replace("/", ".").replace("*", "#")); + this._queue.unbind(this._exchange, topic.replace(/\//g, ".").replace(/\*/g, "#").replace(/\+/g, '*')); } else { util.defer(done); } diff --git a/lib/ascoltatori.js b/lib/ascoltatori.js index 4611617..26f1259 100644 --- a/lib/ascoltatori.js +++ b/lib/ascoltatori.js @@ -66,7 +66,7 @@ module.exports.build = function build(opts, done) { result = null; Klass = (typeof opts.type === 'function') ? opts.type : - (classes[opts.type] || module.exports.MemoryAscoltatore); + (classes[opts.type] || module.exports.TrieAscoltatore); result = new Klass(opts, module.exports); @@ -112,11 +112,11 @@ module.exports.use = function use(ascoltatore) { }; /** - * The default global Ascoltatore is a MemoryAscoltatore. + * The default global Ascoltatore is a TrieAscoltatore. * * @api public */ -module.exports.use(new module.exports.MemoryAscoltatore()); +module.exports.use(new module.exports.TrieAscoltatore()); /** * These are just utilities diff --git a/lib/behave_like_an_ascoltatore.js b/lib/behave_like_an_ascoltatore.js index 52987d1..9ea75d7 100644 --- a/lib/behave_like_an_ascoltatore.js +++ b/lib/behave_like_an_ascoltatore.js @@ -71,13 +71,91 @@ module.exports = function() { this.instance.pub("hello", "world", done); }); - it("should support wildcards", function(done) { + it("should support multi-level wildcard at start of topic", function(done) { + var that = this; + that.instance.sub("*/hello", wrap(done), function() { + that.instance.pub("42/there/hello"); + }); + }); + + it("should support multi-level wildcard in middle of topic", function(done) { + var that = this; + that.instance.sub("hello/*/end", wrap(done), function() { + that.instance.pub("hello/there/42/end"); + }); + }); + + it("should support multi-level wildcard at end of topic", function(done) { var that = this; that.instance.sub("hello/*", wrap(done), function() { + that.instance.pub("hello/there/42"); + }); + }); + + it("should support single-level wildcard at start of topic", function(done) { + var that = this; + that.instance.sub("+/hello", wrap(done), function() { + that.instance.pub("42/hello"); + }); + }); + + it("should support single-level wildcard in middle of topic", function(done) { + var that = this; + that.instance.sub("hello/+/end", wrap(done), function() { + that.instance.pub("hello/42/end"); + }); + }); + + it("should support single-level wildcard at end of topic", function(done) { + var that = this; + that.instance.sub("hello/+", wrap(done), function() { that.instance.pub("hello/42"); }); }); + it("should support both wildcards in topic", function(done) { + var that = this; + that.instance.sub("hello/+/there/*/end", wrap(done), function() { + that.instance.pub("hello/foo/there/bar/42/end"); + }); + }); + + it("should not match multiple levels with single wildcard", function(done) { + var that = this, + callback = null; + + callback = function(topic) { + expect(topic).to.equal("hello/42/there"); + done(); + }; + + that.instance.sub("hello/+/there", callback, function () { + that.instance.pub("hello/42/43/there"); + that.instance.pub("hello/42/there"); + }); + }); + + it("should unsubscribe from wildcard topics independently", function(done) { + var that = this, + callback1 = null, + callback2 = null; + + callback1 = function(topic) { + expect(topic).to.equal("hello/42/there"); + done(); + }; + + callback2 = function () { }; + + that.instance.sub("hello/*/there", callback2, function () { + that.instance.sub("hello/+/there", callback1, function () { + that.instance.unsub("hello/*/there", callback2, function () { + that.instance.pub("hello/42/there"); + }); + }); + }); + }); + it("should call each matching callback", function(done) { var that = this, callback = null, diff --git a/lib/memory_ascoltatore.js b/lib/memory_ascoltatore.js index d6f3cf3..0cb2d26 100644 --- a/lib/memory_ascoltatore.js +++ b/lib/memory_ascoltatore.js @@ -35,14 +35,14 @@ function MemoryAscoltatore() { MemoryAscoltatore.prototype = Object.create(AbstractAscoltatore.prototype); function containsWildcard(topic) { - return topic.indexOf("*") >= 0; + return (topic.indexOf("*") >= 0) || (topic.indexOf("+") >= 0); } MemoryAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) { this._raiseIfClosed(); debug("registered new subscriber for topic " + topic); if (containsWildcard(topic)) { - var regexp = new RegExp(topic.replace("*", ".+")), + var regexp = new RegExp(topic.replace(/\*/g, ".*?").replace(/\+/g, "[^/]+?")), that = this, handler = null; @@ -89,7 +89,7 @@ MemoryAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, regexp = null; if (containsWildcard(topic)) { this.removeListener("newTopic", callback._ascoltatori_global_handler); - regexp = new RegExp(topic.replace("*", ".+")); + regexp = new RegExp(topic.replace(/\*/g, ".*?").replace(/\+/g, "[^/]+?")); this._set.forEach(function(e) { if (e.match(regexp)) { that.unsub(e, callback); diff --git a/lib/mqtt_ascoltatore.js b/lib/mqtt_ascoltatore.js index 01cb978..e27fbea 100644 --- a/lib/mqtt_ascoltatore.js +++ b/lib/mqtt_ascoltatore.js @@ -93,7 +93,7 @@ MQTTAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) var opts = { qos: 1 }; - this._client.subscribe(topic.replace("*", "#"), opts, function() { + this._client.subscribe(topic.replace(/\*/g, "#"), opts, function() { debug("registered new subscriber for topic " + topic); util.defer(done); }); @@ -135,7 +135,7 @@ MQTTAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, do } debug("deregistering subscriber for topic " + topic); - this._client.unsubscribe(topic.replace("*", "#"), newDone); + this._client.unsubscribe(topic.replace(/\*/g, "#"), newDone); }; MQTTAscoltatore.prototype.close = function close(done) { diff --git a/lib/redis_ascoltatore.js b/lib/redis_ascoltatore.js index 3db03cc..d17eca2 100644 --- a/lib/redis_ascoltatore.js +++ b/lib/redis_ascoltatore.js @@ -134,7 +134,7 @@ RedisAscoltatore.prototype._updateReady = function updateReady(key) { }; function containsWildcard(topic) { - return topic.indexOf("*") >= 0; + return (topic.indexOf("*") >= 0) || (topic.indexOf("+") >= 0); } RedisAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) { @@ -145,13 +145,17 @@ RedisAscoltatore.prototype.subscribe = function subscribe(topic, callback, done) util.defer(done); }; + var subTopic = topic; + if (containsWildcard(topic)) { - this._sub.psubscribe(topic, newDone); + subTopic = topic.replace(/\+/g, "*"); + this._sub.psubscribe(subTopic, newDone); } else { - this._sub.subscribe(topic, newDone); + this._sub.subscribe(subTopic, newDone); } - this._subs_counter.add(topic); + this._subs_counter.add(subTopic); + this._ascoltatore.subscribe(topic, callback); }; @@ -170,7 +174,15 @@ RedisAscoltatore.prototype.publish = function publish(topic, message, done) { RedisAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, done) { this._raiseIfClosed(); - this._subs_counter.remove(topic); + + var isWildcard = containsWildcard(topic), + subTopic = topic; + + if (isWildcard) { + subTopic = topic.replace(/\+/g, "*"); + } + + this._subs_counter.remove(subTopic); this._ascoltatore.unsubscribe(topic, callback); var newDone = function() { @@ -178,16 +190,15 @@ RedisAscoltatore.prototype.unsubscribe = function unsubscribe(topic, callback, d util.defer(done); }; - - if (this._subs_counter.include(topic)) { + if (this._subs_counter.include(subTopic)) { newDone(); return this; } - if (containsWildcard(topic)) { - this._sub.punsubscribe(topic, newDone); + if (isWildcard) { + this._sub.punsubscribe(subTopic, newDone); } else { - this._sub.unsubscribe(topic, newDone); + this._sub.unsubscribe(subTopic, newDone); } return this; diff --git a/lib/trie_ascoltatore.js b/lib/trie_ascoltatore.js index 9b2356a..2a7b151 100644 --- a/lib/trie_ascoltatore.js +++ b/lib/trie_ascoltatore.js @@ -17,7 +17,7 @@ function TrieAscoltatore(settings) { this._matcher = new Qlobber({ separator: '/', - wildcard_one: '?', + wildcard_one: '+', wildcard_some: '*' }); diff --git a/test/ascoltatori_spec.js b/test/ascoltatori_spec.js index 69f7918..61f9194 100644 --- a/test/ascoltatori_spec.js +++ b/test/ascoltatori_spec.js @@ -114,7 +114,7 @@ describe("ascoltatori", function() { json: false }); toClose.push(a); - expect(a).to.be.instanceOf(ascoltatori.MemoryAscoltatore); + expect(a).to.be.instanceOf(ascoltatori.TrieAscoltatore); }); it("should create a new AbstractAscoltatore using function", function() {