Skip to content

Commit

Permalink
feat: emit event when a remote peer's subscriptions change (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc authored and vasco-santos committed Nov 28, 2018
1 parent c9f8d39 commit 7611b2e
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 31 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ fsub.start((err) => {
})
```

## Events

Floodsub emits two kinds of events:
1. `<topic>` when a message is received for a particular topic
```Javascript
fsub.on('fruit', (data) => { ... })
```
- `data`: a Buffer containing the data that was published to the topic
2. `floodsub:subscription-change` when the local peer receives an update to the subscriptions of a remote peer.
```Javascript
fsub.on('floodsub:subscription-change', (peerInfo, topics, changes) => { ... })
```
- `peerInfo`: a [PeerInfo](https://github.com/libp2p/js-peer-info) object
- `topics`: the topics that the peer is now subscribed to
- `changes`: an array of `{ topicCID: <topic>, subscribe: <boolean> }`
eg `[ { topicCID: 'fruit', subscribe: true }, { topicCID: 'vegetables': false } ]`


## API

See https://libp2p.github.io/js-libp2p-floodsub
Expand Down
1 change: 1 addition & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class FloodSub extends BaseProtocol {
const peer = this.peers.get(idB58Str)
if (peer) {
peer.updateSubscriptions(subs)
this.emit('floodsub:subscription-change', peer.info, peer.topics, subs)
}
}
}
Expand Down
52 changes: 29 additions & 23 deletions test/2-nodes.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,18 @@ describe('basics between 2 nodes', () => {

it('Subscribe to a topic:Z in nodeA', (done) => {
fsA.subscribe('Z')
setTimeout(() => {
fsB.once('floodsub:subscription-change', (changedPeerInfo, changedTopics, changedSubs) => {
expectSet(fsA.subscriptions, ['Z'])
expect(fsB.peers.size).to.equal(1)
expectSet(first(fsB.peers).topics, ['Z'])
expect(changedPeerInfo.id.toB58String()).to.equal(first(fsB.peers).info.id.toB58String())
expectSet(changedTopics, ['Z'])
expect(changedSubs).to.be.eql([{ topicCID: 'Z', subscribe: true }])
done()
}, 100)
})
})

it('Publish to a topic:Z in nodeA', (done) => {
fsB.once('Z', shouldNotHappen)

function shouldNotHappen (msg) { expect.fail() }

fsA.once('Z', (msg) => {
expect(msg.data.toString()).to.equal('hey')
fsB.removeListener('Z', shouldNotHappen)
Expand All @@ -102,8 +101,6 @@ describe('basics between 2 nodes', () => {
})

it('Publish to a topic:Z in nodeB', (done) => {
fsB.once('Z', shouldNotHappen)

fsA.once('Z', (msg) => {
fsA.once('Z', shouldNotHappen)
expect(msg.data.toString()).to.equal('banana')
Expand Down Expand Up @@ -135,6 +132,7 @@ describe('basics between 2 nodes', () => {

if (++counter === 10) {
fsA.removeListener('Z', receivedMsg)
fsB.removeListener('Z', shouldNotHappen)
done()
}
}
Expand All @@ -157,6 +155,7 @@ describe('basics between 2 nodes', () => {

if (++counter === 10) {
fsA.removeListener('Z', receivedMsg)
fsB.removeListener('Z', shouldNotHappen)
done()
}
}
Expand All @@ -170,11 +169,14 @@ describe('basics between 2 nodes', () => {
fsA.unsubscribe('Z')
expect(fsA.subscriptions.size).to.equal(0)

setTimeout(() => {
fsB.once('floodsub:subscription-change', (changedPeerInfo, changedTopics, changedSubs) => {
expect(fsB.peers.size).to.equal(1)
expectSet(first(fsB.peers).topics, [])
expect(changedPeerInfo.id.toB58String()).to.equal(first(fsB.peers).info.id.toB58String())
expectSet(changedTopics, [])
expect(changedSubs).to.be.eql([{ topicCID: 'Z', subscribe: false }])
done()
}, 100)
})
})

it('Publish to a topic:Z in nodeA nodeB', (done) => {
Expand Down Expand Up @@ -242,22 +244,26 @@ describe('basics between 2 nodes', () => {
})

it('existing subscriptions are sent upon peer connection', (done) => {
nodeA.dial(nodeB.peerInfo, (err) => {
expect(err).to.not.exist()
setTimeout(() => {
expect(fsA.peers.size).to.equal(1)
expect(fsB.peers.size).to.equal(1)
parallel([
cb => fsA.once('floodsub:subscription-change', () => cb()),
cb => fsB.once('floodsub:subscription-change', () => cb())
], () => {
expect(fsA.peers.size).to.equal(1)
expect(fsB.peers.size).to.equal(1)

expectSet(fsA.subscriptions, ['Za'])
expect(fsB.peers.size).to.equal(1)
expectSet(first(fsB.peers).topics, ['Za'])
expectSet(fsA.subscriptions, ['Za'])
expect(fsB.peers.size).to.equal(1)
expectSet(first(fsB.peers).topics, ['Za'])

expectSet(fsB.subscriptions, ['Zb'])
expect(fsA.peers.size).to.equal(1)
expectSet(first(fsA.peers).topics, ['Zb'])
expectSet(fsB.subscriptions, ['Zb'])
expect(fsA.peers.size).to.equal(1)
expectSet(first(fsA.peers).topics, ['Zb'])

done()
}, 1000)
done()
})

nodeA.dial(nodeB.peerInfo, (err) => {
expect(err).to.not.exist()
})
})

Expand Down
20 changes: 12 additions & 8 deletions test/multiple-nodes.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,46 +59,50 @@ describe('multiple nodes (more than 2)', () => {
], (err) => {
expect(err).to.not.exist()
// wait for the pubsub pipes to be established
setTimeout(done, 200)
setTimeout(done, 1000)
})
})

it('subscribe to the topic on node a', (done) => {
a.ps.subscribe('Z')
expectSet(a.ps.subscriptions, ['Z'])

setTimeout(() => {
b.ps.once('floodsub:subscription-change', () => {
expect(b.ps.peers.size).to.equal(2)
const topics = Array.from(b.ps.peers.values())[1].topics
const aPeerId = a.libp2p.peerInfo.id.toB58String()
const topics = b.ps.peers.get(aPeerId).topics
expectSet(topics, ['Z'])

expect(c.ps.peers.size).to.equal(1)
expectSet(first(c.ps.peers).topics, [])

done()
}, 200)
})
})

it('subscribe to the topic on node b', (done) => {
b.ps.subscribe('Z')
expectSet(b.ps.subscriptions, ['Z'])

setTimeout(() => {
parallel([
cb => a.ps.once('floodsub:subscription-change', () => cb()),
cb => c.ps.once('floodsub:subscription-change', () => cb())
], () => {
expect(a.ps.peers.size).to.equal(1)
expectSet(first(a.ps.peers).topics, ['Z'])

expect(c.ps.peers.size).to.equal(1)
expectSet(first(c.ps.peers).topics, ['Z'])

done()
}, 200)
})
})

it('subscribe to the topic on node c', (done) => {
c.ps.subscribe('Z')
expectSet(c.ps.subscriptions, ['Z'])

setTimeout(() => {
b.ps.once('floodsub:subscription-change', () => {
expect(a.ps.peers.size).to.equal(1)
expectSet(first(a.ps.peers).topics, ['Z'])

Expand All @@ -108,7 +112,7 @@ describe('multiple nodes (more than 2)', () => {
})

done()
}, 200)
})
})

it('publish on node a', (done) => {
Expand Down

0 comments on commit 7611b2e

Please sign in to comment.