-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: discover and connect to closest peers #798
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,40 +1,126 @@ | ||
'use strict' | ||
|
||
const errCode = require('err-code') | ||
const debug = require('debug') | ||
const log = debug('libp2p:peer-routing') | ||
log.error = debug('libp2p:peer-routing:error') | ||
|
||
const all = require('it-all') | ||
const pAny = require('p-any') | ||
const { | ||
setDelayedInterval, | ||
clearDelayedInterval | ||
} = require('set-delayed-interval') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I created this module for us to use here, but also in:
It is always tricky to implement a "setInterval" when the recurrent task is a promise. Other than that, if we need a delay beforehand, things get even more complex as we need to guarantee that the timer can be stopped for any of the pauses. When looking at the It would be great to have your 👀 on the module: vasco-santos/set-delayed-interval#1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'd really like to get rid of this whole boot delay logic. In reality we should probably have some async boot function and when that finishes these other systems initiate, but I think we can revisit this when we update the config logic. I'll review the module There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be great to orchestrate subsystems like that. I will add this comment to the configuration issue for us to remember |
||
|
||
/** | ||
* Responsible for managing the usage of the available Peer Routing modules. | ||
*/ | ||
class PeerRouting { | ||
/** | ||
* @class | ||
* @param {Libp2p} libp2p | ||
*/ | ||
constructor (libp2p) { | ||
this._peerId = libp2p.peerId | ||
this._peerStore = libp2p.peerStore | ||
this._routers = libp2p._modules.peerRouting || [] | ||
|
||
// If we have the dht, make it first | ||
if (libp2p._dht) { | ||
this._routers.unshift(libp2p._dht) | ||
} | ||
|
||
this._refreshManagerOptions = libp2p._options.peerRouting.refreshManager | ||
|
||
this._findClosestPeersTask = this._findClosestPeersTask.bind(this) | ||
} | ||
|
||
/** | ||
* Start peer routing service. | ||
*/ | ||
start () { | ||
if (!this._routers.length || this._timeoutId || !this._refreshManagerOptions.enabled) { | ||
return | ||
} | ||
|
||
this._timeoutId = setDelayedInterval( | ||
this._findClosestPeersTask, this._refreshManagerOptions.interval, this._refreshManagerOptions.bootDelay | ||
) | ||
} | ||
|
||
module.exports = (node) => { | ||
const routers = node._modules.peerRouting || [] | ||
/** | ||
* Recurrent task to find closest peers and add their addresses to the Address Book. | ||
*/ | ||
async _findClosestPeersTask () { | ||
try { | ||
for await (const { id, multiaddrs } of this.getClosestPeers(this._peerId.id)) { | ||
this._peerStore.addressBook.add(id, multiaddrs) | ||
} | ||
} catch (err) { | ||
log.error(err) | ||
} | ||
} | ||
|
||
// If we have the dht, make it first | ||
if (node._dht) { | ||
routers.unshift(node._dht) | ||
/** | ||
* Stop peer routing service. | ||
*/ | ||
stop () { | ||
clearDelayedInterval(this._timeoutId) | ||
} | ||
|
||
return { | ||
/** | ||
* Iterates over all peer routers in series to find the given peer. | ||
* | ||
* @param {string} id - The id of the peer to find | ||
* @param {object} [options] | ||
* @param {number} [options.timeout] - How long the query should run | ||
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
*/ | ||
findPeer: async (id, options) => { // eslint-disable-line require-await | ||
if (!routers.length) { | ||
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE') | ||
/** | ||
* Iterates over all peer routers in series to find the given peer. | ||
* | ||
* @param {string} id - The id of the peer to find | ||
* @param {object} [options] | ||
* @param {number} [options.timeout] - How long the query should run | ||
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
*/ | ||
async findPeer (id, options) { // eslint-disable-line require-await | ||
if (!this._routers.length) { | ||
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE') | ||
} | ||
|
||
return pAny(this._routers.map(async (router) => { | ||
const result = await router.findPeer(id, options) | ||
|
||
// If we don't have a result, we need to provide an error to keep trying | ||
if (!result || Object.keys(result).length === 0) { | ||
throw errCode(new Error('not found'), 'NOT_FOUND') | ||
} | ||
|
||
return pAny(routers.map(async (router) => { | ||
const result = await router.findPeer(id, options) | ||
return result | ||
})) | ||
} | ||
|
||
/** | ||
* Attempt to find the closest peers on the network to the given key. | ||
* | ||
* @param {Uint8Array} key - A CID like key | ||
* @param {Object} [options] | ||
* @param {number} [options.timeout=30e3] - How long the query can take. | ||
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>} | ||
*/ | ||
async * getClosestPeers (key, options = { timeout: 30e3 }) { | ||
if (!this._routers.length) { | ||
throw errCode(new Error('No peer routers available'), 'NO_ROUTERS_AVAILABLE') | ||
} | ||
|
||
// If we don't have a result, we need to provide an error to keep trying | ||
if (!result || Object.keys(result).length === 0) { | ||
const result = await pAny( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not going to block this PR on this (as we're doing it in other content/peer routing calls), but we should think about the parallel actions this is doing. Most people shouldn't be using two, but in the change they are, the parallel calls could cause some odd behavior. This is probably worth discussing in more depth when we look at the discovery api / configuration updates. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense, I will cross post it here for us to discuss |
||
this._routers.map(async (router) => { | ||
const peers = await all(router.getClosestPeers(key, options)) | ||
|
||
if (!peers || !peers.length) { | ||
throw errCode(new Error('not found'), 'NOT_FOUND') | ||
} | ||
return peers | ||
}) | ||
) | ||
|
||
return result | ||
})) | ||
for (const peer of result) { | ||
yield peer | ||
} | ||
} | ||
} | ||
|
||
module.exports = PeerRouting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit confusing as it's not quite peer routing. This is really the refresh manager. In go-libp2p since they just have the DHT right now they run the table refreshes when they search for their closest peers. This happens when DHT bootstrap is run at startup and then on an interval. Ideally we would not use a boot delay timer as this is finicky, and instead run once we've finished a bootstrap phase. Since we don't really have this concept atm, I think a delay is fine for now.
Go runs this every 10 minutes by default right now, we should just match that for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to change this into 10 minutes yes. Regarding the namings, thinking on changing this to:
The peerRouting is the scope where we make these queries, so it seems that this configs should live bound to it. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as suggested for now