-
Notifications
You must be signed in to change notification settings - Fork 446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: registrar #471
feat: registrar #471
Changes from all commits
8385fd3
1bcfdf2
15efccb
a5dc7d1
6f72c08
2345976
750601a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
'use strict' | ||
|
||
const assert = require('assert') | ||
|
||
class Topology { | ||
/** | ||
* @param {Object} props | ||
* @param {number} props.min minimum needed connections (default: 0) | ||
* @param {number} props.max maximum needed connections (default: Infinity) | ||
* @param {Array<string>} props.multicodecs protocol multicodecs | ||
* @param {Object} props.handlers | ||
* @param {function} props.handlers.onConnect protocol "onConnect" handler | ||
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler | ||
* @constructor | ||
*/ | ||
constructor ({ | ||
min = 0, | ||
max = Infinity, | ||
multicodecs, | ||
handlers | ||
}) { | ||
assert(multicodecs, 'one or more multicodec should be provided') | ||
assert(handlers, 'the handlers should be provided') | ||
assert(handlers.onConnect && typeof handlers.onConnect === 'function', | ||
'the \'onConnect\' handler must be provided') | ||
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function', | ||
'the \'onDisconnect\' handler must be provided') | ||
|
||
this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs] | ||
this.min = min | ||
this.max = max | ||
|
||
// Handlers | ||
this._onConnect = handlers.onConnect | ||
this._onDisconnect = handlers.onDisconnect | ||
|
||
this.peers = new Map() | ||
this._registrar = undefined | ||
|
||
this._onProtocolChange = this._onProtocolChange.bind(this) | ||
} | ||
|
||
set registrar (registrar) { | ||
this._registrar = registrar | ||
this._registrar.peerStore.on('change:protocols', this._onProtocolChange) | ||
|
||
// Update topology peers | ||
this._updatePeers(this._registrar.peerStore.peers.values()) | ||
} | ||
|
||
/** | ||
* Update topology. | ||
* @param {Array<PeerInfo>} peerInfoIterable | ||
* @returns {void} | ||
*/ | ||
_updatePeers (peerInfoIterable) { | ||
for (const peerInfo of peerInfoIterable) { | ||
if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) { | ||
// Add the peer regardless of whether or not there is currently a connection | ||
this.peers.set(peerInfo.id.toB58String(), peerInfo) | ||
// If there is a connection, call _onConnect | ||
const connection = this._registrar.getConnection(peerInfo) | ||
connection && this._onConnect(peerInfo, connection) | ||
} else { | ||
// Remove any peers we might be tracking that are no longer of value to us | ||
this.peers.delete(peerInfo.id.toB58String()) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Notify protocol of peer disconnected. | ||
* @param {PeerInfo} peerInfo | ||
* @param {Error} [error] | ||
* @returns {void} | ||
*/ | ||
disconnect (peerInfo, error) { | ||
this._onDisconnect(peerInfo, error) | ||
} | ||
|
||
/** | ||
* Check if a new peer support the multicodecs for this topology. | ||
* @param {Object} props | ||
* @param {PeerInfo} props.peerInfo | ||
* @param {Array<string>} props.protocols | ||
*/ | ||
_onProtocolChange ({ peerInfo, protocols }) { | ||
const existingPeer = this.peers.get(peerInfo.id.toB58String()) | ||
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol)) | ||
|
||
// Not supporting the protocol anymore? | ||
if (existingPeer && hasProtocol.length === 0) { | ||
this._onDisconnect({ | ||
peerInfo | ||
}) | ||
} | ||
|
||
// New to protocol support | ||
for (const protocol of protocols) { | ||
if (this.multicodecs.includes(protocol)) { | ||
this._updatePeers([peerInfo]) | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
module.exports = Topology |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
'use strict' | ||
|
||
const assert = require('assert') | ||
const debug = require('debug') | ||
const log = debug('libp2p:peer-store') | ||
log.error = debug('libp2p:peer-store:error') | ||
|
||
const { Connection } = require('libp2p-interfaces/src/connection') | ||
const PeerInfo = require('peer-info') | ||
const Toplogy = require('./connection-manager/topology') | ||
|
||
/** | ||
* Responsible for notifying registered protocols of events in the network. | ||
*/ | ||
class Registrar { | ||
/** | ||
* @param {Object} props | ||
* @param {PeerStore} props.peerStore | ||
* @constructor | ||
*/ | ||
constructor ({ peerStore }) { | ||
this.peerStore = peerStore | ||
|
||
/** | ||
* Map of connections per peer | ||
* TODO: this should be handled by connectionManager | ||
* @type {Map<string, Array<conn>>} | ||
*/ | ||
this.connections = new Map() | ||
|
||
/** | ||
* Map of topologies | ||
* | ||
* @type {Map<string, object>} | ||
*/ | ||
this.topologies = new Map() | ||
|
||
this._handle = undefined | ||
} | ||
|
||
get handle () { | ||
return this._handle | ||
} | ||
|
||
set handle (handle) { | ||
this._handle = handle | ||
} | ||
|
||
/** | ||
* Add a new connected peer to the record | ||
* TODO: this should live in the ConnectionManager | ||
* @param {PeerInfo} peerInfo | ||
* @param {Connection} conn | ||
* @returns {void} | ||
*/ | ||
onConnect (peerInfo, conn) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about changing the connection tracking to just use the connection id, since we have those now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we move in that direction, when we want to get a connection from a peer, we will need to iterate the collection and get the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's a good point, I think we can leave this for now. We could potentially create an id dictionary for quicker lookups both ways, but that might be overkill at the moment. |
||
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') | ||
assert(Connection.isConnection(conn), 'conn must be an instance of interface-connection') | ||
|
||
const id = peerInfo.id.toB58String() | ||
const storedConn = this.connections.get(id) | ||
|
||
if (storedConn) { | ||
storedConn.push(conn) | ||
} else { | ||
this.connections.set(id, [conn]) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The topologies are never informed of new connections here as they are with onDisconnect. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When a new connection happens, I was expecting that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can leave this for now. There may be Topologies in the future that need to known when connections are created but that don't care about the multicodecs, but I don't think we have any of those planned right now. A static peer set Topology, such as a Bootstrap or Priority Node topology won't care about the multicodecs, but they also won't care about connections, just expressing their desire to be connected to certain peers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes sense yes. We iterate on this with the new versions of |
||
} | ||
|
||
/** | ||
* Remove a disconnected peer from the record | ||
* TODO: this should live in the ConnectionManager | ||
* @param {PeerInfo} peerInfo | ||
* @param {Connection} connection | ||
* @param {Error} [error] | ||
* @returns {void} | ||
*/ | ||
onDisconnect (peerInfo, connection, error) { | ||
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') | ||
|
||
const id = peerInfo.id.toB58String() | ||
let storedConn = this.connections.get(id) | ||
|
||
if (storedConn && storedConn.length > 1) { | ||
storedConn = storedConn.filter((conn) => conn.id === connection.id) | ||
} else if (storedConn) { | ||
for (const [, topology] of this.topologies) { | ||
topology.disconnect(peerInfo, error) | ||
} | ||
|
||
this.connections.delete(peerInfo.id.toB58String()) | ||
} | ||
} | ||
|
||
/** | ||
* Get a connection with a peer. | ||
* @param {PeerInfo} peerInfo | ||
* @returns {Connection} | ||
*/ | ||
getConnection (peerInfo) { | ||
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') | ||
|
||
// TODO: what should we return | ||
jacobheun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return this.connections.get(peerInfo.id.toB58String())[0] | ||
} | ||
|
||
/** | ||
* Register handlers for a set of multicodecs given | ||
* @param {Object} topologyProps properties for topology | ||
* @param {Array<string>|string} topologyProps.multicodecs | ||
* @param {Object} topologyProps.handlers | ||
* @param {function} topologyProps.handlers.onConnect | ||
* @param {function} topologyProps.handlers.onDisconnect | ||
* @return {string} registrar identifier | ||
*/ | ||
register (topologyProps) { | ||
// Create multicodec topology | ||
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() | ||
const topology = new Toplogy(topologyProps) | ||
|
||
this.topologies.set(id, topology) | ||
|
||
// Set registrar | ||
topology.registrar = this | ||
|
||
return id | ||
} | ||
|
||
/** | ||
* Unregister topology. | ||
* @param {string} id registrar identifier | ||
* @return {boolean} unregistered successfully | ||
*/ | ||
unregister (id) { | ||
return this.topologies.delete(id) | ||
} | ||
} | ||
|
||
module.exports = Registrar |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
'use strict' | ||
/* eslint-env mocha */ | ||
|
||
const chai = require('chai') | ||
chai.use(require('dirty-chai')) | ||
const { expect } = chai | ||
const sinon = require('sinon') | ||
|
||
const mergeOptions = require('merge-options') | ||
|
||
const multiaddr = require('multiaddr') | ||
const Libp2p = require('../../src') | ||
|
||
const baseOptions = require('../utils/base-options') | ||
const peerUtils = require('../utils/creators/peer') | ||
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') | ||
|
||
describe('registrar on dial', () => { | ||
let peerInfo | ||
let remotePeerInfo | ||
let libp2p | ||
let remoteLibp2p | ||
let remoteAddr | ||
|
||
before(async () => { | ||
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) | ||
remoteLibp2p = new Libp2p(mergeOptions(baseOptions, { | ||
peerInfo: remotePeerInfo | ||
})) | ||
|
||
await remoteLibp2p.transportManager.listen([listenAddr]) | ||
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0] | ||
}) | ||
|
||
after(async () => { | ||
sinon.restore() | ||
await remoteLibp2p.stop() | ||
libp2p && await libp2p.stop() | ||
}) | ||
|
||
it('should inform registrar of a new connection', async () => { | ||
libp2p = new Libp2p(mergeOptions(baseOptions, { | ||
peerInfo | ||
})) | ||
|
||
sinon.spy(remoteLibp2p.registrar, 'onConnect') | ||
|
||
await libp2p.dial(remoteAddr) | ||
expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1) | ||
|
||
const libp2pConn = libp2p.registrar.getConnection(remotePeerInfo) | ||
expect(libp2pConn).to.exist() | ||
|
||
const remoteConn = remoteLibp2p.registrar.getConnection(peerInfo) | ||
expect(remoteConn).to.exist() | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The put here doesn't do anything.
connection.remotePeer
is a PeerId, sogetPeerInfo
is just going to check the peerStore anyway. I would delete thegetPeerInfo
usage and just do:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a
peerB
dialspeerA
thepeerStore
forpeerA
does not have thepeerB
on it. That way, we would havepeerInfo
=undefined
getPeerInfo
puts in thepeerStore
if it is provided. I changed for the following:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait, if I do that
getPeerInfo
tries toput
invalidPeerInfo
to the store in some scenarios from other tests.