diff --git a/doc/DIALER.md b/doc/DIALER.md index eb91038a4d..211f07d101 100644 --- a/doc/DIALER.md +++ b/doc/DIALER.md @@ -9,6 +9,7 @@ * As tokens are limited, DialRequests should be given a prioritized list of Multiaddrs to minimize the potential request time. * Once a single Multiaddr Dial has succeeded, all pending dials in that Dial Request should be aborted. All tokens should be immediately released to the Dialer. * If all Multiaddr Dials fail, or the DIAL_TIMEOUT max is reached for the entire DialRequest, all in progress dials for that DialRequest should be aborted. All tokens should immediately be released to the Dialer. +* If a Multiaddr Dial fails and there are no more dials to use its token, that token should be immediately released to the Dialer. ## Multiaddr Confidence diff --git a/package.json b/package.json index 755088762d..4c844ac004 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ }, "dependencies": { "abort-controller": "^3.0.0", + "aggregate-error": "^3.0.1", "async": "^2.6.2", "async-iterator-all": "^1.0.0", "bignumber.js": "^9.0.0", @@ -67,6 +68,7 @@ "p-map": "^3.0.0", "p-queue": "^6.1.1", "p-settle": "^3.1.0", + "paramap-it": "^0.1.1", "peer-id": "^0.13.4", "peer-info": "^0.17.0", "promisify-es6": "^1.0.3", diff --git a/src/constants.js b/src/constants.js index 72c442d275..a66926b18c 100644 --- a/src/constants.js +++ b/src/constants.js @@ -6,6 +6,7 @@ module.exports = { DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take MAX_COLD_CALLS: 50, // How many dials w/o protocols that can be queued MAX_PARALLEL_DIALS: 100, // Maximum allowed concurrent dials + PER_PEER_LIMIT: 4, // Allowed parallel dials per DialRequest QUARTER_HOUR: 15 * 60e3, PRIORITY_HIGH: 10, PRIORITY_LOW: 20 diff --git a/src/dialer.js b/src/dialer.js index 2154554fa6..1766e28de2 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -1,19 +1,20 @@ 'use strict' -const nextTick = require('async/nextTick') const multiaddr = require('multiaddr') const errCode = require('err-code') -const { default: PQueue } = require('p-queue') const AbortController = require('abort-controller') +const delay = require('delay') const debug = require('debug') const log = debug('libp2p:dialer') log.error = debug('libp2p:dialer:error') -const PeerId = require('peer-id') +const { DialRequest } = require('./dialer/dial-request') +const { anySignal } = require('./util') const { codes } = require('./errors') const { + DIAL_TIMEOUT, MAX_PARALLEL_DIALS, - DIAL_TIMEOUT + PER_PEER_LIMIT } = require('./constants') class Dialer { @@ -29,74 +30,76 @@ class Dialer { transportManager, peerStore, concurrency = MAX_PARALLEL_DIALS, - timeout = DIAL_TIMEOUT + timeout = DIAL_TIMEOUT, + perPeerLimit = PER_PEER_LIMIT }) { this.transportManager = transportManager this.peerStore = peerStore this.concurrency = concurrency this.timeout = timeout - this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true }) + this.perPeerLimit = perPeerLimit + this.tokens = [...new Array(concurrency)].map((_, index) => index) - /** - * @property {IdentifyService} - */ - this._identifyService = null - } - - set identifyService (service) { - this._identifyService = service - } - - /** - * @type {IdentifyService} - */ - get identifyService () { - return this._identifyService + this.releaseToken = this.releaseToken.bind(this) } /** * Connects to a given `Multiaddr`. `addr` should include the id of the peer being * dialed, it will be used for encryption verification. * - * @async * @param {Multiaddr} addr The address to dial * @param {object} [options] * @param {AbortSignal} [options.signal] An AbortController signal * @returns {Promise} */ - async connectToMultiaddr (addr, options = {}) { + connectToMultiaddr (addr, options = {}) { addr = multiaddr(addr) - let conn - let controller - if (!options.signal) { - controller = new AbortController() - options.signal = controller.signal - } + return this.connectToMultiaddrs([addr], options) + } + + /** + * Connects to the first success of a given list of `Multiaddr`. `addrs` should + * include the id of the peer being dialed, it will be used for encryption verification. + * + * @param {Array} addrs + * @param {object} [options] + * @param {AbortSignal} [options.signal] An AbortController signal + * @returns {Promise} + */ + async connectToMultiaddrs (addrs, options = {}) { + const dialAction = (addr, options) => this.transportManager.dial(addr, options) + const dialRequest = new DialRequest({ + addrs, + dialAction, + dialer: this + }) + + // Combine the timeout signal and options.signal, if provided + const timeoutController = new AbortController() + const signals = [timeoutController.signal] + options.signal && signals.push(options.signal) + const signal = anySignal(signals) + const timeoutPromise = delay.reject(this.timeout, { + value: errCode(new Error('Dial timed out'), codes.ERR_TIMEOUT) + }) try { - conn = await this.queue.add(() => this.transportManager.dial(addr, options)) + // Race the dial request and the timeout + const dialResult = await Promise.race([ + dialRequest.run({ + ...options, + signal + }), + timeoutPromise + ]) + timeoutPromise.clear() + return dialResult } catch (err) { - if (err.name === 'TimeoutError') { - controller.abort() - err.code = codes.ERR_TIMEOUT - } - log.error('Error dialing address %s,', addr, err) + log.error(err) + timeoutController.abort() throw err } - - // Perform a delayed Identify handshake - if (this.identifyService) { - nextTick(async () => { - try { - await this.identifyService.identify(conn, conn.remotePeer) - } catch (err) { - log.error(err) - } - }) - } - - return conn } /** @@ -104,31 +107,57 @@ class Dialer { * The dial to the first address that is successfully able to upgrade a connection * will be used. * - * @async - * @param {PeerInfo|PeerId} peer The remote peer to dial + * @param {PeerId} peerId The remote peer id to dial * @param {object} [options] * @param {AbortSignal} [options.signal] An AbortController signal * @returns {Promise} */ - async connectToPeer (peer, options = {}) { - if (PeerId.isPeerId(peer)) { - peer = this.peerStore.get(peer.toB58String()) - } + connectToPeer (peerId, options = {}) { + const addrs = this.peerStore.multiaddrsForPeer(peerId) - const addrs = peer.multiaddrs.toArray() - for (const addr of addrs) { - try { - return await this.connectToMultiaddr(addr, options) - } catch (_) { - // The error is already logged, just move to the next addr - continue - } - } + // TODO: ensure the peer id is on the multiaddr + + return this.connectToMultiaddrs(addrs, options) + } - const err = errCode(new Error('Could not dial peer, all addresses failed'), codes.ERR_CONNECTION_FAILED) - log.error(err) - throw err + getTokens (num) { + const total = Math.min(num, this.perPeerLimit, this.tokens.length) + const tokens = this.tokens.splice(0, total) + log('%d tokens request, returning %d, %d remaining', num, total, this.tokens.length) + return tokens + } + + releaseToken (token) { + log('token %d released', token) + this.tokens.push(token) } } module.exports = Dialer + +// class ActionLimiter { +// constructor(actions, options = {}) { +// this.actions = actions +// this.limit = options.limit || 4 +// this.controller = options.controller || new AbortController() +// } +// async abort () { +// this.controller.abort() +// } +// async run () { +// const limit = pLimit(this.limit) +// let result +// try { +// result = await pAny(this.actions.map(action => limit(action))) +// } catch (err) { +// console.log(err) +// if (!err.code) err.code = codes.ERR_CONNECTION_FAILED +// log.error(err) +// throw err +// } finally { +// console.log('RES', result) +// this.controller.abort() +// } +// return result +// } +// } diff --git a/src/dialer/dial-request.js b/src/dialer/dial-request.js new file mode 100644 index 0000000000..e45282bdc8 --- /dev/null +++ b/src/dialer/dial-request.js @@ -0,0 +1,197 @@ +'use strict' + +const AbortController = require('abort-controller') +const AggregateError = require('aggregate-error') +const pDefer = require('p-defer') +const debug = require('debug') +const log = debug('libp2p:dialer:request') +log.error = debug('libp2p:dialer:request:error') +const { AbortError } = require('libp2p-interfaces/src/transport/errors') + +const { anySignal } = require('../util') +const { TokenHolder } = require('./token-holder') + +class DialRequest { + /** + * + * @param {object} options + * @param {Multiaddr[]} options.addrs + * @param {TransportManager} options.transportManager + * @param {Dialer} options.dialer + */ + constructor ({ + addrs, + dialAction, + dialer + }) { + this.addrs = addrs + this.dialer = dialer + this.dialAction = dialAction + } + + /** + * @async + * @param {object} options + * @param {AbortSignal} options.signal An AbortController signal + * @param {number} options.timeout The max dial time for each request + * @returns {Connection} + */ + async run (options) { + // Determine how many tokens we need + const tokensWanted = Math.min(this.addrs.length, this.dialer.perPeerLimit) + // Get the tokens + const tokens = this.dialer.getTokens(tokensWanted) + // If no tokens are available, throw + if (tokens.length < 1) { + throw Object.assign(new Error('No dial tokens available'), { code: 'ERR_NO_DIAL_TOKENS' }) + } + + // For every token, run a multiaddr dial + // If there are tokens left, release them + // If there are multiaddrs left, wait for tokens to finish + const th = new TokenHolder(tokens, this.dialer.releaseToken) + + // Create the dial functions + const dials = this.addrs.map(addr => { + return () => this._abortableDial(addr, options) + }) + + const dialResolver = new DialResolver() + while (dials.length > 0) { + if (dialResolver.finished) break + // Wait for the next available token + const token = await th.getToken() + const dial = dials.shift() + dialResolver.add(dial, () => th.releaseToken(token)) + } + + // Start giving back the tokens + th.drain() + // Flush all the dials to get the final response + return dialResolver.flush() + } + + /** + * @private + * @param {Multiaddr} addr + * @param {object} options + * @param {AbortSignal} options.signal An AbortController signal + * @param {number} options.timeout The max dial time for each request + * @returns {{abort: function(), promise: Promise}} An AbortableDial + */ + _abortableDial (addr, options) { + log('starting dial to %s', addr) + const controller = new AbortController() + const signals = [controller.signal] + options.signal && signals.push(options.signal) + const signal = anySignal([controller.signal, options.signal]) + + const promise = this.dialAction(addr, { signal, timeout: options.timeout }) + return { + abort: () => controller.abort(), + promise + } + } +} + +class DialResolver { + constructor () { + this.dials = new Set() + this.errors = [] + this.finished = false + this.didFlush = false + this._waiting = null + } + + /** + * Adds a dial function to the resolver. The function will be immediately + * executed and its resolution tracked. + * @async + * @param {function()} dial A function that returns an AbortableDial + * @param {function()} [finallyHandler] Called when the dial resolves or rejects + */ + async add (dial, finallyHandler) { + if (this.finished) return + const abortableDial = dial() + this.dials.add(abortableDial) + try { + this._onResolve(await abortableDial.promise) + } catch (err) { + this._onReject(err) + } finally { + this._onFinally(abortableDial) + finallyHandler && finallyHandler() + } + } + + /** + * Called when a dial resolves + * @param {Connection} result + */ + _onResolve (result) { + this.result = result + } + + /** + * Called when a dial rejects + * @param {Error} err + */ + _onReject (err) { + if (err.code === AbortError.code) return + this.errors.push(err) + } + + _onFinally (dial) { + this.dials.delete(dial) + // If we have a result, or all dials have finished + if (this.result || (this._waiting && this.dials.size === 0)) { + this._onFinish() + } + } + + /** + * Called when dialing is completed, which means one of: + * 1. One dial succeeded + * 2. All dials failed + * 3. All dials were aborted + * @private + */ + _onFinish () { + this.finished = true + // Abort all remaining dials + for (const abortableDial of this.dials) { + abortableDial.abort() + } + this.dials.clear() + + // Flush must be called + if (!this._waiting) return + // If we have a result, or an abort occurred (no errors and no result) + if (this.result || this.errors.length === 0) { + this._waiting.resolve(this.result) + } else { + this._waiting.reject(new AggregateError(this.errors)) + } + } + + /** + * Flushes any remaining dials and resolves the first + * successful `Connection`. Flush should be called after all + * dials have been added. + * @returns {Promise} + */ + flush () { + if (this.finished) { + if (this.result) { + return Promise.resolve(this.result) + } else { + return Promise.reject(new AggregateError(this.errors)) + } + } + this._waiting = pDefer() + return this._waiting.promise + } +} + +module.exports.DialResolver = DialResolver +module.exports.DialRequest = DialRequest diff --git a/src/dialer/token-holder.js b/src/dialer/token-holder.js new file mode 100644 index 0000000000..acef56e296 --- /dev/null +++ b/src/dialer/token-holder.js @@ -0,0 +1,63 @@ +'use strict' + +/** + * @class TokenHolder + * @example + * const th = new TokenHolder(tokens, dialer.releaseToken) + * for (const action of actions) { + * const token = await th.getToken() + * action(token).then(() => th.releaseToken(token)) + * } + * + * await th.drain() + */ +class TokenHolder { + /** + * @param {Array<*>} tokens Tokens to track + * @param {function(*)} release Called when releasing control of the tokens + */ + constructor (tokens, release) { + this.originalTokens = tokens + this.tokens = [...tokens] + this._release = release + } + + /** + * Resolves a token once once is available. Once the token is no + * longer needed it MUST be release with `releaseToken()`. + * @returns {Promise<*>} + */ + getToken () { + if (this.tokens.length) return Promise.resolve(this.tokens.shift()) + return new Promise(resolve => { + const _push = this.tokens.push + this.tokens.push = (token) => { + this.tokens.push = _push + resolve(token) + } + }) + } + + /** + * Makes the token available via `getToken()` + * @param {*} token + */ + releaseToken (token) { + this.tokens.push(token) + } + + /** + * Once tokens are no longer needed for a series of actions, + * drain will release them to the owner via `this._release()` + */ + async drain () { + let drained = 0 + while (drained < this.originalTokens.length) { + this._release(await this.getToken()) + // Remove the token + drained++ + } + } +} + +module.exports.TokenHolder = TokenHolder diff --git a/src/index.js b/src/index.js index 9d99228c55..609edd32b7 100644 --- a/src/index.js +++ b/src/index.js @@ -57,6 +57,12 @@ class Libp2p extends EventEmitter { const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer)) this.registrar.onConnect(peerInfo, connection) this.emit('peer:connect', peerInfo) + + // Run identify for every connection + if (this.identifyService) { + this.identifyService.identify(connection, connection.remotePeer) + .catch(log.error) + } }, onConnectionEnd: (connection) => { const peerInfo = getPeerInfo(connection.remotePeer) @@ -104,12 +110,12 @@ class Libp2p extends EventEmitter { }) // Add the identify service since we can multiplex - this.dialer.identifyService = new IdentifyService({ + this.identifyService = new IdentifyService({ registrar: this.registrar, peerInfo: this.peerInfo, protocols: this.upgrader.protocols }) - this.handle(Object.values(IDENTIFY_PROTOCOLS), this.dialer.identifyService.handleMessage) + this.handle(Object.values(IDENTIFY_PROTOCOLS), this.identifyService.handleMessage) } // Attach private network protector @@ -236,7 +242,7 @@ class Libp2p extends EventEmitter { connection = await this.dialer.connectToMultiaddr(peer, options) } else { peer = await getPeerInfoRemote(peer, this) - connection = await this.dialer.connectToPeer(peer, options) + connection = await this.dialer.connectToPeer(peer.id, options) } const peerInfo = getPeerInfo(connection.remotePeer) @@ -293,8 +299,8 @@ class Libp2p extends EventEmitter { }) // Only push if libp2p is running - if (this.isStarted()) { - this.dialer.identifyService.pushToPeerStore(this.peerStore) + if (this.isStarted() && this.identifyService) { + this.identifyService.pushToPeerStore(this.peerStore) } } @@ -310,8 +316,8 @@ class Libp2p extends EventEmitter { }) // Only push if libp2p is running - if (this.isStarted()) { - this.dialer.identifyService.pushToPeerStore(this.peerStore) + if (this.isStarted() && this.identifyService) { + this.identifyService.pushToPeerStore(this.peerStore) } } diff --git a/src/peer-store/index.js b/src/peer-store/index.js index 27f6d5929e..321fd204e9 100644 --- a/src/peer-store/index.js +++ b/src/peer-store/index.js @@ -217,6 +217,16 @@ class PeerStore extends EventEmitter { protocols: Array.from(peerInfo.protocols) }) } + + /** + * Returns the known multiaddrs for a given `PeerId` + * @param {PeerId} peerId + * @returns {Array} + */ + multiaddrsForPeer (peerId) { + const peerInfo = this.get(peerId.toB58String()) + return peerInfo.multiaddrs.toArray() + } } module.exports = PeerStore diff --git a/src/util/index.js b/src/util/index.js index bca13a4530..a53d537ef5 100644 --- a/src/util/index.js +++ b/src/util/index.js @@ -1,5 +1,7 @@ 'use strict' +const AbortController = require('abort-controller') + /** * Converts BufferList messages to Buffers * @param {*} source @@ -13,4 +15,34 @@ function toBuffer (source) { })() } +/** + * Takes an array of AbortSignals and returns a single signal. + * If any signals are aborted, the returned signal will be aborted. + * @param {Array} signals + * @returns {AbortSignal} + */ +function anySignal (signals) { + const controller = new AbortController() + + function onAbort () { + controller.abort() + + // Cleanup + for (const signal of signals) { + signal.removeEventListener('abort', onAbort) + } + } + + for (const signal of signals) { + if (signal.aborted) { + onAbort() + break + } + signal.addEventListener('abort', onAbort) + } + + return controller.signal +} + module.exports.toBuffer = toBuffer +module.exports.anySignal = anySignal diff --git a/test/dialing/dial-resolver.spec.js b/test/dialing/dial-resolver.spec.js new file mode 100644 index 0000000000..105b71e211 --- /dev/null +++ b/test/dialing/dial-resolver.spec.js @@ -0,0 +1,83 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-as-promised')) +const { expect } = chai +const sinon = require('sinon') +const pDefer = require('p-defer') +const pWaitFor = require('p-wait-for') +const AggregateError = require('aggregate-error') +const { AbortError } = require('libp2p-interfaces/src/transport/errors') + +const { DialResolver } = require('../../src/dialer/dial-request') + +const mockAbortableDial = () => { + const deferred = pDefer() + function dial () { + return { + promise: deferred.promise, + abort: () => deferred.reject(new AbortError()) + } + } + dial.reject = deferred.reject + dial.resolve = deferred.resolve + return dial +} + +describe('DialResolver', () => { + it('should not run subsequent dials if finished', async () => { + const deferred = pDefer() + const dial = sinon.stub().callsFake(() => { + return deferred + }) + const dialResolver = new DialResolver() + dialResolver.add(dial) + deferred.resolve(true) + + await pWaitFor(() => dialResolver.finished === true) + + dialResolver.add(dial) + expect(dial.callCount).to.equal(1) + }) + + it('.flush should throw if all dials errored', async () => { + const dialResolver = new DialResolver() + const dials = [ + mockAbortableDial(), + mockAbortableDial(), + mockAbortableDial() + ] + for (const dial of dials) { + dialResolver.add(dial) + dial.reject(new Error('transport error')) + } + + await expect(dialResolver.flush()).to.eventually.be.rejectedWith(AggregateError) + .and.to.have.nested.property('._errors.length', 3) + }) + + it('.flush should resolve the successful dial', async () => { + const dialResolver = new DialResolver() + const mockConn = {} + const dials = [ + mockAbortableDial(), + mockAbortableDial(), + mockAbortableDial() + ] + + // Make the first succeed + const successfulDial = dials.shift() + dialResolver.add(successfulDial) + successfulDial.resolve(mockConn) + + // Error the rest + for (const dial of dials) { + dialResolver.add(dial) + dial.reject(new Error('transport error')) + } + + await expect(dialResolver.flush()).to.eventually.be(mockConn) + }) +}) diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js index 8fbe5d7944..fa903e2f4b 100644 --- a/test/dialing/direct.node.js +++ b/test/dialing/direct.node.js @@ -15,6 +15,7 @@ const PeerInfo = require('peer-info') const delay = require('delay') const pDefer = require('p-defer') const pipe = require('it-pipe') +const AggregateError = require('aggregate-error') const Libp2p = require('../../src') const Dialer = require('../../src/dialer') @@ -79,15 +80,19 @@ describe('Dialing (direct, TCP)', () => { const dialer = new Dialer({ transportManager: localTM }) await expect(dialer.connectToMultiaddr(unsupportedAddr)) - .to.eventually.be.rejected() - .and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE) + .to.eventually.be.rejectedWith(AggregateError) + .and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE) }) it('should be able to connect to a given peer info', async () => { - const dialer = new Dialer({ transportManager: localTM }) + const dialer = new Dialer({ + transportManager: localTM, + peerStore: { + multiaddrsForPeer: () => [remoteAddr] + } + }) const peerId = await PeerId.createFromJSON(Peers[0]) const peerInfo = new PeerInfo(peerId) - peerInfo.multiaddrs.add(remoteAddr) const connection = await dialer.connectToPeer(peerInfo) expect(connection).to.exist() @@ -112,14 +117,18 @@ describe('Dialing (direct, TCP)', () => { }) it('should fail to connect to a given peer with unsupported addresses', async () => { - const dialer = new Dialer({ transportManager: localTM }) + const dialer = new Dialer({ + transportManager: localTM, + peerStore: { + multiaddrsForPeer: () => [unsupportedAddr] + } + }) const peerId = await PeerId.createFromJSON(Peers[0]) const peerInfo = new PeerInfo(peerId) - peerInfo.multiaddrs.add(unsupportedAddr) await expect(dialer.connectToPeer(peerInfo)) - .to.eventually.be.rejected() - .and.to.have.property('code', ErrorCodes.ERR_CONNECTION_FAILED) + .to.eventually.be.rejectedWith(AggregateError) + .and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE) }) it('should abort dials on queue task timeout', async () => { @@ -136,7 +145,7 @@ describe('Dialing (direct, TCP)', () => { }) await expect(dialer.connectToMultiaddr(remoteAddr)) - .to.eventually.be.rejected() + .to.eventually.be.rejectedWith(Error) .and.to.have.property('code', ErrorCodes.ERR_TIMEOUT) }) @@ -146,25 +155,21 @@ describe('Dialing (direct, TCP)', () => { concurrency: 2 }) + expect(dialer.tokens).to.have.length(2) + const deferredDial = pDefer() sinon.stub(localTM, 'dial').callsFake(async () => { await deferredDial.promise }) - // Add 3 dials - Promise.all([ - dialer.connectToMultiaddr(remoteAddr), - dialer.connectToMultiaddr(remoteAddr), - dialer.connectToMultiaddr(remoteAddr) - ]) + // Perform 3 multiaddr dials + dialer.connectToMultiaddrs([remoteAddr, remoteAddr, remoteAddr]) // Let the call stack run await delay(0) // We should have 2 in progress, and 1 waiting - expect(localTM.dial.callCount).to.equal(2) - expect(dialer.queue.pending).to.equal(2) - expect(dialer.queue.size).to.equal(1) + expect(dialer.tokens).to.have.length(0) deferredDial.resolve() @@ -172,8 +177,7 @@ describe('Dialing (direct, TCP)', () => { await delay(0) // All dials should have executed expect(localTM.dial.callCount).to.equal(3) - expect(dialer.queue.pending).to.equal(0) - expect(dialer.queue.size).to.equal(0) + expect(dialer.tokens).to.have.length(2) }) describe('libp2p.dialer', () => { @@ -214,7 +218,7 @@ describe('Dialing (direct, TCP)', () => { after(() => remoteLibp2p.stop()) - it('should use the dialer for connecting', async () => { + it('should use the dialer for connecting to a multiaddr', async () => { libp2p = new Libp2p({ peerInfo, modules: { @@ -235,6 +239,29 @@ describe('Dialing (direct, TCP)', () => { expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1) }) + it('should use the dialer for connecting to a peer', async () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } + }) + + sinon.spy(libp2p.dialer, 'connectToMultiaddrs') + const remotePeer = new PeerInfo(remoteLibp2p.peerInfo.id) + remotePeer.multiaddrs.add(remoteAddr) + + const connection = await libp2p.dial(remotePeer) + expect(connection).to.exist() + const { stream, protocol } = await connection.newStream('/echo/1.0.0') + expect(stream).to.exist() + expect(protocol).to.equal('/echo/1.0.0') + await connection.close() + expect(libp2p.dialer.connectToMultiaddrs.callCount).to.equal(1) + }) + it('should be able to use hangup to close connections', async () => { libp2p = new Libp2p({ peerInfo, diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index 57ff1d0048..ad538251a6 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -7,6 +7,7 @@ chai.use(require('chai-as-promised')) const { expect } = chai const sinon = require('sinon') const pDefer = require('p-defer') +const pWaitFor = require('p-wait-for') const delay = require('delay') const Transport = require('libp2p-websockets') const Muxer = require('libp2p-mplex') @@ -14,6 +15,7 @@ const Crypto = require('libp2p-secio') const multiaddr = require('multiaddr') const PeerId = require('peer-id') const PeerInfo = require('peer-info') +const AggregateError = require('aggregate-error') const { codes: ErrorCodes } = require('../../src/errors') const Constants = require('../../src/constants') @@ -49,6 +51,22 @@ describe('Dialing (direct, WebSockets)', () => { expect(dialer.timeout).to.equal(Constants.DIAL_TIMEOUT) }) + it('should limit the number of tokens it provides', () => { + const dialer = new Dialer({ transportManager: localTM }) + const maxPerPeer = Constants.PER_PEER_LIMIT + expect(dialer.tokens).to.have.length(Constants.MAX_PARALLEL_DIALS) + const tokens = dialer.getTokens(maxPerPeer + 1) + expect(tokens).to.have.length(maxPerPeer) + expect(dialer.tokens).to.have.length(Constants.MAX_PARALLEL_DIALS - maxPerPeer) + }) + + it('should not return tokens if non are left', () => { + const dialer = new Dialer({ transportManager: localTM }) + sinon.stub(dialer, 'tokens').value([]) + const tokens = dialer.getTokens(1) + expect(tokens.length).to.equal(0) + }) + it('should be able to connect to a remote node via its multiaddr', async () => { const dialer = new Dialer({ transportManager: localTM }) @@ -69,30 +87,36 @@ describe('Dialing (direct, WebSockets)', () => { const dialer = new Dialer({ transportManager: localTM }) await expect(dialer.connectToMultiaddr(unsupportedAddr)) - .to.eventually.be.rejected() - .and.to.have.property('code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED) + .to.eventually.be.rejectedWith(AggregateError) + .and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED) }) it('should be able to connect to a given peer', async () => { - const dialer = new Dialer({ transportManager: localTM }) + const dialer = new Dialer({ + transportManager: localTM, + peerStore: { + multiaddrsForPeer: () => [remoteAddr] + } + }) const peerId = await PeerId.createFromJSON(Peers[0]) - const peerInfo = new PeerInfo(peerId) - peerInfo.multiaddrs.add(remoteAddr) - const connection = await dialer.connectToPeer(peerInfo) + const connection = await dialer.connectToPeer(peerId) expect(connection).to.exist() await connection.close() }) it('should fail to connect to a given peer with unsupported addresses', async () => { - const dialer = new Dialer({ transportManager: localTM }) + const dialer = new Dialer({ + transportManager: localTM, + peerStore: { + multiaddrsForPeer: () => [unsupportedAddr] + } + }) const peerId = await PeerId.createFromJSON(Peers[0]) - const peerInfo = new PeerInfo(peerId) - peerInfo.multiaddrs.add(unsupportedAddr) - await expect(dialer.connectToPeer(peerInfo)) - .to.eventually.be.rejected() - .and.to.have.property('code', ErrorCodes.ERR_CONNECTION_FAILED) + await expect(dialer.connectToPeer(peerId)) + .to.eventually.be.rejectedWith(AggregateError) + .and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED) }) it('should abort dials on queue task timeout', async () => { @@ -119,25 +143,21 @@ describe('Dialing (direct, WebSockets)', () => { concurrency: 2 }) + expect(dialer.tokens).to.have.length(2) + const deferredDial = pDefer() sinon.stub(localTM, 'dial').callsFake(async () => { await deferredDial.promise }) - // Add 3 dials - Promise.all([ - dialer.connectToMultiaddr(remoteAddr), - dialer.connectToMultiaddr(remoteAddr), - dialer.connectToMultiaddr(remoteAddr) - ]) + // Perform 3 multiaddr dials + dialer.connectToMultiaddrs([remoteAddr, remoteAddr, remoteAddr]) // Let the call stack run await delay(0) // We should have 2 in progress, and 1 waiting - expect(localTM.dial.callCount).to.equal(2) - expect(dialer.queue.pending).to.equal(2) - expect(dialer.queue.size).to.equal(1) + expect(dialer.tokens).to.have.length(0) deferredDial.resolve() @@ -145,8 +165,7 @@ describe('Dialing (direct, WebSockets)', () => { await delay(0) // All dials should have executed expect(localTM.dial.callCount).to.equal(3) - expect(dialer.queue.pending).to.equal(0) - expect(dialer.queue.size).to.equal(0) + expect(dialer.tokens).to.have.length(2) }) describe('libp2p.dialer', () => { @@ -215,16 +234,18 @@ describe('Dialing (direct, WebSockets)', () => { } }) - sinon.spy(libp2p.dialer.identifyService, 'identify') + sinon.spy(libp2p.identifyService, 'identify') sinon.spy(libp2p.peerStore, 'replace') sinon.spy(libp2p.upgrader, 'onConnection') const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) expect(connection).to.exist() - // Wait for setImmediate to trigger the identify call - await delay(1) - expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1) - await libp2p.dialer.identifyService.identify.firstCall.returnValue + + // Wait for onConnection to be called + await pWaitFor(() => libp2p.upgrader.onConnection.callCount === 1) + + expect(libp2p.identifyService.identify.callCount).to.equal(1) + await libp2p.identifyService.identify.firstCall.returnValue expect(libp2p.peerStore.replace.callCount).to.equal(1) }) diff --git a/test/dialing/relay.node.js b/test/dialing/relay.node.js index 8eb0be2dfd..b5ce59522f 100644 --- a/test/dialing/relay.node.js +++ b/test/dialing/relay.node.js @@ -10,6 +10,7 @@ const sinon = require('sinon') const multiaddr = require('multiaddr') const { collect } = require('streaming-iterables') const pipe = require('it-pipe') +const AggregateError = require('aggregate-error') const { createPeerInfo } = require('../utils/creators/peer') const baseOptions = require('../utils/base-options') const Libp2p = require('../../src') @@ -93,8 +94,8 @@ describe('Dialing (via relay, TCP)', () => { .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`) await expect(srcLibp2p.dial(dialAddr)) - .to.eventually.be.rejected() - .and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED) + .to.eventually.be.rejectedWith(AggregateError) + .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) }) it('should not stay connected to a relay when not already connected and HOP fails', async () => { @@ -106,8 +107,8 @@ describe('Dialing (via relay, TCP)', () => { .encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toString()}`) await expect(srcLibp2p.dial(dialAddr)) - .to.eventually.be.rejected() - .and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED) + .to.eventually.be.rejectedWith(AggregateError) + .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) // We should not be connected to the relay, because we weren't before the dial const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo) @@ -125,8 +126,8 @@ describe('Dialing (via relay, TCP)', () => { await srcLibp2p.dial(relayAddr) await expect(srcLibp2p.dial(dialAddr)) - .to.eventually.be.rejected() - .and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED) + .to.eventually.be.rejectedWith(AggregateError) + .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) const srcToRelayConn = srcLibp2p.registrar.getConnection(relayLibp2p.peerInfo) expect(srcToRelayConn).to.exist() @@ -152,8 +153,8 @@ describe('Dialing (via relay, TCP)', () => { }]) await expect(srcLibp2p.dial(dialAddr)) - .to.eventually.be.rejected() - .and.to.have.property('code', Errors.ERR_HOP_REQUEST_FAILED) + .to.eventually.be.rejectedWith(AggregateError) + .and.to.have.nested.property('._errors[0].code', Errors.ERR_HOP_REQUEST_FAILED) const dstToRelayConn = dstLibp2p.registrar.getConnection(relayLibp2p.peerInfo) expect(dstToRelayConn).to.exist() diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index d39ac0e295..5eede683db 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -197,15 +197,15 @@ describe('Identify', () => { peerInfo }) - sinon.spy(libp2p.dialer.identifyService, 'identify') + sinon.spy(libp2p.identifyService, 'identify') sinon.spy(libp2p.peerStore, 'replace') const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) expect(connection).to.exist() // Wait for nextTick to trigger the identify call await delay(1) - expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1) - await libp2p.dialer.identifyService.identify.firstCall.returnValue + expect(libp2p.identifyService.identify.callCount).to.equal(1) + await libp2p.identifyService.identify.firstCall.returnValue expect(libp2p.peerStore.replace.callCount).to.equal(1) await connection.close() @@ -217,8 +217,8 @@ describe('Identify', () => { peerInfo }) - sinon.spy(libp2p.dialer.identifyService, 'identify') - sinon.spy(libp2p.dialer.identifyService, 'push') + sinon.spy(libp2p.identifyService, 'identify') + sinon.spy(libp2p.identifyService, 'push') sinon.spy(libp2p.peerStore, 'update') const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) @@ -227,15 +227,15 @@ describe('Identify', () => { await delay(1) // Wait for identify to finish - await libp2p.dialer.identifyService.identify.firstCall.returnValue + await libp2p.identifyService.identify.firstCall.returnValue sinon.stub(libp2p, 'isStarted').returns(true) libp2p.handle('/echo/2.0.0', () => {}) libp2p.unhandle('/echo/2.0.0') // Verify the remote peer is notified of both changes - expect(libp2p.dialer.identifyService.push.callCount).to.equal(2) - for (const call of libp2p.dialer.identifyService.push.getCalls()) { + expect(libp2p.identifyService.push.callCount).to.equal(2) + for (const call of libp2p.identifyService.push.getCalls()) { const [connections] = call.args expect(connections.length).to.equal(1) expect(connections[0].remotePeer.toB58String()).to.equal(remoteAddr.getPeerId())