From 9b438c0a17f69b48d493a7f0f23488888289315e Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 15 Oct 2019 12:54:57 +0100 Subject: [PATCH 1/6] chore: update to latest js-IPFS rc --- package.json | 2 +- test/concurrent-rooms.spec.js | 6 +++++- test/room.spec.js | 7 +++++-- test/same-node.spec.js | 4 ++-- test/utils/create-repo-node.js | 18 +++++++----------- 5 files changed, 20 insertions(+), 17 deletions(-) diff --git a/package.json b/package.json index 71745af..5517943 100644 --- a/package.json +++ b/package.json @@ -37,7 +37,7 @@ "chai": "^4.2.0", "dirty-chai": "^2.0.1", "aegir": "^18.0.3", - "ipfs": "~0.34.4" + "ipfs": "0.39.0-rc.0" }, "browser": { "./test/utils/create-repo-node.js": "./test/utils/create-repo-browser.js" diff --git a/test/concurrent-rooms.spec.js b/test/concurrent-rooms.spec.js index 0fc5da7..59ca937 100644 --- a/test/concurrent-rooms.spec.js +++ b/test/concurrent-rooms.spec.js @@ -69,7 +69,11 @@ describe('concurrent rooms', function () { }) }) - after((done) => each(repos, (repo, cb) => { repo.teardown(cb) }, done)) + after(() => { + return Promise.all( + repos.map(repo => repo.teardown()) + ) + }) it('can create a room, and they find each other', (done) => { room1A = Room(node1, topicA) diff --git a/test/room.spec.js b/test/room.spec.js index 84496eb..29e1370 100644 --- a/test/room.spec.js +++ b/test/room.spec.js @@ -7,7 +7,6 @@ chai.use(require('dirty-chai')) const expect = chai.expect const IPFS = require('ipfs') -const each = require('async/each') const clone = require('lodash.clonedeep') const Room = require('../') @@ -66,7 +65,11 @@ describe('room', function () { }) }) - after((done) => each(repos, (repo, cb) => { repo.teardown(cb) }, done)) + after(() => { + return Promise.all( + repos.map(repo => repo.teardown()) + ) + }) ;([1, 2].forEach((n) => { const topic = topicBase + '-' + n diff --git a/test/same-node.spec.js b/test/same-node.spec.js index 16ef7cb..ee44da3 100644 --- a/test/same-node.spec.js +++ b/test/same-node.spec.js @@ -52,9 +52,9 @@ describe('same node', function () { after(() => rooms.forEach((room) => room.leave())) - after((done) => node.stop(done)) + after(() => node.stop()) - after((done) => repo.teardown(done)) + after(() => repo.teardown()) it('mirrors broadcast', (done) => { rooms[0].once('message', (message) => { diff --git a/test/utils/create-repo-node.js b/test/utils/create-repo-node.js index 118a0a1..26e4471 100644 --- a/test/utils/create-repo-node.js +++ b/test/utils/create-repo-node.js @@ -2,7 +2,6 @@ const IPFSRepo = require('ipfs-repo') const clean = require('./clean') -const series = require('async/series') function createTempRepo () { const repoPath = '/tmp/ipfs-test-' + Math.random().toString().substring(2, 8) @@ -10,22 +9,19 @@ function createTempRepo () { const repo = new IPFSRepo(repoPath) - repo.teardown = (done) => { + repo.teardown = async () => { if (destroyed) { return } destroyed = true - series([ + try { + await repo.close() + } catch (err) { // ignore err, might have been closed already - (cb) => { - repo.close(() => cb()) - }, - (cb) => { - clean(repoPath) - cb() - } - ], done) + } + + clean(repoPath) } return repo From a25d36e0f314636e29f472f582df029cff1e7056 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 21 Jan 2020 11:20:41 +0000 Subject: [PATCH 2/6] refactor: updates to latest async iterator version of ipfs and libp2p Needs `ipfs.id` to return a `CID` before tests will pass. --- .aegir.js | 42 ++++++ .travis.yml | 40 ++++++ README.md | 35 ++--- circle.yml | 15 -- package.json | 17 +-- src/connection.js | 134 ++++++++++++------ src/direct-connection-handler.js | 42 +++--- src/index.js | 134 ++++++++---------- src/libp2p.js | 5 - src/peer-id.js | 4 +- test/concurrent-rooms.spec.js | 113 +++++++-------- test/room.spec.js | 119 +++++++--------- test/same-node.spec.js | 42 ++---- test/utils/create-ipfs-browser.js | 42 ++++++ test/utils/create-ipfs.js | 23 +++ .../{create-repo-node.js => create-repo.js} | 0 16 files changed, 450 insertions(+), 357 deletions(-) create mode 100644 .aegir.js create mode 100644 .travis.yml delete mode 100644 circle.yml delete mode 100644 src/libp2p.js create mode 100644 test/utils/create-ipfs-browser.js create mode 100644 test/utils/create-ipfs.js rename test/utils/{create-repo-node.js => create-repo.js} (100%) diff --git a/.aegir.js b/.aegir.js new file mode 100644 index 0000000..c48527b --- /dev/null +++ b/.aegir.js @@ -0,0 +1,42 @@ +'use strict' + +const { createFactory } = require('ipfsd-ctl') +const df = createFactory({ + ipfsModule: { + path: require.resolve('ipfs'), + ref: require('ipfs') + } +}) + +let ipfsd + +module.exports = { + hooks: { + browser: { + pre: async () => { + ipfsd = await df.spawn({ + type: 'proc', + test: true, + ipfsOptions: { + relay: { + enabled: true, + hop: { + enabled: true + } + }, + config: { + Addresses: { + Swarm: [ + '/ip4/127.0.0.1/tcp/24642/ws' + ] + } + } + } + }) + }, + post: async () => { + await ipfsd.stop() + } + } + } +} diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..be3ad28 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,40 @@ +language: node_js +cache: npm +stages: + - check + - test + - cov + +node_js: + - '12' + +os: + - linux + - osx + - windows + +script: npx nyc -s npm run test:node -- --bail +after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov + +jobs: + include: + - stage: check + script: + - npx aegir commitlint --travis + - npx aegir dep-check + - npm run lint + + - stage: test + name: chrome + addons: + chrome: stable + script: npx aegir test -t browser -t webworker + + - stage: test + name: firefox + addons: + firefox: latest + script: npx aegir test -t browser -t webworker -- --browsers FirefoxHeadless + +notifications: + email: false diff --git a/README.md b/README.md index 17f07c0..968febd 100644 --- a/README.md +++ b/README.md @@ -24,10 +24,7 @@ $ npm install ipfs-pubsub-room ```js const Room = require('ipfs-pubsub-room') const IPFS = require('ipfs') -const ipfs = new IPFS({ - EXPERIMENTAL: { - pubsub: true - }, +const ipfs = await IPFS.create({ config: { Addresses: { Swarm: [ @@ -38,21 +35,19 @@ const ipfs = new IPFS({ }) // IPFS node is ready, so we can start using ipfs-pubsub-room -ipfs.on('ready', () => { - const room = Room(ipfs, 'room-name') +const room = Room(ipfs, 'room-name') - room.on('peer joined', (peer) => { - console.log('Peer joined the room', peer) - }) +room.on('peer joined', (peer) => { + console.log('Peer joined the room', peer) +}) - room.on('peer left', (peer) => { - console.log('Peer left...', peer) - }) +room.on('peer left', (peer) => { + console.log('Peer left...', peer) +}) - // now started to listen to room - room.on('subscribed', () => { - console.log('Now connected!') - }) +// now started to listen to room +room.on('subscribed', () => { + console.log('Now connected!') }) ``` @@ -73,7 +68,7 @@ const room = Room(ipfs, 'some-room-name') Broacasts message (string or buffer). -## room.sendTo(peer, message) +## room.sendTo(cid, message) Sends message (string or buffer) to peer. @@ -85,7 +80,7 @@ Leaves room, stopping everything. Returns an array of peer identifiers (strings). -## room.hasPeer(peer) +## room.hasPeer(cid) Returns a boolean indicating if the given peer is present in the room. @@ -96,11 +91,11 @@ Listens for messages. A `message` is an object containing the following properti * `from` (string): peer id * `data` (Buffer): message content -## room.on('peer joined', (peer) => {}) +## room.on('peer joined', (cid) => {}) Once a peer has joined the room. -## room.on('peer left', (peer) => {}) +## room.on('peer left', (cid) => {}) Once a peer has left the room. diff --git a/circle.yml b/circle.yml deleted file mode 100644 index 0009693..0000000 --- a/circle.yml +++ /dev/null @@ -1,15 +0,0 @@ -# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. -machine: - node: - version: stable - -dependencies: - pre: - - google-chrome --version - - curl -L -o google-chrome.deb https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb - - sudo dpkg -i google-chrome.deb || true - - sudo apt-get update - - sudo apt-get install -f - - sudo apt-get install --only-upgrade lsb-base - - sudo dpkg -i google-chrome.deb - - google-chrome --version diff --git a/package.json b/package.json index 5517943..8700ace 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "lint": "aegir lint", "test": "aegir test", "test:node": "aegir test -t node", - "test:browser": "aegir test - browser" + "test:browser": "aegir test -t browser" }, "repository": { "type": "git", @@ -28,18 +28,19 @@ "homepage": "https://github.com/ipfs-shipyard/ipfs-pubsub-room#readme", "dependencies": { "hyperdiff": "^2.0.5", - "lodash.clonedeep": "^4.5.0", - "pull-pushable": "^2.2.0", - "pull-stream": "^3.6.9" + "it-pipe": "^1.1.0", + "lodash.clonedeep": "^4.5.0" }, "devDependencies": { - "async": "^2.6.1", + "aegir": "^20.5.1", "chai": "^4.2.0", + "delay": "^4.3.0", "dirty-chai": "^2.0.1", - "aegir": "^18.0.3", - "ipfs": "0.39.0-rc.0" + "ipfs": "ipfs/js-ipfs#refactor/async-await-roundup", + "ipfsd-ctl": "^1.0.6" }, "browser": { - "./test/utils/create-repo-node.js": "./test/utils/create-repo-browser.js" + "./test/utils/create-repo.js": "./test/utils/create-repo-browser.js", + "./test/utils/create-ipfs.js": "./test/utils/create-ipfs-browser.js" } } diff --git a/src/connection.js b/src/connection.js index 62bb3d6..5dab830 100644 --- a/src/connection.js +++ b/src/connection.js @@ -1,13 +1,12 @@ 'use strict' const EventEmitter = require('events') -const pull = require('pull-stream') -const Pushable = require('pull-pushable') +const pipe = require('it-pipe') +const CID = require('cids') const PROTOCOL = require('./protocol') const encoding = require('./encoding') const getPeerId = require('./peer-id') -const libp2p = require('./libp2p') module.exports = class Connection extends EventEmitter { constructor (id, ipfs, room) { @@ -22,11 +21,16 @@ module.exports = class Connection extends EventEmitter { push (message) { if (this._connection) { this._connection.push(encoding(message)) - } else { - this.once('connect', () => this.push(message)) - if (!this._connecting) { - this._getConnection() - } + + return + } + + this.once('connect', () => { + this.push(message) + }) + + if (!this._connecting) { + this._connect() } } @@ -36,52 +40,92 @@ module.exports = class Connection extends EventEmitter { } } - _getConnection () { + async _connect () { this._connecting = true - this._getPeerAddresses(this._id, (err, peerAddresses) => { - if (err) { - this.emit('error', err) - return // early - } + const peerAddresses = await this._getPeerAddresses(this._id) - if (!peerAddresses.length) { - this.emit('disconnect') - return // early + if (!peerAddresses.length) { + this.emit('disconnect') + return // early + } + + const peerId = new CID(peerAddresses[0].multihash) + const peerInfo = this._ipfs.libp2p.peerStore.get(peerId.toString('base58btc')) + const { stream } = await this._ipfs.libp2p.dialProtocol(peerInfo, PROTOCOL) + this._connection = new FiFoMessageQueue() + + pipe(this._connection, stream, async (source) => { + this.emit('connect', this._connection) + + for await (const message of source) { + this.emit('message', message) } + }) + .then(() => { + this.emit('disconnect') + }, (err) => { + this.emit('error', err) + }) + } + + async _getPeerAddresses (peerId) { + const peersAddresses = await this._ipfs.swarm.peers() + + return peersAddresses + .filter((peerAddress) => getPeerId(peerAddress.peer) === peerId.toString()) + .map(peerAddress => peerAddress.peer) + } +} + +class FiFoMessageQueue { + constructor () { + this._queue = [] + } + + [Symbol.asyncIterator] () { + return this + } - libp2p(this._ipfs).dialProtocol(peerAddresses[0], PROTOCOL, (err, conn) => { - if (err) { - this.emit('disconnect') - return // early - } - this._connecting = false - const pushable = Pushable() - this._connection = pushable - pull( - pushable, - conn, - pull.onEnd(() => { - delete this._connection - this.emit('disconnect') - }) - ) - this.emit('connect', pushable) + push (message) { + if (this._ended) { + throw new Error('Message queue ended') + } + + if (this._resolve) { + return this._resolve({ + done: false, + value: message }) - }) + } + + this._queue.push(message) + } + + end () { + this._ended = true + if (this._resolve) { + this._resolve({ + done: true + }) + } } - _getPeerAddresses (peerId, callback) { - this._ipfs.swarm.peers((err, peersAddresses) => { - if (err) { - callback(err) - return // early + next () { + if (this._ended) { + return { + done: true } + } + + if (this._queue.length) { + return { + done: false, + value: this._queue.shift() + } + } - callback( - null, - peersAddresses - .filter((peerAddress) => getPeerId(peerAddress.peer) === peerId) - .map(peerAddress => peerAddress.peer)) + return new Promise((resolve) => { + this._resolve = resolve }) } } diff --git a/src/direct-connection-handler.js b/src/direct-connection-handler.js index e6633fd..08157de 100644 --- a/src/direct-connection-handler.js +++ b/src/direct-connection-handler.js @@ -1,39 +1,38 @@ 'use strict' -const pull = require('pull-stream') const EventEmitter = require('events') +const pipe = require('it-pipe') +const CID = require('cids') const emitter = new EventEmitter() -function handler (protocol, conn) { - conn.getPeerInfo((err, peerInfo) => { - if (err) { - console.log(err) - return - } - - const peerId = peerInfo.id.toB58String() +function handler ({ connection, stream }) { + const peerId = new CID(connection.remotePeer.toString()).toString() - pull( - conn, - pull.map((message) => { + pipe( + stream, + async function (source) { + for await (const message of source) { let msg + try { msg = JSON.parse(message.toString()) + msg.to = new CID(msg.to.version, msg.to.codec, Buffer.from(msg.to.hash.data)) + msg.from = new CID(msg.from.version, msg.from.codec, Buffer.from(msg.from.hash.data)) } catch (err) { emitter.emit('warning', err.message) - return // early + continue // early } - if (peerId !== msg.from) { + if (peerId !== msg.from.toString()) { emitter.emit('warning', 'no peerid match ' + msg.from) - return // early + continue // early } const topicIDs = msg.topicIDs if (!Array.isArray(topicIDs)) { emitter.emit('warning', 'no topic IDs') - return // early + continue // early } msg.data = Buffer.from(msg.data, 'hex') @@ -42,14 +41,9 @@ function handler (protocol, conn) { topicIDs.forEach((topic) => { emitter.emit(topic, msg) }) - - return msg - }), - pull.onEnd(() => { - // do nothing - }) - ) - }) + } + } + ) } exports = module.exports = { diff --git a/src/index.js b/src/index.js index 4ff9559..1ff7d88 100644 --- a/src/index.js +++ b/src/index.js @@ -2,22 +2,19 @@ const diff = require('hyperdiff') const EventEmitter = require('events') -const timers = require('timers') const clone = require('lodash.clonedeep') +const CID = require('cids') const PROTOCOL = require('./protocol') const Connection = require('./connection') const encoding = require('./encoding') const directConnection = require('./direct-connection-handler') -const libp2p = require('./libp2p') const DEFAULT_OPTIONS = { pollInterval: 1000 } -module.exports = (ipfs, topic, options) => { - return new PubSubRoom(ipfs, topic, options) -} +let index = 0 class PubSubRoom extends EventEmitter { constructor (ipfs, topic, options) { @@ -29,18 +26,27 @@ class PubSubRoom extends EventEmitter { this._connections = {} this._handleDirectMessage = this._handleDirectMessage.bind(this) + this._handleMessage = this._onMessage.bind(this) if (!this._ipfs.pubsub) { throw new Error('This IPFS node does not have pubsub.') } - if (this._ipfs.isOnline()) { - this._start() - } else { - this._ipfs.on('ready', this._start.bind(this)) + if (!this._ipfs.libp2p) { + throw new Error('This IPFS node does not have libp2p.') } - this._ipfs.on('stop', this.leave.bind(this)) + this._interval = setInterval( + this._pollPeers.bind(this), + this._options.pollInterval + ) + + this._ipfs.libp2p.handle(PROTOCOL, directConnection.handler) + directConnection.emitter.on(this._topic, this._handleDirectMessage) + + this._ipfs.pubsub.subscribe(this._topic, this._handleMessage) + + this._idx = index++ } getPeers () { @@ -48,32 +54,26 @@ class PubSubRoom extends EventEmitter { } hasPeer (peer) { - return this._peers.indexOf(peer) >= 0 + return Boolean(this._peers.find(p => p.toString() === peer.toString())) } - leave () { - return new Promise((resolve, reject) => { - timers.clearInterval(this._interval) - Object.keys(this._connections).forEach((peer) => { - this._connections[peer].stop() - }) - directConnection.emitter.removeListener(this._topic, this._handleDirectMessage) - this.once('stopped', () => resolve()) - this.emit('stopping') + async leave () { + clearInterval(this._interval) + Object.keys(this._connections).forEach((peer) => { + this._connections[peer].stop() }) + directConnection.emitter.removeListener(this._topic, this._handleDirectMessage) + this._ipfs.libp2p.unhandle(PROTOCOL, directConnection.handler) + await this._ipfs.pubsub.unsubscribe(this._topic, this._handleMessage) } - broadcast (_message) { - let message = encoding(_message) + async broadcast (_message) { + const message = encoding(_message) - this._ipfs.pubsub.publish(this._topic, message, (err) => { - if (err) { - this.emit('error', err) - } - }) + await this._ipfs.pubsub.publish(this._topic, message) } - sendTo (peer, message) { + async sendTo (peer, message) { let conn = this._connections[peer] if (!conn) { conn = new Connection(peer, this._ipfs, this) @@ -82,7 +82,7 @@ class PubSubRoom extends EventEmitter { conn.once('disconnect', () => { delete this._connections[peer] - this._peers = this._peers.filter((p) => p !== peer) + this._peers = this._peers.filter((p) => p.toString() !== peer.toString()) this.emit('peer left', peer) }) } @@ -97,78 +97,56 @@ class PubSubRoom extends EventEmitter { const msg = { to: peer, - from: this._ipfs._peerInfo.id.toB58String(), + from: await this._ourId(), data: Buffer.from(message).toString('hex'), seqno: seqno.toString('hex'), - topicIDs: [ this._topic ], - topicCIDs: [ this._topic ] + topicIDs: [this._topic], + topicCIDs: [this._topic] } conn.push(Buffer.from(JSON.stringify(msg))) } - _start () { - this._interval = timers.setInterval( - this._pollPeers.bind(this), - this._options.pollInterval) - - const listener = this._onMessage.bind(this) - this._ipfs.pubsub.subscribe(this._topic, listener, {}, (err) => { - if (err) { - this.emit('error', err) - } else { - this.emit('subscribed', this._topic) - } - }) - - this.once('stopping', () => { - this._ipfs.pubsub.unsubscribe(this._topic, listener, (err) => { - if (err) { - this.emit('error', err) - } else { - this.emit('stopped') - } - }) - }) - - libp2p(this._ipfs).handle(PROTOCOL, directConnection.handler) + async _pollPeers () { + const newPeers = (await this._ipfs.pubsub.peers(this._topic)) + .map(id => new CID(1, 'libp2p-key', new CID(id).buffer)) - directConnection.emitter.on(this._topic, this._handleDirectMessage) - } - - _pollPeers () { - this._ipfs.pubsub.peers(this._topic, (err, _newPeers) => { - if (err) { - this.emit('error', err) - return // early - } - - const newPeers = _newPeers.sort() - - if (this._emitChanges(newPeers)) { - this._peers = newPeers - } - }) + if (this._emitChanges(newPeers)) { + this._peers = newPeers + } } _emitChanges (newPeers) { const differences = diff(this._peers, newPeers) - differences.added.forEach((addedPeer) => this.emit('peer joined', addedPeer)) - differences.removed.forEach((removedPeer) => this.emit('peer left', removedPeer)) + differences.added.forEach((peer) => this.emit('peer joined', peer)) + differences.removed.forEach((peer) => this.emit('peer left', peer)) return differences.added.length > 0 || differences.removed.length > 0 } _onMessage (message) { - this.emit('message', message) + this.emit('message', { + ...message, + from: new CID(1, 'libp2p-key', new CID(message.from).buffer) + }) } - _handleDirectMessage (message) { - if (message.to === this._ipfs._peerInfo.id.toB58String()) { + async _handleDirectMessage (message) { + if (message.to.toString() === (await this._ourId()).toString()) { const m = Object.assign({}, message) delete m.to this.emit('message', m) } } + + async _ourId () { + if (!this._id) { + this._id = (await this._ipfs.id()).id + } + + return this._id + } } + +module.exports = PubSubRoom diff --git a/src/libp2p.js b/src/libp2p.js deleted file mode 100644 index 2391985..0000000 --- a/src/libp2p.js +++ /dev/null @@ -1,5 +0,0 @@ -'use strict' - -module.exports = (ipfs) => { - return ipfs._libp2pNode || ipfs.libp2p -} diff --git a/src/peer-id.js b/src/peer-id.js index 9b368be..8953c92 100644 --- a/src/peer-id.js +++ b/src/peer-id.js @@ -1,8 +1,8 @@ 'use strict' module.exports = (peer) => { - if (peer.id && (typeof peer.id.toB58String === 'function')) { + if (peer.id) { peer = peer.id } - return peer.toB58String() + return peer.toString() } diff --git a/test/concurrent-rooms.spec.js b/test/concurrent-rooms.spec.js index 59ca937..cf65b68 100644 --- a/test/concurrent-rooms.spec.js +++ b/test/concurrent-rooms.spec.js @@ -6,28 +6,14 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const IPFS = require('ipfs') -const each = require('async/each') -const clone = require('lodash.clonedeep') +const delay = require('delay') -const Room = require('../') -const createRepo = require('./utils/create-repo-node') +const PubSubRoom = require('../') +const createRepo = require('./utils/create-repo') +const createIpfs = require('./utils/create-ipfs') const topic = 'pubsub-room-concurrency-test-' + Date.now() + '-' + Math.random() -const ipfsOptions = { - EXPERIMENTAL: { - pubsub: true - }, - config: { - Addresses: { - Swarm: [ - '/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star' - ] - } - } -} - describe('concurrent rooms', function () { this.timeout(30000) const repos = [] @@ -37,36 +23,34 @@ describe('concurrent rooms', function () { const topicA = topic + '-A' const topicB = topic + '-B' - before((done) => { + before(async () => { const repo = createRepo() repos.push(repo) - const options = Object.assign({}, clone(ipfsOptions), { - repo: repo - }) - node1 = new IPFS(options) - node1.once('ready', () => { - node1.id((err, info) => { - expect(err).to.not.exist() - id1 = info.id - done() - }) - }) + node1 = await createIpfs(repo) + id1 = (await node1.id()).id }) - before((done) => { + before(async () => { const repo = createRepo() repos.push(repo) - const options = Object.assign({}, clone(ipfsOptions), { - repo: repo - }) - node2 = new IPFS(options) - node2.once('ready', () => { - node2.id((err, info) => { - expect(err).to.not.exist() - id2 = info.id - done() - }) - }) + node2 = await createIpfs(repo, node1) + id2 = (await node2.id()).id + }) + + after(() => { + return Promise.all([ + room1A.leave(), + room1B.leave(), + room2A.leave(), + room2B.leave() + ]) + }) + + after(() => { + return Promise.all([ + node1.stop(), + node2.stop() + ]) }) after(() => { @@ -75,15 +59,11 @@ describe('concurrent rooms', function () { ) }) - it('can create a room, and they find each other', (done) => { - room1A = Room(node1, topicA) - room2A = Room(node2, topicA) - room1B = Room(node1, topicB) - room2B = Room(node2, topicB) - room1A.on('warning', console.log) - room2A.on('warning', console.log) - room1B.on('warning', console.log) - room2B.on('warning', console.log) + it('can create a room, and they find each other', async () => { + room1A = new PubSubRoom(node1, topicA) + room2A = new PubSubRoom(node2, topicA) + room1B = new PubSubRoom(node1, topicB) + room2B = new PubSubRoom(node2, topicB) const roomNodes = [ [room1A, id2], @@ -92,14 +72,19 @@ describe('concurrent rooms', function () { [room2A, id1] ] - each(roomNodes, (roomNode, cb) => { - const room = roomNode[0] - const waitingFor = roomNode[1] - room.once('peer joined', (id) => { - expect(id).to.equal(waitingFor) - cb() + await Promise.all( + roomNodes.map(async (roomNode) => { + const room = roomNode[0] + const waitingFor = roomNode[1] + + await new Promise((resolve) => { + room.once('peer joined', (peer) => { + expect(peer.toString()).to.equal(waitingFor.toString()) + resolve() + }) + }) }) - }, done) + ) }) it('has peer', (done) => { @@ -119,7 +104,7 @@ describe('concurrent rooms', function () { throw new Error('double message') } gotMessage = true - expect(message.from).to.equal(id2) + expect(message.from.toString()).to.equal(id2.toString()) expect(message.data.toString()).to.equal('message 1') room1B.removeListener('message', crash) @@ -133,7 +118,7 @@ describe('concurrent rooms', function () { room2B.on('message', crash) room2A.once('message', (message) => { - expect(message.from).to.equal(id1) + expect(message.from.toString()).to.equal(id1.toString()) expect(message.seqno.toString()).to.equal(Buffer.from([0]).toString()) expect(message.topicIDs).to.deep.equal([topicA]) expect(message.topicCIDs).to.deep.equal([topicA]) @@ -146,17 +131,17 @@ describe('concurrent rooms', function () { it('can leave room', (done) => { room1A.once('peer left', (peer) => { - expect(peer).to.equal(id2) + expect(peer.toString()).to.equal(id2.toString()) done() }) room2A.leave() }) - it('after leaving, it does not receive more messages', (done) => { + it('after leaving, it does not receive more messages', async () => { room2A.on('message', Crash('should not receive this')) - room2A.leave() + await room2A.leave() room1A.broadcast('message 3') - setTimeout(done, 3000) + await delay(3000) }) }) diff --git a/test/room.spec.js b/test/room.spec.js index 29e1370..70a08f1 100644 --- a/test/room.spec.js +++ b/test/room.spec.js @@ -6,90 +6,62 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const IPFS = require('ipfs') -const clone = require('lodash.clonedeep') - -const Room = require('../') -const createRepo = require('./utils/create-repo-node') +const PubSubRoom = require('../') +const createRepo = require('./utils/create-repo') +const createIpfs = require('./utils/create-ipfs') const topicBase = 'pubsub-room-test-' + Date.now() + '-' + Math.random() -const ipfsOptions = { - EXPERIMENTAL: { - pubsub: true - }, - config: { - Addresses: { - Swarm: [ - '/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star' - ] - } - } -} - describe('room', function () { this.timeout(30000) const repos = [] let node1, node2 let id1, id2 - before((done) => { + before(async () => { const repo = createRepo() repos.push(repo) - const options = Object.assign({}, clone(ipfsOptions), { - repo: repo - }) - node1 = new IPFS(options) - node1.once('ready', () => { - node1.id((err, info) => { - expect(err).to.not.exist() - id1 = info.id - done() - }) - }) + + node1 = await createIpfs(repo) + id1 = (await node1.id()).id }) - before((done) => { + before(async () => { const repo = createRepo() repos.push(repo) - const options = Object.assign({}, clone(ipfsOptions), { - repo: repo - }) - node2 = new IPFS(options) - node2.once('ready', () => { - node2.id((err, info) => { - expect(err).to.not.exist() - id2 = info.id - done() - }) - }) - }) - after(() => { - return Promise.all( - repos.map(repo => repo.teardown()) - ) + node2 = await createIpfs(repo, node1) + id2 = (await node2.id()).id }) + const rooms = [] + ;([1, 2].forEach((n) => { const topic = topicBase + '-' + n - let room1, room2 + + after('after topic ' + n, () => { + return Promise.all([ + rooms[n].a.leave(), + rooms[n].b.leave() + ]) + }) + describe('topic ' + n, () => { it('can create a room, and they find each other', (done) => { - room1 = Room(node1, topic) - room2 = Room(node2, topic) - room1.on('warning', console.log) - room2.on('warning', console.log) + rooms[n] = { + a: new PubSubRoom(node1, topic), + b: new PubSubRoom(node2, topic) + } let left = 2 - room1.once('peer joined', (id) => { - expect(id).to.equal(id2) + rooms[n].a.once('peer joined', (id) => { + expect(id).to.deep.equal(id2) if (--left === 0) { done() } }) - room2.once('peer joined', (id) => { - expect(id).to.equal(id1) + rooms[n].b.once('peer joined', (id) => { + expect(id).to.deep.equal(id1) if (--left === 0) { done() } @@ -97,50 +69,63 @@ describe('room', function () { }) it('has peer', (done) => { - expect(room1.getPeers()).to.deep.equal([id2]) - expect(room2.getPeers()).to.deep.equal([id1]) + expect(rooms[n].a.getPeers()).to.deep.equal([id2]) + expect(rooms[n].b.getPeers()).to.deep.equal([id1]) done() }) it('can broadcast', (done) => { let gotMessage = false - room1.on('message', (message) => { + rooms[n].a.on('message', (message) => { if (gotMessage) { throw new Error('double message:' + message.data.toString()) } gotMessage = true - expect(message.from).to.equal(id2) + expect(message.from).to.deep.equal(id2) expect(message.data.toString()).to.equal('message 1') done() }) - room2.broadcast('message 1') + rooms[n].b.broadcast('message 1') }) it('can send private message', (done) => { let gotMessage = false - room2.on('message', (message) => { + rooms[n].b.on('message', (message) => { if (gotMessage) { throw new Error('double message') } gotMessage = true - expect(message.from).to.equal(id1) + expect(message.from).to.deep.equal(id1) expect(message.seqno.toString()).to.equal(Buffer.from([0]).toString()) expect(message.topicIDs).to.deep.equal([topic]) expect(message.topicCIDs).to.deep.equal([topic]) expect(message.data.toString()).to.equal('message 2') done() }) - room1.sendTo(id2, 'message 2') + rooms[n].a.sendTo(id2, 'message 2') }) it('can leave room', (done) => { - room1.once('peer left', (peer) => { - expect(peer).to.equal(id2) + rooms[n].a.once('peer left', (peer) => { + expect(peer).to.deep.equal(id2) done() }) - room2.leave() + rooms[n].b.leave() }) }) })) + + after(() => { + return Promise.all([ + node1.stop(), + node2.stop() + ]) + }) + + after(() => { + return Promise.all( + repos.map(repo => repo.teardown()) + ) + }) }) diff --git a/test/same-node.spec.js b/test/same-node.spec.js index ee44da3..c3cb5a7 100644 --- a/test/same-node.spec.js +++ b/test/same-node.spec.js @@ -6,51 +6,35 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const IPFS = require('ipfs') -const clone = require('lodash.clonedeep') - -const Room = require('../') -const createRepo = require('./utils/create-repo-node') +const PubSubRoom = require('../') +const createRepo = require('./utils/create-repo') +const createIpfs = require('./utils/create-ipfs') const topic = 'pubsub-same-node-test-' + Date.now() + '-' + Math.random() -const ipfsOptions = { - EXPERIMENTAL: { - pubsub: true - }, - config: { - Addresses: { - Swarm: [ - '/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star' - ] - } - } -} - describe('same node', function () { this.timeout(30000) let repo let node - let rooms = [] + const rooms = [] - before((done) => { + before(async () => { repo = createRepo() - const options = Object.assign({}, clone(ipfsOptions), { - repo: repo - }) - node = new IPFS(options) - node.once('ready', () => { - done() - }) + + node = await createIpfs(repo) }) before(() => { for (let i = 0; i < 2; i++) { - rooms.push(Room(node, topic)) + rooms.push(new PubSubRoom(node, topic)) } }) - after(() => rooms.forEach((room) => room.leave())) + after(() => { + return Promise.all( + rooms.map(room => room.leave()) + ) + }) after(() => node.stop()) diff --git a/test/utils/create-ipfs-browser.js b/test/utils/create-ipfs-browser.js new file mode 100644 index 0000000..7d65331 --- /dev/null +++ b/test/utils/create-ipfs-browser.js @@ -0,0 +1,42 @@ +'use strict' + +const IPFS = require('ipfs') +const clone = require('lodash.clonedeep') +const RELAY_MULTIADDR = '/ip4/127.0.0.1/tcp/24642/ws' + +const ipfsOptions = { + relay: { + enabled: true, // enable relay dialer/listener (STOP) + hop: { + enabled: true // make this node a relay (HOP) + } + }, + config: { + Bootstrap: [ + '/ip4/127.0.0.1/tcp/24642/ws' + ] + } +} + +module.exports = async (repo, otherNode) => { + const options = Object.assign({}, clone(ipfsOptions), { + repo + }) + + const ipfs = await IPFS.create(options) + + // connect to relay peer + await ipfs.swarm.connect(RELAY_MULTIADDR) + + // both nodes created, get them to dial each other via the relay + if (otherNode) { + const peers = await ipfs.swarm.peers() + const nodeId = await ipfs.id() + const otherNodeId = await otherNode.id() + + await ipfs.swarm.connect(`${RELAY_MULTIADDR}/p2p/${peers[0].peer.toString()}/p2p-circuit/p2p/${otherNodeId.id.toString()}`) + await otherNode.swarm.connect(`${RELAY_MULTIADDR}/p2p/${peers[0].peer.toString()}/p2p-circuit/p2p/${nodeId.id.toString()}`) + } + + return ipfs +} diff --git a/test/utils/create-ipfs.js b/test/utils/create-ipfs.js new file mode 100644 index 0000000..016722f --- /dev/null +++ b/test/utils/create-ipfs.js @@ -0,0 +1,23 @@ +'use strict' + +const IPFS = require('ipfs') +const clone = require('lodash.clonedeep') + +const ipfsOptions = { + config: { + Addresses: { + Swarm: [ + '/ip4/0.0.0.0/tcp/0' + ] + }, + Bootstrap: [] + } +} + +module.exports = (repo) => { + const options = Object.assign({}, clone(ipfsOptions), { + repo + }) + + return IPFS.create(options) +} diff --git a/test/utils/create-repo-node.js b/test/utils/create-repo.js similarity index 100% rename from test/utils/create-repo-node.js rename to test/utils/create-repo.js From 619b505acd3aa6a96b46b50e34a82bc1b51d112e Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 21 Jan 2020 11:26:47 +0000 Subject: [PATCH 3/6] chore: only build master branch and prs --- .travis.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.travis.yml b/.travis.yml index be3ad28..5c40af9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,8 @@ language: node_js +branches: + only: + - master + - /^release\/.*$/ cache: npm stages: - check From 9d11615ac7fa4fbe4e94dee9e986ab06bb8db453 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 23 Jan 2020 10:25:43 +0000 Subject: [PATCH 4/6] refactor: ipfs will return peer ids as strings instead --- package.json | 4 +++- src/connection.js | 8 +++----- src/direct-connection-handler.js | 5 +---- src/index.js | 9 ++------- src/peer-id.js | 8 -------- test/concurrent-rooms.spec.js | 2 +- 6 files changed, 10 insertions(+), 26 deletions(-) delete mode 100644 src/peer-id.js diff --git a/package.json b/package.json index 8700ace..cc9c950 100644 --- a/package.json +++ b/package.json @@ -37,7 +37,9 @@ "delay": "^4.3.0", "dirty-chai": "^2.0.1", "ipfs": "ipfs/js-ipfs#refactor/async-await-roundup", - "ipfsd-ctl": "^1.0.6" + "ipfs-repo": "^0.30.1", + "ipfsd-ctl": "^1.0.6", + "rimraf": "^3.0.0" }, "browser": { "./test/utils/create-repo.js": "./test/utils/create-repo-browser.js", diff --git a/src/connection.js b/src/connection.js index 5dab830..71d73f1 100644 --- a/src/connection.js +++ b/src/connection.js @@ -2,11 +2,9 @@ const EventEmitter = require('events') const pipe = require('it-pipe') -const CID = require('cids') const PROTOCOL = require('./protocol') const encoding = require('./encoding') -const getPeerId = require('./peer-id') module.exports = class Connection extends EventEmitter { constructor (id, ipfs, room) { @@ -49,8 +47,8 @@ module.exports = class Connection extends EventEmitter { return // early } - const peerId = new CID(peerAddresses[0].multihash) - const peerInfo = this._ipfs.libp2p.peerStore.get(peerId.toString('base58btc')) + const peerId = peerAddresses[0] + const peerInfo = this._ipfs.libp2p.peerStore.get(peerId) const { stream } = await this._ipfs.libp2p.dialProtocol(peerInfo, PROTOCOL) this._connection = new FiFoMessageQueue() @@ -72,7 +70,7 @@ module.exports = class Connection extends EventEmitter { const peersAddresses = await this._ipfs.swarm.peers() return peersAddresses - .filter((peerAddress) => getPeerId(peerAddress.peer) === peerId.toString()) + .filter((peerAddress) => peerAddress.peer === peerId) .map(peerAddress => peerAddress.peer) } } diff --git a/src/direct-connection-handler.js b/src/direct-connection-handler.js index 08157de..1e14b00 100644 --- a/src/direct-connection-handler.js +++ b/src/direct-connection-handler.js @@ -2,12 +2,11 @@ const EventEmitter = require('events') const pipe = require('it-pipe') -const CID = require('cids') const emitter = new EventEmitter() function handler ({ connection, stream }) { - const peerId = new CID(connection.remotePeer.toString()).toString() + const peerId = connection.remotePeer.toB58String() pipe( stream, @@ -17,8 +16,6 @@ function handler ({ connection, stream }) { try { msg = JSON.parse(message.toString()) - msg.to = new CID(msg.to.version, msg.to.codec, Buffer.from(msg.to.hash.data)) - msg.from = new CID(msg.from.version, msg.from.codec, Buffer.from(msg.from.hash.data)) } catch (err) { emitter.emit('warning', err.message) continue // early diff --git a/src/index.js b/src/index.js index 1ff7d88..8e20f04 100644 --- a/src/index.js +++ b/src/index.js @@ -3,7 +3,6 @@ const diff = require('hyperdiff') const EventEmitter = require('events') const clone = require('lodash.clonedeep') -const CID = require('cids') const PROTOCOL = require('./protocol') const Connection = require('./connection') @@ -108,8 +107,7 @@ class PubSubRoom extends EventEmitter { } async _pollPeers () { - const newPeers = (await this._ipfs.pubsub.peers(this._topic)) - .map(id => new CID(1, 'libp2p-key', new CID(id).buffer)) + const newPeers = (await this._ipfs.pubsub.peers(this._topic)).sort() if (this._emitChanges(newPeers)) { this._peers = newPeers @@ -126,10 +124,7 @@ class PubSubRoom extends EventEmitter { } _onMessage (message) { - this.emit('message', { - ...message, - from: new CID(1, 'libp2p-key', new CID(message.from).buffer) - }) + this.emit('message', message) } async _handleDirectMessage (message) { diff --git a/src/peer-id.js b/src/peer-id.js deleted file mode 100644 index 8953c92..0000000 --- a/src/peer-id.js +++ /dev/null @@ -1,8 +0,0 @@ -'use strict' - -module.exports = (peer) => { - if (peer.id) { - peer = peer.id - } - return peer.toString() -} diff --git a/test/concurrent-rooms.spec.js b/test/concurrent-rooms.spec.js index cf65b68..622c263 100644 --- a/test/concurrent-rooms.spec.js +++ b/test/concurrent-rooms.spec.js @@ -79,7 +79,7 @@ describe('concurrent rooms', function () { await new Promise((resolve) => { room.once('peer joined', (peer) => { - expect(peer.toString()).to.equal(waitingFor.toString()) + expect(peer).to.equal(waitingFor) resolve() }) }) From b86e70e7a91e694d982f4168cb57a3f346dcd846 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 24 Jan 2020 17:59:46 +0000 Subject: [PATCH 5/6] refactor: remove ipfs and just use libp2p --- .aegir.js | 57 ++++++++++++-------------- README.md | 44 +++++++++++--------- package.json | 13 +++--- src/connection.js | 28 ++++++------- src/index.js | 42 +++++++------------ test/concurrent-rooms.spec.js | 22 +++------- test/room.spec.js | 24 +++-------- test/same-node.spec.js | 10 +---- test/utils/create-ipfs-browser.js | 42 ------------------- test/utils/create-ipfs.js | 23 ----------- test/utils/create-libp2p.js | 68 +++++++++++++++++++++++++++++++ test/utils/create-repo-browser.js | 27 ------------ 12 files changed, 166 insertions(+), 234 deletions(-) delete mode 100644 test/utils/create-ipfs-browser.js delete mode 100644 test/utils/create-ipfs.js create mode 100644 test/utils/create-libp2p.js delete mode 100644 test/utils/create-repo-browser.js diff --git a/.aegir.js b/.aegir.js index c48527b..70e1748 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,42 +1,37 @@ 'use strict' -const { createFactory } = require('ipfsd-ctl') -const df = createFactory({ - ipfsModule: { - path: require.resolve('ipfs'), - ref: require('ipfs') - } -}) +const Libp2p = require('libp2p') +const PeerInfo = require('peer-info') +const { config } = require('./test/utils/create-libp2p') -let ipfsd +let relay module.exports = { hooks: { - browser: { - pre: async () => { - ipfsd = await df.spawn({ - type: 'proc', - test: true, - ipfsOptions: { - relay: { - enabled: true, - hop: { - enabled: true - } - }, - config: { - Addresses: { - Swarm: [ - '/ip4/127.0.0.1/tcp/24642/ws' - ] - } + pre: async () => { + const peerInfo = await PeerInfo.create() + peerInfo.multiaddrs.add('/ip4/127.0.0.1/tcp/24642/ws') + + const defaultConfig = await config() + + relay = new Libp2p({ + ...defaultConfig, + peerInfo, + config: { + ...defaultConfig.config, + relay: { + enabled: true, + hop: { + enabled: true } } - }) - }, - post: async () => { - await ipfsd.stop() - } + } + }) + + await relay.start() + }, + post: async () => { + await relay.stop() } } } diff --git a/README.md b/README.md index 968febd..2b97306 100644 --- a/README.md +++ b/README.md @@ -5,14 +5,10 @@ [![Build Status](https://travis-ci.org/ipfs-shipyard/ipfs-pubsub-room.svg?branch=master)](https://travis-ci.org/ipfs-shipyard/ipfs-pubsub-room) -> Creates a room based on an IPFS pub-sub channel. Emits membership events, listens for messages, broadcast and direct messeges to peers. +> Creates a room based on a LibP2P pub-sub channel. Emits membership events, listens for messages, broadcast and direct messeges to peers. ([Demo video](https://t.co/HNYQGE4D4P)) -## js-ipfs - -This package has been tested with js-ipfs version __0.32.0__. - ## Install ```bash @@ -21,22 +17,32 @@ $ npm install ipfs-pubsub-room ## Use +Creating a pubsub room from a LibP2P node + +```js +const Room = require('ipfs-pubsub-room') +const Libp2p = require('libp2p') + +const libp2p = new Libp2p({ ... }) +await libp2p.start() + +// libp2p node is ready, so we can start using ipfs-pubsub-room +const room = Room(libp2p, 'room-name') +``` + +Creating a pubsub room from an IPFS node + ```js const Room = require('ipfs-pubsub-room') const IPFS = require('ipfs') -const ipfs = await IPFS.create({ - config: { - Addresses: { - Swarm: [ - '/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star' - ] - } - } -}) -// IPFS node is ready, so we can start using ipfs-pubsub-room -const room = Room(ipfs, 'room-name') +const ipfs = await IPFS.create({ ... }) +const room = Room(ipfs.libp2p, 'room-name') +``` + +Once we have a room we can listen for messages +```js room.on('peer joined', (peer) => { console.log('Peer joined the room', peer) }) @@ -53,15 +59,15 @@ room.on('subscribed', () => { ## API -### Room (ipfs:IPFS, roomName:string, options:object) +### Room (libp2p:LibP2P, roomName:string, options:object) -* `ipfs`: IPFS object. Must have pubsub activated +* `libp2p`: LibP2P node. Must have pubsub activated * `roomName`: string, global identifier for the room * `options`: object: * `pollInterval`: interval for polling the pubsub peers, in ms. Defaults to 1000. ```js -const room = Room(ipfs, 'some-room-name') +const room = Room(libp2p, 'some-room-name') ``` ## room.broadcast(message) diff --git a/package.json b/package.json index cc9c950..f7595c6 100644 --- a/package.json +++ b/package.json @@ -36,13 +36,12 @@ "chai": "^4.2.0", "delay": "^4.3.0", "dirty-chai": "^2.0.1", - "ipfs": "ipfs/js-ipfs#refactor/async-await-roundup", - "ipfs-repo": "^0.30.1", - "ipfsd-ctl": "^1.0.6", + "libp2p": "0.27.0-rc.0", + "libp2p-gossipsub": "0.2.1", + "libp2p-mplex": "^0.9.3", + "libp2p-secio": "^0.12.2", + "libp2p-websockets": "^0.13.2", + "peer-info": "^0.17.1", "rimraf": "^3.0.0" - }, - "browser": { - "./test/utils/create-repo.js": "./test/utils/create-repo-browser.js", - "./test/utils/create-ipfs.js": "./test/utils/create-ipfs-browser.js" } } diff --git a/src/connection.js b/src/connection.js index 71d73f1..e20ec42 100644 --- a/src/connection.js +++ b/src/connection.js @@ -7,10 +7,10 @@ const PROTOCOL = require('./protocol') const encoding = require('./encoding') module.exports = class Connection extends EventEmitter { - constructor (id, ipfs, room) { + constructor (remoteId, libp2p, room) { super() - this._id = id - this._ipfs = ipfs + this._remoteId = remoteId + this._libp2p = libp2p this._room = room this._connection = null this._connecting = false @@ -40,19 +40,19 @@ module.exports = class Connection extends EventEmitter { async _connect () { this._connecting = true - const peerAddresses = await this._getPeerAddresses(this._id) - if (!peerAddresses.length) { + if (!this._isConnectedToRemote()) { this.emit('disconnect') + this._connecting = false return // early } - const peerId = peerAddresses[0] - const peerInfo = this._ipfs.libp2p.peerStore.get(peerId) - const { stream } = await this._ipfs.libp2p.dialProtocol(peerInfo, PROTOCOL) + const peerInfo = this._libp2p.peerStore.get(this._remoteId) + const { stream } = await this._libp2p.dialProtocol(peerInfo, PROTOCOL) this._connection = new FiFoMessageQueue() pipe(this._connection, stream, async (source) => { + this._connecting = false this.emit('connect', this._connection) for await (const message of source) { @@ -66,12 +66,12 @@ module.exports = class Connection extends EventEmitter { }) } - async _getPeerAddresses (peerId) { - const peersAddresses = await this._ipfs.swarm.peers() - - return peersAddresses - .filter((peerAddress) => peerAddress.peer === peerId) - .map(peerAddress => peerAddress.peer) + _isConnectedToRemote () { + for (const peerId of this._libp2p.connections.keys()) { + if (peerId === this._remoteId) { + return true + } + } } } diff --git a/src/index.js b/src/index.js index 8e20f04..bf31114 100644 --- a/src/index.js +++ b/src/index.js @@ -16,9 +16,9 @@ const DEFAULT_OPTIONS = { let index = 0 class PubSubRoom extends EventEmitter { - constructor (ipfs, topic, options) { + constructor (libp2p, topic, options) { super() - this._ipfs = ipfs + this._libp2p = libp2p this._topic = topic this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options)) this._peers = [] @@ -27,12 +27,8 @@ class PubSubRoom extends EventEmitter { this._handleDirectMessage = this._handleDirectMessage.bind(this) this._handleMessage = this._onMessage.bind(this) - if (!this._ipfs.pubsub) { - throw new Error('This IPFS node does not have pubsub.') - } - - if (!this._ipfs.libp2p) { - throw new Error('This IPFS node does not have libp2p.') + if (!this._libp2p.pubsub) { + throw new Error('pubsub has not been configured') } this._interval = setInterval( @@ -40,10 +36,10 @@ class PubSubRoom extends EventEmitter { this._options.pollInterval ) - this._ipfs.libp2p.handle(PROTOCOL, directConnection.handler) + this._libp2p.handle(PROTOCOL, directConnection.handler) directConnection.emitter.on(this._topic, this._handleDirectMessage) - this._ipfs.pubsub.subscribe(this._topic, this._handleMessage) + this._libp2p.pubsub.subscribe(this._topic, this._handleMessage) this._idx = index++ } @@ -62,20 +58,20 @@ class PubSubRoom extends EventEmitter { this._connections[peer].stop() }) directConnection.emitter.removeListener(this._topic, this._handleDirectMessage) - this._ipfs.libp2p.unhandle(PROTOCOL, directConnection.handler) - await this._ipfs.pubsub.unsubscribe(this._topic, this._handleMessage) + this._libp2p.unhandle(PROTOCOL, directConnection.handler) + await this._libp2p.pubsub.unsubscribe(this._topic, this._handleMessage) } async broadcast (_message) { const message = encoding(_message) - await this._ipfs.pubsub.publish(this._topic, message) + await this._libp2p.pubsub.publish(this._topic, message) } - async sendTo (peer, message) { + sendTo (peer, message) { let conn = this._connections[peer] if (!conn) { - conn = new Connection(peer, this._ipfs, this) + conn = new Connection(peer, this._libp2p, this) conn.on('error', (err) => this.emit('error', err)) this._connections[peer] = conn @@ -96,7 +92,7 @@ class PubSubRoom extends EventEmitter { const msg = { to: peer, - from: await this._ourId(), + from: this._libp2p.peerInfo.id.toB58String(), data: Buffer.from(message).toString('hex'), seqno: seqno.toString('hex'), topicIDs: [this._topic], @@ -107,7 +103,7 @@ class PubSubRoom extends EventEmitter { } async _pollPeers () { - const newPeers = (await this._ipfs.pubsub.peers(this._topic)).sort() + const newPeers = (await this._libp2p.pubsub.getSubscribers(this._topic)).sort() if (this._emitChanges(newPeers)) { this._peers = newPeers @@ -127,21 +123,13 @@ class PubSubRoom extends EventEmitter { this.emit('message', message) } - async _handleDirectMessage (message) { - if (message.to.toString() === (await this._ourId()).toString()) { + _handleDirectMessage (message) { + if (message.to.toString() === this._libp2p.peerInfo.id.toB58String()) { const m = Object.assign({}, message) delete m.to this.emit('message', m) } } - - async _ourId () { - if (!this._id) { - this._id = (await this._ipfs.id()).id - } - - return this._id - } } module.exports = PubSubRoom diff --git a/test/concurrent-rooms.spec.js b/test/concurrent-rooms.spec.js index 622c263..69c151b 100644 --- a/test/concurrent-rooms.spec.js +++ b/test/concurrent-rooms.spec.js @@ -9,14 +9,12 @@ const expect = chai.expect const delay = require('delay') const PubSubRoom = require('../') -const createRepo = require('./utils/create-repo') -const createIpfs = require('./utils/create-ipfs') +const createLibp2p = require('./utils/create-libp2p') const topic = 'pubsub-room-concurrency-test-' + Date.now() + '-' + Math.random() describe('concurrent rooms', function () { this.timeout(30000) - const repos = [] let node1, node2 let id1, id2 let room1A, room1B, room2A, room2B @@ -24,17 +22,13 @@ describe('concurrent rooms', function () { const topicB = topic + '-B' before(async () => { - const repo = createRepo() - repos.push(repo) - node1 = await createIpfs(repo) - id1 = (await node1.id()).id + node1 = await createLibp2p() + id1 = node1.peerInfo.id.toB58String() }) before(async () => { - const repo = createRepo() - repos.push(repo) - node2 = await createIpfs(repo, node1) - id2 = (await node2.id()).id + node2 = await createLibp2p(node1) + id2 = node2.peerInfo.id.toB58String() }) after(() => { @@ -53,12 +47,6 @@ describe('concurrent rooms', function () { ]) }) - after(() => { - return Promise.all( - repos.map(repo => repo.teardown()) - ) - }) - it('can create a room, and they find each other', async () => { room1A = new PubSubRoom(node1, topicA) room2A = new PubSubRoom(node2, topicA) diff --git a/test/room.spec.js b/test/room.spec.js index 70a08f1..600650e 100644 --- a/test/room.spec.js +++ b/test/room.spec.js @@ -7,31 +7,23 @@ chai.use(require('dirty-chai')) const expect = chai.expect const PubSubRoom = require('../') -const createRepo = require('./utils/create-repo') -const createIpfs = require('./utils/create-ipfs') +const createLibp2p = require('./utils/create-libp2p') const topicBase = 'pubsub-room-test-' + Date.now() + '-' + Math.random() describe('room', function () { this.timeout(30000) - const repos = [] let node1, node2 let id1, id2 before(async () => { - const repo = createRepo() - repos.push(repo) - - node1 = await createIpfs(repo) - id1 = (await node1.id()).id + node1 = await createLibp2p() + id1 = node1.peerInfo.id.toB58String() }) before(async () => { - const repo = createRepo() - repos.push(repo) - - node2 = await createIpfs(repo, node1) - id2 = (await node2.id()).id + node2 = await createLibp2p(node1) + id2 = node2.peerInfo.id.toB58String() }) const rooms = [] @@ -122,10 +114,4 @@ describe('room', function () { node2.stop() ]) }) - - after(() => { - return Promise.all( - repos.map(repo => repo.teardown()) - ) - }) }) diff --git a/test/same-node.spec.js b/test/same-node.spec.js index c3cb5a7..e51aba2 100644 --- a/test/same-node.spec.js +++ b/test/same-node.spec.js @@ -7,21 +7,17 @@ chai.use(require('dirty-chai')) const expect = chai.expect const PubSubRoom = require('../') -const createRepo = require('./utils/create-repo') -const createIpfs = require('./utils/create-ipfs') +const createLibp2p = require('./utils/create-libp2p') const topic = 'pubsub-same-node-test-' + Date.now() + '-' + Math.random() describe('same node', function () { this.timeout(30000) - let repo let node const rooms = [] before(async () => { - repo = createRepo() - - node = await createIpfs(repo) + node = await createLibp2p() }) before(() => { @@ -38,8 +34,6 @@ describe('same node', function () { after(() => node.stop()) - after(() => repo.teardown()) - it('mirrors broadcast', (done) => { rooms[0].once('message', (message) => { expect(message.data.toString()).to.equal('message 1') diff --git a/test/utils/create-ipfs-browser.js b/test/utils/create-ipfs-browser.js deleted file mode 100644 index 7d65331..0000000 --- a/test/utils/create-ipfs-browser.js +++ /dev/null @@ -1,42 +0,0 @@ -'use strict' - -const IPFS = require('ipfs') -const clone = require('lodash.clonedeep') -const RELAY_MULTIADDR = '/ip4/127.0.0.1/tcp/24642/ws' - -const ipfsOptions = { - relay: { - enabled: true, // enable relay dialer/listener (STOP) - hop: { - enabled: true // make this node a relay (HOP) - } - }, - config: { - Bootstrap: [ - '/ip4/127.0.0.1/tcp/24642/ws' - ] - } -} - -module.exports = async (repo, otherNode) => { - const options = Object.assign({}, clone(ipfsOptions), { - repo - }) - - const ipfs = await IPFS.create(options) - - // connect to relay peer - await ipfs.swarm.connect(RELAY_MULTIADDR) - - // both nodes created, get them to dial each other via the relay - if (otherNode) { - const peers = await ipfs.swarm.peers() - const nodeId = await ipfs.id() - const otherNodeId = await otherNode.id() - - await ipfs.swarm.connect(`${RELAY_MULTIADDR}/p2p/${peers[0].peer.toString()}/p2p-circuit/p2p/${otherNodeId.id.toString()}`) - await otherNode.swarm.connect(`${RELAY_MULTIADDR}/p2p/${peers[0].peer.toString()}/p2p-circuit/p2p/${nodeId.id.toString()}`) - } - - return ipfs -} diff --git a/test/utils/create-ipfs.js b/test/utils/create-ipfs.js deleted file mode 100644 index 016722f..0000000 --- a/test/utils/create-ipfs.js +++ /dev/null @@ -1,23 +0,0 @@ -'use strict' - -const IPFS = require('ipfs') -const clone = require('lodash.clonedeep') - -const ipfsOptions = { - config: { - Addresses: { - Swarm: [ - '/ip4/0.0.0.0/tcp/0' - ] - }, - Bootstrap: [] - } -} - -module.exports = (repo) => { - const options = Object.assign({}, clone(ipfsOptions), { - repo - }) - - return IPFS.create(options) -} diff --git a/test/utils/create-libp2p.js b/test/utils/create-libp2p.js new file mode 100644 index 0000000..2322164 --- /dev/null +++ b/test/utils/create-libp2p.js @@ -0,0 +1,68 @@ +'use strict' + +const Libp2p = require('libp2p') +const WS = require('libp2p-websockets') +const Multiplex = require('libp2p-mplex') +const SECIO = require('libp2p-secio') +const GossipSub = require('libp2p-gossipsub') +const PeerInfo = require('peer-info') + +const RELAY_MULTIADDR = '/ip4/127.0.0.1/tcp/24642/ws' + +const config = async () => { + return { + peerInfo: await PeerInfo.create(), + dialer: { + maxParallelDials: 150, // 150 total parallel multiaddr dials + maxDialsPerPeer: 4, // Allow 4 multiaddrs to be dialed per peer in parallel + dialTimeout: 10e3 // 10 second dial timeout per peer dial + }, + modules: { + transport: [ + WS + ], + streamMuxer: [ + Multiplex + ], + connEncryption: [ + SECIO + ], + pubsub: GossipSub + }, + config: { + peerDiscovery: { + autoDial: false, + bootstrap: { + enabled: false + } + }, + pubsub: { + enabled: true, + emitSelf: true + } + } + } +} + +module.exports = async (otherNode) => { + const node = new Libp2p(await config()) + + await node.start() + + // connect to relay peer + await node.dial(RELAY_MULTIADDR) + + // both nodes created, get them to dial each other via the relay + if (otherNode) { + const relayId = node.connections.keys().next().value + const otherNodeId = otherNode.peerInfo.id.toB58String() + const nodeId = node.peerInfo.id.toB58String() + + await node.dial(`${RELAY_MULTIADDR}/p2p/${relayId}/p2p-circuit/p2p/${otherNodeId}`) + await otherNode.dial(`${RELAY_MULTIADDR}/p2p/${relayId}/p2p-circuit/p2p/${nodeId}`) + } + + return node +} + +module.exports.config = config diff --git a/test/utils/create-repo-browser.js b/test/utils/create-repo-browser.js deleted file mode 100644 index e72fec1..0000000 --- a/test/utils/create-repo-browser.js +++ /dev/null @@ -1,27 +0,0 @@ -/* global self */ -'use strict' - -const IPFSRepo = require('ipfs-repo') - -const idb = self.indexedDB || - self.mozIndexedDB || - self.webkitIndexedDB || - self.msIndexedDB - -function createTempRepo (repoPath) { - repoPath = repoPath || '/tmp/ipfs-test-' + Math.random().toString().substring(2, 8) - - const repo = new IPFSRepo(repoPath) - - repo.teardown = (done) => { - repo.close(() => { - idb.deleteDatabase(repoPath) - idb.deleteDatabase(repoPath + '/blocks') - done() - }) - } - - return repo -} - -module.exports = createTempRepo From c610140101cb21831c39f5075addacfc3b934689 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 24 Jan 2020 18:03:19 +0000 Subject: [PATCH 6/6] fix: fix linting --- README.md | 2 +- test/utils/clean.js | 15 -------- test/utils/create-repo.js | 30 --------------- test/utils/testCommon.js | 79 --------------------------------------- 4 files changed, 1 insertion(+), 125 deletions(-) delete mode 100644 test/utils/clean.js delete mode 100644 test/utils/create-repo.js delete mode 100644 test/utils/testCommon.js diff --git a/README.md b/README.md index 2b97306..fa3d41c 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![made by Protocol Labs](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai) [![Freenode](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) -[![Build Status](https://travis-ci.org/ipfs-shipyard/ipfs-pubsub-room.svg?branch=master)](https://travis-ci.org/ipfs-shipyard/ipfs-pubsub-room) +[![Build Status](https://travis-ci.com/ipfs-shipyard/ipfs-pubsub-room.svg?branch=master)](https://travis-ci.com/ipfs-shipyard/ipfs-pubsub-room) > Creates a room based on a LibP2P pub-sub channel. Emits membership events, listens for messages, broadcast and direct messeges to peers. diff --git a/test/utils/clean.js b/test/utils/clean.js deleted file mode 100644 index 13752b5..0000000 --- a/test/utils/clean.js +++ /dev/null @@ -1,15 +0,0 @@ -'use strict' - -const rimraf = require('rimraf') -const fs = require('fs') - -module.exports = (dir) => { - try { - fs.accessSync(dir) - } catch (err) { - // Does not exist so all good - return - } - - rimraf.sync(dir) -} diff --git a/test/utils/create-repo.js b/test/utils/create-repo.js deleted file mode 100644 index 26e4471..0000000 --- a/test/utils/create-repo.js +++ /dev/null @@ -1,30 +0,0 @@ -'use strict' - -const IPFSRepo = require('ipfs-repo') -const clean = require('./clean') - -function createTempRepo () { - const repoPath = '/tmp/ipfs-test-' + Math.random().toString().substring(2, 8) - let destroyed = false - - const repo = new IPFSRepo(repoPath) - - repo.teardown = async () => { - if (destroyed) { - return - } - destroyed = true - - try { - await repo.close() - } catch (err) { - // ignore err, might have been closed already - } - - clean(repoPath) - } - - return repo -} - -module.exports = createTempRepo diff --git a/test/utils/testCommon.js b/test/utils/testCommon.js deleted file mode 100644 index 98c72dc..0000000 --- a/test/utils/testCommon.js +++ /dev/null @@ -1,79 +0,0 @@ -'use strict' - -const path = require('path') -const fs = !process.browser && require('fs') -const rimraf = !process.browser && require('rimraf') - -let dbidx = 0 - -function location () { - return path.join(__dirname, '_leveldown_test_db_' + dbidx++) -} - -function lastLocation () { - return path.join(__dirname, '_leveldown_test_db_' + dbidx) -} - -function cleanup (callback) { - if (process.browser) { return callback() } - - fs.readdir(__dirname, function (err, list) { - if (err) return callback(err) - - list = list.filter(function (f) { - return (/^_leveldown_test_db_/).test(f) - }) - - if (!list.length) { return callback() } - - let ret = 0 - - list.forEach(function (f) { - rimraf(path.join(__dirname, f), function (err) { - if (err) { - callback(err) - return // early - } - if (++ret === list.length) { - callback() - } - }) - }) - }) -} - -function setUp (t) { - cleanup(function (err) { - t.error(err, 'cleanup returned an error') - t.end() - }) -} - -function tearDown (t) { - setUp(t) // same cleanup! -} - -function collectEntries (iterator, callback) { - const data = [] - const next = function () { - iterator.next(function (err, key, value) { - if (err) return callback(err) - if (!arguments.length) { - callback(err, data) - } else { - data.push({ key: key, value: String(value) }) - setTimeout(next, 0) - } - }) - } - next() -} - -module.exports = { - location: location, - cleanup: cleanup, - lastLocation: lastLocation, - setUp: setUp, - tearDown: tearDown, - collectEntries: collectEntries -}