Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: coalescing dial support #518

Merged
merged 11 commits into from
Dec 15, 2019
11 changes: 4 additions & 7 deletions doc/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [`pubsub.publish`](#pubsubpublish)
* [`pubsub.subscribe`](#pubsubsubscribe)
* [`pubsub.unsubscribe`](#pubsubunsubscribe)
* [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue)
* [`metrics.global`](#metricsglobal)
* [`metrics.peers`](#metricspeers)
* [`metrics.protocols`](#metricsprotocols)
Expand Down Expand Up @@ -92,7 +93,7 @@ Required keys in the `options` object:

</details>

Once you have a libp2p instance, you are able to listen to several events it emmits, so that you can be noticed of relevant network events.
Once you have a libp2p instance, you are able to listen to several events it emits, so that you can be noticed of relevant network events.

<details><summary>Events</summary>

Expand Down Expand Up @@ -666,12 +667,8 @@ Enables users to change the value of certain peers in a range of 0 to 1. Peers w
#### Example

```js
const topic = 'topic'
const handler = (msg) => {
// msg.data - pubsub data received
}

libp2p.pubsub.unsubscribe(topic, handler)
libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1)
libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0)
```

### metrics.global
Expand Down
1 change: 1 addition & 0 deletions doc/DIALER.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# js-libp2p Dialer

**Synopsis**
* Parallel dials to the same peer will yield the same connection/error when the first dial settles.
* All Dial Requests in js-libp2p must request a token(s) from the Dialer.
* The number of tokens requested should be between 1 and the MAX_PER_PEER_DIALS max set in the Dialer.
* If the number of available tokens is less than requested, the Dialer may return less than requested.
Expand Down
2 changes: 1 addition & 1 deletion src/circuit/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class Circuit {
let disconnectOnFailure = false
let relayConnection = this._registrar.getConnection(new PeerInfo(relayPeer))
if (!relayConnection) {
relayConnection = await this._dialer.connectToMultiaddr(relayAddr, options)
relayConnection = await this._dialer.connectToPeer(relayAddr, options)
disconnectOnFailure = true
}

Expand Down
2 changes: 1 addition & 1 deletion src/circuit/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module.exports = (circuit) => {
listener.listen = async (addr) => {
const [addrString] = String(addr).split('/p2p-circuit').slice(-1)

const relayConn = await circuit._dialer.connectToMultiaddr(multiaddr(addrString))
const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString))
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')

listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
Expand Down
158 changes: 115 additions & 43 deletions src/dialer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const multiaddr = require('multiaddr')
const errCode = require('err-code')
const TimeoutController = require('timeout-abort-controller')
const anySignal = require('any-signal')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const debug = require('debug')
const log = debug('libp2p:dialer')
log.error = debug('libp2p:dialer:error')
Expand Down Expand Up @@ -38,7 +40,7 @@ class Dialer {
this.timeout = timeout
this.perPeerLimit = perPeerLimit
this.tokens = [...new Array(concurrency)].map((_, index) => index)
this._pendingDials = new Set()
this._pendingDials = new Map()
}

/**
Expand All @@ -56,72 +58,111 @@ class Dialer {
}

/**
* Connects to the first success of a given list of `Multiaddr`. `addrs` should
* include the id of the peer being dialed, it will be used for encryption verification.
* Connects to a given `PeerId` or `Multiaddr` by dialing all of its known addresses.
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
*
* @param {Array<Multiaddr>|Multiaddr} addrs
* @param {PeerInfo|Multiaddr} peer The peer to dial
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
*/
async connectToMultiaddr (addrs, options = {}) {
if (!Array.isArray(addrs)) addrs = [multiaddr(addrs)]

const dialAction = (addr, options) => {
if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED')
return this.transportManager.dial(addr, options)
async connectToPeer (peer, options = {}) {
const dialTarget = this._createDialTarget(peer)
if (dialTarget.addrs.length === 0) {
throw errCode(new Error('The dial request has no addresses'), 'ERR_NO_DIAL_MULTIADDRS')
}
const dialRequest = new DialRequest({
addrs,
dialAction,
dialer: this
})

// Combine the timeout signal and options.signal, if provided
const timeoutController = new TimeoutController(this.timeout)
const signals = [timeoutController.signal]
options.signal && signals.push(options.signal)
const signal = anySignal(signals)

const dial = {
dialRequest,
controller: timeoutController
}
this._pendingDials.add(dial)
const pendingDial = this._pendingDials.get(dialTarget.id) || this._createPendingDial(dialTarget, options)

try {
const dialResult = await dialRequest.run({ ...options, signal })
log('dial succeeded to %s', dialResult.remoteAddr)
return dialResult
const connection = await pendingDial.promise
log('dial succeeded to %s', dialTarget.id)
return connection
} catch (err) {
// Error is a timeout
if (timeoutController.signal.aborted) {
if (pendingDial.controller.signal.aborted) {
err.code = codes.ERR_TIMEOUT
}
log.error(err)
throw err
} finally {
timeoutController.clear()
this._pendingDials.delete(dial)
pendingDial.destroy()
}
}

/**
* Connects to a given `PeerInfo` or `PeerId` by dialing all of its known addresses.
* The dial to the first address that is successfully able to upgrade a connection
* will be used.
*
* @param {PeerId} peerId The remote peer id to dial
* @typedef DialTarget
* @property {string} id
* @property {Multiaddr[]} addrs
*/

/**
* Creates a DialTarget. The DialTarget is used to create and track
* the DialRequest to a given peer.
* @private
* @param {PeerInfo|Multiaddr} peer A PeerId or Multiaddr
* @returns {DialTarget}
*/
_createDialTarget (peer) {
const dialable = Dialer.getDialable(peer)
if (multiaddr.isMultiaddr(dialable)) {
return {
id: dialable.toString(),
addrs: [dialable]
}
}
const addrs = this.peerStore.multiaddrsForPeer(dialable)
return {
id: dialable.id.toString(),
addrs
}
}

/**
* @typedef PendingDial
* @property {DialRequest} dialRequest
* @property {TimeoutController} controller
* @property {Promise} promise
* @property {function():void} destroy
*/

/**
* Creates a PendingDial that wraps the underlying DialRequest
* @private
* @param {DialTarget} dialTarget
* @param {object} [options]
* @param {AbortSignal} [options.signal] An AbortController signal
* @returns {Promise<Connection>}
* @returns {PendingDial}
*/
connectToPeer (peerId, options = {}) {
const addrs = this.peerStore.multiaddrsForPeer(peerId)
_createPendingDial (dialTarget, options) {
const dialAction = (addr, options) => {
if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED')
return this.transportManager.dial(addr, options)
}

// TODO: ensure the peer id is on the multiaddr
const dialRequest = new DialRequest({
addrs: dialTarget.addrs,
dialAction,
dialer: this
})

// Combine the timeout signal and options.signal, if provided
const timeoutController = new TimeoutController(this.timeout)
const signals = [timeoutController.signal]
options.signal && signals.push(options.signal)
const signal = anySignal(signals)

return this.connectToMultiaddr(addrs, options)
const pendingDial = {
dialRequest,
controller: timeoutController,
promise: dialRequest.run({ ...options, signal }),
destroy: () => {
timeoutController.clear()
this._pendingDials.delete(dialTarget.id)
}
}
this._pendingDials.set(dialTarget.id, pendingDial)
return pendingDial
}

getTokens (num) {
Expand All @@ -137,6 +178,37 @@ class Dialer {
log('token %d released', token)
this.tokens.push(token)
}

/**
* Converts the given `peer` into a `PeerInfo` or `Multiaddr`.
* @static
* @param {PeerInfo|PeerId|Multiaddr|string} peer
* @returns {PeerInfo|Multiaddr}
*/
static getDialable (peer) {
if (PeerInfo.isPeerInfo(peer)) return peer
if (typeof peer === 'string') {
peer = multiaddr(peer)
}

let addr
if (multiaddr.isMultiaddr(peer)) {
addr = peer
try {
peer = PeerId.createFromCID(peer.getPeerId())
} catch (err) {
// Couldn't get the PeerId, just use the address
return peer
}
}

if (PeerId.isPeerId(peer)) {
peer = new PeerInfo(peer)
}

addr && peer.multiaddrs.add(addr)
return peer
}
}

module.exports = Dialer
37 changes: 14 additions & 23 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ const log = debug('libp2p')
log.error = debug('libp2p:error')

const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')

const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const pubsub = require('./pubsub')
const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info')
const { getPeerInfo } = require('./get-peer-info')
const { validate: validateConfig } = require('./config')
const { codes } = require('./errors')

Expand Down Expand Up @@ -51,8 +50,6 @@ class Libp2p extends EventEmitter {
this._transport = [] // Transport instances/references
this._discovery = new Map() // Discovery service instances/references

this.peerStore = new PeerStore()

if (this._options.metrics.enabled) {
this.metrics = new Metrics(this._options.metrics)
}
Expand All @@ -62,7 +59,7 @@ class Libp2p extends EventEmitter {
localPeer: this.peerInfo.id,
metrics: this.metrics,
onConnection: (connection) => {
const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer))
const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer), { silent: true })
this.registrar.onConnect(peerInfo, connection)
this.connectionManager.onConnect(connection)
this.emit('peer:connect', peerInfo)
Expand All @@ -74,7 +71,7 @@ class Libp2p extends EventEmitter {
}
},
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
const peerInfo = Dialer.getDialable(connection.remotePeer)
this.registrar.onDisconnect(peerInfo, connection)
this.connectionManager.onDisconnect(connection)

Expand Down Expand Up @@ -266,27 +263,22 @@ class Libp2p extends EventEmitter {
* @returns {Promise<Connection|*>}
*/
async dialProtocol (peer, protocols, options) {
const dialable = Dialer.getDialable(peer)
let connection
if (multiaddr.isMultiaddr(peer)) {
connection = await this.dialer.connectToMultiaddr(peer, options)
} else {
peer = await getPeerInfoRemote(peer, this)
connection = await this.dialer.connectToPeer(peer.id, options)
if (PeerInfo.isPeerInfo(dialable)) {
this.peerStore.put(dialable, { silent: true })
connection = this.registrar.getConnection(dialable)
}

const peerInfo = getPeerInfo(connection.remotePeer)
if (!connection) {
connection = await this.dialer.connectToPeer(dialable, options)
}

// If a protocol was provided, create a new stream
if (protocols) {
const stream = await connection.newStream(protocols)

peerInfo.protocols.add(stream.protocol)
this.peerStore.put(peerInfo)

return stream
return connection.newStream(protocols)
}

this.peerStore.put(peerInfo)
return connection
}

Expand Down Expand Up @@ -428,11 +420,10 @@ class Libp2p extends EventEmitter {
// If auto dialing is on and we have no connection to the peer, check if we should dial
if (this._config.peerDiscovery.autoDial === true && !this.registrar.getConnection(peerInfo)) {
const minPeers = this._options.connectionManager.minPeers || 0
// TODO: This does not account for multiple connections to a peer
if (minPeers > this.registrar.connections.size) {
log('connecting to discovered peer')
if (minPeers > this.connectionManager._connections.size) {
log('connecting to discovered peer %s', peerInfo.id.toString())
try {
await this.dialer.connectToPeer(peerInfo.id)
await this.dialer.connectToPeer(peerInfo)
} catch (err) {
log.error('could not connect to discovered peer', err)
}
Expand Down
Loading