From 562a0bb9a13106c3a2d3445f85b32a048a351420 Mon Sep 17 00:00:00 2001 From: David Dias Date: Sat, 10 Dec 2016 13:07:09 -0800 Subject: [PATCH 1/8] chore: update contributors --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 0469623b..780bcb9e 100644 --- a/package.json +++ b/package.json @@ -44,10 +44,10 @@ "contributors": [ "David Dias ", "Friedel Ziegelmayer ", - "Greenkeeper ", "Richard Littauer ", "Stephen Whitmore ", "Victor Bjelkholm ", + "greenkeeperio-bot ", "nginnever " ] } \ No newline at end of file From e9506ffd0a1ab737d6a1506a20f5750f48e34ad9 Mon Sep 17 00:00:00 2001 From: David Dias Date: Sat, 10 Dec 2016 13:07:10 -0800 Subject: [PATCH 2/8] chore: release version v0.22.1 --- CHANGELOG.md | 10 ++++++++++ package.json | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ae301fd..69bdd2ae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ + +## [0.22.1](https://github.com/ipfs/interface-ipfs-core/compare/v0.22.0...v0.22.1) (2016-12-10) + + +### Features + +* **object:** add template option to object.new ([2f23617](https://github.com/ipfs/interface-ipfs-core/commit/2f23617)) + + + # [0.22.0](https://github.com/ipfs/interface-ipfs-core/compare/v0.21.0...v0.22.0) (2016-11-24) diff --git a/package.json b/package.json index 780bcb9e..7cf65ce5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "interface-ipfs-core", - "version": "0.22.0", + "version": "0.22.1", "description": "A test suite and interface you can use to implement a IPFS core interface.", "main": "src/index.js", "scripts": { From 56cd45edd5b1ac25220196ea59e9c51f32e1d57d Mon Sep 17 00:00:00 2001 From: David Dias Date: Mon, 19 Dec 2016 09:32:12 +0000 Subject: [PATCH 3/8] chore: update deps --- package.json | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/package.json b/package.json index 7cf65ce5..68e00eeb 100644 --- a/package.json +++ b/package.json @@ -28,18 +28,18 @@ "dependencies": { "async": "^2.1.4", "bl": "^1.1.2", - "bs58": "^3.0.0", + "bs58": "^3.1.0", "chai": "^3.5.0", "concat-stream": "^1.5.2", "detect-node": "^2.0.3", - "ipfs-block": "^0.5.0", - "ipld-dag-pb": "^0.9.0", + "ipfs-block": "^0.5.3", + "ipld-dag-pb": "^0.9.3", "multiaddr": "^2.1.1", - "multihashes": "^0.3.0", - "readable-stream": "2.1.5" + "multihashes": "^0.3.1", + "readable-stream": "2.2.2" }, "devDependencies": { - "aegir": "^9.2.1" + "aegir": "^9.3.0" }, "contributors": [ "David Dias ", From 74003a7712775e12d3a9d2d12ec2840a51514c49 Mon Sep 17 00:00:00 2001 From: haad Date: Mon, 5 Dec 2016 13:06:43 +0100 Subject: [PATCH 4/8] feat: add first pass of pubsub tests (running in js-ipfs-api) --- src/index.js | 2 + src/pubsub-message.js | 100 +++++++++ src/pubsub.js | 508 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 610 insertions(+) create mode 100644 src/pubsub-message.js create mode 100644 src/pubsub.js diff --git a/src/index.js b/src/index.js index 33865f9b..50b2efff 100644 --- a/src/index.js +++ b/src/index.js @@ -8,3 +8,5 @@ exports.generic = require('./generic') exports.swarm = require('./swarm') exports.block = require('./block') exports.dht = require('./dht') +exports.pubsub = require('./pubsub') +exports.pubsubMessage = require('./pubsub-message') diff --git a/src/pubsub-message.js b/src/pubsub-message.js new file mode 100644 index 00000000..d48f4a3f --- /dev/null +++ b/src/pubsub-message.js @@ -0,0 +1,100 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const isNode = require('detect-node') + +// NOTE! +// (Most of) these tests are skipped for now until we figure out the +// final data types for the messages coming over the wire + +const topicName = 'js-ipfs-api-tests' + +module.exports = (common, deps) => { + // Make sure the needed dependencies are injected + expect(deps.PubsubMessage).to.exist + expect(deps.PubsubMessageUtils).to.exist + + const PubsubMessage = deps.PubsubMessage // eslint-disable-line no-unused-vars + const PubsubMessageUtils = deps.PubsubMessageUtils // eslint-disable-line no-unused-vars + + // TESTS + describe('.pubsub-message', () => { + if (!isNode) { + return + } + + it.skip('create message', () => { + // TODO + }) + + it.skip('deserialize message from JSON object', () => { + const obj = { + from: 'BI:ۛv�m�uyѱ����tU�+��#���V', + data: 'aGk=', + seqno: 'FIlj2BpyEgI=', + topicIDs: [ topicName ] + } + try { + const message = PubsubMessageUtils.deserialize(obj) + expect(message.from).to.equal('AAA') + expect(message.data).to.equal('hi') + expect(message.seqno).to.equal('\u0014�c�\u001ar\u0012\u0002') + expect(message.topicIDs.length).to.equal(1) + expect(message.topicIDs[0]).to.equal(topicName) + } catch (e) { + expect(e).to.not.exist + } + }) + + describe('immutable properties', () => { + const sender = 'A' + const data = 'hello' + const seqno = '123' + const topicIDs = ['hello world'] + + const message = PubsubMessageUtils.create(sender, data, seqno, topicIDs) + + it('from', () => { + try { + message.from = 'not allowed' + } catch (e) { + expect(e).to.be.an('error') + expect(e.toString()).to.equal(`TypeError: Cannot set property from of # which has only a getter`) + } + expect(message.from).to.equal(sender) + }) + + it('data', () => { + try { + message.data = 'not allowed' + } catch (e) { + expect(e).to.be.an('error') + expect(e.toString()).to.equal(`TypeError: Cannot set property data of # which has only a getter`) + } + expect(message.data).to.equal(data) + }) + + it('seqno', () => { + try { + message.seqno = 'not allowed' + } catch (e) { + expect(e).to.be.an('error') + expect(e.toString()).to.equal(`TypeError: Cannot set property seqno of # which has only a getter`) + } + expect(message.seqno).to.equal(seqno) + }) + + it('topicIDs', () => { + try { + message.topicIDs = ['not allowed'] + } catch (e) { + expect(e).to.be.an('error') + expect(e.toString()).to.equal(`TypeError: Cannot set property topicIDs of # which has only a getter`) + } + expect(message.topicIDs[0]).to.equal(topicIDs[0]) + expect(message.topicIDs.length).to.equal(topicIDs.length) + }) + }) + }) +} diff --git a/src/pubsub.js b/src/pubsub.js new file mode 100644 index 00000000..89c90611 --- /dev/null +++ b/src/pubsub.js @@ -0,0 +1,508 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ['error', 8] */ +'use strict' + +const expect = require('chai').expect +const isNode = require('detect-node') +const series = require('async/series') + +const topicName = 'js-ipfs-api-tests' + +const publish = (ipfs, data, callback) => { + ipfs.pubsub.publish(topicName, data, (err) => { + expect(err).to.not.exist + callback() + }) +} + +const waitForPeers = (ipfs, peersToWait, callback) => { + const i = setInterval(() => { + ipfs.pubsub.peers(topicName, (err, peers) => { + if (err) { + return callback(err) + } + + const hasAllPeers = peersToWait.map((e) => peers.includes(e)).filter((e) => e === false).length === 0 + if (hasAllPeers) { + clearInterval(i) + callback(null) + } + }) + }, 1000) +} + +module.exports = (common) => { + describe('.pubsub', () => { + if (!isNode) { + return + } + + let ipfs, ipfs2 + + before((done) => { + // CI takes longer to instantiate the daemon, + // so we need to increase the timeout for the + // before step + common.setup((err, factory) => { + expect(err).to.not.exist + series([ + (cb) => { + factory.spawnNode((err, node) => { + expect(err).to.not.exist + ipfs = node + ipfs.id().then((res) => { + ipfs.PeerId = res.id + cb() + }) + }) + }, + (cb) => { + factory.spawnNode((err, node) => { + expect(err).to.not.exist + ipfs2 = node + ipfs2.id().then((res) => { + ipfs2.PeerId = res.id + cb() + }) + }) + } + ], done) + }) + }) + + after((done) => { + common.teardown(done) + }) + + describe('publish', () => { + it('message from string', (done) => { + publish(ipfs, 'hello friend', done) + }) + + it('message from buffer', (done) => { + publish(ipfs, new Buffer('hello friend'), done) + }) + }) + + describe('subscribe', () => { + it('one topic', (done) => { + ipfs.pubsub.subscribe(topicName, (err, subscription) => { + expect(err).to.not.exist + + subscription.on('data', (d) => { + expect(d.data).to.equal('hi') + subscription.cancel(done) + }) + + ipfs.pubsub.publish(topicName, 'hi', (err) => { + expect(err).to.not.exist + }) + }) + }) + + it('cancels a subscription', (done) => { + ipfs.pubsub.subscribe(topicName, (err, subscription) => { + expect(err).to.not.exist + subscription.cancel(done) + }) + }) + + it('closes the subscription stream', (done) => { + ipfs.pubsub.subscribe(topicName, (err, subscription) => { + expect(err).to.not.exist + subscription.on('end', done) + subscription.cancel() + }) + }) + + it('returns an error when already subscribed', (done) => { + ipfs.pubsub.subscribe(topicName) + .then((firstSub) => { + ipfs.pubsub.subscribe(topicName) + .then((secondSub) => { + expect(secondSub).to.not.exist + done("Shouldn't get here!") + }) + .catch((secondErr) => { + expect(secondErr).to.be.an('error') + expect(secondErr.toString()).to.equal(`Error: Already subscribed to '${topicName}'`) + firstSub.cancel(done) + }) + }) + .catch(done) + }) + + it('takes options as an argument', (done) => { + ipfs.pubsub.subscribe(topicName, { discover: true }, (err, subscription) => { + expect(err).to.not.exist + + subscription.on('data', (d) => { + expect(d.data).to.equal('hi') + subscription.cancel(done) + }) + + ipfs.pubsub.publish(topicName, 'hi', (err) => { + expect(err).to.not.exist + }) + }) + }) + }) + + describe('peers', () => { + it('returns an error when not subscribed to a topic', (done) => { + ipfs.pubsub.peers(topicName, (err, peers) => { + expect(err).to.be.an('error') + expect(err.toString()).to.equal(`Error: Not subscribed to '${topicName}'`) + done() + }) + }) + + it.skip('returns no peers within 10 seconds', (done) => { + // Currently go-ipfs returns peers that have not been subscribed to the topic + // Enable when go-ipfs has been fixed + ipfs.pubsub.subscribe(topicName, (err, subscription) => { + expect(err).to.not.exist + + setTimeout(() => { + ipfs.pubsub.peers(topicName, (err, peers) => { + expect(err).to.not.exist + expect(peers.length).to.equal(0) + subscription.cancel(done) + }) + }, 10000) + }) + }) + + it.skip('doesn\'t return extra peers', (done) => { + // Currently go-ipfs returns peers that have not been subscribed to the topic + // Enable when go-ipfs has been fixed + ipfs.pubsub.subscribe(topicName, (err, subscription1) => { + expect(err).to.not.exist + + ipfs2.pubsub.subscribe(topicName + 'different topic', (err, subscription2) => { + expect(err).to.not.exist + + setTimeout(() => { + ipfs.pubsub.peers(topicName, (err, peers) => { + expect(err).to.not.exist + expect(peers.length).to.equal(0) + + subscription1.cancel(() => { + subscription2.cancel(done) + }) + }) + }, 10000) + }) + }) + }) + + it.skip('returns peers for a topic - one peer', (done) => { + // Currently go-ipfs returns peers that have not been subscribed to the topic + // Enable when go-ipfs has been fixed + const peersToWait = [ipfs2.PeerId] + + ipfs2.pubsub.subscribe(topicName, (err, subscription) => { + expect(err).to.not.exist + + const i = setInterval(() => { + ipfs.pubsub.peers(topicName, (err, peers) => { + if (err) { + expect(err).to.not.exist + done(err) + } + + console.log(peers) + + const hasAllPeers = peersToWait + .map((e) => peers.indexOf(e) !== -1) + .filter((e) => e === false) + .length === 0 + + if (hasAllPeers) { + clearInterval(i) + expect(peers.length).to.equal(peersToWait.length) + subscription.cancel(done) + } + }) + }, 1000) + }) + }) + + it.skip('lists peers for a topic - multiple peers', (done) => { + // TODO + }) + }) + + describe('ls', () => { + it('lists no subscribed topics', (done) => { + ipfs.pubsub.ls((err, topics) => { + expect(err).to.not.exist + expect(topics.length).to.equal(0) + done() + }) + }) + + it('lists 1 subscribed topic', (done) => { + ipfs.pubsub.subscribe(topicName, (err, subscription) => { + expect(err).to.not.exist + + ipfs.pubsub.ls((err, topics) => { + expect(err).to.not.exist + expect(topics.length).to.equal(1) + expect(topics[0]).to.equal(topicName) + subscription.cancel(done) + }) + }) + }) + + it('lists all subscribed topics', (done) => { + let topics = ['one', 'two', 'three'] + let subscriptions = topics.map((e) => ipfs.pubsub.subscribe(e)) + Promise.all(subscriptions) + .then((subscriptions) => { + ipfs.pubsub.ls((err, result) => { + expect(err).to.not.exist + expect(result.length).to.equal(3) + result.forEach((e) => { + expect(topics.indexOf(e) !== -1).to.be.true + }) + Promise.all(subscriptions.map((s) => s.cancel())) + .then(() => done()) + .catch(done) + }) + }) + .catch(done) + }) + }) + + describe('send and receive messages', () => { + it('receive messages from different node', (done) => { + const expectedString = 'hello from the other side' + + ipfs.pubsub.subscribe(topicName, (err, subscription) => { + expect(err).to.not.exist + expect(subscription).to.exist + + subscription.on('data', (d) => { + expect(d.data).to.be.equal(expectedString) + subscription.cancel(done) + }) + + waitForPeers(ipfs2, [ipfs.PeerId], (err) => { + expect(err).to.not.exist + ipfs2.pubsub.publish(topicName, expectedString, (err) => { + expect(err).to.not.exist + }) + }) + }) + }) + + it('receive multiple messages', (done) => { + let receivedMessages = [] + const expectedMessages = 2 + + ipfs.pubsub.subscribe(topicName, (err, subscription) => { + expect(err).to.not.exists + + subscription.on('data', (d) => { + receivedMessages.push(d.data) + if (receivedMessages.length === expectedMessages) { + receivedMessages.forEach((msg) => { + expect(msg).to.be.equal('hi') + }) + subscription.cancel(done) + } + }) + + waitForPeers(ipfs2, [ipfs.PeerId], (err) => { + expect(err).to.not.exist + ipfs2.pubsub.publish(topicName, 'hi') + ipfs2.pubsub.publish(topicName, 'hi') + }) + }) + }) + }) + + describe('promises', () => { + it('subscribe', (done) => { + ipfs.pubsub.subscribe(topicName) + .then((subscription) => { + expect(subscription).to.exist + subscription.cancel(done) + }) + .catch(done) + }) + + it('publish', (done) => { + ipfs.pubsub.subscribe(topicName) + .then((subscription) => { + return ipfs.pubsub.publish(topicName, 'hi') + .then(() => subscription) + }) + .then((subscription) => subscription.cancel(done)) + .catch(done) + }) + + it('cancel subscription', (done) => { + ipfs.pubsub.subscribe(topicName) + .then((subscription) => subscription.cancel()) + .then(() => done()) + .catch(done) + }) + + it('peers', (done) => { + let s + ipfs.pubsub.subscribe(topicName) + .then((subscription) => { + s = subscription + return ipfs.pubsub.peers(topicName) + }) + .then((peers) => { + expect(peers).to.exist + s.cancel(done) + }) + .catch(done) + }) + + it('topics', (done) => { + ipfs.pubsub.ls() + .then((topics) => { + expect(topics).to.exist + expect(topics.length).to.equal(0) + done() + }) + .catch(done) + }) + }) + + describe('load tests', () => { + it('send/receive 10k messages', (done) => { + const expectedString = 'hello' + const count = 10000 + let sendCount = 0 + let receivedCount = 0 + let startTime + + ipfs.pubsub.subscribe(topicName, (err, subscription) => { + expect(err).to.not.exists + + const outputProgress = () => { + process.stdout.write(' \r') + process.stdout.write('Sent: ' + sendCount + ' of ' + count + ', Received: ' + receivedCount + '\r') + } + + subscription.on('data', (d) => { + expect(d.data).to.be.equal(expectedString) + receivedCount++ + outputProgress() + if (receivedCount >= count) { + const duration = new Date().getTime() - startTime + process.stdout.write(' \r') + console.log(`Send/Receive 10k messages took: ${duration} ms, ${Math.floor(count / (duration / 1000))} ops / s`) + subscription.cancel(done) + } + }) + + const loop = () => { + if (sendCount < count) { + sendCount++ + outputProgress() + ipfs2.pubsub.publish(topicName, expectedString, (err) => { + expect(err).to.not.exist + process.nextTick(() => loop()) + }) + } + } + + waitForPeers(ipfs, [ipfs2.PeerId], (err) => { + expect(err).to.not.exist + startTime = new Date().getTime() + loop() + }) + }) + }) + + it('call publish 1k times', (done) => { + const expectedString = 'hello' + const count = 1000 + let sendCount = 0 + + const loop = () => { + if (sendCount < count) { + sendCount++ + process.stdout.write(' \r') + process.stdout.write('Sending messages: ' + sendCount + ' of ' + count + '\r') + ipfs.pubsub.publish(topicName, expectedString, (err) => { + expect(err).to.not.exist + process.nextTick(() => loop()) + }) + } else { + done() + } + } + loop() + }) + + it('call subscribe 1k times', (done) => { + const count = 1000 + let sendCount = 0 + let receivedCount = 0 + let subscription = null + + const loop = () => { + if (sendCount < count) { + sendCount++ + process.stdout.write(' \r') + process.stdout.write('Subscribing: ' + sendCount + ' of ' + count + '\r') + ipfs.pubsub.subscribe(topicName, (err, res) => { + receivedCount++ + // First call should go through normally + if (receivedCount === 1) { + expect(err).to.not.exist + expect(res).to.exist + subscription = res + } else { + // Subsequent calls should return "error, duplicate subscription" + expect(err).to.exist + } + process.nextTick(() => loop()) + }) + } else { + subscription.cancel(done) + } + } + loop() + }) + + it('subscribe/unsubscribe 1k times', (done) => { + const count = 1000 + let sendCount = 0 + let receivedCount = 0 + + const outputProgress = () => { + process.stdout.write(' \r') + process.stdout.write('Subscribe: ' + sendCount + ' of ' + count + ', Unsubscribe: ' + receivedCount + '\r') + } + + const loop = () => { + if (sendCount < count) { + sendCount++ + outputProgress() + ipfs.pubsub.subscribe(topicName, (err, subscription) => { + expect(err).to.not.exist + subscription.cancel((err) => { + receivedCount++ + outputProgress() + expect(err).to.not.exist + process.nextTick(() => loop()) + }) + }) + } else { + done() + } + } + loop() + }) + }) + }) +} From eb65994e050062dc7099b4847ad5372e89d9095e Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Dec 2016 20:06:00 +0000 Subject: [PATCH 5/8] moved pubsub-message tests to js-ipfs-api as it is specific to js-ipfs-api --- src/pubsub-message.js | 100 ------------------------------------------ 1 file changed, 100 deletions(-) delete mode 100644 src/pubsub-message.js diff --git a/src/pubsub-message.js b/src/pubsub-message.js deleted file mode 100644 index d48f4a3f..00000000 --- a/src/pubsub-message.js +++ /dev/null @@ -1,100 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const expect = require('chai').expect -const isNode = require('detect-node') - -// NOTE! -// (Most of) these tests are skipped for now until we figure out the -// final data types for the messages coming over the wire - -const topicName = 'js-ipfs-api-tests' - -module.exports = (common, deps) => { - // Make sure the needed dependencies are injected - expect(deps.PubsubMessage).to.exist - expect(deps.PubsubMessageUtils).to.exist - - const PubsubMessage = deps.PubsubMessage // eslint-disable-line no-unused-vars - const PubsubMessageUtils = deps.PubsubMessageUtils // eslint-disable-line no-unused-vars - - // TESTS - describe('.pubsub-message', () => { - if (!isNode) { - return - } - - it.skip('create message', () => { - // TODO - }) - - it.skip('deserialize message from JSON object', () => { - const obj = { - from: 'BI:ۛv�m�uyѱ����tU�+��#���V', - data: 'aGk=', - seqno: 'FIlj2BpyEgI=', - topicIDs: [ topicName ] - } - try { - const message = PubsubMessageUtils.deserialize(obj) - expect(message.from).to.equal('AAA') - expect(message.data).to.equal('hi') - expect(message.seqno).to.equal('\u0014�c�\u001ar\u0012\u0002') - expect(message.topicIDs.length).to.equal(1) - expect(message.topicIDs[0]).to.equal(topicName) - } catch (e) { - expect(e).to.not.exist - } - }) - - describe('immutable properties', () => { - const sender = 'A' - const data = 'hello' - const seqno = '123' - const topicIDs = ['hello world'] - - const message = PubsubMessageUtils.create(sender, data, seqno, topicIDs) - - it('from', () => { - try { - message.from = 'not allowed' - } catch (e) { - expect(e).to.be.an('error') - expect(e.toString()).to.equal(`TypeError: Cannot set property from of # which has only a getter`) - } - expect(message.from).to.equal(sender) - }) - - it('data', () => { - try { - message.data = 'not allowed' - } catch (e) { - expect(e).to.be.an('error') - expect(e.toString()).to.equal(`TypeError: Cannot set property data of # which has only a getter`) - } - expect(message.data).to.equal(data) - }) - - it('seqno', () => { - try { - message.seqno = 'not allowed' - } catch (e) { - expect(e).to.be.an('error') - expect(e.toString()).to.equal(`TypeError: Cannot set property seqno of # which has only a getter`) - } - expect(message.seqno).to.equal(seqno) - }) - - it('topicIDs', () => { - try { - message.topicIDs = ['not allowed'] - } catch (e) { - expect(e).to.be.an('error') - expect(e.toString()).to.equal(`TypeError: Cannot set property topicIDs of # which has only a getter`) - } - expect(message.topicIDs[0]).to.equal(topicIDs[0]) - expect(message.topicIDs.length).to.equal(topicIDs.length) - }) - }) - }) -} From a63988b1243d58ce0aebd8e40894957e8760ce27 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 6 Dec 2016 13:25:45 -0800 Subject: [PATCH 6/8] docs: add PubSub interface spec, refactor tests --- API/pubsub/README.md | 63 ++++ src/index.js | 1 - src/pubsub.js | 790 ++++++++++++++++++++++--------------------- 3 files changed, 464 insertions(+), 390 deletions(-) create mode 100644 API/pubsub/README.md diff --git a/API/pubsub/README.md b/API/pubsub/README.md new file mode 100644 index 00000000..0b6cf78f --- /dev/null +++ b/API/pubsub/README.md @@ -0,0 +1,63 @@ +pubsub API +========== + +#### `pubsub.subscribe` + +> Subscribe to an IPFS Topic + +##### `Go` **WIP** + +##### `JavaScript` - ipfs.pubsub.subscribe(topic, options, callback) + +- `topic` - type: String +- `options` - type: Object, optional, might contain the following properties: + - `discover`: type: Boolean - Will use the DHT to find + +`callback` must follow `function (err, subscription) {}` where Subscription is a Node.js Stream in Object mode, emiting a `data` event for each new message on the subscribed topic.`err` is an error if the operation was not successful. + +`subscription` has a `.cancel` event in order to cancel the subscription. + +If no `callback` is passed, a [promise][] is returned. + +> _In the future, topic can also be type of TopicDescriptor (https://github.com/libp2p/pubsub-notes/blob/master/flooding/flooding.proto#L23). However, for now, only strings are supported._ + +#### `pubsub.publish` + +> Publish a data message to a pubsub topic + +##### `Go` **WIP** + +##### `JavaScript` - ipfs.pubsub.publish(topic, data, callback) + +- `topic` - type: String +- `data` - type: Buffer + +`callback` must follow `function (err) {}` signature, where `err` is an error if the operation was not successful. + +If no `callback` is passed, a [promise][] is returned. + +#### `pubsub.ls` + +> Returns the list of subscriptions the peer is subscribed to. + +##### `Go` **WIP** + +##### `JavaScript` - ipfs.pubsub.ls(topic, callback) + +`callback` must follow `function (err) {}` signature, where `err` is an error if the operation was not successful. + +If no `callback` is passed, a [promise][] is returned. + +#### `pubsub.peers` + +> Returns the peers that are subscribed to one topic. + +##### `Go` **WIP** + +##### `JavaScript` - ipfs.pubsub.peers(topic, callback) + +- `topic` - type: String + +`callback` must follow `function (err) {}` signature, where `err` is an error if the operation was not successful. + +If no `callback` is passed, a [promise][] is returned. diff --git a/src/index.js b/src/index.js index 50b2efff..24b2e4b3 100644 --- a/src/index.js +++ b/src/index.js @@ -9,4 +9,3 @@ exports.swarm = require('./swarm') exports.block = require('./block') exports.dht = require('./dht') exports.pubsub = require('./pubsub') -exports.pubsubMessage = require('./pubsub-message') diff --git a/src/pubsub.js b/src/pubsub.js index 89c90611..bbaf96ee 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -3,505 +3,517 @@ 'use strict' const expect = require('chai').expect -const isNode = require('detect-node') const series = require('async/series') -const topicName = 'js-ipfs-api-tests' - -const publish = (ipfs, data, callback) => { - ipfs.pubsub.publish(topicName, data, (err) => { - expect(err).to.not.exist - callback() - }) -} - -const waitForPeers = (ipfs, peersToWait, callback) => { - const i = setInterval(() => { - ipfs.pubsub.peers(topicName, (err, peers) => { - if (err) { - return callback(err) - } - - const hasAllPeers = peersToWait.map((e) => peers.includes(e)).filter((e) => e === false).length === 0 - if (hasAllPeers) { - clearInterval(i) - callback(null) - } - }) - }, 1000) -} - module.exports = (common) => { describe('.pubsub', () => { - if (!isNode) { - return - } - - let ipfs, ipfs2 - - before((done) => { - // CI takes longer to instantiate the daemon, - // so we need to increase the timeout for the - // before step - common.setup((err, factory) => { - expect(err).to.not.exist - series([ - (cb) => { - factory.spawnNode((err, node) => { - expect(err).to.not.exist - ipfs = node - ipfs.id().then((res) => { - ipfs.PeerId = res.id - cb() - }) - }) - }, - (cb) => { - factory.spawnNode((err, node) => { - expect(err).to.not.exist - ipfs2 = node - ipfs2.id().then((res) => { - ipfs2.PeerId = res.id - cb() - }) - }) - } - ], done) - }) - }) + const topic = 'pubsub-tests' - after((done) => { - common.teardown(done) - }) + describe('callback API', () => { + let ipfs1 + let ipfs2 - describe('publish', () => { - it('message from string', (done) => { - publish(ipfs, 'hello friend', done) + before((done) => { + // CI takes longer to instantiate the daemon, + // so we need to increase the timeout for the + // before step + common.setup((err, factory) => { + expect(err).to.not.exist + series([ + (cb) => { + factory.spawnNode((err, node) => { + expect(err).to.not.exist + ipfs1 = node + ipfs1.id().then((res) => { + ipfs1.peerId = res.id + cb() + }) + }) + }, + (cb) => { + factory.spawnNode((err, node) => { + expect(err).to.not.exist + ipfs2 = node + ipfs2.id().then((res) => { + ipfs2.peerId = res.id + cb() + }) + }) + } + ], done) + }) }) - it('message from buffer', (done) => { - publish(ipfs, new Buffer('hello friend'), done) + after((done) => { + common.teardown(done) }) - }) - describe('subscribe', () => { - it('one topic', (done) => { - ipfs.pubsub.subscribe(topicName, (err, subscription) => { - expect(err).to.not.exist + function waitForPeers (ipfs, peersToWait, callback) { + const i = setInterval(() => { + ipfs.pubsub.peers(topic, (err, peers) => { + if (err) { + return callback(err) + } - subscription.on('data', (d) => { - expect(d.data).to.equal('hi') - subscription.cancel(done) + const hasAllPeers = peersToWait + .map((e) => peers.includes(e)) + .filter((e) => e === false) + .length === 0 + if (hasAllPeers) { + clearInterval(i) + callback() + } }) + }, 1000) + } - ipfs.pubsub.publish(topicName, 'hi', (err) => { - expect(err).to.not.exist - }) + describe('.publish', () => { + it('message from string', (done) => { + ipfs1.pubsub.publish(topic, 'hello friend', done) }) - }) - it('cancels a subscription', (done) => { - ipfs.pubsub.subscribe(topicName, (err, subscription) => { - expect(err).to.not.exist - subscription.cancel(done) + it('message from buffer', (done) => { + ipfs1.pubsub.publish(topic, new Buffer('hello friend'), done) }) }) - it('closes the subscription stream', (done) => { - ipfs.pubsub.subscribe(topicName, (err, subscription) => { - expect(err).to.not.exist - subscription.on('end', done) - subscription.cancel() - }) - }) + describe('.subscribe', () => { + it('to one topic', (done) => { + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist - it('returns an error when already subscribed', (done) => { - ipfs.pubsub.subscribe(topicName) - .then((firstSub) => { - ipfs.pubsub.subscribe(topicName) - .then((secondSub) => { - expect(secondSub).to.not.exist - done("Shouldn't get here!") - }) - .catch((secondErr) => { - expect(secondErr).to.be.an('error') - expect(secondErr.toString()).to.equal(`Error: Already subscribed to '${topicName}'`) - firstSub.cancel(done) - }) - }) - .catch(done) - }) + subscription.on('data', (msg) => { + expect(msg.data).to.equal('hi') + subscription.cancel(done) + }) - it('takes options as an argument', (done) => { - ipfs.pubsub.subscribe(topicName, { discover: true }, (err, subscription) => { - expect(err).to.not.exist + ipfs1.pubsub.publish(topic, 'hi', (err) => { + expect(err).to.not.exist + }) + }) + }) - subscription.on('data', (d) => { - expect(d.data).to.equal('hi') - subscription.cancel(done) + it('errors on double subscription', (done) => { + series([ + (cb) => ipfs1.pubsub.subscribe(topic, cb), + (cb) => ipfs1.pubsub.subscribe(topic, cb) + ], (err, subs) => { + expect(err).to.exist + expect(err.toString()) + .to.eql(`Error: Already subscribed to '${topic}'`) + subs[0].cancel(done) }) + }) - ipfs.pubsub.publish(topicName, 'hi', (err) => { + it('discover options', (done) => { + ipfs1.pubsub.subscribe(topic, { + discover: true + }, (err, subscription) => { expect(err).to.not.exist + subscription.cancel(done) }) }) }) - }) - describe('peers', () => { - it('returns an error when not subscribed to a topic', (done) => { - ipfs.pubsub.peers(topicName, (err, peers) => { - expect(err).to.be.an('error') - expect(err.toString()).to.equal(`Error: Not subscribed to '${topicName}'`) - done() + describe('subscription', () => { + it('.cancel and wait for callback', (done) => { + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist + subscription.cancel(done) + }) }) - }) - it.skip('returns no peers within 10 seconds', (done) => { - // Currently go-ipfs returns peers that have not been subscribed to the topic - // Enable when go-ipfs has been fixed - ipfs.pubsub.subscribe(topicName, (err, subscription) => { - expect(err).to.not.exist - - setTimeout(() => { - ipfs.pubsub.peers(topicName, (err, peers) => { - expect(err).to.not.exist - expect(peers.length).to.equal(0) - subscription.cancel(done) - }) - }, 10000) + it('.cancel and wait for end event', (done) => { + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist + subscription.on('end', done) + subscription.cancel() + }) }) }) - it.skip('doesn\'t return extra peers', (done) => { - // Currently go-ipfs returns peers that have not been subscribed to the topic - // Enable when go-ipfs has been fixed - ipfs.pubsub.subscribe(topicName, (err, subscription1) => { - expect(err).to.not.exist + describe('.peers', () => { + // TODO clarify what is the goal of pubsub.peers + it('returns an error when not subscribed to a topic', (done) => { + ipfs1.pubsub.peers(topic, (err, peers) => { + expect(err).to.exist + expect(err.toString()).to.equal(`Error: Not subscribed to '${topic}'`) + done() + }) + }) - ipfs2.pubsub.subscribe(topicName + 'different topic', (err, subscription2) => { + it.skip('returns no peers within 10 seconds', (done) => { + // Currently go-ipfs returns peers that have not been + // subscribed to the topic. Enable when go-ipfs has been fixed + ipfs1.pubsub.subscribe(topic, (err, subscription) => { expect(err).to.not.exist setTimeout(() => { - ipfs.pubsub.peers(topicName, (err, peers) => { + ipfs1.pubsub.peers(topic, (err, peers) => { expect(err).to.not.exist expect(peers.length).to.equal(0) - - subscription1.cancel(() => { - subscription2.cancel(done) - }) + subscription.cancel(done) }) }, 10000) }) }) - }) - it.skip('returns peers for a topic - one peer', (done) => { - // Currently go-ipfs returns peers that have not been subscribed to the topic - // Enable when go-ipfs has been fixed - const peersToWait = [ipfs2.PeerId] + it.skip('doesn\'t return extra peers', (done) => { + // Currently go-ipfs returns peers that have not been + // subscribed to the topic. Enable when go-ipfs has been fixed + ipfs1.pubsub.subscribe(topic, (err, subscription1) => { + expect(err).to.not.exist - ipfs2.pubsub.subscribe(topicName, (err, subscription) => { - expect(err).to.not.exist + ipfs2.pubsub.subscribe(topic + 'different topic', (err, subscription2) => { + expect(err).to.not.exist - const i = setInterval(() => { - ipfs.pubsub.peers(topicName, (err, peers) => { - if (err) { - expect(err).to.not.exist - done(err) - } + setTimeout(() => { + ipfs1.pubsub.peers(topic, (err, peers) => { + expect(err).to.not.exist + expect(peers.length).to.equal(0) - console.log(peers) + subscription1.cancel(() => { + subscription2.cancel(done) + }) + }) + }, 10000) + }) + }) + }) - const hasAllPeers = peersToWait - .map((e) => peers.indexOf(e) !== -1) - .filter((e) => e === false) - .length === 0 + it.skip('returns peers for a topic - one peer', (done) => { + // Currently go-ipfs returns peers that have not been subscribed to the topic + // Enable when go-ipfs has been fixed + const peersToWait = [ipfs2.peerId] - if (hasAllPeers) { - clearInterval(i) - expect(peers.length).to.equal(peersToWait.length) - subscription.cancel(done) - } - }) - }, 1000) + ipfs2.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist + + const i = setInterval(() => { + ipfs1.pubsub.peers(topic, (err, peers) => { + if (err) { + expect(err).to.not.exist + done(err) + } + + console.log(peers) + + const hasAllPeers = peersToWait + .map((e) => peers.indexOf(e) !== -1) + .filter((e) => e === false) + .length === 0 + + if (hasAllPeers) { + clearInterval(i) + expect(peers.length).to.equal(peersToWait.length) + subscription.cancel(done) + } + }) + }, 1000) + }) }) - }) - it.skip('lists peers for a topic - multiple peers', (done) => { - // TODO + it.skip('lists peers for a topic - multiple peers', (done) => { + // TODO + }) }) - }) - describe('ls', () => { - it('lists no subscribed topics', (done) => { - ipfs.pubsub.ls((err, topics) => { - expect(err).to.not.exist - expect(topics.length).to.equal(0) - done() + describe('.ls', () => { + it('empty list when no topics are subscribed', (done) => { + ipfs1.pubsub.ls((err, topics) => { + expect(err).to.not.exist + expect(topics.length).to.equal(0) + done() + }) }) - }) - it('lists 1 subscribed topic', (done) => { - ipfs.pubsub.subscribe(topicName, (err, subscription) => { - expect(err).to.not.exist + it('list with 1 subscribed topic', (done) => { + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist - ipfs.pubsub.ls((err, topics) => { + ipfs1.pubsub.ls((err, topics) => { + expect(err).to.not.exist + expect(topics.length).to.equal(1) + expect(topics[0]).to.equal(topic) + subscription.cancel(done) + }) + }) + }) + + it('list with 3 subscribed topicss', (done) => { + const topics = ['one', 'two', 'three'] + series( + topics.map((t) => (cb) => ipfs1.pubsub.subscribe(t, cb)) + , (err, subs) => { expect(err).to.not.exist - expect(topics.length).to.equal(1) - expect(topics[0]).to.equal(topicName) - subscription.cancel(done) + ipfs1.pubsub.ls((err, list) => { + expect(err).to.not.exist + expect(list.length).to.equal(3) + expect(list).to.eql(topics) + series(subs.map((s) => (cb) => s.cancel(cb)), done) + }) }) }) }) - it('lists all subscribed topics', (done) => { - let topics = ['one', 'two', 'three'] - let subscriptions = topics.map((e) => ipfs.pubsub.subscribe(e)) - Promise.all(subscriptions) - .then((subscriptions) => { - ipfs.pubsub.ls((err, result) => { + describe('multiple nodes', () => { + it('receive messages from different node', (done) => { + const expectedString = 'hello from the other side' + + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist + expect(subscription).to.exist + + subscription.on('data', (d) => { + expect(d.data).to.be.equal(expectedString) + subscription.cancel(done) + }) + + waitForPeers(ipfs2, [ipfs1.peerId], (err) => { expect(err).to.not.exist - expect(result.length).to.equal(3) - result.forEach((e) => { - expect(topics.indexOf(e) !== -1).to.be.true + ipfs2.pubsub.publish(topic, expectedString, (err) => { + expect(err).to.not.exist }) - Promise.all(subscriptions.map((s) => s.cancel())) - .then(() => done()) - .catch(done) }) }) - .catch(done) - }) - }) + }) - describe('send and receive messages', () => { - it('receive messages from different node', (done) => { - const expectedString = 'hello from the other side' + it('receive multiple messages', (done) => { + let receivedMessages = [] + const expectedMessages = 2 - ipfs.pubsub.subscribe(topicName, (err, subscription) => { - expect(err).to.not.exist - expect(subscription).to.exist + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exists - subscription.on('data', (d) => { - expect(d.data).to.be.equal(expectedString) - subscription.cancel(done) - }) + subscription.on('data', (d) => { + receivedMessages.push(d.data) + if (receivedMessages.length === expectedMessages) { + receivedMessages.forEach((msg) => { + expect(msg).to.be.equal('hi') + }) + subscription.cancel(done) + } + }) - waitForPeers(ipfs2, [ipfs.PeerId], (err) => { - expect(err).to.not.exist - ipfs2.pubsub.publish(topicName, expectedString, (err) => { + waitForPeers(ipfs2, [ipfs1.peerId], (err) => { expect(err).to.not.exist + ipfs2.pubsub.publish(topic, 'hi') + ipfs2.pubsub.publish(topic, 'hi') }) }) }) }) - it('receive multiple messages', (done) => { - let receivedMessages = [] - const expectedMessages = 2 + describe('load tests', () => { + it('send/receive 10k messages', (done) => { + const expectedString = 'hello' + const count = 10000 + let sendCount = 0 + let receivedCount = 0 + let startTime + + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exists + + const outputProgress = () => { + process.stdout.write(' \r') + process.stdout.write('Sent: ' + sendCount + ' of ' + count + ', Received: ' + receivedCount + '\r') + } + + subscription.on('data', (d) => { + expect(d.data).to.be.equal(expectedString) + receivedCount++ + outputProgress() + if (receivedCount >= count) { + const duration = new Date().getTime() - startTime + process.stdout.write(' \r') + console.log(`Send/Receive 10k messages took: ${duration} ms, ${Math.floor(count / (duration / 1000))} ops / s`) + subscription.cancel(done) + } + }) + + function loop () { + if (sendCount < count) { + sendCount++ + outputProgress() + ipfs2.pubsub.publish(topic, expectedString, (err) => { + expect(err).to.not.exist + process.nextTick(() => loop()) + }) + } + } + + waitForPeers(ipfs1, [ipfs2.peerId], (err) => { + expect(err).to.not.exist + startTime = new Date().getTime() + loop() + }) + }) + }) + + it('call publish 1k times', (done) => { + const expectedString = 'hello' + const count = 1000 + let sendCount = 0 + + function loop () { + if (sendCount < count) { + sendCount++ + process.stdout.write(' \r') + process.stdout.write('Sending messages: ' + sendCount + ' of ' + count + '\r') + ipfs1.pubsub.publish(topic, expectedString, (err) => { + expect(err).to.not.exist + process.nextTick(() => loop()) + }) + } else { + done() + } + } + loop() + }) - ipfs.pubsub.subscribe(topicName, (err, subscription) => { - expect(err).to.not.exists + it('call subscribe 1k times', (done) => { + const count = 1000 + let sendCount = 0 + let receivedCount = 0 + let subscription = null - subscription.on('data', (d) => { - receivedMessages.push(d.data) - if (receivedMessages.length === expectedMessages) { - receivedMessages.forEach((msg) => { - expect(msg).to.be.equal('hi') + function loop () { + if (sendCount < count) { + sendCount++ + process.stdout.write(' \r') + process.stdout.write('Subscribing: ' + sendCount + ' of ' + count + '\r') + ipfs1.pubsub.subscribe(topic, (err, res) => { + receivedCount++ + // First call should go through normally + if (receivedCount === 1) { + expect(err).to.not.exist + expect(res).to.exist + subscription = res + } else { + // Subsequent calls should return "error, duplicate subscription" + expect(err).to.exist + } + process.nextTick(() => loop()) }) + } else { subscription.cancel(done) } - }) + } + loop() + }) - waitForPeers(ipfs2, [ipfs.PeerId], (err) => { - expect(err).to.not.exist - ipfs2.pubsub.publish(topicName, 'hi') - ipfs2.pubsub.publish(topicName, 'hi') - }) + it('subscribe/unsubscribe 1k times', (done) => { + const count = 1000 + let sendCount = 0 + let receivedCount = 0 + + function outputProgress () { + process.stdout.write(' \r') + process.stdout.write('Subscribe: ' + sendCount + ' of ' + count + ', Unsubscribe: ' + receivedCount + '\r') + } + + function loop () { + if (sendCount < count) { + sendCount++ + outputProgress() + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist + subscription.cancel((err) => { + expect(err).to.not.exist + receivedCount++ + outputProgress() + process.nextTick(() => loop()) + }) + }) + } else { + done() + } + } + loop() }) }) }) - describe('promises', () => { - it('subscribe', (done) => { - ipfs.pubsub.subscribe(topicName) + describe('promise API', () => { + let ipfs1 + let ipfs2 + + before((done) => { + // CI takes longer to instantiate the daemon, + // so we need to increase the timeout for the + // before step + common.setup((err, factory) => { + expect(err).to.not.exist + series([ + (cb) => { + factory.spawnNode((err, node) => { + expect(err).to.not.exist + ipfs1 = node + ipfs1.id().then((res) => { + ipfs1.peerId = res.id + cb() + }) + }) + }, + (cb) => { + factory.spawnNode((err, node) => { + expect(err).to.not.exist + ipfs2 = node + ipfs2.id().then((res) => { + ipfs2.peerId = res.id + cb() + }) + }) + } + ], done) + }) + }) + + after((done) => { + common.teardown(done) + }) + + it('.subscribe', () => { + return ipfs1.pubsub.subscribe(topic) .then((subscription) => { expect(subscription).to.exist - subscription.cancel(done) + return subscription.cancel() }) - .catch(done) }) - it('publish', (done) => { - ipfs.pubsub.subscribe(topicName) + it('.publish', () => { + return ipfs1.pubsub.subscribe(topic) .then((subscription) => { - return ipfs.pubsub.publish(topicName, 'hi') - .then(() => subscription) + return ipfs1.pubsub.publish(topic, 'hi').then(() => subscription) }) - .then((subscription) => subscription.cancel(done)) - .catch(done) + .then((subscription) => subscription.cancel()) }) - it('cancel subscription', (done) => { - ipfs.pubsub.subscribe(topicName) + it('.cancel', () => { + return ipfs1.pubsub.subscribe(topic) .then((subscription) => subscription.cancel()) - .then(() => done()) - .catch(done) }) - it('peers', (done) => { + it('.peers', () => { let s - ipfs.pubsub.subscribe(topicName) + return ipfs1.pubsub.subscribe(topic) .then((subscription) => { s = subscription - return ipfs.pubsub.peers(topicName) + return ipfs1.pubsub.peers(topic) }) .then((peers) => { expect(peers).to.exist - s.cancel(done) + return s.cancel() }) - .catch(done) }) - it('topics', (done) => { - ipfs.pubsub.ls() + it('.ls', () => { + return ipfs1.pubsub.ls() .then((topics) => { expect(topics).to.exist expect(topics.length).to.equal(0) - done() - }) - .catch(done) - }) - }) - - describe('load tests', () => { - it('send/receive 10k messages', (done) => { - const expectedString = 'hello' - const count = 10000 - let sendCount = 0 - let receivedCount = 0 - let startTime - - ipfs.pubsub.subscribe(topicName, (err, subscription) => { - expect(err).to.not.exists - - const outputProgress = () => { - process.stdout.write(' \r') - process.stdout.write('Sent: ' + sendCount + ' of ' + count + ', Received: ' + receivedCount + '\r') - } - - subscription.on('data', (d) => { - expect(d.data).to.be.equal(expectedString) - receivedCount++ - outputProgress() - if (receivedCount >= count) { - const duration = new Date().getTime() - startTime - process.stdout.write(' \r') - console.log(`Send/Receive 10k messages took: ${duration} ms, ${Math.floor(count / (duration / 1000))} ops / s`) - subscription.cancel(done) - } - }) - - const loop = () => { - if (sendCount < count) { - sendCount++ - outputProgress() - ipfs2.pubsub.publish(topicName, expectedString, (err) => { - expect(err).to.not.exist - process.nextTick(() => loop()) - }) - } - } - - waitForPeers(ipfs, [ipfs2.PeerId], (err) => { - expect(err).to.not.exist - startTime = new Date().getTime() - loop() }) - }) - }) - - it('call publish 1k times', (done) => { - const expectedString = 'hello' - const count = 1000 - let sendCount = 0 - - const loop = () => { - if (sendCount < count) { - sendCount++ - process.stdout.write(' \r') - process.stdout.write('Sending messages: ' + sendCount + ' of ' + count + '\r') - ipfs.pubsub.publish(topicName, expectedString, (err) => { - expect(err).to.not.exist - process.nextTick(() => loop()) - }) - } else { - done() - } - } - loop() - }) - - it('call subscribe 1k times', (done) => { - const count = 1000 - let sendCount = 0 - let receivedCount = 0 - let subscription = null - - const loop = () => { - if (sendCount < count) { - sendCount++ - process.stdout.write(' \r') - process.stdout.write('Subscribing: ' + sendCount + ' of ' + count + '\r') - ipfs.pubsub.subscribe(topicName, (err, res) => { - receivedCount++ - // First call should go through normally - if (receivedCount === 1) { - expect(err).to.not.exist - expect(res).to.exist - subscription = res - } else { - // Subsequent calls should return "error, duplicate subscription" - expect(err).to.exist - } - process.nextTick(() => loop()) - }) - } else { - subscription.cancel(done) - } - } - loop() - }) - - it('subscribe/unsubscribe 1k times', (done) => { - const count = 1000 - let sendCount = 0 - let receivedCount = 0 - - const outputProgress = () => { - process.stdout.write(' \r') - process.stdout.write('Subscribe: ' + sendCount + ' of ' + count + ', Unsubscribe: ' + receivedCount + '\r') - } - - const loop = () => { - if (sendCount < count) { - sendCount++ - outputProgress() - ipfs.pubsub.subscribe(topicName, (err, subscription) => { - expect(err).to.not.exist - subscription.cancel((err) => { - receivedCount++ - outputProgress() - expect(err).to.not.exist - process.nextTick(() => loop()) - }) - }) - } else { - done() - } - } - loop() }) }) }) From d004577d31d9de23306df27b96a25d2fbbb4e3ea Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 8 Dec 2016 15:05:10 -0800 Subject: [PATCH 7/8] fix: fix a bunch of issues (i.e: identify race condition) --- src/object.js | 2 +- src/pubsub.js | 615 +++++++++++++++++++++++++++----------------------- 2 files changed, 330 insertions(+), 287 deletions(-) diff --git a/src/object.js b/src/object.js index cd130456..079c1fca 100644 --- a/src/object.js +++ b/src/object.js @@ -42,7 +42,7 @@ module.exports = (common) => { }) }) - it('template unixfs-dir', (done) => { + it.skip('template unixfs-dir', (done) => { ipfs.object.new('unixfs-dir', (err, node) => { expect(err).to.not.exist const nodeJSON = node.toJSON() diff --git a/src/pubsub.js b/src/pubsub.js index bbaf96ee..aa407e41 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -48,389 +48,432 @@ module.exports = (common) => { common.teardown(done) }) - function waitForPeers (ipfs, peersToWait, callback) { - const i = setInterval(() => { - ipfs.pubsub.peers(topic, (err, peers) => { - if (err) { - return callback(err) - } - - const hasAllPeers = peersToWait - .map((e) => peers.includes(e)) - .filter((e) => e === false) - .length === 0 - if (hasAllPeers) { - clearInterval(i) - callback() - } + describe('single node', () => { + describe('.publish', () => { + it('message from string', (done) => { + ipfs1.pubsub.publish(topic, 'hello friend', done) }) - }, 1000) - } - describe('.publish', () => { - it('message from string', (done) => { - ipfs1.pubsub.publish(topic, 'hello friend', done) + it('message from buffer', (done) => { + ipfs1.pubsub.publish(topic, new Buffer('hello friend'), done) + }) }) - it('message from buffer', (done) => { - ipfs1.pubsub.publish(topic, new Buffer('hello friend'), done) - }) - }) + describe('.subscribe', () => { + it('to one topic', (done) => { + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist - describe('.subscribe', () => { - it('to one topic', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist + subscription.on('data', (msg) => { + expect(msg.data.toString()).to.equal('hi') + subscription.cancel(done) + }) - subscription.on('data', (msg) => { - expect(msg.data).to.equal('hi') - subscription.cancel(done) + ipfs1.pubsub.publish(topic, 'hi', (err) => { + expect(err).to.not.exist + }) }) + }) - ipfs1.pubsub.publish(topic, 'hi', (err) => { - expect(err).to.not.exist + it('errors on double subscription', (done) => { + series([ + (cb) => ipfs1.pubsub.subscribe(topic, cb), + (cb) => ipfs1.pubsub.subscribe(topic, cb) + ], (err, subs) => { + expect(err).to.exist + expect(err.toString()) + .to.eql(`Error: Already subscribed to '${topic}'`) + subs[0].cancel(done) }) }) - }) - it('errors on double subscription', (done) => { - series([ - (cb) => ipfs1.pubsub.subscribe(topic, cb), - (cb) => ipfs1.pubsub.subscribe(topic, cb) - ], (err, subs) => { - expect(err).to.exist - expect(err.toString()) - .to.eql(`Error: Already subscribed to '${topic}'`) - subs[0].cancel(done) + it('discover options', (done) => { + ipfs1.pubsub.subscribe(topic, { + discover: true + }, (err, subscription) => { + expect(err).to.not.exist + subscription.cancel(done) + }) }) }) - it('discover options', (done) => { - ipfs1.pubsub.subscribe(topic, { - discover: true - }, (err, subscription) => { - expect(err).to.not.exist - subscription.cancel(done) + describe('subscription', () => { + it('.cancel and wait for callback', (done) => { + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist + subscription.cancel(done) + }) }) - }) - }) - describe('subscription', () => { - it('.cancel and wait for callback', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist - subscription.cancel(done) + it('.cancel and wait for end event', (done) => { + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist + subscription.on('end', done) + subscription.cancel() + }) }) }) + }) - it('.cancel and wait for end event', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + describe('multiple nodes connected', () => { + before((done) => { + ipfs2.id((err, id) => { expect(err).to.not.exist - subscription.on('end', done) - subscription.cancel() + const ipfs2Addr = id.addresses[0] + ipfs1.swarm.connect(ipfs2Addr, (err) => { + expect(err).to.not.exist + done() + }) }) }) - }) - describe('.peers', () => { - // TODO clarify what is the goal of pubsub.peers - it('returns an error when not subscribed to a topic', (done) => { - ipfs1.pubsub.peers(topic, (err, peers) => { - expect(err).to.exist - expect(err.toString()).to.equal(`Error: Not subscribed to '${topic}'`) - done() - }) - }) + function waitForPeers (ipfs, peersToWait, callback) { + const i = setInterval(() => { + ipfs.pubsub.peers(topic, (err, peers) => { + if (err) { + return callback(err) + } - it.skip('returns no peers within 10 seconds', (done) => { - // Currently go-ipfs returns peers that have not been - // subscribed to the topic. Enable when go-ipfs has been fixed - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist + const hasAllPeers = peersToWait + .map((e) => peers.includes(e)) + .filter((e) => e === false) + .length === 0 - setTimeout(() => { - ipfs1.pubsub.peers(topic, (err, peers) => { - expect(err).to.not.exist - expect(peers.length).to.equal(0) - subscription.cancel(done) - }) - }, 10000) + if (hasAllPeers) { + clearInterval(i) + callback() + } + }) + }, 1000) + } + + describe('.peers', () => { + it('returns an error when not subscribed to a topic', (done) => { + ipfs1.pubsub.peers(topic, (err, peers) => { + expect(err).to.exist + expect(err.toString()) + .to.eql(`Error: Not subscribed to '${topic}'`) + done() + }) }) - }) - - it.skip('doesn\'t return extra peers', (done) => { - // Currently go-ipfs returns peers that have not been - // subscribed to the topic. Enable when go-ipfs has been fixed - ipfs1.pubsub.subscribe(topic, (err, subscription1) => { - expect(err).to.not.exist - ipfs2.pubsub.subscribe(topic + 'different topic', (err, subscription2) => { + // I don't understand the purpose of this test + it.skip('returns no peers within 10 seconds', (done) => { + // Currently go-ipfs returns peers that have not been + // subscribed to the topic. Enable when go-ipfs has been fixed + ipfs1.pubsub.subscribe(topic, (err, subscription) => { expect(err).to.not.exist setTimeout(() => { ipfs1.pubsub.peers(topic, (err, peers) => { expect(err).to.not.exist expect(peers.length).to.equal(0) - - subscription1.cancel(() => { - subscription2.cancel(done) - }) + subscription.cancel(done) }) }, 10000) }) }) - }) - - it.skip('returns peers for a topic - one peer', (done) => { - // Currently go-ipfs returns peers that have not been subscribed to the topic - // Enable when go-ipfs has been fixed - const peersToWait = [ipfs2.peerId] - - ipfs2.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist - const i = setInterval(() => { - ipfs1.pubsub.peers(topic, (err, peers) => { - if (err) { - expect(err).to.not.exist - done(err) - } - - console.log(peers) + it('doesn\'t return extra peers', (done) => { + // Currently go-ipfs returns peers that have not been + // subscribed to the topic. Enable when go-ipfs has been fixed + ipfs1.pubsub.subscribe(topic, (err, subscription1) => { + expect(err).to.not.exist - const hasAllPeers = peersToWait - .map((e) => peers.indexOf(e) !== -1) - .filter((e) => e === false) - .length === 0 + ipfs2.pubsub.subscribe(topic + 'different topic', (err, subscription2) => { + expect(err).to.not.exist - if (hasAllPeers) { - clearInterval(i) - expect(peers.length).to.equal(peersToWait.length) - subscription.cancel(done) - } + setTimeout(() => { + ipfs1.pubsub.peers(topic, (err, peers) => { + expect(err).to.not.exist + expect(peers).to.have.length(0) + subscription1.cancel(() => subscription2.cancel(done)) + }) + }, 10000) }) - }, 1000) + }) }) - }) - it.skip('lists peers for a topic - multiple peers', (done) => { - // TODO - }) - }) + it('returns peers for a topic - one peer', (done) => { + // Currently go-ipfs returns peers that have not been + // subscribed to the topic. Enable when go-ipfs has been fixed + const peersToWait = [ipfs2.peerId] + let subscription2 - describe('.ls', () => { - it('empty list when no topics are subscribed', (done) => { - ipfs1.pubsub.ls((err, topics) => { - expect(err).to.not.exist - expect(topics.length).to.equal(0) - done() - }) - }) - - it('list with 1 subscribed topic', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist + subscription2 = subscription + }) - ipfs1.pubsub.ls((err, topics) => { + ipfs2.pubsub.subscribe(topic, (err, subscription) => { expect(err).to.not.exist - expect(topics.length).to.equal(1) - expect(topics[0]).to.equal(topic) - subscription.cancel(done) + + const i = setInterval(() => { + ipfs1.pubsub.peers(topic, (err, peers) => { + if (err) { + expect(err).to.not.exist + done(err) + } + + const hasAllPeers = peersToWait + .map((e) => peers.indexOf(e) !== -1) + .filter((e) => e === false) + .length === 0 + + if (hasAllPeers) { + clearInterval(i) + expect(peers.length).to.equal(peersToWait.length) + subscription.cancel(() => subscription2.cancel(done)) + } + }) + }, 1000) }) }) + + it.skip('lists peers for a topic - multiple peers', (done) => { + // TODO + }) }) - it('list with 3 subscribed topicss', (done) => { - const topics = ['one', 'two', 'three'] - series( - topics.map((t) => (cb) => ipfs1.pubsub.subscribe(t, cb)) - , (err, subs) => { - expect(err).to.not.exist - ipfs1.pubsub.ls((err, list) => { + describe('.ls', () => { + it('empty list when no topics are subscribed', (done) => { + ipfs1.pubsub.ls((err, topics) => { expect(err).to.not.exist - expect(list.length).to.equal(3) - expect(list).to.eql(topics) - series(subs.map((s) => (cb) => s.cancel(cb)), done) + expect(topics.length).to.equal(0) + done() }) }) - }) - }) - - describe('multiple nodes', () => { - it('receive messages from different node', (done) => { - const expectedString = 'hello from the other side' - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist - expect(subscription).to.exist + it('list with 1 subscribed topic', (done) => { + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist - subscription.on('data', (d) => { - expect(d.data).to.be.equal(expectedString) - subscription.cancel(done) + ipfs1.pubsub.ls((err, topics) => { + expect(err).to.not.exist + expect(topics.length).to.equal(1) + expect(topics[0]).to.equal(topic) + subscription.cancel(done) + }) }) + }) - waitForPeers(ipfs2, [ipfs1.peerId], (err) => { + it('list with 3 subscribed topicss', (done) => { + const topics = ['one', 'two', 'three'] + series( + topics.map((t) => (cb) => ipfs1.pubsub.subscribe(t, cb)) + , (err, subs) => { expect(err).to.not.exist - ipfs2.pubsub.publish(topic, expectedString, (err) => { + ipfs1.pubsub.ls((err, list) => { expect(err).to.not.exist + expect(list.length).to.equal(3) + expect(list).to.eql(topics) + series(subs.map((s) => (cb) => s.cancel(cb)), done) }) }) }) }) - it('receive multiple messages', (done) => { - let receivedMessages = [] - const expectedMessages = 2 + describe('multiple nodes', () => { + it('receive messages from different node', (done) => { + const expectedString = 'hello from the other side' + let subscription2 - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exists + ipfs2.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist + subscription2 = subscription + }) + + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exist + expect(subscription).to.exist + + subscription.on('data', (msg) => { + expect(msg.data.toString()).to.equal(expectedString) + subscription.cancel(() => subscription2.cancel(done)) + }) - subscription.on('data', (d) => { - receivedMessages.push(d.data) - if (receivedMessages.length === expectedMessages) { - receivedMessages.forEach((msg) => { - expect(msg).to.be.equal('hi') + waitForPeers(ipfs2, [ipfs1.peerId], (err) => { + expect(err).to.not.exist + ipfs2.pubsub.publish(topic, expectedString, (err) => { + expect(err).to.not.exist }) - subscription.cancel(done) - } + }) }) + }) + + it('receive multiple messages', (done) => { + let receivedMessages = [] + const expectedMessages = 2 + let subscription2 - waitForPeers(ipfs2, [ipfs1.peerId], (err) => { + ipfs2.pubsub.subscribe(topic, (err, subscription) => { expect(err).to.not.exist - ipfs2.pubsub.publish(topic, 'hi') - ipfs2.pubsub.publish(topic, 'hi') + subscription2 = subscription + }) + + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exists + + subscription.on('data', (msg) => { + receivedMessages.push(msg.data) + + if (receivedMessages.length === expectedMessages) { + receivedMessages.forEach((msg) => { + expect(msg.toString()).to.be.equal('hi') + }) + subscription.cancel(() => subscription2.cancel(done)) + } + }) + + waitForPeers(ipfs2, [ipfs1.peerId], (err) => { + expect(err).to.not.exist + ipfs2.pubsub.publish(topic, 'hi') + ipfs2.pubsub.publish(topic, 'hi') + }) }) }) }) - }) - describe('load tests', () => { - it('send/receive 10k messages', (done) => { - const expectedString = 'hello' - const count = 10000 - let sendCount = 0 - let receivedCount = 0 - let startTime + describe('load tests', function () { + // Write the progress to stdout when in Node.js, silent when in the browser + const LOGS = false + const log = LOGS && process && process.stdout ? (s) => process.stdout.write(s) : () => {} + + it('send/receive 10k messages', function (done) { + // js-ipfs is a little slow atm, so make sure we have enough time + this.timeout(2 * 60 * 1000) + + const expectedString = 'hello' + const count = 10000 + let sendCount = 0 + let receivedCount = 0 + let startTime + let subscription2 + + ipfs2.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exists + subscription2 = subscription + }) - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exists + ipfs1.pubsub.subscribe(topic, (err, subscription) => { + expect(err).to.not.exists - const outputProgress = () => { - process.stdout.write(' \r') - process.stdout.write('Sent: ' + sendCount + ' of ' + count + ', Received: ' + receivedCount + '\r') - } + const outputProgress = () => { + log(' \r') + log('Sent: ' + sendCount + ' of ' + count + ', Received: ' + receivedCount + '\r') + } - subscription.on('data', (d) => { - expect(d.data).to.be.equal(expectedString) - receivedCount++ - outputProgress() - if (receivedCount >= count) { - const duration = new Date().getTime() - startTime - process.stdout.write(' \r') - console.log(`Send/Receive 10k messages took: ${duration} ms, ${Math.floor(count / (duration / 1000))} ops / s`) - subscription.cancel(done) + subscription.on('data', (d) => { + expect(d.data.toString()).to.equal(expectedString) + + receivedCount++ + outputProgress() + if (receivedCount >= count) { + const duration = new Date().getTime() - startTime + log(`Send/Receive 10k messages took: ${duration} ms, ${Math.floor(count / (duration / 1000))} ops / s\n`) + + subscription.cancel(() => subscription2.cancel(done)) + } + }) + + function loop () { + if (sendCount < count) { + sendCount++ + outputProgress() + ipfs2.pubsub.publish(topic, expectedString, (err) => { + expect(err).to.not.exist + process.nextTick(() => loop()) + }) + } } + + waitForPeers(ipfs1, [ipfs2.peerId], (err) => { + expect(err).to.not.exist + startTime = new Date().getTime() + loop() + }) }) + }) + + it('call publish 1k times', (done) => { + const expectedString = 'hello' + const count = 1000 + let sendCount = 0 function loop () { if (sendCount < count) { sendCount++ - outputProgress() - ipfs2.pubsub.publish(topic, expectedString, (err) => { + log(' \r') + log('Sending messages: ' + sendCount + ' of ' + count + '\r') + ipfs1.pubsub.publish(topic, expectedString, (err) => { expect(err).to.not.exist process.nextTick(() => loop()) }) + } else { + done() } } - - waitForPeers(ipfs1, [ipfs2.peerId], (err) => { - expect(err).to.not.exist - startTime = new Date().getTime() - loop() - }) + loop() }) - }) - it('call publish 1k times', (done) => { - const expectedString = 'hello' - const count = 1000 - let sendCount = 0 - - function loop () { - if (sendCount < count) { - sendCount++ - process.stdout.write(' \r') - process.stdout.write('Sending messages: ' + sendCount + ' of ' + count + '\r') - ipfs1.pubsub.publish(topic, expectedString, (err) => { - expect(err).to.not.exist - process.nextTick(() => loop()) - }) - } else { - done() + it('call subscribe 1k times', (done) => { + const count = 1000 + let sendCount = 0 + let receivedCount = 0 + let subscription = null + + function loop () { + if (sendCount < count) { + sendCount++ + log(' \r') + log('Subscribing: ' + sendCount + ' of ' + count + '\r') + ipfs1.pubsub.subscribe(topic, (err, res) => { + receivedCount++ + // First call should go through normally + if (receivedCount === 1) { + expect(err).to.not.exist + expect(res).to.exist + subscription = res + } else { + // Subsequent calls should return "error, duplicate subscription" + expect(err).to.exist + } + process.nextTick(() => loop()) + }) + } else { + subscription.cancel(done) + } } - } - loop() - }) + loop() + }) - it('call subscribe 1k times', (done) => { - const count = 1000 - let sendCount = 0 - let receivedCount = 0 - let subscription = null - - function loop () { - if (sendCount < count) { - sendCount++ - process.stdout.write(' \r') - process.stdout.write('Subscribing: ' + sendCount + ' of ' + count + '\r') - ipfs1.pubsub.subscribe(topic, (err, res) => { - receivedCount++ - // First call should go through normally - if (receivedCount === 1) { - expect(err).to.not.exist - expect(res).to.exist - subscription = res - } else { - // Subsequent calls should return "error, duplicate subscription" - expect(err).to.exist - } - process.nextTick(() => loop()) - }) - } else { - subscription.cancel(done) + it('subscribe/unsubscribe 1k times', (done) => { + const count = 1000 + let sendCount = 0 + + function outputProgress () { + log(' \r') + log('Subscribe/Unsubscribe: ' + sendCount + ' of ' + count + '\r') } - } - loop() - }) - it('subscribe/unsubscribe 1k times', (done) => { - const count = 1000 - let sendCount = 0 - let receivedCount = 0 - - function outputProgress () { - process.stdout.write(' \r') - process.stdout.write('Subscribe: ' + sendCount + ' of ' + count + ', Unsubscribe: ' + receivedCount + '\r') - } - - function loop () { - if (sendCount < count) { - sendCount++ - outputProgress() - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist - subscription.cancel((err) => { + function loop () { + if (sendCount < count) { + sendCount++ + outputProgress() + ipfs1.pubsub.subscribe(topic, (err, subscription) => { expect(err).to.not.exist - receivedCount++ - outputProgress() - process.nextTick(() => loop()) + subscription.cancel((err) => { + expect(err).to.not.exist + outputProgress() + process.nextTick(() => loop()) + }) }) - }) - } else { - done() + } else { + done() + } } - } - loop() + loop() + }) }) }) }) From 1a82890aeeb3b3eecdc19950144ed59a7cb09867 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Mon, 19 Dec 2016 12:44:50 +0100 Subject: [PATCH 8/8] feat: new event based API --- API/pubsub/README.md | 52 ++-- src/object.js | 2 +- src/pubsub.js | 715 ++++++++++++++++++++++--------------------- 3 files changed, 396 insertions(+), 373 deletions(-) diff --git a/API/pubsub/README.md b/API/pubsub/README.md index 0b6cf78f..8bb5a6b3 100644 --- a/API/pubsub/README.md +++ b/API/pubsub/README.md @@ -3,38 +3,48 @@ pubsub API #### `pubsub.subscribe` -> Subscribe to an IPFS Topic +> Subscribe to a pubsub topic. ##### `Go` **WIP** -##### `JavaScript` - ipfs.pubsub.subscribe(topic, options, callback) +##### `JavaScript` - ipfs.pubsub.subscribe(topic, options, handler, callback) -- `topic` - type: String -- `options` - type: Object, optional, might contain the following properties: - - `discover`: type: Boolean - Will use the DHT to find - -`callback` must follow `function (err, subscription) {}` where Subscription is a Node.js Stream in Object mode, emiting a `data` event for each new message on the subscribed topic.`err` is an error if the operation was not successful. - -`subscription` has a `.cancel` event in order to cancel the subscription. +- `topic: string` +- `options: Object` - (Optional), might contain the following properties: + - `discover`: type: Boolean - Will use the DHT to find other peers. +- `handler: (msg) => ()` - Event handler which will be called with a message object everytime one is received. The `msg` has the format `{from: string, seqno: Buffer, data: Buffer, topicCIDs: Array}`. +- `callback: (Error) => ()` (Optional) Called once the subscription is established. If no `callback` is passed, a [promise][] is returned. > _In the future, topic can also be type of TopicDescriptor (https://github.com/libp2p/pubsub-notes/blob/master/flooding/flooding.proto#L23). However, for now, only strings are supported._ +#### `pubsub.unsubscribe` + +> Unsubscribes from a pubsub topic. + +##### `Go` **WIP** + +##### `JavaScript` - `ipfs.pubsub.unsubscribe(topic, handler)` + +- `topic: string` - The topic to unsubscribe from +- `handler: (msg) => ()` - The handler to remove. + +This works like `EventEmitter.removeListener`, as that only the `handler` passed to a `subscribe` call before is removed from listening. The underlying subscription will only be canceled once all listeners for a topic have been removed. + #### `pubsub.publish` -> Publish a data message to a pubsub topic +> Publish a data message to a pubsub topic. ##### `Go` **WIP** ##### `JavaScript` - ipfs.pubsub.publish(topic, data, callback) -- `topic` - type: String -- `data` - type: Buffer +- `topic: string` +- `data: buffer` - The actual message to send +- `callback: (Error) => ()` - Calls back with an error or nothing if the publish was successfull. -`callback` must follow `function (err) {}` signature, where `err` is an error if the operation was not successful. - -If no `callback` is passed, a [promise][] is returned. +If no `callback` is passed, a promise is returned. #### `pubsub.ls` @@ -44,9 +54,10 @@ If no `callback` is passed, a [promise][] is returned. ##### `JavaScript` - ipfs.pubsub.ls(topic, callback) -`callback` must follow `function (err) {}` signature, where `err` is an error if the operation was not successful. +- `topic: string` +- `callback: (Error, Array>) => ()` - Calls back with an error or a list of topicCIDs that this peer is subscribed to. -If no `callback` is passed, a [promise][] is returned. +If no `callback` is passed, a promise is returned. #### `pubsub.peers` @@ -56,8 +67,7 @@ If no `callback` is passed, a [promise][] is returned. ##### `JavaScript` - ipfs.pubsub.peers(topic, callback) -- `topic` - type: String - -`callback` must follow `function (err) {}` signature, where `err` is an error if the operation was not successful. +- `topic: string` +- `callback: (Error, Array>) => ()` - Calls back with an error or a list of peer ids subscribed to the `topic`. -If no `callback` is passed, a [promise][] is returned. +If no `callback` is passed, a promise is returned. diff --git a/src/object.js b/src/object.js index 079c1fca..cd130456 100644 --- a/src/object.js +++ b/src/object.js @@ -42,7 +42,7 @@ module.exports = (common) => { }) }) - it.skip('template unixfs-dir', (done) => { + it('template unixfs-dir', (done) => { ipfs.object.new('unixfs-dir', (err, node) => { expect(err).to.not.exist const nodeJSON = node.toJSON() diff --git a/src/pubsub.js b/src/pubsub.js index aa407e41..3829dc23 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -4,6 +4,55 @@ const expect = require('chai').expect const series = require('async/series') +const waterfall = require('async/waterfall') +const parallel = require('async/parallel') +const whilst = require('async/whilst') +const each = require('async/each') + +function waitForPeers (ipfs, topic, peersToWait, callback) { + const i = setInterval(() => { + ipfs.pubsub.peers(topic, (err, peers) => { + if (err) { + return callback(err) + } + + const missingPeers = peersToWait + .map((e) => peers.includes(e)) + .filter((e) => !e) + + if (missingPeers.length === 0) { + clearInterval(i) + callback() + } + }) + }, 500) +} + +function spawnWithId (factory, callback) { + waterfall([ + (cb) => factory.spawnNode(cb), + (node, cb) => node.id((err, res) => { + if (err) { + return cb(err) + } + node.peerId = res + cb(null, node) + }) + ], callback) +} + +function makeCheck (n, done) { + let i = 0 + return (err) => { + if (err) { + return done(err) + } + + if (++i === n) { + done() + } + } +} module.exports = (common) => { describe('.pubsub', () => { @@ -12,35 +61,28 @@ module.exports = (common) => { describe('callback API', () => { let ipfs1 let ipfs2 + let ipfs3 before((done) => { - // CI takes longer to instantiate the daemon, - // so we need to increase the timeout for the - // before step common.setup((err, factory) => { - expect(err).to.not.exist + if (err) { + return done(err) + } + series([ - (cb) => { - factory.spawnNode((err, node) => { - expect(err).to.not.exist - ipfs1 = node - ipfs1.id().then((res) => { - ipfs1.peerId = res.id - cb() - }) - }) - }, - (cb) => { - factory.spawnNode((err, node) => { - expect(err).to.not.exist - ipfs2 = node - ipfs2.id().then((res) => { - ipfs2.peerId = res.id - cb() - }) - }) + (cb) => spawnWithId(factory, cb), + (cb) => spawnWithId(factory, cb), + (cb) => spawnWithId(factory, cb) + ], (err, nodes) => { + if (err) { + return done(err) } - ], done) + + ipfs1 = nodes[0] + ipfs2 = nodes[1] + ipfs3 = nodes[2] + done() + }) }) }) @@ -50,8 +92,11 @@ module.exports = (common) => { describe('single node', () => { describe('.publish', () => { - it('message from string', (done) => { - ipfs1.pubsub.publish(topic, 'hello friend', done) + it('errors on string messags', (done) => { + ipfs1.pubsub.publish(topic, 'hello friend', (err) => { + expect(err).to.exist + done() + }) }) it('message from buffer', (done) => { @@ -61,55 +106,84 @@ module.exports = (common) => { describe('.subscribe', () => { it('to one topic', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist + const check = makeCheck(2, done) - subscription.on('data', (msg) => { - expect(msg.data.toString()).to.equal('hi') - subscription.cancel(done) - }) + const handler = (msg) => { + expect(msg.data.toString()).to.equal('hi') + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicCIDs').eql([topic]) + // TODO: broken https://github.com/ipfs/go-ipfs/issues/3522 + // expect(msg).to.have.property('from', ipfs1.peerId.id) + + ipfs1.pubsub.unsubscribe(topic, handler) - ipfs1.pubsub.publish(topic, 'hi', (err) => { + ipfs1.pubsub.ls((err, topics) => { expect(err).to.not.exist + expect(topics).to.be.empty + check() }) - }) - }) - - it('errors on double subscription', (done) => { - series([ - (cb) => ipfs1.pubsub.subscribe(topic, cb), - (cb) => ipfs1.pubsub.subscribe(topic, cb) - ], (err, subs) => { - expect(err).to.exist - expect(err.toString()) - .to.eql(`Error: Already subscribed to '${topic}'`) - subs[0].cancel(done) - }) - }) + } - it('discover options', (done) => { - ipfs1.pubsub.subscribe(topic, { - discover: true - }, (err, subscription) => { + ipfs1.pubsub.subscribe(topic, handler, (err) => { expect(err).to.not.exist - subscription.cancel(done) + ipfs1.pubsub.publish(topic, new Buffer('hi'), check) }) }) - }) - describe('subscription', () => { - it('.cancel and wait for callback', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + it('attaches multiple event listeners', (done) => { + const check = makeCheck(3, done) + const handler1 = (msg) => { + expect(msg.data.toString()).to.be.eql('hello') + + ipfs1.pubsub.unsubscribe(topic, handler1) + + series([ + (cb) => ipfs1.pubsub.ls(cb), + (cb) => { + ipfs1.pubsub.unsubscribe(topic, handler2) + cb() + }, + (cb) => ipfs1.pubsub.ls(cb) + ], (err, res) => { + expect(err).to.not.exist + + // Still subscribed as there is one listener left + expect(res[0]).to.be.eql([topic]) + // Now all listeners are gone no subscription anymore + expect(res[2]).to.be.eql([]) + check() + }) + } + + const handler2 = (msg) => { + expect(msg.data.toString()).to.be.eql('hello') + check() + } + + parallel([ + (cb) => ipfs1.pubsub.subscribe(topic, handler1, cb), + (cb) => ipfs1.pubsub.subscribe(topic, handler2, cb) + ], (err) => { expect(err).to.not.exist - subscription.cancel(done) + ipfs1.pubsub.publish(topic, new Buffer('hello'), check) }) }) - it('.cancel and wait for end event', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + it('discover options', (done) => { + const check = makeCheck(2, done) + + const handler = (msg) => { + expect(msg.data.toString()).to.be.eql('hi') + ipfs1.pubsub.unsubscribe(topic, handler) + check() + } + + ipfs1.pubsub.subscribe(topic, { + discover: true + }, handler, (err) => { expect(err).to.not.exist - subscription.on('end', done) - subscription.cancel() + ipfs1.pubsub.publish(topic, new Buffer('hi'), check) }) }) }) @@ -117,122 +191,97 @@ module.exports = (common) => { describe('multiple nodes connected', () => { before((done) => { - ipfs2.id((err, id) => { - expect(err).to.not.exist - const ipfs2Addr = id.addresses[0] - ipfs1.swarm.connect(ipfs2Addr, (err) => { - expect(err).to.not.exist - done() - }) + parallel([ + (cb) => ipfs1.swarm.connect(ipfs2.peerId.addresses[0], cb), + (cb) => ipfs2.swarm.connect(ipfs3.peerId.addresses[0], cb), + (cb) => ipfs1.swarm.connect(ipfs3.peerId.addresses[0], cb) + ], (err) => { + if (err) { + return done(err) + } + // give some time to let everything connect + setTimeout(done, 300) }) }) - function waitForPeers (ipfs, peersToWait, callback) { - const i = setInterval(() => { - ipfs.pubsub.peers(topic, (err, peers) => { - if (err) { - return callback(err) - } - - const hasAllPeers = peersToWait - .map((e) => peers.includes(e)) - .filter((e) => e === false) - .length === 0 - - if (hasAllPeers) { - clearInterval(i) - callback() - } - }) - }, 1000) - } - describe('.peers', () => { - it('returns an error when not subscribed to a topic', (done) => { + it('does not error when not subscribed to a topic', (done) => { ipfs1.pubsub.peers(topic, (err, peers) => { - expect(err).to.exist - expect(err.toString()) - .to.eql(`Error: Not subscribed to '${topic}'`) + expect(err).to.not.exist + // Should be empty but as mentioned below go-ipfs returns more than it should + // expect(peers).to.be.empty + done() }) }) - // I don't understand the purpose of this test - it.skip('returns no peers within 10 seconds', (done) => { + it.skip("doesn't return extra peers", (done) => { // Currently go-ipfs returns peers that have not been // subscribed to the topic. Enable when go-ipfs has been fixed - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist + const sub1 = (msg) => {} + const sub2 = (msg) => {} + + const topicOther = topic + 'different topic' + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topicOther, sub2, cb) + ], (err) => { + expect(err).to.not.exist setTimeout(() => { ipfs1.pubsub.peers(topic, (err, peers) => { expect(err).to.not.exist - expect(peers.length).to.equal(0) - subscription.cancel(done) - }) - }, 10000) - }) - }) - it('doesn\'t return extra peers', (done) => { - // Currently go-ipfs returns peers that have not been - // subscribed to the topic. Enable when go-ipfs has been fixed - ipfs1.pubsub.subscribe(topic, (err, subscription1) => { - expect(err).to.not.exist - - ipfs2.pubsub.subscribe(topic + 'different topic', (err, subscription2) => { - expect(err).to.not.exist - - setTimeout(() => { - ipfs1.pubsub.peers(topic, (err, peers) => { - expect(err).to.not.exist - expect(peers).to.have.length(0) - subscription1.cancel(() => subscription2.cancel(done)) - }) + expect(peers).to.be.empty + ipfs1.pubsub.unsubscribe(topic, sub1) + ipfs2.pubsub.unsubscribe(topicOther, sub2) + done() }, 10000) }) }) }) - it('returns peers for a topic - one peer', (done) => { + it.skip('returns peers for a topic - one peer', (done) => { // Currently go-ipfs returns peers that have not been // subscribed to the topic. Enable when go-ipfs has been fixed - const peersToWait = [ipfs2.peerId] - let subscription2 + const sub1 = (msg) => {} + const sub2 = (msg) => {} - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), + (cb) => waitForPeers(ipfs1, topic, [ipfs2.peerId.id], cb) + ], (err) => { expect(err).to.not.exist - subscription2 = subscription + ipfs1.pubsub.unsubscribe(topic, sub1) + ipfs2.pubsub.unsubscribe(topic, sub2) + + done() }) + }) + + it('lists peers for a topic - multiple peers', (done) => { + const sub1 = (msg) => {} + const sub2 = (msg) => {} + const sub3 = (msg) => {} - ipfs2.pubsub.subscribe(topic, (err, subscription) => { + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), + (cb) => ipfs3.pubsub.subscribe(topic, sub3, cb), + (cb) => waitForPeers(ipfs1, topic, [ + ipfs2.peerId.id, + ipfs3.peerId.id + ], cb) + ], (err) => { expect(err).to.not.exist + ipfs1.pubsub.unsubscribe(topic, sub1) + ipfs2.pubsub.unsubscribe(topic, sub2) + ipfs3.pubsub.unsubscribe(topic, sub3) - const i = setInterval(() => { - ipfs1.pubsub.peers(topic, (err, peers) => { - if (err) { - expect(err).to.not.exist - done(err) - } - - const hasAllPeers = peersToWait - .map((e) => peers.indexOf(e) !== -1) - .filter((e) => e === false) - .length === 0 - - if (hasAllPeers) { - clearInterval(i) - expect(peers.length).to.equal(peersToWait.length) - subscription.cancel(() => subscription2.cancel(done)) - } - }) - }, 1000) + done() }) }) - - it.skip('lists peers for a topic - multiple peers', (done) => { - // TODO - }) }) describe('.ls', () => { @@ -245,29 +294,51 @@ module.exports = (common) => { }) it('list with 1 subscribed topic', (done) => { - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + const sub1 = (msg) => {} + + ipfs1.pubsub.subscribe(topic, sub1, (err) => { expect(err).to.not.exist ipfs1.pubsub.ls((err, topics) => { expect(err).to.not.exist - expect(topics.length).to.equal(1) - expect(topics[0]).to.equal(topic) - subscription.cancel(done) + expect(topics).to.be.eql([topic]) + + ipfs1.pubsub.unsubscribe(topic, sub1) + done() }) }) }) - it('list with 3 subscribed topicss', (done) => { - const topics = ['one', 'two', 'three'] - series( - topics.map((t) => (cb) => ipfs1.pubsub.subscribe(t, cb)) - , (err, subs) => { + it('list with 3 subscribed topics', (done) => { + const topics = [{ + name: 'one', + handler () {} + }, { + name: 'two', + handler () {} + }, { + name: 'three', + handler () {} + }] + + each(topics, (t, cb) => { + ipfs1.pubsub.subscribe(t.name, t.handler, cb) + }, (err) => { expect(err).to.not.exist ipfs1.pubsub.ls((err, list) => { expect(err).to.not.exist - expect(list.length).to.equal(3) - expect(list).to.eql(topics) - series(subs.map((s) => (cb) => s.cancel(cb)), done) + + expect( + list.sort() + ).to.be.eql( + topics.map((t) => t.name).sort() + ) + + topics.forEach((t) => { + ipfs1.pubsub.unsubscribe(t.name, t.handler) + }) + + done() }) }) }) @@ -275,72 +346,91 @@ module.exports = (common) => { describe('multiple nodes', () => { it('receive messages from different node', (done) => { + const check = makeCheck(3, done) const expectedString = 'hello from the other side' - let subscription2 - ipfs2.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist - subscription2 = subscription - }) + const sub1 = (msg) => { + expect(msg.data.toString()).to.be.eql(expectedString) + // TODO: Reenable when go-ipfs is unbroken + // expect(msg.from).to.be.eql(ipfs2.peerId.id) + ipfs1.pubsub.unsubscribe(topic, sub1) + check() + } - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist - expect(subscription).to.exist + const sub2 = (msg) => { + expect(msg.data.toString()).to.be.eql(expectedString) + // TODO: reenable when go-ipfs is unbroken + // expect(msg.from).to.be.eql(ipfs2.peerId.id) + ipfs2.pubsub.unsubscribe(topic, sub2) + check() + } - subscription.on('data', (msg) => { - expect(msg.data.toString()).to.equal(expectedString) - subscription.cancel(() => subscription2.cancel(done)) - }) + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), + (cb) => waitForPeers(ipfs2, topic, [ipfs1.peerId.id], cb) + ], (err) => { + expect(err).to.not.exist - waitForPeers(ipfs2, [ipfs1.peerId], (err) => { - expect(err).to.not.exist - ipfs2.pubsub.publish(topic, expectedString, (err) => { - expect(err).to.not.exist - }) - }) + ipfs2.pubsub.publish(topic, new Buffer(expectedString), check) }) }) it('receive multiple messages', (done) => { - let receivedMessages = [] - const expectedMessages = 2 - let subscription2 + const inbox1 = [] + const inbox2 = [] + const outbox = ['hello', 'world', 'this', 'is', 'pubsub'] - ipfs2.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exist - subscription2 = subscription + const check = makeCheck(outbox.length * 3, (err) => { + ipfs1.pubsub.unsubscribe(topic, sub1) + ipfs2.pubsub.unsubscribe(topic, sub2) + + expect(inbox1.sort()).to.be.eql(outbox.sort()) + expect(inbox2.sort()).to.be.eql(outbox.sort()) + + done(err) }) - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exists + function sub1 (msg) { + inbox1.push(msg.data.toString()) + // TODO: enable when go-ipfs is unbroken + // expect(msg.from).to.be.eql(ipfs2.peerId.id) + check() + } - subscription.on('data', (msg) => { - receivedMessages.push(msg.data) + function sub2 (msg) { + inbox2.push(msg.data.toString()) + // TODO: enable when go-ipfs is unbroken + // expect(msg.from).to.be.eql(ipfs2.peerId.id) + check() + } - if (receivedMessages.length === expectedMessages) { - receivedMessages.forEach((msg) => { - expect(msg.toString()).to.be.equal('hi') - }) - subscription.cancel(() => subscription2.cancel(done)) - } - }) + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), + (cb) => waitForPeers(ipfs2, topic, [ipfs1.peerId.id], cb) + ], (err) => { + expect(err).to.not.exist - waitForPeers(ipfs2, [ipfs1.peerId], (err) => { - expect(err).to.not.exist - ipfs2.pubsub.publish(topic, 'hi') - ipfs2.pubsub.publish(topic, 'hi') + outbox.forEach((msg) => { + ipfs2.pubsub.publish(topic, new Buffer(msg), check) }) }) }) }) describe('load tests', function () { - // Write the progress to stdout when in Node.js, silent when in the browser - const LOGS = false - const log = LOGS && process && process.stdout ? (s) => process.stdout.write(s) : () => {} + before(() => { + ipfs1.pubsub.setMaxListeners(10 * 1000) + ipfs2.pubsub.setMaxListeners(10 * 1000) + }) + + after(() => { + ipfs1.pubsub.setMaxListeners(11) + ipfs2.pubsub.setMaxListeners(11) + }) it('send/receive 10k messages', function (done) { - // js-ipfs is a little slow atm, so make sure we have enough time this.timeout(2 * 60 * 1000) const expectedString = 'hello' @@ -348,50 +438,39 @@ module.exports = (common) => { let sendCount = 0 let receivedCount = 0 let startTime - let subscription2 - ipfs2.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exists - subscription2 = subscription - }) + const sub1 = (msg) => { + expect(msg.data.toString()).to.equal(expectedString) - ipfs1.pubsub.subscribe(topic, (err, subscription) => { - expect(err).to.not.exists + receivedCount++ - const outputProgress = () => { - log(' \r') - log('Sent: ' + sendCount + ' of ' + count + ', Received: ' + receivedCount + '\r') - } + if (receivedCount >= count) { + const duration = new Date().getTime() - startTime + console.log(`Send/Receive 10k messages took: ${duration} ms, ${Math.floor(count / (duration / 1000))} ops / s\n`) - subscription.on('data', (d) => { - expect(d.data.toString()).to.equal(expectedString) + ipfs1.pubsub.unsubscribe(topic, sub1) + ipfs2.pubsub.unsubscribe(topic, sub2) + } + } - receivedCount++ - outputProgress() - if (receivedCount >= count) { - const duration = new Date().getTime() - startTime - log(`Send/Receive 10k messages took: ${duration} ms, ${Math.floor(count / (duration / 1000))} ops / s\n`) + const sub2 = (msg) => {} - subscription.cancel(() => subscription2.cancel(done)) - } - }) + series([ + (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), + (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), + (cb) => waitForPeers(ipfs1, topic, [ipfs2.peerId.id], cb) + ], (err) => { + expect(err).to.not.exist + startTime = new Date().getTime() - function loop () { - if (sendCount < count) { + whilst( + () => sendCount < count, + (cb) => { sendCount++ - outputProgress() - ipfs2.pubsub.publish(topic, expectedString, (err) => { - expect(err).to.not.exist - process.nextTick(() => loop()) - }) - } - } - - waitForPeers(ipfs1, [ipfs2.peerId], (err) => { - expect(err).to.not.exist - startTime = new Date().getTime() - loop() - }) + ipfs2.pubsub.publish(topic, new Buffer(expectedString), cb) + }, + done + ) }) }) @@ -400,79 +479,42 @@ module.exports = (common) => { const count = 1000 let sendCount = 0 - function loop () { - if (sendCount < count) { + whilst( + () => sendCount < count, + (cb) => { sendCount++ - log(' \r') - log('Sending messages: ' + sendCount + ' of ' + count + '\r') - ipfs1.pubsub.publish(topic, expectedString, (err) => { - expect(err).to.not.exist - process.nextTick(() => loop()) - }) - } else { - done() - } - } - loop() + ipfs1.pubsub.publish(topic, new Buffer(expectedString), cb) + }, + done + ) }) - it('call subscribe 1k times', (done) => { + it('call subscribe/unsubscribe 1k times', (done) => { const count = 1000 let sendCount = 0 - let receivedCount = 0 - let subscription = null + const handlers = [] - function loop () { - if (sendCount < count) { + whilst( + () => sendCount < count, + (cb) => { sendCount++ - log(' \r') - log('Subscribing: ' + sendCount + ' of ' + count + '\r') - ipfs1.pubsub.subscribe(topic, (err, res) => { - receivedCount++ - // First call should go through normally - if (receivedCount === 1) { - expect(err).to.not.exist - expect(res).to.exist - subscription = res - } else { - // Subsequent calls should return "error, duplicate subscription" - expect(err).to.exist - } - process.nextTick(() => loop()) + const handler = (msg) => {} + handlers.push(handler) + ipfs1.pubsub.subscribe(topic, handler, cb) + }, + (err) => { + expect(err).to.not.exist + handlers.forEach((handler) => { + ipfs1.pubsub.unsubscribe(topic, handler) }) - } else { - subscription.cancel(done) - } - } - loop() - }) - - it('subscribe/unsubscribe 1k times', (done) => { - const count = 1000 - let sendCount = 0 - function outputProgress () { - log(' \r') - log('Subscribe/Unsubscribe: ' + sendCount + ' of ' + count + '\r') - } - - function loop () { - if (sendCount < count) { - sendCount++ - outputProgress() - ipfs1.pubsub.subscribe(topic, (err, subscription) => { + ipfs1.pubsub.ls((err, topics) => { expect(err).to.not.exist - subscription.cancel((err) => { - expect(err).to.not.exist - outputProgress() - process.nextTick(() => loop()) - }) + expect(topics).to.be.eql([]) + done() }) - } else { - done() } - } - loop() + ) }) }) }) @@ -480,36 +522,21 @@ module.exports = (common) => { describe('promise API', () => { let ipfs1 - let ipfs2 before((done) => { - // CI takes longer to instantiate the daemon, - // so we need to increase the timeout for the - // before step common.setup((err, factory) => { - expect(err).to.not.exist - series([ - (cb) => { - factory.spawnNode((err, node) => { - expect(err).to.not.exist - ipfs1 = node - ipfs1.id().then((res) => { - ipfs1.peerId = res.id - cb() - }) - }) - }, - (cb) => { - factory.spawnNode((err, node) => { - expect(err).to.not.exist - ipfs2 = node - ipfs2.id().then((res) => { - ipfs2.peerId = res.id - cb() - }) - }) + if (err) { + return done(err) + } + + spawnWithId(factory, (err, node) => { + if (err) { + return done(err) } - ], done) + + ipfs1 = node + done() + }) }) }) @@ -517,45 +544,31 @@ module.exports = (common) => { common.teardown(done) }) - it('.subscribe', () => { - return ipfs1.pubsub.subscribe(topic) - .then((subscription) => { - expect(subscription).to.exist - return subscription.cancel() - }) - }) - - it('.publish', () => { - return ipfs1.pubsub.subscribe(topic) - .then((subscription) => { - return ipfs1.pubsub.publish(topic, 'hi').then(() => subscription) - }) - .then((subscription) => subscription.cancel()) - }) + it('.subscribe and .publish', () => { + const sub = (msg) => { + expect(msg.data.toString()).to.be.eql('hi') + ipfs1.pubsub.unsubscribe(topic, sub) + } - it('.cancel', () => { - return ipfs1.pubsub.subscribe(topic) - .then((subscription) => subscription.cancel()) + return ipfs1.pubsub.subscribe(topic, sub) + .then(() => ipfs1.pubsub.publish(topic, new Buffer('hi'))) }) it('.peers', () => { - let s - return ipfs1.pubsub.subscribe(topic) - .then((subscription) => { - s = subscription - return ipfs1.pubsub.peers(topic) - }) + const sub = (msg) => {} + + return ipfs1.pubsub.subscribe(topic, sub) + .then(() => ipfs1.pubsub.peers(topic)) .then((peers) => { expect(peers).to.exist - return s.cancel() + ipfs1.pubsub.unsubscribe(topic, sub) }) }) it('.ls', () => { return ipfs1.pubsub.ls() .then((topics) => { - expect(topics).to.exist - expect(topics.length).to.equal(0) + expect(topics).to.be.eql([]) }) }) })