From bbc18b7db6b03827dad77471bc58490e913b0875 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 28 Oct 2019 09:55:24 +0100 Subject: [PATCH] chore: apply suggestions from code review Co-Authored-By: Jacob Heun --- README.md | 8 +++---- src/index.js | 56 ++++++++++++++++++++++++++++++++++---------- src/peer.js | 5 ++++ test/pubsub.spec.js | 57 +++++++++++++++++++++++++++------------------ test/utils.spec.js | 2 +- test/utils/index.js | 49 +++++++++++++++++++++++++++++++++++--- 6 files changed, 134 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 8ac7de9..d017240 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ js-libp2p-pubsub [![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) [![](https://img.shields.io/badge/pm-waffle-yellow.svg?style=flat-square)](https://waffle.io/libp2p/js-libp2p-pubsub) -> libp2p-pubsub consists on the base protocol for libp2p pubsub implementations. This module is responsible for registering the protocol in libp2p, as well as all the logic regarding pubsub connections with other peers. +> libp2p-pubsub is the base protocol for libp2p pubsub implementations. This module is responsible for registering the protocol with libp2p, as well as managing the logic regarding pubsub connections with other peers. ## Lead Maintainer @@ -34,11 +34,11 @@ js-libp2p-pubsub ## Usage -`libp2p-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algortithm, instead of also needing to create the setup for it. +`libp2p-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algorithm, instead of also needing to create the setup for it. A pubsub implementation **MUST** override the `_processMessages`, `publish`, `subscribe`, `unsubscribe` and `getTopics` functions. -Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to add custom logic on them. It is important pointing out that `start` and `stop` **must** call `super`. The `start` function is responsible for registering the pubsub protocol onto the libp2p node, while the `stop` function is responsible for unregistering the pubsub protocol and shutting down every connection +Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to customize their logic. Implementations overriding `start` and `stop` **MUST** call `super`. The `start` function is responsible for registering the pubsub protocol with libp2p, while the `stop` function is responsible for unregistering the pubsub protocol and closing pubsub connections. All the remaining functions **MUST NOT** be overwritten. @@ -183,7 +183,7 @@ Get a list of the peer-ids that are subscribed to one topic. | Type | Description | |------|-------------| -| `Array` | Array of base-58 peer id's | +| `Array` | Array of base-58 PeerId's | ### Validate diff --git a/src/index.js b/src/index.js index 54188ab..b971187 100644 --- a/src/index.js +++ b/src/index.js @@ -25,6 +25,7 @@ class PubsubBaseProtocol extends EventEmitter { * @param {Array|string} props.multicodecs protocol identificers to connect * @param {PeerInfo} props.peerInfo peer's peerInfo * @param {Object} props.registrar registrar for libp2p protocols + * @param {function} props.registrar.handle * @param {function} props.registrar.register * @param {function} props.registrar.unregister * @param {boolean} [props.signMessages] if messages should be signed, defaults to true @@ -44,7 +45,8 @@ class PubsubBaseProtocol extends EventEmitter { assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') // registrar handling - assert(registrar && typeof registrar === 'object', 'a registrar object is required') // TODO: isRegistrar when it's implemented + assert(registrar && typeof registrar === 'object', 'a registrar object is required') + assert(typeof registrar.handle === 'function', 'a handle function must be provided in registrar') assert(typeof registrar.register === 'function', 'a register function must be provided in registrar') assert(typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar') @@ -84,13 +86,15 @@ class PubsubBaseProtocol extends EventEmitter { */ this.strictSigning = strictSigning + this._registrarId = undefined + this._onIncomingStream = this._onIncomingStream.bind(this) this._onPeerConnected = this._onPeerConnected.bind(this) this._onPeerDisconnected = this._onPeerDisconnected.bind(this) } /** * Register the pubsub protocol onto the libp2p node. - * @returns {Promise} + * @returns {Promise} */ async start () { if (this.started) { @@ -98,8 +102,11 @@ class PubsubBaseProtocol extends EventEmitter { } this.log('starting') + // Incoming streams + this.registrar.handle(this.multicodecs, this._onIncomingStream) + // register protocol with multicodec and handlers - await this.registrar.register(this.multicodecs, { + this._registrarId = await this.registrar.register(this.multicodecs, { onConnect: this._onPeerConnected, onDisconnect: this._onPeerDisconnected }) @@ -118,7 +125,7 @@ class PubsubBaseProtocol extends EventEmitter { } // unregister protocol and handlers - await this.registrar.unregister(this.multicodecs) + await this.registrar.unregister(this._registrarId) this.log('stopping') this.peers.forEach((peer) => peer.close()) @@ -128,20 +135,38 @@ class PubsubBaseProtocol extends EventEmitter { this.log('stopped') } + /** + * On an incoming stream event. + * @private + * @param {Object} props + * @param {DuplexStream} props.strean + * @param {PeerId} props.remotePeer remote peer-id + */ + async _onIncomingStream ({ stream, remotePeer }) { + const peerInfo = await PeerInfo.create(remotePeer) + const idB58Str = peerInfo.id.toB58String() + + const peer = this._addPeer(new Peer(peerInfo)) + + peer.attachConnection(stream) + this._processMessages(idB58Str, stream, peer) + } + /** * Registrar notifies a connection successfully with pubsub protocol. * @private * @param {PeerInfo} peerInfo remote peer info * @param {Connection} conn connection to the peer */ - _onPeerConnected (peerInfo, conn) { + async _onPeerConnected (peerInfo, conn) { const idB58Str = peerInfo.id.toB58String() this.log('connected', idB58Str) const peer = this._addPeer(new Peer(peerInfo)) - peer.attachConnection(conn) + const { stream } = await conn.newStream(this.multicodecs) - this._processMessages(idB58Str, conn, peer) + peer.attachConnection(stream) + this._processMessages(idB58Str, stream, peer) } /** @@ -166,8 +191,8 @@ class PubsubBaseProtocol extends EventEmitter { */ _addPeer (peer) { const id = peer.info.id.toB58String() - let existing = this.peers.get(id) + if (!existing) { this.log('new peer', id) this.peers.set(id, peer) @@ -175,6 +200,7 @@ class PubsubBaseProtocol extends EventEmitter { peer.once('close', () => this._removePeer(peer)) } + ++existing._references return existing } @@ -188,8 +214,14 @@ class PubsubBaseProtocol extends EventEmitter { _removePeer (peer) { const id = peer.info.id.toB58String() - this.log('delete peer', id) - this.peers.delete(id) + this.log('remove', id, peer._references) + + // Only delete when no one else is referencing this peer. + if (--peer._references === 0) { + this.log('delete peer', id) + this.peers.delete(id) + } + return peer } @@ -202,14 +234,14 @@ class PubsubBaseProtocol extends EventEmitter { // If strict signing is on and we have no signature, abort if (this.strictSigning && !message.signature) { this.log('Signing required and no signature was present, dropping message:', message) - return Promise.resolve(false) + return false } // Check the message signature if present if (message.signature) { return verifySignature(message) } else { - return Promise.resolve(true) + return true } } diff --git a/src/peer.js b/src/peer.js index 0939f93..efaa067 100644 --- a/src/peer.js +++ b/src/peer.js @@ -34,6 +34,8 @@ class Peer extends EventEmitter { * @type {Pushable} */ this.stream = null + + this._references = 0 } /** @@ -164,6 +166,9 @@ class Peer extends EventEmitter { * @returns {void} */ close () { + // Force removal of peer + this._references = 1 + // End the pushable if (this.stream) { this.stream.end() diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 1e68d02..00d12a6 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,11 +6,16 @@ chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect const sinon = require('sinon') -const DuplexPair = require('it-pair/duplex') const PubsubBaseProtocol = require('../src') const { randomSeqno } = require('../src/utils') -const { createPeerInfo, mockRegistrar, PubsubImplementation } = require('./utils') +const { + createPeerInfo, + createMockRegistrar, + mockRegistrar, + PubsubImplementation, + ConnectionPair +} = require('./utils') describe('pubsub base protocol', () => { describe('should start and stop properly', () => { @@ -20,6 +25,7 @@ describe('pubsub base protocol', () => { beforeEach(async () => { const peerInfo = await createPeerInfo() sinonMockRegistrar = { + handle: sinon.stub(), register: sinon.stub(), unregister: sinon.stub() } @@ -40,6 +46,7 @@ describe('pubsub base protocol', () => { it('should be able to start and stop', async () => { await pubsub.start() + expect(sinonMockRegistrar.handle.calledOnce).to.be.true() expect(sinonMockRegistrar.register.calledOnce).to.be.true() await pubsub.stop() @@ -49,6 +56,7 @@ describe('pubsub base protocol', () => { it('should not throw to start if already started', async () => { await pubsub.start() await pubsub.start() + expect(sinonMockRegistrar.handle.calledOnce).to.be.true() expect(sinonMockRegistrar.register.calledOnce).to.be.true() await pubsub.stop() @@ -131,22 +139,13 @@ describe('pubsub base protocol', () => { const registrarRecordA = {} const registrarRecordB = {} - const registrar = (registrarRecord) => ({ - register: (multicodecs, handlers) => { - registrarRecord[multicodecs[0]] = handlers - }, - unregister: (multicodecs) => { - delete registrarRecord[multicodecs[0]] - } - }) - // mount pubsub beforeEach(async () => { peerInfoA = await createPeerInfo() peerInfoB = await createPeerInfo() - pubsubA = new PubsubImplementation(protocol, peerInfoA, registrar(registrarRecordA)) - pubsubB = new PubsubImplementation(protocol, peerInfoB, registrar(registrarRecordB)) + pubsubA = new PubsubImplementation(protocol, peerInfoA, createMockRegistrar(registrarRecordA)) + pubsubB = new PubsubImplementation(protocol, peerInfoB, createMockRegistrar(registrarRecordB)) }) // start pubsub @@ -169,29 +168,41 @@ describe('pubsub base protocol', () => { ]) }) - it('should handle onConnect as expected', () => { + it('should handle onConnect as expected', async () => { const onConnectA = registrarRecordA[protocol].onConnect - const onConnectB = registrarRecordB[protocol].onConnect + const handlerB = registrarRecordB[protocol].handler // Notice peers of connection - const [d0, d1] = DuplexPair() - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) + const [c0, c1] = ConnectionPair() + + await onConnectA(peerInfoB, c0) + await handlerB({ + protocol, + stream: c1.stream, + remotePeer: peerInfoA.id + }) expect(pubsubA.peers.size).to.be.eql(1) expect(pubsubB.peers.size).to.be.eql(1) }) - it('should handle onDisconnect as expected', () => { + it('should handle onDisconnect as expected', async () => { const onConnectA = registrarRecordA[protocol].onConnect const onDisconnectA = registrarRecordA[protocol].onDisconnect - const onConnectB = registrarRecordB[protocol].onConnect + const handlerB = registrarRecordB[protocol].handler const onDisconnectB = registrarRecordB[protocol].onDisconnect // Notice peers of connection - const [d0, d1] = DuplexPair() - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) + const [c0, c1] = ConnectionPair() + + await onConnectA(peerInfoB, c0) + await handlerB({ + protocol, + stream: c1.stream, + remotePeer: peerInfoA.id + }) + + // Notice peers of disconnect onDisconnectA(peerInfoB) onDisconnectB(peerInfoA) diff --git a/test/utils.spec.js b/test/utils.spec.js index 22499ea..d1e00b0 100644 --- a/test/utils.spec.js +++ b/test/utils.spec.js @@ -1,7 +1,7 @@ /* eslint-env mocha */ 'use strict' -const expect = require('chai').expect +const { expect } = require('chai') const utils = require('../src/utils') diff --git a/test/utils/index.js b/test/utils/index.js index 90cc9f0..15c32e3 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -2,6 +2,7 @@ const lp = require('it-length-prefixed') const pipe = require('it-pipe') +const DuplexPair = require('it-pair/duplex') const PeerId = require('peer-id') const PeerInfo = require('peer-info') @@ -41,7 +42,7 @@ class PubsubImplementation extends PubsubBaseProtocol { pipe( conn, lp.decode(), - async function collect (source) { + async function (source) { for await (const val of source) { const rpc = message.rpc.RPC.decode(val) @@ -55,10 +56,52 @@ class PubsubImplementation extends PubsubBaseProtocol { exports.PubsubImplementation = PubsubImplementation exports.mockRegistrar = { - register: (multicodec, handlers) => { + handle: (multicodecs, handlers) => { }, - unregister: (multicodec) => { + register: (multicodecs, handlers) => { + }, + unregister: (id) => { + + } +} + +exports.createMockRegistrar = (registrarRecord) => ({ + handle: (multicodecs, handler) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + handler + } + }, + register: (multicodecs, handlers) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + ...handlers + } + + return multicodecs[0] + }, + unregister: (id) => { + delete registrarRecord[id] } +}) + +exports.ConnectionPair = () => { + const [d0, d1] = DuplexPair() + + return [ + { + stream: d0, + newStream: () => Promise.resolve({ stream: d0 }) + }, + { + stream: d1, + newStream: () => Promise.resolve({ stream: d1 }) + } + ] }