Skip to content

Commit

Permalink
feat: coalescing dial support (#518)
Browse files Browse the repository at this point in the history
* docs: fix spelling in api

* fix: dont create peerstore twice

* feat: add support for dial coalescing

* doc(fix): add setPeerValue to API TOC

* docs: add more jsdocs to dialer

* chore: remove old comment

* fix: ensure connections are closed

* fix: registrar.getConnections returns first open conn

* fix: directly set the closed status

* chore: remove unneeded log

* refactor: peerStore.put takes an options object
  • Loading branch information
jacobheun authored Dec 15, 2019
1 parent a39889c commit 4a871bb
Show file tree
Hide file tree
Showing 14 changed files with 325 additions and 172 deletions.
11 changes: 4 additions & 7 deletions doc/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [`pubsub.publish`](#pubsubpublish)
* [`pubsub.subscribe`](#pubsubsubscribe)
* [`pubsub.unsubscribe`](#pubsubunsubscribe)
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
* [`metrics.global`](#metricsglobal)
* [`metrics.peers`](#metricspeers)
* [`metrics.protocols`](#metricsprotocols)
Expand Down Expand Up @@ -92,7 +93,7 @@ Required keys in the `options` object:

</details>

Once you have a libp2p instance, you are able to listen to several events it emmits, so that you can be noticed of relevant network events.
Once you have a libp2p instance, you are able to listen to several events it emits, so that you can be noticed of relevant network events.

<details><summary>Events</summary>

Expand Down Expand Up @@ -666,12 +667,8 @@ Enables users to change the value of certain peers in a range of 0 to 1. Peers w
#### Example

```js
const topic = 'topic'
const handler = (msg) => {
// msg.data - pubsub data received
}

libp2p.pubsub.unsubscribe(topic, handler)
libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1)
libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0)
```

### metrics.global
Expand Down
1 change: 1 addition & 0 deletions doc/DIALER.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# js-libp2p Dialer

**Synopsis**
* Parallel dials to the same peer will yield the same connection/error when the first dial settles.
* All Dial Requests in js-libp2p must request a token(s) from the Dialer.
* The number of tokens requested should be between 1 and the MAX_PER_PEER_DIALS max set in the Dialer.
* If the number of available tokens is less than requested, the Dialer may return less than requested.
Expand Down
2 changes: 1 addition & 1 deletion src/circuit/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class Circuit {
let disconnectOnFailure = false
let relayConnection = this._registrar.getConnection(new PeerInfo(relayPeer))
if (!relayConnection) {
relayConnection = await this._dialer.connectToMultiaddr(relayAddr, options)
relayConnection = await this._dialer.connectToPeer(relayAddr, options)
disconnectOnFailure = true
}

Expand Down
2 changes: 1 addition & 1 deletion src/circuit/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module.exports = (circuit) => {
listener.listen = async (addr) => {
const [addrString] = String(addr).split('/p2p-circuit').slice(-1)

const relayConn = await circuit._dialer.connectToMultiaddr(multiaddr(addrString))
const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString))
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')

listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
Expand Down
158 changes: 115 additions & 43 deletions src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const multiaddr = require('multiaddr')
const errCode = require('err-code')
const TimeoutController = require('timeout-abort-controller')
const anySignal = require('any-signal')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const debug = require('debug')
const log = debug('libp2p:dialer')
log.error = debug('libp2p:dialer:error')
Expand Down Expand Up @@ -38,7 +40,7 @@ class Dialer {
this.timeout = timeout
this.perPeerLimit = perPeerLimit
this.tokens = [...new Array(concurrency)].map((_, index) => index)
this._pendingDials = new Set()
this._pendingDials = new Map()
}

/**
Expand All @@ -56,72 +58,111 @@ class Dialer {
}

/**
* Connects to the first success of a given list of `Multiaddr`. `addrs` should
* include the id of the peer being dialed, it will be used for encryption verification.
* Connects to a given `PeerId` or `Multiaddr` by dialing all of its known addresses.
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
*
* @param {Array<Multiaddr>|Multiaddr} addrs
* @param {PeerInfo|Multiaddr} peer The peer to dial
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToMultiaddr (addrs, options = {}) {
if (!Array.isArray(addrs)) addrs = [multiaddr(addrs)]

const dialAction = (addr, options) => {
if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED')
return this.transportManager.dial(addr, options)
async connectToPeer (peer, options = {}) {
const dialTarget = this._createDialTarget(peer)
if (dialTarget.addrs.length === 0) {
throw errCode(new Error('The dial request has no addresses'), 'ERR_NO_DIAL_MULTIADDRS')
}
const dialRequest = new DialRequest({
addrs,
dialAction,
dialer: this
})

// Combine the timeout signal and options.signal, if provided
const timeoutController = new TimeoutController(this.timeout)
const signals = [timeoutController.signal]
options.signal && signals.push(options.signal)
const signal = anySignal(signals)

const dial = {
dialRequest,
controller: timeoutController
}
this._pendingDials.add(dial)
const pendingDial = this._pendingDials.get(dialTarget.id) || this._createPendingDial(dialTarget, options)

try {
const dialResult = await dialRequest.run({ ...options, signal })
log('dial succeeded to %s', dialResult.remoteAddr)
return dialResult
const connection = await pendingDial.promise
log('dial succeeded to %s', dialTarget.id)
return connection
} catch (err) {
// Error is a timeout
if (timeoutController.signal.aborted) {
if (pendingDial.controller.signal.aborted) {
err.code = codes.ERR_TIMEOUT
}
log.error(err)
throw err
} finally {
timeoutController.clear()
this._pendingDials.delete(dial)
pendingDial.destroy()
}
}

/**
* Connects to a given `PeerInfo` or `PeerId` by dialing all of its known addresses.
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
*
* @param {PeerId} peerId The remote peer id to dial
* @typedef DialTarget
* @property {string} id
* @property {Multiaddr[]} addrs
*/

/**
* Creates a DialTarget. The DialTarget is used to create and track
* the DialRequest to a given peer.
* @private
* @param {PeerInfo|Multiaddr} peer A PeerId or Multiaddr
* @returns {DialTarget}
*/
_createDialTarget (peer) {
const dialable = Dialer.getDialable(peer)
if (multiaddr.isMultiaddr(dialable)) {
return {
id: dialable.toString(),
addrs: [dialable]
}
}
const addrs = this.peerStore.multiaddrsForPeer(dialable)
return {
id: dialable.id.toString(),
addrs
}
}

/**
* @typedef PendingDial
* @property {DialRequest} dialRequest
* @property {TimeoutController} controller
* @property {Promise} promise
* @property {function():void} destroy
*/

/**
* Creates a PendingDial that wraps the underlying DialRequest
* @private
* @param {DialTarget} dialTarget
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
* @returns {PendingDial}
*/
connectToPeer (peerId, options = {}) {
const addrs = this.peerStore.multiaddrsForPeer(peerId)
_createPendingDial (dialTarget, options) {
const dialAction = (addr, options) => {
if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED')
return this.transportManager.dial(addr, options)
}

// TODO: ensure the peer id is on the multiaddr
const dialRequest = new DialRequest({
addrs: dialTarget.addrs,
dialAction,
dialer: this
})

// Combine the timeout signal and options.signal, if provided
const timeoutController = new TimeoutController(this.timeout)
const signals = [timeoutController.signal]
options.signal && signals.push(options.signal)
const signal = anySignal(signals)

return this.connectToMultiaddr(addrs, options)
const pendingDial = {
dialRequest,
controller: timeoutController,
promise: dialRequest.run({ ...options, signal }),
destroy: () => {
timeoutController.clear()
this._pendingDials.delete(dialTarget.id)
}
}
this._pendingDials.set(dialTarget.id, pendingDial)
return pendingDial
}

getTokens (num) {
Expand All @@ -137,6 +178,37 @@ class Dialer {
log('token %d released', token)
this.tokens.push(token)
}

/**
* Converts the given `peer` into a `PeerInfo` or `Multiaddr`.
* @static
* @param {PeerInfo|PeerId|Multiaddr|string} peer
* @returns {PeerInfo|Multiaddr}
*/
static getDialable (peer) {
if (PeerInfo.isPeerInfo(peer)) return peer
if (typeof peer === 'string') {
peer = multiaddr(peer)
}

let addr
if (multiaddr.isMultiaddr(peer)) {
addr = peer
try {
peer = PeerId.createFromCID(peer.getPeerId())
} catch (err) {
// Couldn't get the PeerId, just use the address
return peer
}
}

if (PeerId.isPeerId(peer)) {
peer = new PeerInfo(peer)
}

addr && peer.multiaddrs.add(addr)
return peer
}
}

module.exports = Dialer
37 changes: 14 additions & 23 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ const log = debug('libp2p')
log.error = debug('libp2p:error')

const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')

const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const pubsub = require('./pubsub')
const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info')
const { getPeerInfo } = require('./get-peer-info')
const { validate: validateConfig } = require('./config')
const { codes } = require('./errors')

Expand Down Expand Up @@ -51,8 +50,6 @@ class Libp2p extends EventEmitter {
this._transport = [] // Transport instances/references
this._discovery = new Map() // Discovery service instances/references

this.peerStore = new PeerStore()

if (this._options.metrics.enabled) {
this.metrics = new Metrics(this._options.metrics)
}
Expand All @@ -62,7 +59,7 @@ class Libp2p extends EventEmitter {
localPeer: this.peerInfo.id,
metrics: this.metrics,
onConnection: (connection) => {
const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer))
const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer), { silent: true })
this.registrar.onConnect(peerInfo, connection)
this.connectionManager.onConnect(connection)
this.emit('peer:connect', peerInfo)
Expand All @@ -74,7 +71,7 @@ class Libp2p extends EventEmitter {
}
},
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
const peerInfo = Dialer.getDialable(connection.remotePeer)
this.registrar.onDisconnect(peerInfo, connection)
this.connectionManager.onDisconnect(connection)

Expand Down Expand Up @@ -266,27 +263,22 @@ class Libp2p extends EventEmitter {
* @returns {Promise<Connection|*>}
*/
async dialProtocol (peer, protocols, options) {
const dialable = Dialer.getDialable(peer)
let connection
if (multiaddr.isMultiaddr(peer)) {
connection = await this.dialer.connectToMultiaddr(peer, options)
} else {
peer = await getPeerInfoRemote(peer, this)
connection = await this.dialer.connectToPeer(peer.id, options)
if (PeerInfo.isPeerInfo(dialable)) {
this.peerStore.put(dialable, { silent: true })
connection = this.registrar.getConnection(dialable)
}

const peerInfo = getPeerInfo(connection.remotePeer)
if (!connection) {
connection = await this.dialer.connectToPeer(dialable, options)
}

// If a protocol was provided, create a new stream
if (protocols) {
const stream = await connection.newStream(protocols)

peerInfo.protocols.add(stream.protocol)
this.peerStore.put(peerInfo)

return stream
return connection.newStream(protocols)
}

this.peerStore.put(peerInfo)
return connection
}

Expand Down Expand Up @@ -428,11 +420,10 @@ class Libp2p extends EventEmitter {
// If auto dialing is on and we have no connection to the peer, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !this.registrar.getConnection(peerInfo)) {
const minPeers = this._options.connectionManager.minPeers || 0
// TODO: This does not account for multiple connections to a peer
if (minPeers > this.registrar.connections.size) {
log('connecting to discovered peer')
if (minPeers > this.connectionManager._connections.size) {
log('connecting to discovered peer %s', peerInfo.id.toString())
try {
await this.dialer.connectToPeer(peerInfo.id)
await this.dialer.connectToPeer(peerInfo)
} catch (err) {
log.error('could not connect to discovered peer', err)
}
Expand Down
Loading

0 comments on commit 4a871bb

Please sign in to comment.