Skip to content

Commit

Permalink
chore: added unsubscribe 1 param
Browse files Browse the repository at this point in the history
  • Loading branch information
Isan-Rivkin committed Feb 20, 2019
1 parent dded92e commit b453ba0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 29 deletions.
42 changes: 18 additions & 24 deletions src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,7 @@ module.exports = (node) => {
const floodSub = new FloodSub(node)

node._floodSub = floodSub
const _internalUnsubscribe = (input)=>{
let {topic,handler,callback} = input

if (!node.isStarted() && !floodSub.started) {
throw new Error(NOT_STARTED_YET)
}

if(handler){
floodSub.removeListener(topic, handler)
}else{
floodSub.removeAllListeners(topic)
}

if (floodSub.listenerCount(topic) === 0) {
floodSub.unsubscribe(topic)
}

if (typeof callback === 'function') {
setImmediate(() => callback())
}
}
return {
subscribe: (topic, options, handler, callback) => {
if (typeof options === 'function') {
Expand All @@ -52,12 +32,26 @@ module.exports = (node) => {

subscribe(callback)
},
unsubscribeAll : (topic,callback)=>{
_internalUnsubscribe({topic : topic, callback: callback});
},

unsubscribe: (topic, handler, callback) => {
_internalUnsubscribe({topic : topic, handler : handler, callback: callback});
if (!node.isStarted() && !floodSub.started) {
throw new Error(NOT_STARTED_YET)
}
if (!handler && !callback) {
floodSub.removeAllListeners(topic)
} else {
floodSub.removeListener(topic, handler)
}

if (floodSub.listenerCount(topic) === 0) {
floodSub.unsubscribe(topic)
}

if (typeof callback === 'function') {
setImmediate(() => callback())
}
},

publish: (topic, data, callback) => {
if (!node.isStarted() && !floodSub.started) {
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
Expand Down
14 changes: 9 additions & 5 deletions test/pubsub.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ describe('.pubsub', () => {
expect(err).to.not.exist().mark()
})
})
it('start two nodes and send one message, then unsubscribeAll', (done) => {
it('start two nodes and send one message, then unsubscribe without handler', (done) => {
// Check the final series error, and the publish handler
expect(3).checks(done)

Expand All @@ -113,12 +113,16 @@ describe('.pubsub', () => {
// Wait a moment before unsubscribing
(cb) => setTimeout(cb, 500),
// unsubscribe on the first
(cb) => nodes[0].pubsub.unsubscribeAll('pubsub', cb),
(cb) => {
nodes[0].pubsub.unsubscribe('pubsub')
// Wait a moment to make sure the ubsubscribe-from-all worked
setTimeout(cb, 500)
},
// Verify unsubscribed
(cb) => {
nodes[0].pubsub.ls((err,topics)=>{
expect(topics.length).to.eql(0).mark();
cb(err);
nodes[0].pubsub.ls((err, topics) => {
expect(topics.length).to.eql(0).mark()
cb(err)
})
},
// Stop both nodes
Expand Down

0 comments on commit b453ba0

Please sign in to comment.