diff --git a/doc/API.md b/doc/API.md index 75c0af683f..800807047b 100644 --- a/doc/API.md +++ b/doc/API.md @@ -11,12 +11,12 @@ * [`handle`](#handle) * [`unhandle`](#unhandle) * [`ping`](#ping) - * [`peerRouting.findPeer`](#peerroutingfindpeer) * [`contentRouting.findProviders`](#contentroutingfindproviders) * [`contentRouting.provide`](#contentroutingprovide) * [`contentRouting.put`](#contentroutingput) * [`contentRouting.get`](#contentroutingget) * [`contentRouting.getMany`](#contentroutinggetmany) + * [`peerRouting.findPeer`](#peerroutingfindpeer) * [`peerStore.addressBook.add`](#peerstoreaddressbookadd) * [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete) * [`peerStore.addressBook.get`](#peerstoreaddressbookget) @@ -34,7 +34,9 @@ * [`pubsub.publish`](#pubsubpublish) * [`pubsub.subscribe`](#pubsubsubscribe) * [`pubsub.unsubscribe`](#pubsubunsubscribe) + * [`connectionManager.get`](#connectionmanagerget) * [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue) + * [`connectionManager.size`](#connectionmanagersize) * [`metrics.global`](#metricsglobal) * [`metrics.peers`](#metricspeers) * [`metrics.protocols`](#metricsprotocols) @@ -42,6 +44,7 @@ * [`metrics.forProtocol`](#metricsforprotocol) * [Events](#events) * [`libp2p`](#libp2p) + * [`libp2p.connectionManager`](#libp2pconnectionmanager) * [`libp2p.peerStore`](#libp2ppeerStore) * [Types](#types) * [`Stats`](#stats) @@ -999,6 +1002,28 @@ const handler = (msg) => { libp2p.pubsub.unsubscribe(topic, handler) ``` +### connectionManager.get + +Get a connection with a given peer, if it exists. + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| peerId | [`PeerId`][peer-id] | The peer to find | + +#### Returns + +| Type | Description | +|------|-------------| +| [`Connection`][connection] | Connection with the given peer | + +#### Example + +```js +libp2p.connectionManager.get(peerId) +``` + ### connectionManager.setPeerValue Enables users to change the value of certain peers in a range of 0 to 1. Peers with the lowest values will have their Connections pruned first, if any Connection Manager limits are exceeded. See [./CONFIGURATION.md#configuring-connection-manager](./CONFIGURATION.md#configuring-connection-manager) for details on how to configure these limits. @@ -1025,6 +1050,17 @@ libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1) libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0) ``` +### connectionManager.size + +Getter for obtaining the current number of open connections. + +#### Example + +```js +libp2p.connectionManager.size +// 10 +``` + ### metrics.global A [`Stats`](#stats) object of tracking the global bandwidth of the libp2p node. @@ -1126,21 +1162,23 @@ unless they are performing a specific action. See [peer discovery and auto dial] - `peer`: instance of [`PeerId`][peer-id] +### libp2p.connectionManager + #### A new connection to a peer has been opened This event will be triggered anytime a new Connection is established to another peer. -`libp2p.on('peer:connect', (peer) => {})` +`libp2p.connectionManager.on('peer:connect', (connection) => {})` -- `peer`: instance of [`PeerId`][peer-id] +- `connection`: instance of [`Connection`][connection] #### An existing connection to a peer has been closed This event will be triggered anytime we are disconnected from another peer, regardless of the circumstances of that disconnection. If we happen to have multiple connections to a peer, this event will **only** be triggered when the last connection is closed. -`libp2p.on('peer:disconnect', (peer) => {})` +`libp2p.connectionManager.on('peer:disconnect', (connection) => {})` -- `peer`: instance of [`PeerId`][peer-id] +- `connection`: instance of [`Connection`][connection] ### libp2p.peerStore diff --git a/src/circuit/circuit/hop.js b/src/circuit/circuit/hop.js index 0a5e71c8ba..c8ac0fddb0 100644 --- a/src/circuit/circuit/hop.js +++ b/src/circuit/circuit/hop.js @@ -41,7 +41,7 @@ module.exports.handleHop = async function handleHop ({ // Get the connection to the destination (stop) peer const destinationPeer = new PeerId(request.dstPeer.id) - const destinationConnection = circuit._registrar.getConnection(destinationPeer) + const destinationConnection = circuit._connectionManager.get(destinationPeer) if (!destinationConnection && !circuit._options.hop.active) { log('HOP request received but we are not connected to the destination peer') return streamHandler.end({ diff --git a/src/circuit/index.js b/src/circuit/index.js index be8326aaa7..c833f124ea 100644 --- a/src/circuit/index.js +++ b/src/circuit/index.js @@ -29,6 +29,7 @@ class Circuit { constructor ({ libp2p, upgrader }) { this._dialer = libp2p.dialer this._registrar = libp2p.registrar + this._connectionManager = libp2p.connectionManager this._upgrader = upgrader this._options = libp2p._config.relay this.addresses = libp2p.addresses @@ -107,7 +108,7 @@ class Circuit { const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId()) let disconnectOnFailure = false - let relayConnection = this._registrar.getConnection(relayPeer) + let relayConnection = this._connectionManager.get(relayPeer) if (!relayConnection) { relayConnection = await this._dialer.connectToPeer(relayAddr, options) disconnectOnFailure = true diff --git a/src/connection-manager/index.js b/src/connection-manager/index.js index 0868226f56..b6aad603b0 100644 --- a/src/connection-manager/index.js +++ b/src/connection-manager/index.js @@ -6,6 +6,10 @@ const LatencyMonitor = require('latency-monitor').default const debug = require('debug')('libp2p:connection-manager') const retimer = require('retimer') +const { EventEmitter } = require('events') + +const PeerId = require('peer-id') + const { ERR_INVALID_PARAMETERS } = require('../errors') @@ -22,7 +26,12 @@ const defaultOptions = { defaultPeerValue: 1 } -class ConnectionManager { +/** + * Responsible for managing known connections. + * @fires ConnectionManager#peer:connect Emitted when a new peer is connected. + * @fires ConnectionManager#peer:disconnect Emitted when a peer is disconnected. + */ +class ConnectionManager extends EventEmitter { /** * @constructor * @param {Libp2p} libp2p @@ -38,9 +47,11 @@ class ConnectionManager { * @param {Number} options.defaultPeerValue The value of the peer. Default=1 */ constructor (libp2p, options) { + super() + this._libp2p = libp2p - this._registrar = libp2p.registrar this._peerId = libp2p.peerId.toB58String() + this._options = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, options) if (this._options.maxConnections < this._options.minConnections) { throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS) @@ -48,20 +59,38 @@ class ConnectionManager { debug('options: %j', this._options) - this._metrics = libp2p.metrics + this._libp2p = libp2p + /** + * Map of peer identifiers to their peer value for pruning connections. + * @type {Map} + */ this._peerValues = new Map() - this._connections = new Map() + + /** + * Map of connections per peer + * @type {Map>} + */ + this.connections = new Map() + this._timer = null this._checkMetrics = this._checkMetrics.bind(this) } + /** + * Get current number of open connections. + */ + get size () { + return Array.from(this.connections.values()) + .reduce((accumulator, value) => accumulator + value.length, 0) + } + /** * Starts the Connection Manager. If Metrics are not enabled on libp2p * only event loop and connection limits will be monitored. */ start () { - if (this._metrics) { + if (this._libp2p.metrics) { this._timer = this._timer || retimer(this._checkMetrics, this._options.pollInterval) } @@ -77,13 +106,33 @@ class ConnectionManager { /** * Stops the Connection Manager + * @async */ - stop () { + async stop () { this._timer && this._timer.clear() this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure) + + await this._close() debug('stopped') } + /** + * Cleans up the connections + * @async + */ + async _close () { + // Close all connections we're tracking + const tasks = [] + for (const connectionList of this.connections.values()) { + for (const connection of connectionList) { + tasks.push(connection.close()) + } + } + + await tasks + this.connections.clear() + } + /** * Sets the value of the given peer. Peers with lower values * will be disconnected first. @@ -106,7 +155,7 @@ class ConnectionManager { * @private */ _checkMetrics () { - const movingAverages = this._metrics.global.movingAverages + const movingAverages = this._libp2p.metrics.global.movingAverages const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage() this._checkLimit('maxReceivedData', received) const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage() @@ -123,11 +172,20 @@ class ConnectionManager { */ onConnect (connection) { const peerId = connection.remotePeer.toB58String() - this._connections.set(connection.id, connection) + const storedConn = this.connections.get(peerId) + + if (storedConn) { + storedConn.push(connection) + } else { + this.connections.set(peerId, [connection]) + this.emit('peer:connect', connection) + } + if (!this._peerValues.has(peerId)) { this._peerValues.set(peerId, this._options.defaultPeerValue) } - this._checkLimit('maxConnections', this._connections.size) + + this._checkLimit('maxConnections', this.size) } /** @@ -135,8 +193,37 @@ class ConnectionManager { * @param {Connection} connection */ onDisconnect (connection) { - this._connections.delete(connection.id) - this._peerValues.delete(connection.remotePeer.toB58String()) + const peerId = connection.remotePeer.toB58String() + let storedConn = this.connections.get(peerId) + + if (storedConn && storedConn.length > 1) { + storedConn = storedConn.filter((conn) => conn.id !== connection.id) + this.connections.set(peerId, storedConn) + } else if (storedConn) { + this.connections.delete(peerId) + this._peerValues.delete(connection.remotePeer.toB58String()) + this.emit('peer:disconnect', connection) + } + } + + /** + * Get a connection with a peer. + * @param {PeerId} peerId + * @returns {Connection} + */ + get (peerId) { + if (!PeerId.isPeerId(peerId)) { + throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS) + } + + const id = peerId.toB58String() + const connections = this.connections.get(id) + + // Return the first, open connection + if (connections) { + return connections.find(connection => connection.stat.status === 'open') + } + return null } /** @@ -169,7 +256,7 @@ class ConnectionManager { * @private */ _maybeDisconnectOne () { - if (this._options.minConnections < this._connections.size) { + if (this._options.minConnections < this.connections.size) { const peerValues = Array.from(this._peerValues).sort(byPeerValue) debug('%s: sorted peer values: %j', this._peerId, peerValues) const disconnectPeer = peerValues[0] @@ -177,9 +264,9 @@ class ConnectionManager { const peerId = disconnectPeer[0] debug('%s: lowest value peer is %s', this._peerId, peerId) debug('%s: closing a connection to %j', this._peerId, peerId) - for (const connection of this._connections.values()) { - if (connection.remotePeer.toB58String() === peerId) { - connection.close() + for (const connections of this.connections.values()) { + if (connections[0].remotePeer.toB58String() === peerId) { + connections[0].close() break } } diff --git a/src/identify/index.js b/src/identify/index.js index 6fe8732fd6..d8ea46e857 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -45,16 +45,28 @@ class IdentifyService { /** * @constructor * @param {object} options - * @param {Registrar} options.registrar + * @param {PeerStore} options.peerStore + * @param {ConnectionManager} options.connectionManager * @param {Map} options.protocols A reference to the protocols we support * @param {PeerId} options.peerId The peer running the identify service * @param {{ listen: Array}} options.addresses The peer addresses */ constructor (options) { /** - * @property {Registrar} + * @property {PeerStore} */ - this.registrar = options.registrar + this.peerStore = options.peerStore + + /** + * @property {ConnectionManager} + */ + this.connectionManager = options.connectionManager + this.connectionManager.on('peer:connect', (connection) => { + const peerId = connection.remotePeer + + this.identify(connection, peerId).catch(log.error) + }) + /** * @property {PeerId} */ @@ -103,7 +115,7 @@ class IdentifyService { const connections = [] let connection for (const peer of peerStore.peers.values()) { - if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.registrar.getConnection(peer.id))) { + if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) { connections.push(connection) } } @@ -159,8 +171,8 @@ class IdentifyService { observedAddr = IdentifyService.getCleanMultiaddr(observedAddr) // Update peers data in PeerStore - this.registrar.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr))) - this.registrar.peerStore.protoBook.set(id, protocols) + this.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr))) + this.peerStore.protoBook.set(id, protocols) // TODO: Track our observed address so that we can score it log('received observed address of %s', observedAddr) @@ -244,13 +256,13 @@ class IdentifyService { // Update peers data in PeerStore const id = connection.remotePeer try { - this.registrar.peerStore.addressBook.set(id, message.listenAddrs.map((addr) => multiaddr(addr))) + this.peerStore.addressBook.set(id, message.listenAddrs.map((addr) => multiaddr(addr))) } catch (err) { return log.error('received invalid listen addrs', err) } // Update the protocols - this.registrar.peerStore.protoBook.set(id, message.protocols) + this.peerStore.protoBook.set(id, message.protocols) } } diff --git a/src/index.js b/src/index.js index c0b7fded57..b32c2f7d0c 100644 --- a/src/index.js +++ b/src/index.js @@ -53,55 +53,40 @@ class Libp2p extends EventEmitter { this._transport = [] // Transport instances/references this._discovery = new Map() // Discovery service instances/references + // Create the Connection Manager + this.connectionManager = new ConnectionManager(this, this._options.connectionManager) + + // Create Metrics if (this._options.metrics.enabled) { - this.metrics = new Metrics(this._options.metrics) + this.metrics = new Metrics({ + ...this._options.metrics, + connectionManager: this.connectionManager + }) } // Setup the Upgrader this.upgrader = new Upgrader({ localPeer: this.peerId, metrics: this.metrics, - onConnection: (connection) => { - const peerId = connection.remotePeer - - this.registrar.onConnect(peerId, connection) - this.connectionManager.onConnect(connection) - this.emit('peer:connect', peerId) - - // Run identify for every connection - if (this.identifyService) { - this.identifyService.identify(connection, peerId) - .catch(log.error) - } - }, - onConnectionEnd: (connection) => { - const peerId = connection.remotePeer - - this.registrar.onDisconnect(peerId, connection) - this.connectionManager.onDisconnect(connection) - - // If there are no connections to the peer, disconnect - if (!this.registrar.getConnection(peerId)) { - this.emit('peer:disconnect', peerId) - this.metrics && this.metrics.onPeerDisconnected(peerId) - } - } + onConnection: (connection) => this.connectionManager.onConnect(connection), + onConnectionEnd: (connection) => this.connectionManager.onDisconnect(connection) }) - // Create the Registrar - this.registrar = new Registrar({ peerStore: this.peerStore }) - this.handle = this.handle.bind(this) - this.registrar.handle = this.handle - - // Create the Connection Manager - this.connectionManager = new ConnectionManager(this, this._options.connectionManager) - // Setup the transport manager this.transportManager = new TransportManager({ libp2p: this, upgrader: this.upgrader }) + // Create the Registrar + this.registrar = new Registrar({ + peerStore: this.peerStore, + connectionManager: this.connectionManager + }) + + this.handle = this.handle.bind(this) + this.registrar.handle = this.handle + // Attach crypto channels if (this._modules.connEncryption) { const cryptos = this._modules.connEncryption @@ -137,7 +122,8 @@ class Libp2p extends EventEmitter { // Add the identify service since we can multiplex this.identifyService = new IdentifyService({ - registrar: this.registrar, + peerStore: this.peerStore, + connectionManager: this.connectionManager, peerId: this.peerId, addresses: this.addresses, protocols: this.upgrader.protocols @@ -239,7 +225,6 @@ class Libp2p extends EventEmitter { ]) await this.transportManager.close() - await this.registrar.close() ping.unmount(this) this.dialer.destroy() @@ -291,7 +276,7 @@ class Libp2p extends EventEmitter { */ async dialProtocol (peer, protocols, options) { const { id, multiaddrs } = getPeer(peer, this.peerStore) - let connection = this.registrar.getConnection(id) + let connection = this.connectionManager.get(id) if (!connection) { connection = await this.dialer.connectToPeer(peer, options) @@ -316,7 +301,7 @@ class Libp2p extends EventEmitter { const { id } = getPeer(peer) return Promise.all( - this.registrar.connections.get(id.toB58String()).map(connection => { + this.connectionManager.connections.get(id.toB58String()).map(connection => { return connection.close() }) ) @@ -443,9 +428,9 @@ class Libp2p extends EventEmitter { */ async _maybeConnect (peerId) { // If auto dialing is on and we have no connection to the peer, check if we should dial - if (this._config.peerDiscovery.autoDial === true && !this.registrar.getConnection(peerId)) { + if (this._config.peerDiscovery.autoDial === true && !this.connectionManager.get(peerId)) { const minPeers = this._options.connectionManager.minPeers || 0 - if (minPeers > this.connectionManager._connections.size) { + if (minPeers > this.connectionManager.size) { log('connecting to discovered peer %s', peerId.toB58String()) try { await this.dialer.connectToPeer(peerId) diff --git a/src/metrics/index.js b/src/metrics/index.js index e687f87b4f..0a92c036c1 100644 --- a/src/metrics/index.js +++ b/src/metrics/index.js @@ -21,6 +21,7 @@ class Metrics { /** * * @param {object} options + * @param {ConnectionManager} options.connectionManager * @param {number} options.computeThrottleMaxQueueSize * @param {number} options.computeThrottleTimeout * @param {Array} options.movingAverageIntervals @@ -34,6 +35,10 @@ class Metrics { this._oldPeers = oldPeerLRU(this._options.maxOldPeersRetention) this._running = false this._onMessage = this._onMessage.bind(this) + this._connectionManager = options.connectionManager + this._connectionManager.on('peer:disconnect', (connection) => { + this.onPeerDisconnected(connection.remotePeer) + }) } /** diff --git a/src/registrar.js b/src/registrar.js index fbe7acb532..6aa601f003 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -5,13 +5,10 @@ const errcode = require('err-code') const log = debug('libp2p:peer-store') log.error = debug('libp2p:peer-store:error') -const PeerId = require('peer-id') - const { ERR_INVALID_PARAMETERS } = require('./errors') const Topology = require('libp2p-interfaces/src/topology') -const { Connection } = require('libp2p-interfaces/src/connection') /** * Responsible for notifying registered protocols of events in the network. @@ -20,18 +17,14 @@ class Registrar { /** * @param {Object} props * @param {PeerStore} props.peerStore + * @param {connectionManager} props.connectionManager * @constructor */ - constructor ({ peerStore }) { + constructor ({ peerStore, connectionManager }) { // Used on topology to listen for protocol changes this.peerStore = peerStore - /** - * Map of connections per peer - * TODO: this should be handled by connectionManager - * @type {Map>} - */ - this.connections = new Map() + this.connectionManager = connectionManager /** * Map of topologies @@ -41,6 +34,9 @@ class Registrar { this.topologies = new Map() this._handle = undefined + + this._onDisconnect = this._onDisconnect.bind(this) + this.connectionManager.on('peer:disconnect', this._onDisconnect) } get handle () { @@ -51,93 +47,13 @@ class Registrar { this._handle = handle } - /** - * Cleans up the registrar - * @async - */ - async close () { - // Close all connections we're tracking - const tasks = [] - for (const connectionList of this.connections.values()) { - for (const connection of connectionList) { - tasks.push(connection.close()) - } - } - - await tasks - this.connections.clear() - } - - /** - * Add a new connected peer to the record - * TODO: this should live in the ConnectionManager - * @param {PeerId} peerId - * @param {Connection} conn - * @returns {void} - */ - onConnect (peerId, conn) { - if (!PeerId.isPeerId(peerId)) { - throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS) - } - - if (!Connection.isConnection(conn)) { - throw errcode(new Error('conn must be an instance of interface-connection'), ERR_INVALID_PARAMETERS) - } - - const id = peerId.toB58String() - const storedConn = this.connections.get(id) - - if (storedConn) { - storedConn.push(conn) - } else { - this.connections.set(id, [conn]) - } - } - - /** - * Remove a disconnected peer from the record - * TODO: this should live in the ConnectionManager - * @param {PeerId} peerId - * @param {Connection} connection - * @param {Error} [error] - * @returns {void} - */ - onDisconnect (peerId, connection, error) { - if (!PeerId.isPeerId(peerId)) { - throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS) - } - - const id = peerId.toB58String() - let storedConn = this.connections.get(id) - - if (storedConn && storedConn.length > 1) { - storedConn = storedConn.filter((conn) => conn.id !== connection.id) - this.connections.set(id, storedConn) - } else if (storedConn) { - for (const [, topology] of this.topologies) { - topology.disconnect(peerId, error) - } - - this.connections.delete(id) - } - } - /** * Get a connection with a peer. * @param {PeerId} peerId * @returns {Connection} */ getConnection (peerId) { - if (!PeerId.isPeerId(peerId)) { - throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS) - } - - const connections = this.connections.get(peerId.toB58String()) - // Return the first, open connection - if (connections) { - return connections.find(connection => connection.stat.status === 'open') - } - return null + return this.connectionManager.get(peerId) } /** @@ -169,6 +85,18 @@ class Registrar { unregister (id) { return this.topologies.delete(id) } + + /** + * Remove a disconnected peer from the record + * @param {Connection} connection + * @param {Error} [error] + * @returns {void} + */ + _onDisconnect (connection, error) { + for (const [, topology] of this.topologies) { + topology.disconnect(connection.remotePeer, error) + } + } } module.exports = Registrar diff --git a/test/connection-manager/index.node.js b/test/connection-manager/index.node.js new file mode 100644 index 0000000000..bec446357f --- /dev/null +++ b/test/connection-manager/index.node.js @@ -0,0 +1,88 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-as-promised')) +const { expect } = chai +const sinon = require('sinon') + +const multiaddr = require('multiaddr') + +const peerUtils = require('../utils/creators/peer') +const mockConnection = require('../utils/mockConnection') +const baseOptions = require('../utils/base-options.browser') + +const listenMultiaddr = multiaddr('/ip4/127.0.0.1/tcp/15002/ws') + +describe('Connection Manager', () => { + let libp2p + + beforeEach(async () => { + [libp2p] = await peerUtils.createPeer({ + config: { + addresses: { + listen: [listenMultiaddr] + }, + modules: baseOptions.modules + } + }) + }) + + afterEach(() => libp2p.stop()) + + it('should filter connections on disconnect, removing the closed one', async () => { + const [localPeer, remotePeer] = await peerUtils.createPeerId({ number: 2 }) + + const conn1 = await mockConnection({ localPeer, remotePeer }) + const conn2 = await mockConnection({ localPeer, remotePeer }) + + const id = remotePeer.toB58String() + + // Add connection to the connectionManager + libp2p.connectionManager.onConnect(conn1) + libp2p.connectionManager.onConnect(conn2) + + expect(libp2p.connectionManager.connections.get(id).length).to.eql(2) + + conn2._stat.status = 'closed' + libp2p.connectionManager.onDisconnect(conn2) + + const peerConnections = libp2p.connectionManager.connections.get(id) + expect(peerConnections.length).to.eql(1) + expect(peerConnections[0]._stat.status).to.eql('open') + }) + + it('should add connection on dial and remove on node stop', async () => { + const [remoteLibp2p] = await peerUtils.createPeer({ + config: { + addresses: { + listen: [multiaddr('/ip4/127.0.0.1/tcp/15003/ws')] + }, + modules: baseOptions.modules + } + }) + + // Spy on emit for easy verification + sinon.spy(libp2p.connectionManager, 'emit') + sinon.spy(remoteLibp2p.connectionManager, 'emit') + + libp2p.peerStore.addressBook.set(remoteLibp2p.peerId, remoteLibp2p.addresses.listen) + await libp2p.dial(remoteLibp2p.peerId) + + // check connect event + expect(libp2p.connectionManager.emit.callCount).to.equal(1) + const [event, connection] = libp2p.connectionManager.emit.getCall(0).args + expect(event).to.equal('peer:connect') + expect(connection.remotePeer.isEqual(remoteLibp2p.peerId)).to.equal(true) + + const libp2pConn = libp2p.connectionManager.get(remoteLibp2p.peerId) + expect(libp2pConn).to.exist() + + const remoteConn = remoteLibp2p.connectionManager.get(libp2p.peerId) + expect(remoteConn).to.exist() + + await remoteLibp2p.stop() + expect(remoteLibp2p.connectionManager.size).to.eql(0) + }) +}) diff --git a/test/connection-manager/index.spec.js b/test/connection-manager/index.spec.js index ce4c6312dc..8ffb0ee19c 100644 --- a/test/connection-manager/index.spec.js +++ b/test/connection-manager/index.spec.js @@ -7,7 +7,7 @@ chai.use(require('chai-as-promised')) const { expect } = chai const sinon = require('sinon') -const { createPeer } = require('../utils/creators/peer') +const peerUtils = require('../utils/creators/peer') const mockConnection = require('../utils/mockConnection') const baseOptions = require('../utils/base-options.browser') @@ -20,7 +20,7 @@ describe('Connection Manager', () => { }) it('should be able to create without metrics', async () => { - [libp2p] = await createPeer({ + [libp2p] = await peerUtils.createPeer({ config: { modules: baseOptions.modules }, @@ -35,7 +35,7 @@ describe('Connection Manager', () => { }) it('should be able to create with metrics', async () => { - [libp2p] = await createPeer({ + [libp2p] = await peerUtils.createPeer({ config: { modules: baseOptions.modules, metrics: { @@ -49,12 +49,12 @@ describe('Connection Manager', () => { await libp2p.start() expect(spy).to.have.property('callCount', 1) - expect(libp2p.connectionManager._metrics).to.exist() + expect(libp2p.connectionManager._libp2p.metrics).to.exist() }) it('should close lowest value peer connection when the maximum has been reached', async () => { const max = 5 - ;[libp2p] = await createPeer({ + ;[libp2p] = await peerUtils.createPeer({ config: { modules: baseOptions.modules, connectionManager: { @@ -92,7 +92,7 @@ describe('Connection Manager', () => { it('should close connection when the maximum has been reached even without peer values', async () => { const max = 5 - ;[libp2p] = await createPeer({ + ;[libp2p] = await peerUtils.createPeer({ config: { modules: baseOptions.modules, connectionManager: { @@ -110,7 +110,7 @@ describe('Connection Manager', () => { const spy = sinon.spy() await Promise.all([...new Array(max + 1)].map(async () => { const connection = await mockConnection() - sinon.stub(connection, 'close').callsFake(() => spy()) + sinon.stub(connection, 'close').callsFake(() => spy()) // eslint-disable-line libp2p.connectionManager.onConnect(connection) })) @@ -119,7 +119,7 @@ describe('Connection Manager', () => { }) it('should fail if the connection manager has mismatched connection limit options', async () => { - await expect(createPeer({ + await expect(peerUtils.createPeer({ config: { modules: baseOptions.modules, connectionManager: { diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js index d7c9e6fa02..e3bd7d7be5 100644 --- a/test/dialing/direct.node.js +++ b/test/dialing/direct.node.js @@ -373,8 +373,8 @@ describe('Dialing (direct, TCP)', () => { } // 1 connection, because we know the peer in the multiaddr - expect(libp2p.connectionManager._connections.size).to.equal(1) - expect(remoteLibp2p.connectionManager._connections.size).to.equal(1) + expect(libp2p.connectionManager.size).to.equal(1) + expect(remoteLibp2p.connectionManager.size).to.equal(1) }) it('should coalesce parallel dials to the same error on failure', async () => { @@ -408,8 +408,8 @@ describe('Dialing (direct, TCP)', () => { } // 1 connection, because we know the peer in the multiaddr - expect(libp2p.connectionManager._connections.size).to.equal(0) - expect(remoteLibp2p.connectionManager._connections.size).to.equal(0) + expect(libp2p.connectionManager.size).to.equal(0) + expect(remoteLibp2p.connectionManager.size).to.equal(0) }) }) }) diff --git a/test/dialing/relay.node.js b/test/dialing/relay.node.js index 57a5e8ac88..4b706225ec 100644 --- a/test/dialing/relay.node.js +++ b/test/dialing/relay.node.js @@ -120,7 +120,7 @@ describe('Dialing (via relay, TCP)', () => { .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) // We should not be connected to the relay, because we weren't before the dial - const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerId) + const srcToRelayConn = srcLibp2p.connectionManager.get(relayLibp2p.peerId) expect(srcToRelayConn).to.not.exist() }) @@ -137,7 +137,7 @@ describe('Dialing (via relay, TCP)', () => { .to.eventually.be.rejectedWith(AggregateError) .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) - const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerId) + const srcToRelayConn = srcLibp2p.connectionManager.get(relayLibp2p.peerId) expect(srcToRelayConn).to.exist() expect(srcToRelayConn.stat.status).to.equal('open') }) @@ -163,7 +163,7 @@ describe('Dialing (via relay, TCP)', () => { .to.eventually.be.rejectedWith(AggregateError) .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) - const dstToRelayConn = dstLibp2p.registrar.getConnection(relayLibp2p.peerId) + const dstToRelayConn = dstLibp2p.connectionManager.get(relayLibp2p.peerId) expect(dstToRelayConn).to.exist() expect(dstToRelayConn.stat.status).to.equal('open') }) diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 648000a147..c10bda7b87 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -7,6 +7,7 @@ chai.use(require('chai-as-promised')) const { expect } = chai const sinon = require('sinon') +const { EventEmitter } = require('events') const delay = require('delay') const PeerId = require('peer-id') const duplexPair = require('it-pair/duplex') @@ -48,14 +49,13 @@ describe('Identify', () => { listen: [] }, protocols, - registrar: { - peerStore: { - addressBook: { - set: () => { } - }, - protoBook: { - set: () => { } - } + connectionManager: new EventEmitter(), + peerStore: { + addressBook: { + set: () => { } + }, + protoBook: { + set: () => { } } } }) @@ -64,7 +64,8 @@ describe('Identify', () => { addresses: { listen: [] }, - protocols + protocols, + connectionManager: new EventEmitter() }) const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') @@ -74,8 +75,8 @@ describe('Identify', () => { const [local, remote] = duplexPair() sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY }) - sinon.spy(localIdentify.registrar.peerStore.addressBook, 'set') - sinon.spy(localIdentify.registrar.peerStore.protoBook, 'set') + sinon.spy(localIdentify.peerStore.addressBook, 'set') + sinon.spy(localIdentify.peerStore.protoBook, 'set') // Run identify await Promise.all([ @@ -87,10 +88,10 @@ describe('Identify', () => { }) ]) - expect(localIdentify.registrar.peerStore.addressBook.set.callCount).to.equal(1) - expect(localIdentify.registrar.peerStore.protoBook.set.callCount).to.equal(1) + expect(localIdentify.peerStore.addressBook.set.callCount).to.equal(1) + expect(localIdentify.peerStore.protoBook.set.callCount).to.equal(1) // Validate the remote peer gets updated in the peer store - const call = localIdentify.registrar.peerStore.addressBook.set.firstCall + const call = localIdentify.peerStore.addressBook.set.firstCall expect(call.args[0].id.bytes).to.equal(remotePeer.bytes) }) @@ -101,14 +102,13 @@ describe('Identify', () => { listen: [] }, protocols, - registrar: { - peerStore: { - addressBook: { - set: () => { } - }, - protoBook: { - set: () => { } - } + connectionManager: new EventEmitter(), + peerStore: { + addressBook: { + set: () => { } + }, + protoBook: { + set: () => { } } } }) @@ -117,7 +117,8 @@ describe('Identify', () => { addresses: { listen: [] }, - protocols + protocols, + connectionManager: new EventEmitter() }) const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') @@ -145,12 +146,15 @@ describe('Identify', () => { describe('push', () => { it('should be able to push identify updates to another peer', async () => { const listeningAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') + const connectionManager = new EventEmitter() + connectionManager.getConnection = () => {} + const localIdentify = new IdentifyService({ peerId: localPeer, addresses: { listen: [listeningAddr] }, - registrar: { getConnection: () => {} }, + connectionManager, protocols: new Map([ [multicodecs.IDENTIFY], [multicodecs.IDENTIFY_PUSH], @@ -162,14 +166,13 @@ describe('Identify', () => { addresses: { listen: [] }, - registrar: { - peerStore: { - addressBook: { - set: () => {} - }, - protoBook: { - set: () => { } - } + connectionManager, + peerStore: { + addressBook: { + set: () => { } + }, + protoBook: { + set: () => { } } } }) @@ -182,8 +185,8 @@ describe('Identify', () => { const [local, remote] = duplexPair() sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY_PUSH }) - sinon.spy(remoteIdentify.registrar.peerStore.addressBook, 'set') - sinon.spy(remoteIdentify.registrar.peerStore.protoBook, 'set') + sinon.spy(remoteIdentify.peerStore.addressBook, 'set') + sinon.spy(remoteIdentify.peerStore.protoBook, 'set') // Run identify await Promise.all([ @@ -195,12 +198,12 @@ describe('Identify', () => { }) ]) - expect(remoteIdentify.registrar.peerStore.addressBook.set.callCount).to.equal(1) - expect(remoteIdentify.registrar.peerStore.protoBook.set.callCount).to.equal(1) - const [peerId, multiaddrs] = remoteIdentify.registrar.peerStore.addressBook.set.firstCall.args + expect(remoteIdentify.peerStore.addressBook.set.callCount).to.equal(1) + expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1) + const [peerId, multiaddrs] = remoteIdentify.peerStore.addressBook.set.firstCall.args expect(peerId.bytes).to.eql(localPeer.bytes) expect(multiaddrs).to.eql([listeningAddr]) - const [peerId2, protocols] = remoteIdentify.registrar.peerStore.protoBook.set.firstCall.args + const [peerId2, protocols] = remoteIdentify.peerStore.protoBook.set.firstCall.args expect(peerId2.bytes).to.eql(localPeer.bytes) expect(protocols).to.eql(Array.from(localProtocols)) }) diff --git a/test/metrics/index.spec.js b/test/metrics/index.spec.js index d84250ef4d..da0c10d555 100644 --- a/test/metrics/index.spec.js +++ b/test/metrics/index.spec.js @@ -7,6 +7,8 @@ chai.use(require('chai-as-promised')) const { expect } = chai const sinon = require('sinon') +const { EventEmitter } = require('events') + const { randomBytes } = require('libp2p-crypto') const duplexPair = require('it-pair/duplex') const pipe = require('it-pipe') @@ -35,7 +37,8 @@ describe('Metrics', () => { const [local, remote] = duplexPair() const metrics = new Metrics({ computeThrottleMaxQueueSize: 1, // compute after every message - movingAverageIntervals: [10, 100, 1000] + movingAverageIntervals: [10, 100, 1000], + connectionManager: new EventEmitter() }) metrics.trackStream({ @@ -70,7 +73,8 @@ describe('Metrics', () => { const [local, remote] = duplexPair() const metrics = new Metrics({ computeThrottleMaxQueueSize: 1, // compute after every message - movingAverageIntervals: [10, 100, 1000] + movingAverageIntervals: [10, 100, 1000], + connectionManager: new EventEmitter() }) metrics.trackStream({ @@ -118,7 +122,8 @@ describe('Metrics', () => { const [local2, remote2] = duplexPair() const metrics = new Metrics({ computeThrottleMaxQueueSize: 1, // compute after every message - movingAverageIntervals: [10, 100, 1000] + movingAverageIntervals: [10, 100, 1000], + connectionManager: new EventEmitter() }) const protocol = '/echo/1.0.0' metrics.start() @@ -173,7 +178,8 @@ describe('Metrics', () => { const [local, remote] = duplexPair() const metrics = new Metrics({ computeThrottleMaxQueueSize: 1, // compute after every message - movingAverageIntervals: [10, 100, 1000] + movingAverageIntervals: [10, 100, 1000], + connectionManager: new EventEmitter() }) metrics.start() @@ -228,7 +234,8 @@ describe('Metrics', () => { })) const metrics = new Metrics({ - maxOldPeersRetention: 5 // Only keep track of 5 + maxOldPeersRetention: 5, // Only keep track of 5 + connectionManager: new EventEmitter() }) // Clone so trackedPeers isn't modified diff --git a/test/registrar/registrar.node.js b/test/registrar/registrar.node.js deleted file mode 100644 index f2829d918a..0000000000 --- a/test/registrar/registrar.node.js +++ /dev/null @@ -1,72 +0,0 @@ -'use strict' -/* eslint-env mocha */ - -const chai = require('chai') -chai.use(require('dirty-chai')) -const { expect } = chai -const sinon = require('sinon') - -const mergeOptions = require('merge-options') - -const multiaddr = require('multiaddr') -const Libp2p = require('../../src') - -const baseOptions = require('../utils/base-options') -const peerUtils = require('../utils/creators/peer') -const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') - -describe('registrar on dial', () => { - let peerId - let remotePeerId - let libp2p - let remoteLibp2p - let remoteAddr - - before(async () => { - [peerId, remotePeerId] = await peerUtils.createPeerId({ number: 2 }) - remoteLibp2p = new Libp2p(mergeOptions(baseOptions, { - peerId: remotePeerId - })) - - await remoteLibp2p.transportManager.listen([listenAddr]) - remoteAddr = remoteLibp2p.transportManager.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`) - }) - - after(async () => { - sinon.restore() - await remoteLibp2p.stop() - libp2p && await libp2p.stop() - }) - - it('should inform registrar of a new connection', async () => { - libp2p = new Libp2p(mergeOptions(baseOptions, { - peerId - })) - - sinon.spy(remoteLibp2p.registrar, 'onConnect') - - await libp2p.dial(remoteAddr) - expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1) - - const libp2pConn = libp2p.registrar.getConnection(remotePeerId) - expect(libp2pConn).to.exist() - - const remoteConn = remoteLibp2p.registrar.getConnection(peerId) - expect(remoteConn).to.exist() - }) - - it('should be closed on libp2p stop', async () => { - libp2p = new Libp2p(mergeOptions(baseOptions, { - peerId - })) - - await libp2p.dial(remoteAddr) - expect(libp2p.connections.size).to.equal(1) - - sinon.spy(libp2p.registrar, 'close') - - await libp2p.stop() - expect(libp2p.registrar.close.callCount).to.equal(1) - expect(libp2p.connections.size).to.equal(0) - }) -}) diff --git a/test/registrar/registrar.spec.js b/test/registrar/registrar.spec.js index 696531e98b..7fd57991ac 100644 --- a/test/registrar/registrar.spec.js +++ b/test/registrar/registrar.spec.js @@ -6,21 +6,26 @@ chai.use(require('dirty-chai')) const { expect } = chai const pDefer = require('p-defer') +const { EventEmitter } = require('events') + const Topology = require('libp2p-interfaces/src/topology/multicodec-topology') const PeerStore = require('../../src/peer-store') const Registrar = require('../../src/registrar') -const { createMockConnection } = require('./utils') + +const createMockConnection = require('../utils/mockConnection') const peerUtils = require('../utils/creators/peer') +const baseOptions = require('../utils/base-options.browser') const multicodec = '/test/1.0.0' describe('registrar', () => { - let peerStore, registrar + let peerStore + let registrar describe('errors', () => { beforeEach(() => { peerStore = new PeerStore() - registrar = new Registrar({ peerStore }) + registrar = new Registrar({ peerStore, connectionManager: new EventEmitter() }) }) it('should fail to register a protocol if no multicodec is provided', () => { @@ -36,11 +41,19 @@ describe('registrar', () => { }) describe('registration', () => { - beforeEach(() => { - peerStore = new PeerStore() - registrar = new Registrar({ peerStore }) + let libp2p + + beforeEach(async () => { + [libp2p] = await peerUtils.createPeer({ + config: { + modules: baseOptions.modules + }, + started: false + }) }) + afterEach(() => libp2p.stop()) + it('should be able to register a protocol', () => { const topologyProps = new Topology({ multicodecs: multicodec, @@ -50,7 +63,7 @@ describe('registrar', () => { } }) - const identifier = registrar.register(topologyProps) + const identifier = libp2p.registrar.register(topologyProps) expect(identifier).to.exist() }) @@ -64,14 +77,14 @@ describe('registrar', () => { } }) - const identifier = registrar.register(topologyProps) - const success = registrar.unregister(identifier) + const identifier = libp2p.registrar.register(topologyProps) + const success = libp2p.registrar.unregister(identifier) expect(success).to.eql(true) }) it('should fail to unregister if no register was made', () => { - const success = registrar.unregister('bad-identifier') + const success = libp2p.registrar.unregister('bad-identifier') expect(success).to.eql(false) }) @@ -85,10 +98,10 @@ describe('registrar', () => { const remotePeerId = conn.remotePeer // Add connected peer with protocol to peerStore and registrar - peerStore.protoBook.add(remotePeerId, [multicodec]) + libp2p.peerStore.protoBook.add(remotePeerId, [multicodec]) - registrar.onConnect(remotePeerId, conn) - expect(registrar.connections.size).to.eql(1) + libp2p.connectionManager.onConnect(conn) + expect(libp2p.connectionManager.size).to.eql(1) const topologyProps = new Topology({ multicodecs: multicodec, @@ -108,14 +121,16 @@ describe('registrar', () => { }) // Register protocol - const identifier = registrar.register(topologyProps) - const topology = registrar.topologies.get(identifier) + const identifier = libp2p.registrar.register(topologyProps) + const topology = libp2p.registrar.topologies.get(identifier) // Topology created expect(topology).to.exist() - registrar.onDisconnect(remotePeerId) - expect(registrar.connections.size).to.eql(0) + await conn.close() + + libp2p.connectionManager.onDisconnect(conn) + expect(libp2p.connectionManager.size).to.eql(0) // Wait for handlers to be called return Promise.all([ @@ -141,68 +156,30 @@ describe('registrar', () => { }) // Register protocol - const identifier = registrar.register(topologyProps) - const topology = registrar.topologies.get(identifier) + const identifier = libp2p.registrar.register(topologyProps) + const topology = libp2p.registrar.topologies.get(identifier) // Topology created expect(topology).to.exist() - expect(registrar.connections.size).to.eql(0) + expect(libp2p.connectionManager.size).to.eql(0) // Setup connections before registrar const conn = await createMockConnection() const remotePeerId = conn.remotePeer // Add connected peer to peerStore and registrar - peerStore.protoBook.set(remotePeerId, []) - registrar.onConnect(remotePeerId, conn) + libp2p.peerStore.protoBook.set(remotePeerId, []) + libp2p.connectionManager.onConnect(conn) // Add protocol to peer and update it - peerStore.protoBook.add(remotePeerId, [multicodec]) + libp2p.peerStore.protoBook.add(remotePeerId, [multicodec]) await onConnectDefer.promise // Remove protocol to peer and update it - peerStore.protoBook.set(remotePeerId, []) + libp2p.peerStore.protoBook.set(remotePeerId, []) await onDisconnectDefer.promise }) - - it('should filter connections on disconnect, removing the closed one', async () => { - const onDisconnectDefer = pDefer() - - const topologyProps = new Topology({ - multicodecs: multicodec, - handlers: { - onConnect: () => {}, - onDisconnect: () => { - onDisconnectDefer.resolve() - } - } - }) - - // Register protocol - registrar.register(topologyProps) - - // Setup connections before registrar - const [localPeer, remotePeer] = await peerUtils.createPeerId({ number: 2 }) - - const conn1 = await createMockConnection({ localPeer, remotePeer }) - const conn2 = await createMockConnection({ localPeer, remotePeer }) - - const id = remotePeer.toB58String() - - // Add connection to registrar - registrar.onConnect(remotePeer, conn1) - registrar.onConnect(remotePeer, conn2) - - expect(registrar.connections.get(id).length).to.eql(2) - - conn2._stat.status = 'closed' - registrar.onDisconnect(remotePeer, conn2) - - const peerConnections = registrar.connections.get(id) - expect(peerConnections.length).to.eql(1) - expect(peerConnections[0]._stat.status).to.eql('open') - }) }) }) diff --git a/test/registrar/utils.js b/test/registrar/utils.js deleted file mode 100644 index 727d99b195..0000000000 --- a/test/registrar/utils.js +++ /dev/null @@ -1,51 +0,0 @@ -'use strict' - -const { Connection } = require('libp2p-interfaces/src/connection') -const multiaddr = require('multiaddr') - -const pair = require('it-pair') - -const peerUtils = require('../utils/creators/peer') - -module.exports.createMockConnection = async (properties = {}) => { - const localAddr = multiaddr('/ip4/127.0.0.1/tcp/8080') - const remoteAddr = multiaddr('/ip4/127.0.0.1/tcp/8081') - - const [localPeer, remotePeer] = await peerUtils.createPeerId({ number: 2 }) - const openStreams = [] - let streamId = 0 - - return new Connection({ - localPeer: localPeer, - remotePeer: remotePeer, - localAddr, - remoteAddr, - stat: { - timeline: { - open: Date.now() - 10, - upgraded: Date.now() - }, - direction: 'outbound', - encryption: '/secio/1.0.0', - multiplexer: '/mplex/6.7.0', - status: 'open' - }, - newStream: (protocols) => { - const id = streamId++ - const stream = pair() - - stream.close = () => stream.sink([]) - stream.id = id - - openStreams.push(stream) - - return { - stream, - protocol: protocols[0] - } - }, - close: () => { }, - getStreams: () => openStreams, - ...properties - }) -} diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js index 6579bfe347..484470c3fb 100644 --- a/test/upgrading/upgrader.spec.js +++ b/test/upgrading/upgrader.spec.js @@ -420,24 +420,24 @@ describe('libp2p.upgrader', () => { const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) // Spy on emit for easy verification - sinon.spy(libp2p, 'emit') + sinon.spy(libp2p.connectionManager, 'emit') // Upgrade and check the connect event const connections = await Promise.all([ libp2p.upgrader.upgradeOutbound(outbound), remoteUpgrader.upgradeInbound(inbound) ]) - expect(libp2p.emit.callCount).to.equal(1) + expect(libp2p.connectionManager.emit.callCount).to.equal(1) - let [event, peerId] = libp2p.emit.getCall(0).args + let [event, connection] = libp2p.connectionManager.emit.getCall(0).args expect(event).to.equal('peer:connect') - expect(peerId.isEqual(remotePeer)).to.equal(true) + expect(connection.remotePeer.isEqual(remotePeer)).to.equal(true) // Close and check the disconnect event await Promise.all(connections.map(conn => conn.close())) - expect(libp2p.emit.callCount).to.equal(2) - ;([event, peerId] = libp2p.emit.getCall(1).args) + expect(libp2p.connectionManager.emit.callCount).to.equal(2) + ;([event, connection] = libp2p.connectionManager.emit.getCall(1).args) expect(event).to.equal('peer:disconnect') - expect(peerId.isEqual(remotePeer)).to.equal(true) + expect(connection.remotePeer.isEqual(remotePeer)).to.equal(true) }) })