diff --git a/src/pubsub.js b/src/pubsub.js index fb9a805bb8..8e33761a01 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -8,27 +8,7 @@ module.exports = (node) => { const floodSub = new FloodSub(node) node._floodSub = floodSub - const _internalUnsubscribe = (input)=>{ - let {topic,handler,callback} = input - if (!node.isStarted() && !floodSub.started) { - throw new Error(NOT_STARTED_YET) - } - - if(handler){ - floodSub.removeListener(topic, handler) - }else{ - floodSub.removeAllListeners(topic) - } - - if (floodSub.listenerCount(topic) === 0) { - floodSub.unsubscribe(topic) - } - - if (typeof callback === 'function') { - setImmediate(() => callback()) - } - } return { subscribe: (topic, options, handler, callback) => { if (typeof options === 'function') { @@ -52,12 +32,26 @@ module.exports = (node) => { subscribe(callback) }, - unsubscribeAll : (topic,callback)=>{ - _internalUnsubscribe({topic : topic, callback: callback}); - }, + unsubscribe: (topic, handler, callback) => { - _internalUnsubscribe({topic : topic, handler : handler, callback: callback}); + if (!node.isStarted() && !floodSub.started) { + throw new Error(NOT_STARTED_YET) + } + if (!handler && !callback) { + floodSub.removeAllListeners(topic) + } else { + floodSub.removeListener(topic, handler) + } + + if (floodSub.listenerCount(topic) === 0) { + floodSub.unsubscribe(topic) + } + + if (typeof callback === 'function') { + setImmediate(() => callback()) + } }, + publish: (topic, data, callback) => { if (!node.isStarted() && !floodSub.started) { return setImmediate(() => callback(new Error(NOT_STARTED_YET))) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index 78d4b641b3..aaa2ba928b 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -87,7 +87,7 @@ describe('.pubsub', () => { expect(err).to.not.exist().mark() }) }) - it('start two nodes and send one message, then unsubscribeAll', (done) => { + it('start two nodes and send one message, then unsubscribe without handler', (done) => { // Check the final series error, and the publish handler expect(3).checks(done) @@ -113,12 +113,16 @@ describe('.pubsub', () => { // Wait a moment before unsubscribing (cb) => setTimeout(cb, 500), // unsubscribe on the first - (cb) => nodes[0].pubsub.unsubscribeAll('pubsub', cb), + (cb) => { + nodes[0].pubsub.unsubscribe('pubsub') + // Wait a moment to make sure the ubsubscribe-from-all worked + setTimeout(cb, 500) + }, // Verify unsubscribed (cb) => { - nodes[0].pubsub.ls((err,topics)=>{ - expect(topics.length).to.eql(0).mark(); - cb(err); + nodes[0].pubsub.ls((err, topics) => { + expect(topics.length).to.eql(0).mark() + cb(err) }) }, // Stop both nodes