Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: registrar #471

Merged
merged 7 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions src/connection-manager/topology.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
'use strict'

const assert = require('assert')

class Topology {
/**
* @param {Object} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {Array<string>} props.multicodecs protocol multicodecs
* @param {Object} props.handlers
* @param {function} props.handlers.onConnect protocol "onConnect" handler
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
* @constructor
*/
constructor ({
min = 0,
max = Infinity,
multicodecs,
handlers
}) {
assert(multicodecs, 'one or more multicodec should be provided')
assert(handlers, 'the handlers should be provided')
assert(handlers.onConnect && typeof handlers.onConnect === 'function',
'the \'onConnect\' handler must be provided')
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function',
'the \'onDisconnect\' handler must be provided')

this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs]
this.min = min
this.max = max

// Handlers
this._onConnect = handlers.onConnect
this._onDisconnect = handlers.onDisconnect

this.peers = new Map()
this._registrar = undefined

this._onProtocolChange = this._onProtocolChange.bind(this)
}

set registrar (registrar) {
this._registrar = registrar
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)

// Update topology peers
this._updatePeers(this._registrar.peerStore.peers.values())
}

/**
* Update topology.
* @param {Array<PeerInfo>} peerInfoIterable
* @returns {void}
*/
_updatePeers (peerInfoIterable) {
for (const peerInfo of peerInfoIterable) {
if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) {
// Add the peer regardless of whether or not there is currently a connection
this.peers.set(peerInfo.id.toB58String(), peerInfo)
// If there is a connection, call _onConnect
const connection = this._registrar.getConnection(peerInfo)
connection && this._onConnect(peerInfo, connection)
} else {
// Remove any peers we might be tracking that are no longer of value to us
this.peers.delete(peerInfo.id.toB58String())
}
}
}

/**
* Notify protocol of peer disconnected.
* @param {PeerInfo} peerInfo
* @param {Error} [error]
* @returns {void}
*/
disconnect (peerInfo, error) {
this._onDisconnect(peerInfo, error)
}

/**
* Check if a new peer support the multicodecs for this topology.
* @param {Object} props
* @param {PeerInfo} props.peerInfo
* @param {Array<string>} props.protocols
*/
_onProtocolChange ({ peerInfo, protocols }) {
const existingPeer = this.peers.get(peerInfo.id.toB58String())
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol))

// Not supporting the protocol anymore?
if (existingPeer && hasProtocol.length === 0) {
this._onDisconnect({
peerInfo
})
}

// New to protocol support
for (const protocol of protocols) {
if (this.multicodecs.includes(protocol)) {
this._updatePeers([peerInfo])
return
}
}
}
}

module.exports = Topology
12 changes: 6 additions & 6 deletions src/get-peer-info.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ const errCode = require('err-code')

