Skip to content

Commit

Permalink
chore: support multiple conns
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Nov 5, 2019
1 parent 0ec963e commit 1507fb5
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 113 deletions.
65 changes: 49 additions & 16 deletions src/connection-manager/topology.js
Original file line number Diff line number Diff line change
@@ -1,41 +1,74 @@
'use strict'

const assert = require('assert')

class Topology {
/**
* @param {Object} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {function} props.onConnect protocol "onConnect" handler
* @param {function} props.onDisconnect protocol "onDisconnect" handler
* @param {Array<string>} props.multicodecs protocol multicodecs
* @param {Registrar} registrar
* @param {Object} props.handlers
* @param {function} props.handlers.onConnect protocol "onConnect" handler
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
* @constructor
*/
constructor ({
min = 0,
max = Infinity,
onConnect,
onDisconnect,
multicodecs,
registrar,
peerStore
handlers
}) {
this.multicodecs = multicodecs
this.registrar = registrar
assert(multicodecs, 'one or more multicodec should be provided')
assert(handlers, 'the handlers should be provided')
assert(handlers.onConnect && typeof handlers.onConnect === 'function',
'the \'onConnect\' handler must be provided')
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function',
'the \'onDisconnect\' handler must be provided')

this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs]
this.min = min
this.max = max
this.peers = new Map()

// Handlers
this._onConnect = onConnect
this._onDisconnect = onDisconnect
this._onConnect = handlers.onConnect
this._onDisconnect = handlers.onDisconnect

this.peers = new Map()
this._registrar = undefined

this._onProtocolChange = this._onProtocolChange.bind(this)
}

set registrar (registrar) {
this._registrar = registrar
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)

// Set by the registrar
this._peerStore = peerStore
// Add connected peers to the topology
this._addConnectedPeers()
// TODO: remaining peers in the store
}

this._peerStore.on('change:protocols', this._onProtocolChange)
/**
* Add connected peers to the topology.
*/
_addConnectedPeers () {
const knownPeers = []

for (const [, peer] of this._registrar.peerStore.peers) {
if (this.multicodecs.filter(multicodec => peer.protocols.has(multicodec))) {
knownPeers.push(peer)
}
}

for (const [id, conn] of this._registrar.connections.entries()) {
const targetPeer = knownPeers.find((peerInfo) => peerInfo.id.toB58String() === id)

if (targetPeer) {
// TODO: what should we return
this.tryToConnect(targetPeer, conn[0])
}
}
}

/**
Expand Down Expand Up @@ -85,7 +118,7 @@ class Topology {
// New to protocol support
for (const protocol of protocols) {
if (this.multicodecs.includes(protocol)) {
this.tryToConnect(peerInfo, this.registrar.getConnection(peerInfo))
this.tryToConnect(peerInfo, this._registrar.getConnection(peerInfo))
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Libp2p extends EventEmitter {
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)

this.registrar.onDisconnect(peerInfo)
this.registrar.onDisconnect(peerInfo, connection)
this.emit('peer:disconnect', peerInfo)
}
})
Expand Down
93 changes: 34 additions & 59 deletions src/registrar.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const assert = require('assert')
const debug = require('debug')
const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error')
const errCode = require('err-code')

const { Connection } = require('libp2p-interfaces/src/connection')
const PeerInfo = require('peer-info')
Expand All @@ -25,7 +24,7 @@ class Registrar {
/**
* Map of connections per peer
* TODO: this should be handled by connectionManager
* @type {Map<string, conn>}
* @type {Map<string, Array<conn>>}
*/
this.connections = new Map()

Expand Down Expand Up @@ -58,24 +57,39 @@ class Registrar {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
assert(Connection.isConnection(conn), 'conn must be an instance of interface-connection')

this.connections.set(peerInfo.id.toB58String(), conn)
const id = peerInfo.id.toB58String()
const storedConn = this.connections.get(id)

if (storedConn) {
storedConn.push(conn)
} else {
this.connections.set(id, [conn])
}
}

/**
* Remove a disconnected peer from the record
* TODO: this should live in the ConnectionManager
* @param {PeerInfo} peerInfo
* @param {Connection} connection
* @param {Error} [error]
* @returns {void}
*/
onDisconnect (peerInfo, error) {
onDisconnect (peerInfo, connection, error) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

for (const [, topology] of this.topologies) {
topology.disconnect(peerInfo, error)
}
const id = peerInfo.id.toB58String()
let storedConn = this.connections.get(id)

this.connections.delete(peerInfo.id.toB58String())
if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id === connection.id)
} else if (storedConn) {
for (const [, topology] of this.topologies) {
topology.disconnect(peerInfo, error)
}

this.connections.delete(peerInfo.id.toB58String())
}
}

/**
Expand All @@ -86,71 +100,32 @@ class Registrar {
getConnection (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')

return this.connections.get(peerInfo.id.toB58String())
// TODO: what should we return
return this.connections.get(peerInfo.id.toB58String())[0]
}

/**
* Register handlers for a set of multicodecs given
* @param {Array<string>|string} multicodecs
* @param {object} handlers
* @param {function} handlers.onConnect
* @param {function} handlers.onDisconnect
* @param {object} topologyProps properties for topology
* @param {Object} topologyProps properties for topology
* @param {Array<string>|string} topologyProps.multicodecs
* @param {Object} topologyProps.handlers
* @param {function} topologyProps.handlers.onConnect
* @param {function} topologyProps.handlers.onDisconnect
* @return {string} registrar identifier
*/
register (multicodecs, handlers, topologyProps = {}) {
if (!multicodecs) {
throw errCode(new Error('one or more multicodec should be provided'), 'ERR_NO_MULTICODECS')
} else if (!Array.isArray(multicodecs)) {
multicodecs = [multicodecs]
}

if (!handlers) {
throw errCode(new Error('the handlers should be provided'), 'ERR_NO_HANDLERS')
} else if (!handlers.onConnect || typeof handlers.onConnect !== 'function') {
throw errCode(new Error('the \'onConnect\' handler must be provided'), 'ERR_NO_ONCONNECT_HANDLER')
} else if (!handlers.onDisconnect || typeof handlers.onDisconnect !== 'function') {
throw errCode(new Error('the \'onDisconnect\' handler must be provided'), 'ERR_NO_ONDISCONNECT_HANDLER')
}

register (topologyProps) {
// Create multicodec topology
const topology = new Toplogy({
onConnect: handlers.onConnect,
onDisconnect: handlers.onDisconnect,
registrar: this,
multicodecs,
peerStore: this.peerStore,
...topologyProps
})

const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
this.topologies.set(id, topology)
const topology = new Toplogy(topologyProps)

this._addConnectedPeers(multicodecs, topology)
this.topologies.set(id, topology)

// TODO: try to connect to peers-store peers according to current topology
// Set registrar
topology.registrar = this

return id
}

_addConnectedPeers (multicodecs, topology) {
const knownPeers = []

for (const [, peer] of this.peerStore.peers) {
if (multicodecs.filter(multicodec => peer.protocols.has(multicodec))) {
knownPeers.push(peer)
}
}

for (const [id, conn] of this.connections.entries()) {
const targetPeer = knownPeers.find((peerInfo) => peerInfo.id.toB58String() === id)

if (targetPeer) {
topology.tryToConnect(targetPeer, conn)
}
}
}

/**
* Unregister topology.
* @param {string} id registrar identifier
Expand Down
Loading

0 comments on commit 1507fb5

Please sign in to comment.