From 797d8f0cf1620dd374cc2ea1fbbbc6c8e0542ad7 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 6 Nov 2019 15:47:11 +0100 Subject: [PATCH] feat: registrar (#471) * feat: peer-store v0 * feat: registrar * chore: apply suggestions from code review Co-Authored-By: Jacob Heun * chore: address review * chore: support multiple conns * chore: address review * fix: no remote peer from topology on disconnect --- src/connection-manager/topology.js | 108 ++++++++++++++ src/get-peer-info.js | 12 +- src/index.js | 8 ++ src/registrar.js | 139 ++++++++++++++++++ test/registrar/registrar.node.js | 57 ++++++++ test/registrar/registrar.spec.js | 224 +++++++++++++++++++++++++++++ test/registrar/utils.js | 50 +++++++ test/utils/base-options.js | 4 +- 8 files changed, 594 insertions(+), 8 deletions(-) create mode 100644 src/connection-manager/topology.js create mode 100644 src/registrar.js create mode 100644 test/registrar/registrar.node.js create mode 100644 test/registrar/registrar.spec.js create mode 100644 test/registrar/utils.js diff --git a/src/connection-manager/topology.js b/src/connection-manager/topology.js new file mode 100644 index 0000000000..2c2a877919 --- /dev/null +++ b/src/connection-manager/topology.js @@ -0,0 +1,108 @@ +'use strict' + +const assert = require('assert') + +class Topology { + /** + * @param {Object} props + * @param {number} props.min minimum needed connections (default: 0) + * @param {number} props.max maximum needed connections (default: Infinity) + * @param {Array} props.multicodecs protocol multicodecs + * @param {Object} props.handlers + * @param {function} props.handlers.onConnect protocol "onConnect" handler + * @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler + * @constructor + */ + constructor ({ + min = 0, + max = Infinity, + multicodecs, + handlers + }) { + assert(multicodecs, 'one or more multicodec should be provided') + assert(handlers, 'the handlers should be provided') + assert(handlers.onConnect && typeof handlers.onConnect === 'function', + 'the \'onConnect\' handler must be provided') + assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function', + 'the \'onDisconnect\' handler must be provided') + + this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs] + this.min = min + this.max = max + + // Handlers + this._onConnect = handlers.onConnect + this._onDisconnect = handlers.onDisconnect + + this.peers = new Map() + this._registrar = undefined + + this._onProtocolChange = this._onProtocolChange.bind(this) + } + + set registrar (registrar) { + this._registrar = registrar + this._registrar.peerStore.on('change:protocols', this._onProtocolChange) + + // Update topology peers + this._updatePeers(this._registrar.peerStore.peers.values()) + } + + /** + * Update topology. + * @param {Array} peerInfoIterable + * @returns {void} + */ + _updatePeers (peerInfoIterable) { + for (const peerInfo of peerInfoIterable) { + if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) { + // Add the peer regardless of whether or not there is currently a connection + this.peers.set(peerInfo.id.toB58String(), peerInfo) + // If there is a connection, call _onConnect + const connection = this._registrar.getConnection(peerInfo) + connection && this._onConnect(peerInfo, connection) + } else { + // Remove any peers we might be tracking that are no longer of value to us + this.peers.delete(peerInfo.id.toB58String()) + } + } + } + + /** + * Notify protocol of peer disconnected. + * @param {PeerInfo} peerInfo + * @param {Error} [error] + * @returns {void} + */ + disconnect (peerInfo, error) { + this._onDisconnect(peerInfo, error) + } + + /** + * Check if a new peer support the multicodecs for this topology. + * @param {Object} props + * @param {PeerInfo} props.peerInfo + * @param {Array} props.protocols + */ + _onProtocolChange ({ peerInfo, protocols }) { + const existingPeer = this.peers.get(peerInfo.id.toB58String()) + const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol)) + + // Not supporting the protocol anymore? + if (existingPeer && hasProtocol.length === 0) { + this._onDisconnect({ + peerInfo + }) + } + + // New to protocol support + for (const protocol of protocols) { + if (this.multicodecs.includes(protocol)) { + this._updatePeers([peerInfo]) + return + } + } + } +} + +module.exports = Topology diff --git a/src/get-peer-info.js b/src/get-peer-info.js index b75757f5e6..b74caa3ec1 100644 --- a/src/get-peer-info.js +++ b/src/get-peer-info.js @@ -7,14 +7,14 @@ const errCode = require('err-code') /** * Converts the given `peer` to a `PeerInfo` instance. - * The `PeerBook` will be checked for the resulting peer, and - * the peer will be updated in the `PeerBook`. + * The `PeerStore` will be checked for the resulting peer, and + * the peer will be updated in the `PeerStore`. * * @param {PeerInfo|PeerId|Multiaddr|string} peer - * @param {PeerBook} peerBook + * @param {PeerStore} peerStore * @returns {PeerInfo} */ -function getPeerInfo (peer, peerBook) { +function getPeerInfo (peer, peerStore) { if (typeof peer === 'string') { peer = multiaddr(peer) } @@ -38,7 +38,7 @@ function getPeerInfo (peer, peerBook) { addr && peer.multiaddrs.add(addr) - return peerBook ? peerBook.put(peer) : peer + return peerStore ? peerStore.put(peer) : peer } /** @@ -54,7 +54,7 @@ function getPeerInfoRemote (peer, libp2p) { let peerInfo try { - peerInfo = getPeerInfo(peer, libp2p.peerBook) + peerInfo = getPeerInfo(peer, libp2p.peerStore) } catch (err) { return Promise.reject(errCode( new Error(`${peer} is not a valid peer type`), diff --git a/src/index.js b/src/index.js index df830918ae..998d4a6bae 100644 --- a/src/index.js +++ b/src/index.js @@ -29,6 +29,7 @@ const Dialer = require('./dialer') const TransportManager = require('./transport-manager') const Upgrader = require('./upgrader') const PeerStore = require('./peer-store') +const Registrar = require('./registrar') const notStarted = (action, state) => { return errCode( @@ -71,10 +72,13 @@ class Libp2p extends EventEmitter { const peerInfo = getPeerInfo(connection.remotePeer) this.peerStore.put(peerInfo) + this.registrar.onConnect(peerInfo, connection) this.emit('peer:connect', peerInfo) }, onConnectionEnd: (connection) => { const peerInfo = getPeerInfo(connection.remotePeer) + + this.registrar.onDisconnect(peerInfo, connection) this.emit('peer:disconnect', peerInfo) } }) @@ -108,6 +112,10 @@ class Libp2p extends EventEmitter { transportManager: this.transportManager }) + this.registrar = new Registrar({ peerStore: this.peerStore }) + this.handle = this.handle.bind(this) + this.registrar.handle = this.handle + // Attach private network protector if (this._modules.connProtector) { this.upgrader.protector = this._modules.connProtector diff --git a/src/registrar.js b/src/registrar.js new file mode 100644 index 0000000000..c6e4439c00 --- /dev/null +++ b/src/registrar.js @@ -0,0 +1,139 @@ +'use strict' + +const assert = require('assert') +const debug = require('debug') +const log = debug('libp2p:peer-store') +log.error = debug('libp2p:peer-store:error') + +const { Connection } = require('libp2p-interfaces/src/connection') +const PeerInfo = require('peer-info') +const Toplogy = require('./connection-manager/topology') + +/** + * Responsible for notifying registered protocols of events in the network. + */ +class Registrar { + /** + * @param {Object} props + * @param {PeerStore} props.peerStore + * @constructor + */ + constructor ({ peerStore }) { + this.peerStore = peerStore + + /** + * Map of connections per peer + * TODO: this should be handled by connectionManager + * @type {Map>} + */ + this.connections = new Map() + + /** + * Map of topologies + * + * @type {Map} + */ + this.topologies = new Map() + + this._handle = undefined + } + + get handle () { + return this._handle + } + + set handle (handle) { + this._handle = handle + } + + /** + * Add a new connected peer to the record + * TODO: this should live in the ConnectionManager + * @param {PeerInfo} peerInfo + * @param {Connection} conn + * @returns {void} + */ + onConnect (peerInfo, conn) { + assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') + assert(Connection.isConnection(conn), 'conn must be an instance of interface-connection') + + const id = peerInfo.id.toB58String() + const storedConn = this.connections.get(id) + + if (storedConn) { + storedConn.push(conn) + } else { + this.connections.set(id, [conn]) + } + } + + /** + * Remove a disconnected peer from the record + * TODO: this should live in the ConnectionManager + * @param {PeerInfo} peerInfo + * @param {Connection} connection + * @param {Error} [error] + * @returns {void} + */ + onDisconnect (peerInfo, connection, error) { + assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') + + const id = peerInfo.id.toB58String() + let storedConn = this.connections.get(id) + + if (storedConn && storedConn.length > 1) { + storedConn = storedConn.filter((conn) => conn.id === connection.id) + } else if (storedConn) { + for (const [, topology] of this.topologies) { + topology.disconnect(peerInfo, error) + } + + this.connections.delete(peerInfo.id.toB58String()) + } + } + + /** + * Get a connection with a peer. + * @param {PeerInfo} peerInfo + * @returns {Connection} + */ + getConnection (peerInfo) { + assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') + + // TODO: what should we return + return this.connections.get(peerInfo.id.toB58String())[0] + } + + /** + * Register handlers for a set of multicodecs given + * @param {Object} topologyProps properties for topology + * @param {Array|string} topologyProps.multicodecs + * @param {Object} topologyProps.handlers + * @param {function} topologyProps.handlers.onConnect + * @param {function} topologyProps.handlers.onDisconnect + * @return {string} registrar identifier + */ + register (topologyProps) { + // Create multicodec topology + const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() + const topology = new Toplogy(topologyProps) + + this.topologies.set(id, topology) + + // Set registrar + topology.registrar = this + + return id + } + + /** + * Unregister topology. + * @param {string} id registrar identifier + * @return {boolean} unregistered successfully + */ + unregister (id) { + return this.topologies.delete(id) + } +} + +module.exports = Registrar diff --git a/test/registrar/registrar.node.js b/test/registrar/registrar.node.js new file mode 100644 index 0000000000..b2045004c4 --- /dev/null +++ b/test/registrar/registrar.node.js @@ -0,0 +1,57 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') + +const mergeOptions = require('merge-options') + +const multiaddr = require('multiaddr') +const Libp2p = require('../../src') + +const baseOptions = require('../utils/base-options') +const peerUtils = require('../utils/creators/peer') +const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') + +describe('registrar on dial', () => { + let peerInfo + let remotePeerInfo + let libp2p + let remoteLibp2p + let remoteAddr + + before(async () => { + [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) + remoteLibp2p = new Libp2p(mergeOptions(baseOptions, { + peerInfo: remotePeerInfo + })) + + await remoteLibp2p.transportManager.listen([listenAddr]) + remoteAddr = remoteLibp2p.transportManager.getAddrs()[0] + }) + + after(async () => { + sinon.restore() + await remoteLibp2p.stop() + libp2p && await libp2p.stop() + }) + + it('should inform registrar of a new connection', async () => { + libp2p = new Libp2p(mergeOptions(baseOptions, { + peerInfo + })) + + sinon.spy(remoteLibp2p.registrar, 'onConnect') + + await libp2p.dial(remoteAddr) + expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1) + + const libp2pConn = libp2p.registrar.getConnection(remotePeerInfo) + expect(libp2pConn).to.exist() + + const remoteConn = remoteLibp2p.registrar.getConnection(peerInfo) + expect(remoteConn).to.exist() + }) +}) diff --git a/test/registrar/registrar.spec.js b/test/registrar/registrar.spec.js new file mode 100644 index 0000000000..ec7d1b6189 --- /dev/null +++ b/test/registrar/registrar.spec.js @@ -0,0 +1,224 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const pDefer = require('p-defer') + +const PeerInfo = require('peer-info') +const PeerStore = require('../../src/peer-store') +const Registrar = require('../../src/registrar') +const { createMockConnection } = require('./utils') + +const multicodec = '/test/1.0.0' + +describe('registrar', () => { + let peerStore, registrar + + describe('errors', () => { + beforeEach(() => { + peerStore = new PeerStore() + registrar = new Registrar({ peerStore }) + }) + + it('should fail to register a protocol if no multicodec is provided', () => { + try { + registrar.register() + } catch (err) { + expect(err).to.exist() + return + } + throw new Error('should fail to register a protocol if no multicodec is provided') + }) + + it('should fail to register a protocol if no handlers are provided', () => { + const topologyProps = { + multicodecs: multicodec + } + + try { + registrar.register(topologyProps) + } catch (err) { + expect(err).to.exist() + return + } + throw new Error('should fail to register a protocol if no handlers are provided') + }) + + it('should fail to register a protocol if the onConnect handler is not provided', () => { + const topologyProps = { + multicodecs: multicodec, + handlers: { + onDisconnect: () => { } + } + } + + try { + registrar.register(topologyProps) + } catch (err) { + expect(err).to.exist() + return + } + throw new Error('should fail to register a protocol if the onConnect handler is not provided') + }) + + it('should fail to register a protocol if the onDisconnect handler is not provided', () => { + const topologyProps = { + multicodecs: multicodec, + handlers: { + onConnect: () => { } + } + } + + try { + registrar.register(topologyProps) + } catch (err) { + expect(err).to.exist() + return + } + throw new Error('should fail to register a protocol if the onDisconnect handler is not provided') + }) + }) + + describe('registration', () => { + beforeEach(() => { + peerStore = new PeerStore() + registrar = new Registrar({ peerStore }) + }) + + it('should be able to register a protocol', () => { + const topologyProps = { + handlers: { + onConnect: () => { }, + onDisconnect: () => { } + }, + multicodecs: multicodec + } + + const identifier = registrar.register(topologyProps) + + expect(identifier).to.exist() + }) + + it('should be able to unregister a protocol', () => { + const topologyProps = { + handlers: { + onConnect: () => { }, + onDisconnect: () => { } + }, + multicodecs: multicodec + } + + const identifier = registrar.register(topologyProps) + const success = registrar.unregister(identifier) + + expect(success).to.eql(true) + }) + + it('should fail to unregister if no register was made', () => { + const success = registrar.unregister('bad-identifier') + + expect(success).to.eql(false) + }) + + it('should call onConnect handler for connected peers after register', async () => { + const onConnectDefer = pDefer() + const onDisconnectDefer = pDefer() + + // Setup connections before registrar + const conn = await createMockConnection() + const remotePeerInfo = await PeerInfo.create(conn.remotePeer) + + // Add protocol to peer + remotePeerInfo.protocols.add(multicodec) + + // Add connected peer to peerStore and registrar + peerStore.put(remotePeerInfo) + registrar.onConnect(remotePeerInfo, conn) + expect(registrar.connections.size).to.eql(1) + + const topologyProps = { + multicodecs: multicodec, + handlers: { + onConnect: (peerInfo, connection) => { + expect(peerInfo.id.toB58String()).to.eql(remotePeerInfo.id.toB58String()) + expect(connection.id).to.eql(conn.id) + + onConnectDefer.resolve() + }, + onDisconnect: (peerInfo) => { + expect(peerInfo.id.toB58String()).to.eql(remotePeerInfo.id.toB58String()) + + onDisconnectDefer.resolve() + } + } + } + + // Register protocol + const identifier = registrar.register(topologyProps) + const topology = registrar.topologies.get(identifier) + + // Topology created + expect(topology).to.exist() + expect(topology.peers.size).to.eql(1) + + registrar.onDisconnect(remotePeerInfo) + expect(registrar.connections.size).to.eql(0) + expect(topology.peers.size).to.eql(1) // topology should keep the peer + + // Wait for handlers to be called + return Promise.all([ + onConnectDefer.promise, + onDisconnectDefer.promise + ]) + }) + + it('should call onConnect handler after register, once a peer is connected and protocols are updated', async () => { + const onConnectDefer = pDefer() + const onDisconnectDefer = pDefer() + + const topologyProps = { + multicodecs: multicodec, + handlers: { + onConnect: () => { + onConnectDefer.resolve() + }, + onDisconnect: () => { + onDisconnectDefer.resolve() + } + } + } + + // Register protocol + const identifier = registrar.register(topologyProps) + const topology = registrar.topologies.get(identifier) + + // Topology created + expect(topology).to.exist() + expect(topology.peers.size).to.eql(0) + expect(registrar.connections.size).to.eql(0) + + // Setup connections before registrar + const conn = await createMockConnection() + const peerInfo = await PeerInfo.create(conn.remotePeer) + + // Add connected peer to peerStore and registrar + peerStore.put(peerInfo) + registrar.onConnect(peerInfo, conn) + + // Add protocol to peer and update it + peerInfo.protocols.add(multicodec) + peerStore.put(peerInfo) + + await onConnectDefer.promise + expect(topology.peers.size).to.eql(1) + + // Remove protocol to peer and update it + peerInfo.protocols.delete(multicodec) + peerStore.put(peerInfo) + + await onDisconnectDefer.promise + }) + }) +}) diff --git a/test/registrar/utils.js b/test/registrar/utils.js new file mode 100644 index 0000000000..4b4e04839f --- /dev/null +++ b/test/registrar/utils.js @@ -0,0 +1,50 @@ +'use strict' + +const { Connection } = require('libp2p-interfaces/src/connection') +const multiaddr = require('multiaddr') + +const pair = require('it-pair') + +const peerUtils = require('../utils/creators/peer') + +module.exports.createMockConnection = async (properties = {}) => { + const localAddr = multiaddr('/ip4/127.0.0.1/tcp/8080') + const remoteAddr = multiaddr('/ip4/127.0.0.1/tcp/8081') + + const [localPeer, remotePeer] = await peerUtils.createPeerInfoFromFixture(2) + const openStreams = [] + let streamId = 0 + + return new Connection({ + localPeer: localPeer.id, + remotePeer: remotePeer.id, + localAddr, + remoteAddr, + stat: { + timeline: { + open: Date.now() - 10, + upgraded: Date.now() + }, + direction: 'outbound', + encryption: '/secio/1.0.0', + multiplexer: '/mplex/6.7.0' + }, + newStream: (protocols) => { + const id = streamId++ + const stream = pair() + + stream.close = () => stream.sink([]) + stream.id = id + + openStreams.push(stream) + + return { + stream, + protocol: protocols[0] + } + }, + close: () => { }, + getStreams: () => openStreams, + ...properties + }) +} diff --git a/test/utils/base-options.js b/test/utils/base-options.js index 038df59219..68163eeed5 100644 --- a/test/utils/base-options.js +++ b/test/utils/base-options.js @@ -2,12 +2,12 @@ const Transport = require('libp2p-tcp') const Muxer = require('libp2p-mplex') -const mockCrypto = require('../utils/mockCrypto') +const Crypto = require('../../src/insecure/plaintext') module.exports = { modules: { transport: [Transport], streamMuxer: [Muxer], - connEncryption: [mockCrypto] + connEncryption: [Crypto] } }