Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor conn mgr and registrar #611

Merged
merged 2 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions doc/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
* [`handle`](#handle)
* [`unhandle`](#unhandle)
* [`ping`](#ping)
* [`peerRouting.findPeer`](#peerroutingfindpeer)
* [`contentRouting.findProviders`](#contentroutingfindproviders)
* [`contentRouting.provide`](#contentroutingprovide)
* [`contentRouting.put`](#contentroutingput)
* [`contentRouting.get`](#contentroutingget)
* [`contentRouting.getMany`](#contentroutinggetmany)
* [`peerRouting.findPeer`](#peerroutingfindpeer)
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
* [`peerStore.addressBook.get`](#peerstoreaddressbookget)
Expand All @@ -34,14 +34,17 @@
* [`pubsub.publish`](#pubsubpublish)
* [`pubsub.subscribe`](#pubsubsubscribe)
* [`pubsub.unsubscribe`](#pubsubunsubscribe)
* [`connectionManager.get`](#connectionmanagerget)
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
* [`connectionManager.size`](#connectionmanagersize)
* [`metrics.global`](#metricsglobal)
* [`metrics.peers`](#metricspeers)
* [`metrics.protocols`](#metricsprotocols)
* [`metrics.forPeer`](#metricsforpeer)
* [`metrics.forProtocol`](#metricsforprotocol)
* [Events](#events)
* [`libp2p`](#libp2p)
* [`libp2p.connectionManager`](#libp2pconnectionmanager)
* [`libp2p.peerStore`](#libp2ppeerStore)
* [Types](#types)
* [`Stats`](#stats)
Expand Down Expand Up @@ -999,6 +1002,28 @@ const handler = (msg) => {
libp2p.pubsub.unsubscribe(topic, handler)
```

### connectionManager.get

Get a connection with a given peer, if it exists.

#### Parameters

| Name | Type | Description |
|------|------|-------------|
| peerId | [`PeerId`][peer-id] | The peer to find |

#### Returns

| Type | Description |
|------|-------------|
| [`Connection`][connection] | Connection with the given peer |

#### Example

```js
libp2p.connectionManager.get(peerId)
```

### connectionManager.setPeerValue

Enables users to change the value of certain peers in a range of 0 to 1. Peers with the lowest values will have their Connections pruned first, if any Connection Manager limits are exceeded. See [./CONFIGURATION.md#configuring-connection-manager](./CONFIGURATION.md#configuring-connection-manager) for details on how to configure these limits.
Expand All @@ -1025,6 +1050,17 @@ libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1)
libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0)
```

### connectionManager.size

Getter for obtaining the current number of open connections.

#### Example

```js
libp2p.connectionManager.size
// 10
```

### metrics.global

A [`Stats`](#stats) object of tracking the global bandwidth of the libp2p node.
Expand Down Expand Up @@ -1126,21 +1162,23 @@ unless they are performing a specific action. See [peer discovery and auto dial]

- `peer`: instance of [`PeerId`][peer-id]

### libp2p.connectionManager

#### A new connection to a peer has been opened

This event will be triggered anytime a new Connection is established to another peer.

`libp2p.on('peer:connect', (peer) => {})`
`libp2p.on('peer:connect', (connection) => {})`
vasco-santos marked this conversation as resolved.
Show resolved Hide resolved

- `peer`: instance of [`PeerId`][peer-id]
- `connection`: instance of [`Connection`][connection]

#### An existing connection to a peer has been closed

This event will be triggered anytime we are disconnected from another peer, regardless of the circumstances of that disconnection. If we happen to have multiple connections to a peer, this event will **only** be triggered when the last connection is closed.

`libp2p.on('peer:disconnect', (peer) => {})`
`libp2p.on('peer:disconnect', (connection) => {})`
vasco-santos marked this conversation as resolved.
Show resolved Hide resolved

- `peer`: instance of [`PeerId`][peer-id]
- `connection`: instance of [`Connection`][connection]

### libp2p.peerStore

Expand Down
2 changes: 1 addition & 1 deletion src/circuit/circuit/hop.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ module.exports.handleHop = async function handleHop ({
// Get the connection to the destination (stop) peer
const destinationPeer = new PeerId(request.dstPeer.id)

const destinationConnection = circuit._registrar.getConnection(destinationPeer)
const destinationConnection = circuit._connectionManager.get(destinationPeer)
if (!destinationConnection && !circuit._options.hop.active) {
log('HOP request received but we are not connected to the destination peer')
return streamHandler.end({
Expand Down
3 changes: 2 additions & 1 deletion src/circuit/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Circuit {
constructor ({ libp2p, upgrader }) {
this._dialer = libp2p.dialer
this._registrar = libp2p.registrar
this._connectionManager = libp2p.connectionManager
this._upgrader = upgrader
this._options = libp2p._config.relay
this.addresses = libp2p.addresses
Expand Down Expand Up @@ -107,7 +108,7 @@ class Circuit {
const destinationPeer = PeerId.createFromCID(destinationAddr.getPeerId())

let disconnectOnFailure = false
let relayConnection = this._registrar.getConnection(relayPeer)
let relayConnection = this._connectionManager.get(relayPeer)
if (!relayConnection) {
relayConnection = await this._dialer.connectToPeer(relayAddr, options)
disconnectOnFailure = true
Expand Down
122 changes: 107 additions & 15 deletions src/connection-manager/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ const LatencyMonitor = require('latency-monitor').default
const debug = require('debug')('libp2p:connection-manager')
const retimer = require('retimer')

const { EventEmitter } = require('events')

const PeerId = require('peer-id')
const { Connection } = require('libp2p-interfaces/src/connection')

const {
ERR_INVALID_PARAMETERS
} = require('../errors')
Expand All @@ -22,7 +27,12 @@ const defaultOptions = {
defaultPeerValue: 1
}

class ConnectionManager {
/**
* Responsible for managing known connections.
* @fires ConnectionManager#peer:connect Emitted when a new peer is connected.
* @fires ConnectionManager#peer:disconnect Emitted when a peer is disconnected.
*/
class ConnectionManager extends EventEmitter {
/**
* @constructor
* @param {Libp2p} libp2p
Expand All @@ -38,30 +48,50 @@ class ConnectionManager {
* @param {Number} options.defaultPeerValue The value of the peer. Default=1
*/
constructor (libp2p, options) {
super()

this._libp2p = libp2p
this._registrar = libp2p.registrar
this._peerId = libp2p.peerId.toB58String()

this._options = mergeOptions.call({ ignoreUndefined: true }, defaultOptions, options)
if (this._options.maxConnections < this._options.minConnections) {
throw errcode(new Error('Connection Manager maxConnections must be greater than minConnections'), ERR_INVALID_PARAMETERS)
}

debug('options: %j', this._options)

this._metrics = libp2p.metrics
this._libp2p = libp2p

/**
* Map of peer identifiers to their peer value for pruning connections.
* @type {Map<string, number>}
*/
this._peerValues = new Map()
this._connections = new Map()

/**
* Map of connections per peer
* @type {Map<string, Array<conn>>}
*/
this.connections = new Map()

this._timer = null
this._checkMetrics = this._checkMetrics.bind(this)
}

/**
* Get current number of open connections.
*/
get size () {
return Array.from(this.connections.values())
.reduce((accumulator, value) => accumulator + value.length, 0)
}

/**
* Starts the Connection Manager. If Metrics are not enabled on libp2p
* only event loop and connection limits will be monitored.
*/
start () {
if (this._metrics) {
if (this._libp2p.metrics) {
this._timer = this._timer || retimer(this._checkMetrics, this._options.pollInterval)
}

Expand All @@ -77,13 +107,33 @@ class ConnectionManager {

/**
* Stops the Connection Manager
* @async
*/
stop () {
async stop () {
this._timer && this._timer.clear()
this._latencyMonitor && this._latencyMonitor.removeListener('data', this._onLatencyMeasure)

await this._close()
debug('stopped')
}

/**
* Cleans up the connections
* @async
*/
async _close () {
// Close all connections we're tracking
const tasks = []
for (const connectionList of this.connections.values()) {
for (const connection of connectionList) {
tasks.push(connection.close())
}
}

await tasks
this.connections.clear()
}

/**
* Sets the value of the given peer. Peers with lower values
* will be disconnected first.
Expand All @@ -106,7 +156,7 @@ class ConnectionManager {
* @private
*/
_checkMetrics () {
const movingAverages = this._metrics.global.movingAverages
const movingAverages = this._libp2p.metrics.global.movingAverages
const received = movingAverages.dataReceived[this._options.movingAverageInterval].movingAverage()
this._checkLimit('maxReceivedData', received)
const sent = movingAverages.dataSent[this._options.movingAverageInterval].movingAverage()
Expand All @@ -122,21 +172,63 @@ class ConnectionManager {
* @param {Connection} connection
*/
onConnect (connection) {
if (!Connection.isConnection(connection)) {
throw errcode(new Error('conn must be an instance of interface-connection'), ERR_INVALID_PARAMETERS)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onConnect is an internal handler, we should avoid throwing errors as this is going to be fatal. Log and return instead.


const peerId = connection.remotePeer.toB58String()
this._connections.set(connection.id, connection)
const storedConn = this.connections.get(peerId)

if (storedConn) {
storedConn.push(connection)
} else {
this.connections.set(peerId, [connection])
this.emit('peer:connect', connection)
}

if (!this._peerValues.has(peerId)) {
this._peerValues.set(peerId, this._options.defaultPeerValue)
}
this._checkLimit('maxConnections', this._connections.size)

this._checkLimit('maxConnections', this.size)
}

/**
* Removes the connection from tracking
* @param {Connection} connection
*/
onDisconnect (connection) {
this._connections.delete(connection.id)
this._peerValues.delete(connection.remotePeer.toB58String())
const peerId = connection.remotePeer.toB58String()
let storedConn = this.connections.get(peerId)

if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
this.connections.set(peerId, storedConn)
} else if (storedConn) {
this.connections.delete(peerId)
this._peerValues.delete(connection.remotePeer.toB58String())
this.emit('peer:disconnect', connection)
}
}

/**
* Get a connection with a peer.
* @param {PeerId} peerId
* @returns {Connection}
*/
get (peerId) {
if (!PeerId.isPeerId(peerId)) {
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
}

const id = peerId.toB58String()
const connections = this.connections.get(id)

// Return the first, open connection
if (connections) {
return connections.find(connection => connection.stat.status === 'open')
}
return null
}

/**
Expand Down Expand Up @@ -169,17 +261,17 @@ class ConnectionManager {
* @private
*/
_maybeDisconnectOne () {
if (this._options.minConnections < this._connections.size) {
if (this._options.minConnections < this.connections.size) {
const peerValues = Array.from(this._peerValues).sort(byPeerValue)
debug('%s: sorted peer values: %j', this._peerId, peerValues)
const disconnectPeer = peerValues[0]
if (disconnectPeer) {
const peerId = disconnectPeer[0]
debug('%s: lowest value peer is %s', this._peerId, peerId)
debug('%s: closing a connection to %j', this._peerId, peerId)
for (const connection of this._connections.values()) {
if (connection.remotePeer.toB58String() === peerId) {
connection.close()
for (const connections of this.connections.values()) {
if (connections[0].remotePeer.toB58String() === peerId) {
connections[0].close()
break
}
}
Expand Down
Loading