Skip to content

Commit

Permalink
refactor!: remove per-peer parallel dialling (libp2p#2090) (libp2p#2251)
Browse files Browse the repository at this point in the history
Since we are now dialling one peer at a time as a means of smarter dialling, we
no longer need the option to control the concurrency of parallel dials across
a single peers' multiaddrs.

Closes libp2p#2090

Co-authored-by: chad <chad.nehemiah94@gmail.com>
  • Loading branch information
achingbrain and maschad committed Nov 27, 2023
1 parent 0e9da4c commit d805a31
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 224 deletions.
11 changes: 10 additions & 1 deletion doc/migrations/v0.46-v1.0.0.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--Specify versions for migration below-->
# Migrating to libp2p@1.0.0 <!-- omit in toc -->
# Migrating to libp2p@1.0 <!-- omit in toc -->

A migration guide for refactoring your application code from libp2p `v0.46` to `v1.0.0`.

Expand All @@ -16,6 +16,7 @@ A migration guide for refactoring your application code from libp2p `v0.46` to `
- [Plaintext](#plaintext)
- [Pnet](#pnet)
- [Metrics](#metrics)
- [Connection Manager](#connection-manager)

## AutoNAT

Expand Down Expand Up @@ -311,3 +312,11 @@ The following metrics were renamed:

`libp2p_dialler_pending_dials` => `libp2p_dial_queue_pending_dials`
`libp2p_dialler_in_progress_dials` => `libp2p_dial_queue_in_progress_dials`

## Connection Manager

The observed behavior of dialing peers has been that given a list of supported addresses, if any one routable address would succeed then all would succeed and that if any routable address would fail then all would fail.

Consequently the previous dial behaviour of dialing all available addresses (up to a concurrency limit) and cancelling any in-flight dials when the first succeeds was a very inefficient use of resources.

Since `libp2p@0.46.10` we have only dialed one address at a time for each peer by setting the default value of the `ConnectionManager`'s `maxParallelDialsPerPeer` option to `1`. As of `libp2p@1.0.0` this option has been removed.
5 changes: 0 additions & 5 deletions packages/libp2p/src/connection-manager/constants.defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ export const INBOUND_UPGRADE_TIMEOUT = 30e3
*/
export const MAX_PEER_ADDRS_TO_DIAL = 25

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDialsPerPeer
*/
export const MAX_PARALLEL_DIALS_PER_PEER = 1

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialInterval
*/
Expand Down
20 changes: 3 additions & 17 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import {
DIAL_TIMEOUT,
MAX_PARALLEL_DIALS_PER_PEER,
MAX_PARALLEL_DIALS,
MAX_PEER_ADDRS_TO_DIAL,
LAST_DIAL_FAILURE_KEY
Expand Down Expand Up @@ -44,7 +43,6 @@ interface DialerInit {
addressSorter?: AddressSorter
maxParallelDials?: number
maxPeerAddrsToDial?: number
maxParallelDialsPerPeer?: number
dialTimeout?: number
resolvers?: Record<string, Resolver>
connections?: PeerMap<Connection[]>
Expand All @@ -54,7 +52,6 @@ const defaultOptions = {
addressSorter: defaultAddressSort,
maxParallelDials: MAX_PARALLEL_DIALS,
maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL,
maxParallelDialsPerPeer: MAX_PARALLEL_DIALS_PER_PEER,
dialTimeout: DIAL_TIMEOUT,
resolvers: {
dnsaddr: dnsaddrResolver
Expand All @@ -79,7 +76,6 @@ export class DialQueue {
private readonly transportManager: TransportManager
private readonly addressSorter: AddressSorter
private readonly maxPeerAddrsToDial: number
private readonly maxParallelDialsPerPeer: number
private readonly dialTimeout: number
private readonly inProgressDialCount?: Metric
private readonly pendingDialCount?: Metric
Expand All @@ -90,7 +86,6 @@ export class DialQueue {
constructor (components: DialQueueComponents, init: DialerInit = {}) {
this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout
this.connections = init.connections ?? new PeerMap()
this.log = components.logger.forComponent('libp2p:connection-manager:dial-queue')
Expand Down Expand Up @@ -463,18 +458,10 @@ export class DialQueue {
const dialAbortControllers: Array<(AbortController | undefined)> = pendingDial.multiaddrs.map(() => new AbortController())

try {
let success = false

// internal peer dial queue to ensure we only dial the configured number of addresses
// per peer at the same time to prevent one peer with a lot of addresses swamping
// the dial queue
const peerDialQueue = new PQueue({
concurrency: this.maxParallelDialsPerPeer
})
// internal peer dial queue - only one dial per peer at a time
const peerDialQueue = new PQueue({ concurrency: 1 })
peerDialQueue.on('error', (err) => {
if (!success) {
this.log.error('error dialling [%s] %o', pendingDial.multiaddrs, err)
}
this.log.error('error dialing [%s] %o', pendingDial.multiaddrs, err)
})

const conn = await Promise.any(pendingDial.multiaddrs.map(async (addr, i) => {
Expand Down Expand Up @@ -538,7 +525,6 @@ export class DialQueue {
})

this.log('dial to %a succeeded', addr)
success = true

// resolve the connection promise
deferred.resolve(conn)
Expand Down
11 changes: 1 addition & 10 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import { AutoDial } from './auto-dial.js'
import { ConnectionPruner } from './connection-pruner.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS_PER_PEER, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { DialQueue } from './dial-queue.js'
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions, ComponentLogger, Logger } from '@libp2p/interface'
import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -89,14 +89,6 @@ export interface ConnectionManagerInit {
*/
maxParallelDials?: number

/**
* To prevent individual peers with large amounts of multiaddrs swamping the
* dial queue, this value controls how many addresses to dial in parallel per
* peer. So for example if two peers have 10 addresses and this value is set
* at 5, we will dial 5 addresses from each at a time. (default: 1)
*/
maxParallelDialsPerPeer?: number

/**
* Maximum number of addresses allowed for a given peer - if a peer has more
* addresses than this then the dial will fail. (default: 25)
Expand Down Expand Up @@ -260,7 +252,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
addressSorter: init.addressSorter ?? defaultAddressSort,
maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS,
maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL,
maxParallelDialsPerPeer: init.maxParallelDialsPerPeer ?? MAX_PARALLEL_DIALS_PER_PEER,
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
resolvers: init.resolvers ?? {
dnsaddr: dnsaddrResolver
Expand Down
97 changes: 0 additions & 97 deletions packages/libp2p/test/connection-manager/dial-queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,103 +193,6 @@ describe('dial queue', () => {
expect(reject).to.have.property('callCount', addrs.length)
})

it('should abort all dials when its signal is aborted', async () => {
const signals: Record<string, AbortSignal | undefined> = {}
const slowDial = async (): Promise<void> => {
await delay(1000)
}
const actions: Record<string, (...args: any[]) => Promise<any>> = {
'/ip4/127.0.0.1/tcp/1231': slowDial,
'/ip4/127.0.0.1/tcp/1232': slowDial,
'/ip4/127.0.0.1/tcp/1233': slowDial
}
const controller = new AbortController()

dialer = new DialQueue(components, {
maxParallelDials: 2,
maxParallelDialsPerPeer: 10
})

components.transportManager.transportForMultiaddr.returns(stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, options = {}) => {
const maStr = ma.toString()
const action = actions[maStr]

if (action != null) {
signals[maStr] = options.signal
return action()
}

throw new Error(`No action found for multiaddr ${maStr}`)
})

setTimeout(() => { controller.abort() }, 100)

await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)), {
signal: controller.signal
})).to.eventually.be.rejected
.with.property('name', 'CodeError')

expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true)
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', true)
expect(signals).to.not.have.property('/ip4/127.0.0.1/tcp/1233') // never dialled as above the maxParallelDials limit
})

it('should abort other dials when one succeeds', async () => {
const remotePeer = await createEd25519PeerId()
const connection1 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
const connection2 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
const connection3 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
const actions: Record<string, () => Promise<Connection>> = {
'/ip4/127.0.0.1/tcp/1231': async () => {
// Slow dial
await delay(1000)

return connection1
},
'/ip4/127.0.0.1/tcp/1232': async () => {
// Fast dial
await delay(10)

return connection2
},
'/ip4/127.0.0.1/tcp/1233': async () => {
// Slow dial
await delay(1000)

return connection3
}
}
const signals: Record<string, AbortSignal> = {}

components.transportManager.transportForMultiaddr.returns(stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, opts = {}) => {
const maStr = ma.toString()
const action = actions[maStr]

if (action != null) {
signals[maStr] = opts.signal
return action()
}

throw new Error(`No action found for multiaddr ${maStr}`)
})

dialer = new DialQueue(components, {
maxParallelDials: 50,
maxParallelDialsPerPeer: 10
})

await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)))).to.eventually.equal(connection2)

// Dial attempt finished without connection
expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true)
// Dial attempt led to connection
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', false)
// Dial attempt finished without connection
expect(signals['/ip4/127.0.0.1/tcp/1233']).to.have.property('aborted', true)
})

it('should ignore DNS addresses for other peers', async () => {
const remotePeer = await createEd25519PeerId()
const otherRemotePeer = await createEd25519PeerId()
Expand Down
100 changes: 36 additions & 64 deletions packages/libp2p/test/connection-manager/direct.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import { MemoryDatastore } from 'datastore-core/memory'
import delay from 'delay'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import pDefer from 'p-defer'
import pWaitFor from 'p-wait-for'
import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
Expand Down Expand Up @@ -223,84 +222,57 @@ describe('dialing (direct, TCP)', () => {
.and.to.have.property('code', ERR_TIMEOUT)
})

it('should dial to the max concurrency', async () => {
const peerId = await createEd25519PeerId()
const addrs = [
multiaddr('/ip4/0.0.0.0/tcp/8000'),
multiaddr('/ip4/0.0.0.0/tcp/8001'),
multiaddr('/ip4/0.0.0.0/tcp/8002')
]
it('should only dial to the max concurrency', async () => {
const peerId1 = await createEd25519PeerId()
const peerId2 = await createEd25519PeerId()
const peerId3 = await createEd25519PeerId()

const addr1 = multiaddr(`/ip4/127.0.0.1/tcp/1234/p2p/${peerId1}`)
const addr2 = multiaddr(`/ip4/127.0.12.4/tcp/3210/p2p/${peerId2}`)
const addr3 = multiaddr(`/ip4/123.3.11.1/tcp/2010/p2p/${peerId3}`)

const slowDial = async (): Promise<Connection> => {
await delay(100)
return mockConnection(mockMultiaddrConnection(mockDuplex(), peerId1))
}

const actions: Record<string, (...args: any[]) => Promise<any>> = {
[addr1.toString()]: slowDial,
[addr2.toString()]: slowDial,
[addr3.toString()]: async () => mockConnection(mockMultiaddrConnection(mockDuplex(), peerId3))
}

const dialer = new DialQueue(localComponents, {
maxParallelDials: 2,
maxParallelDialsPerPeer: 10
maxParallelDials: 2
})

const deferredDial = pDefer<Connection>()
const transportManagerDialStub = Sinon.stub(localTM, 'dial')
transportManagerDialStub.callsFake(async () => deferredDial.promise)
transportManagerDialStub.callsFake(async ma => {
const maStr = ma.toString()
const action = actions[maStr]

// Perform 3 multiaddr dials
void dialer.dial(addrs)

// Let the call stack run
await delay(0)
if (action != null) {
return action()
}

// We should have 2 in progress, and 1 waiting
expect(transportManagerDialStub).to.have.property('callCount', 2)
throw new Error(`No action found for multiaddr ${maStr}`)
})

deferredDial.resolve(mockConnection(mockMultiaddrConnection(mockDuplex(), peerId)))
// dial 3 different peers
void Promise.all([
dialer.dial(addr1),
dialer.dial(addr2),
dialer.dial(addr3)
])

// Let the call stack run
await delay(0)

// Only two dials should be executed, as the first dial will succeed
// We should have 2 in progress, and 1 waiting
expect(transportManagerDialStub).to.have.property('callCount', 2)
})

it('should append the remote peerId to multiaddrs', async () => {
const addrs = [
multiaddr('/ip4/0.0.0.0/tcp/8000'),
multiaddr('/ip4/0.0.0.0/tcp/8001'),
multiaddr('/ip4/0.0.0.0/tcp/8002'),
multiaddr('/unix/tmp/some/path.sock')
]

// Inject data into the AddressBook
await localComponents.peerStore.merge(remoteComponents.peerId, {
multiaddrs: addrs
})

const dialer = new DialQueue(localComponents, {
maxParallelDialsPerPeer: 10
})

const transportManagerDialStub = Sinon.stub(localTM, 'dial')
transportManagerDialStub.callsFake(async (ma) => {
await delay(10)
return mockConnection(mockMultiaddrConnection(mockDuplex(), remoteComponents.peerId))
})

// Perform dial
await dialer.dial(remoteComponents.peerId)
// stop dials
dialer.stop()

// Dialled each address
expect(transportManagerDialStub).to.have.property('callCount', 4)

for (let i = 0; i < addrs.length; i++) {
const call = transportManagerDialStub.getCall(i)
const ma = call.args[0]

// should not append peerId to path multiaddrs
if (ma.toString().startsWith('/unix')) {
expect(ma.toString()).to.not.endWith(`/p2p/${remoteComponents.peerId.toString()}`)

continue
}

expect(ma.toString()).to.endWith(`/p2p/${remoteComponents.peerId.toString()}`)
}
})
})

Expand Down
Loading

0 comments on commit d805a31

Please sign in to comment.