/**
* Converts the given `peer` to a `PeerInfo` instance.
* The `PeerBook` will be checked for the resulting peer, and
* the peer will be updated in the `PeerBook`.
* The `PeerStore` will be checked for the resulting peer, and
* the peer will be updated in the `PeerStore`.
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer
* @param {PeerBook} peerBook
* @param {PeerStore} peerStore
* @returns {PeerInfo}
*/
function getPeerInfo (peer, peerBook) {
function getPeerInfo (peer, peerStore) {
if (typeof peer === 'string') {
peer = multiaddr(peer)
}
Expand All @@ -38,7 +38,7 @@ function getPeerInfo (peer, peerBook) {

addr && peer.multiaddrs.add(addr)

return peerBook ? peerBook.put(peer) : peer
return peerStore ? peerStore.put(peer) : peer
}

/**
Expand All @@ -54,7 +54,7 @@ function getPeerInfoRemote (peer, libp2p) {
let peerInfo

try {
peerInfo = getPeerInfo(peer, libp2p.peerBook)
peerInfo = getPeerInfo(peer, libp2p.peerStore)
} catch (err) {
return Promise.reject(errCode(
new Error(`${peer} is not a valid peer type`),
Expand Down
8 changes: 8 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const Dialer = require('./dialer')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const PeerStore = require('./peer-store')
const Registrar = require('./registrar')

const notStarted = (action, state) => {
return errCode(
Expand Down Expand Up @@ -71,10 +72,13 @@ class Libp2p extends EventEmitter {
const peerInfo = getPeerInfo(connection.remotePeer)

this.peerStore.put(peerInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The put here doesn't do anything. connection.remotePeer is a PeerId, so getPeerInfo is just going to check the peerStore anyway. I would delete the getPeerInfo usage and just do:

const peerInfo = this.peerStore.get(connection.remotePeer.toB58String())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a peerB dials peerA the peerStore for peerA does not have the peerB on it. That way, we would have peerInfo = undefined
getPeerInfo puts in the peerStore if it is provided. I changed for the following:

const peerInfo = getPeerInfo(connection.remotePeer, this.peerStore)

this.registrar.onConnect(peerInfo, connection)
this.emit('peer:connect', peerInfo)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, if I do that getPeerInfo tries to put invalid PeerInfo to the store in some scenarios from other tests.

this.registrar.onConnect(peerInfo, connection)
this.emit('peer:connect', peerInfo)
},
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
vasco-santos marked this conversation as resolved.
Show resolved Hide resolved

this.registrar.onDisconnect(peerInfo, connection)
this.emit('peer:disconnect', peerInfo)
}
})
Expand Down Expand Up @@ -108,6 +112,10 @@ class Libp2p extends EventEmitter {
transportManager: this.transportManager
})

this.registrar = new Registrar({ peerStore: this.peerStore })
this.handle = this.handle.bind(this)
this.registrar.handle = this.handle

// Attach private network protector
if (this._modules.connProtector) {
this.upgrader.protector = this._modules.connProtector
Expand Down
139 changes: 139 additions & 0 deletions src/registrar.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
'use strict'

const assert = require('assert')
const debug = require('debug')
const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error')

const { Connection } = require('libp2p-interfaces/src/connection')
const PeerInfo = require('peer-info')
const Toplogy = require('./connection-manager/topology')

/**
* Responsible for notifying registered protocols of events in the network.
*/
class Registrar {
/**
* @param {Object} props
* @param {PeerStore} props.peerStore
* @constructor
*/
constructor ({ peerStore }) {
this.peerStore = peerStore

/**
* Map of connections per peer
* TODO: this should be handled by connectionManager
* @type {Map<string, Array<conn>>}
*/
this.connections = new Map()

/**
* Map of topologies
*
* @type {Map<string, object>}
*/
this.topologies = new Map()

this._handle = undefined
}

get handle () {
return this._handle
}

set handle (handle) {
this._handle = handle
}

/**
* Add a new connected peer to the record
* TODO: this should live in the ConnectionManager
* @param {PeerInfo} peerInfo
* @param {Connection} conn
* @returns {void}
*/
onConnect (peerInfo, conn) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about changing the connection tracking to just use the connection id, since we have those now? onConnect could just take the connection, since we can determine the PeerInfo using its remotePeer and the PeerStore. This would also simplify having to track multiple connections per peer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we move in that direction, when we want to get a connection from a peer, we will need to iterate the collection and get the remotePeer from the connection and then check if it matches. This simplifies the multiple connections tracking but makes the getConnection more complex. I am not sure if we should change it or not, but we can do it if you feel this is a more viable approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good point, I think we can leave this for now. We could potentially create an id dictionary for quicker lookups both ways, but that might be overkill at the moment.

assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
assert(Connection.isConnection(conn), 'conn must be an instance of interface-connection')

const id = peerInfo.id.toB58String()
const storedConn = this.connections.get(id)

if (storedConn) {
storedConn.push(conn)
} else {
this.connections.set(id, [conn])
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topologies are never informed of new connections here as they are with onDisconnect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a new connection happens, I was expecting that the peer-store will trigger the protocol-change after identify and we do not need to inform the topologies with the connection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave this for now. There may be Topologies in the future that need to known when connections are created but that don't care about the multicodecs, but I don't think we have any of those planned right now. A static peer set Topology, such as a Bootstrap or Priority Node topology won't care about the multicodecs, but they also won't care about connections, just expressing their desire to be connected to certain peers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense yes. We iterate on this with the new versions of connMgr + peer-store

}

/**
* Remove a disconnected peer from the record
* TODO: this should live in the ConnectionManager
* @param {PeerInfo} peerInfo
* @param {Connection} connection
* @param {Error} [error]
* @returns {void}
*/
onDisconnect (peerInfo, connection, error) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

const id = peerInfo.id.toB58String()
let storedConn = this.connections.get(id)

if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id === connection.id)
} else if (storedConn) {
for (const [, topology] of this.topologies) {
topology.disconnect(peerInfo, error)
}

this.connections.delete(peerInfo.id.toB58String())
}
}

/**
* Get a connection with a peer.
* @param {PeerInfo} peerInfo
* @returns {Connection}
*/
getConnection (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

// TODO: what should we return
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
return this.connections.get(peerInfo.id.toB58String())[0]
}

/**
* Register handlers for a set of multicodecs given
* @param {Object} topologyProps properties for topology
* @param {Array<string>|string} topologyProps.multicodecs
* @param {Object} topologyProps.handlers
* @param {function} topologyProps.handlers.onConnect
* @param {function} topologyProps.handlers.onDisconnect
* @return {string} registrar identifier
*/
register (topologyProps) {
// Create multicodec topology
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
const topology = new Toplogy(topologyProps)

this.topologies.set(id, topology)

// Set registrar
topology.registrar = this

return id
}

/**
* Unregister topology.
* @param {string} id registrar identifier
* @return {boolean} unregistered successfully
*/
unregister (id) {
return this.topologies.delete(id)
}
}

module.exports = Registrar
57 changes: 57 additions & 0 deletions test/registrar/registrar.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict'
/* eslint-env mocha */

const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai
const sinon = require('sinon')

const mergeOptions = require('merge-options')

const multiaddr = require('multiaddr')
const Libp2p = require('../../src')

const baseOptions = require('../utils/base-options')
const peerUtils = require('../utils/creators/peer')
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')

describe('registrar on dial', () => {
let peerInfo
let remotePeerInfo
let libp2p
let remoteLibp2p
let remoteAddr

before(async () => {
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2)
remoteLibp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo: remotePeerInfo
}))

await remoteLibp2p.transportManager.listen([listenAddr])
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0]
})

after(async () => {
sinon.restore()
await remoteLibp2p.stop()
libp2p && await libp2p.stop()
})

it('should inform registrar of a new connection', async () => {
libp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo
}))

sinon.spy(remoteLibp2p.registrar, 'onConnect')

await libp2p.dial(remoteAddr)
expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1)

const libp2pConn = libp2p.registrar.getConnection(remotePeerInfo)
expect(libp2pConn).to.exist()

const remoteConn = remoteLibp2p.registrar.getConnection(peerInfo)
expect(remoteConn).to.exist()
})
})
Loading