Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

fix(pubsub): dynamic topics to avoid race conditions #151

Merged
merged 1 commit into from
Aug 29, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ module.exports = (common) => {
describe('.pubsub', function () {
this.timeout(20 * 1000)

const topic = 'pubsub-tests'
const getTopic = () => 'pubsub-tests-' + Math.random()

describe('callback API', () => {
let ipfs1
Expand Down Expand Up @@ -98,20 +98,23 @@ module.exports = (common) => {
describe('single node', () => {
describe('.publish', () => {
it('errors on string messags', (done) => {
const topic = getTopic()
ipfs1.pubsub.publish(topic, 'hello friend', (err) => {
expect(err).to.exist()
done()
})
})

it('message from buffer', (done) => {
const topic = getTopic()
ipfs1.pubsub.publish(topic, Buffer.from('hello friend'), done)
})
})

describe('.subscribe', () => {
it('to one topic', (done) => {
const check = makeCheck(2, done)
const topic = getTopic()

const handler = (msg) => {
expect(msg.data.toString()).to.equal('hi')
Expand All @@ -136,6 +139,8 @@ module.exports = (common) => {
})

it('attaches multiple event listeners', (done) => {
const topic = getTopic()

const check = makeCheck(3, done)
const handler1 = (msg) => {
expect(msg.data.toString()).to.eql('hello')
Expand Down Expand Up @@ -176,6 +181,7 @@ module.exports = (common) => {

it('discover options', (done) => {
const check = makeCheck(2, done)
const topic = getTopic()

const handler = (msg) => {
expect(msg.data.toString()).to.be.eql('hi')
Expand Down Expand Up @@ -210,6 +216,7 @@ module.exports = (common) => {

describe('.peers', () => {
it('does not error when not subscribed to a topic', (done) => {
const topic = getTopic()
ipfs1.pubsub.peers(topic, (err, peers) => {
expect(err).to.not.exist()
// Should be empty() but as mentioned below go-ipfs returns more than it should
Expand All @@ -226,6 +233,7 @@ module.exports = (common) => {
const sub2 = (msg) => {}
const sub3 = (msg) => {}

const topic = getTopic()
const topicOther = topic + 'different topic'

series([
Expand Down Expand Up @@ -254,6 +262,7 @@ module.exports = (common) => {
const sub1 = (msg) => {}
const sub2 = (msg) => {}
const sub3 = (msg) => {}
const topic = getTopic()

series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
Expand All @@ -274,6 +283,7 @@ module.exports = (common) => {
const sub1 = (msg) => {}
const sub2 = (msg) => {}
const sub3 = (msg) => {}
const topic = getTopic()

series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
Expand Down Expand Up @@ -305,6 +315,7 @@ module.exports = (common) => {

it('list with 1 subscribed topic', (done) => {
const sub1 = (msg) => {}
const topic = getTopic()

ipfs1.pubsub.subscribe(topic, sub1, (err) => {
expect(err).to.not.exist()
Expand Down Expand Up @@ -358,6 +369,7 @@ module.exports = (common) => {
it('receive messages from different node', (done) => {
const check = makeCheck(3, done)
const expectedString = 'hello from the other side'
const topic = getTopic()

const sub1 = (msg) => {
expect(msg.data.toString()).to.be.eql(expectedString)
Expand Down Expand Up @@ -388,6 +400,7 @@ module.exports = (common) => {
const check = makeCheck(3, done)
const expectedHex = 'a36161636179656162830103056164a16466666666f4'
const buffer = Buffer.from(expectedHex, 'hex')
const topic = getTopic()

const sub1 = (msg) => {
try {
Expand Down Expand Up @@ -428,6 +441,7 @@ module.exports = (common) => {
const inbox1 = []
const inbox2 = []
const outbox = ['hello', 'world', 'this', 'is', 'pubsub']
const topic = getTopic()

const check = makeCheck(outbox.length * 3, (err) => {
ipfs1.pubsub.unsubscribe(topic, sub1)
Expand Down Expand Up @@ -479,6 +493,7 @@ module.exports = (common) => {
it('call publish 1k times', (done) => {
const count = 1000
let sendCount = 0
const topic = getTopic()

whilst(
() => sendCount < count,
Expand All @@ -499,6 +514,7 @@ module.exports = (common) => {
let receivedCount = 0
let startTime
let counter = 0
const topic = getTopic()

const sub1 = (msg) => {
// go-ipfs can't send messages in order when there are
Expand Down Expand Up @@ -555,7 +571,7 @@ module.exports = (common) => {
let sendCount = 0
const handlers = []

const someTopic = 'some-other-topic'
const someTopic = getTopic()

whilst(
() => sendCount < count,
Expand Down Expand Up @@ -608,6 +624,8 @@ module.exports = (common) => {
})

it('.subscribe and .publish', () => {
const topic = getTopic()

const sub = (msg) => {
expect(msg.data.toString()).to.be.eql('hi')
ipfs1.pubsub.unsubscribe(topic, sub)
Expand All @@ -619,6 +637,7 @@ module.exports = (common) => {

it('.peers', () => {
const sub = (msg) => {}
const topic = getTopic()

return ipfs1.pubsub.subscribe(topic, sub)
.then(() => ipfs1.pubsub.peers(topic))
Expand Down