Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: remove per-peer parallel dialling #2163

Merged
merged 2 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion doc/migrations/v0.46-v1.0.0.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--Specify versions for migration below-->
# Migrating to libp2p@1.0.0 <!-- omit in toc -->
# Migrating to libp2p@1.0 <!-- omit in toc -->

A migration guide for refactoring your application code from libp2p `v0.46` to `v1.0.0`.

Expand All @@ -9,6 +9,7 @@ A migration guide for refactoring your application code from libp2p `v0.46` to `
- [KeyChain](#keychain)
- [Pnet](#pnet)
- [Metrics](#metrics)
- [Connection Manager](#connection-manager)

## AutoNAT

Expand Down Expand Up @@ -77,3 +78,7 @@ The following metrics were renamed:

`libp2p_dialler_pending_dials` => `libp2p_dial_queue_pending_dials`
`libp2p_dialler_in_progress_dials` => `libp2p_dial_queue_in_progress_dials`

## Connection Manager

We now only dial one peer at a time as this is a more efficient use of resources. This means that the `maParallelDialsPerPeer` option has been removed.
achingbrain marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@ export const INBOUND_UPGRADE_TIMEOUT = 30e3
*/
export const MAX_PEER_ADDRS_TO_DIAL = 25

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDialsPerPeer
*/
export const MAX_PARALLEL_DIALS_PER_PEER = 1

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#autoDialInterval
*/
Expand Down
13 changes: 2 additions & 11 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import {
DIAL_TIMEOUT,
MAX_PARALLEL_DIALS_PER_PEER,
MAX_PARALLEL_DIALS,
MAX_PEER_ADDRS_TO_DIAL,
LAST_DIAL_FAILURE_KEY
Expand Down Expand Up @@ -45,7 +44,6 @@ interface DialerInit {
addressSorter?: AddressSorter
maxParallelDials?: number
maxPeerAddrsToDial?: number
maxParallelDialsPerPeer?: number
dialTimeout?: number
resolvers?: Record<string, Resolver>
}
Expand All @@ -54,7 +52,6 @@ const defaultOptions = {
addressSorter: defaultAddressSort,
maxParallelDials: MAX_PARALLEL_DIALS,
maxPeerAddrsToDial: MAX_PEER_ADDRS_TO_DIAL,
maxParallelDialsPerPeer: MAX_PARALLEL_DIALS_PER_PEER,
dialTimeout: DIAL_TIMEOUT,
resolvers: {
dnsaddr: dnsaddrResolver
Expand All @@ -78,7 +75,6 @@ export class DialQueue {
private readonly transportManager: TransportManager
private readonly addressSorter: AddressSorter
private readonly maxPeerAddrsToDial: number
private readonly maxParallelDialsPerPeer: number
private readonly dialTimeout: number
private readonly inProgressDialCount?: Metric
private readonly pendingDialCount?: Metric
Expand All @@ -87,7 +83,6 @@ export class DialQueue {
constructor (components: DialQueueComponents, init: DialerInit = {}) {
this.addressSorter = init.addressSorter ?? defaultOptions.addressSorter
this.maxPeerAddrsToDial = init.maxPeerAddrsToDial ?? defaultOptions.maxPeerAddrsToDial
this.maxParallelDialsPerPeer = init.maxParallelDialsPerPeer ?? defaultOptions.maxParallelDialsPerPeer
this.dialTimeout = init.dialTimeout ?? defaultOptions.dialTimeout

this.peerId = components.peerId
Expand Down Expand Up @@ -421,12 +416,8 @@ export class DialQueue {
const dialAbortControllers: Array<(AbortController | undefined)> = pendingDial.multiaddrs.map(() => new AbortController())

try {
// internal peer dial queue to ensure we only dial the configured number of addresses
// per peer at the same time to prevent one peer with a lot of addresses swamping
// the dial queue
const peerDialQueue = new PQueue({
concurrency: this.maxParallelDialsPerPeer
})
// internal peer dial queue - only one dial per peer at a time
const peerDialQueue = new PQueue({ concurrency: 1 })
peerDialQueue.on('error', (err) => {
log.error('error dialling', err)
})
Expand Down
11 changes: 1 addition & 10 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import { AutoDial } from './auto-dial.js'
import { ConnectionPruner } from './connection-pruner.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PARALLEL_DIALS_PER_PEER, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, DIAL_TIMEOUT, INBOUND_CONNECTION_THRESHOLD, MAX_CONNECTIONS, MAX_INCOMING_PENDING_CONNECTIONS, MAX_PARALLEL_DIALS, MAX_PEER_ADDRS_TO_DIAL, MIN_CONNECTIONS } from './constants.js'
import { DialQueue } from './dial-queue.js'
import type { PendingDial, AddressSorter, Libp2pEvents, AbortOptions } from '@libp2p/interface'
import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -92,14 +92,6 @@ export interface ConnectionManagerInit {
*/
maxParallelDials?: number

/**
* To prevent individual peers with large amounts of multiaddrs swamping the
* dial queue, this value controls how many addresses to dial in parallel per
* peer. So for example if two peers have 10 addresses and this value is set
* at 5, we will dial 5 addresses from each at a time. (default: 1)
*/
maxParallelDialsPerPeer?: number

/**
* Maximum number of addresses allowed for a given peer - if a peer has more
* addresses than this then the dial will fail. (default: 25)
Expand Down Expand Up @@ -257,7 +249,6 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
addressSorter: init.addressSorter ?? defaultAddressSort,
maxParallelDials: init.maxParallelDials ?? MAX_PARALLEL_DIALS,
maxPeerAddrsToDial: init.maxPeerAddrsToDial ?? MAX_PEER_ADDRS_TO_DIAL,
maxParallelDialsPerPeer: init.maxParallelDialsPerPeer ?? MAX_PARALLEL_DIALS_PER_PEER,
dialTimeout: init.dialTimeout ?? DIAL_TIMEOUT,
resolvers: init.resolvers ?? {
dnsaddr: dnsaddrResolver
Expand Down
97 changes: 0 additions & 97 deletions packages/libp2p/test/connection-manager/dial-queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,103 +186,6 @@ describe('dial queue', () => {
expect(reject).to.have.property('callCount', addrs.length)
})

it('should abort all dials when its signal is aborted', async () => {
const signals: Record<string, AbortSignal | undefined> = {}
const slowDial = async (): Promise<void> => {
await delay(1000)
}
const actions: Record<string, (...args: any[]) => Promise<any>> = {
'/ip4/127.0.0.1/tcp/1231': slowDial,
'/ip4/127.0.0.1/tcp/1232': slowDial,
'/ip4/127.0.0.1/tcp/1233': slowDial
}
const controller = new AbortController()

dialer = new DialQueue(components, {
maxParallelDials: 2,
maxParallelDialsPerPeer: 10
})

components.transportManager.transportForMultiaddr.returns(stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, options = {}) => {
const maStr = ma.toString()
const action = actions[maStr]

if (action != null) {
signals[maStr] = options.signal
return action()
}

throw new Error(`No action found for multiaddr ${maStr}`)
})

setTimeout(() => { controller.abort() }, 100)

await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)), {
signal: controller.signal
})).to.eventually.be.rejected
.with.property('name', 'CodeError')

expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true)
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', true)
expect(signals).to.not.have.property('/ip4/127.0.0.1/tcp/1233') // never dialled as above the maxParallelDials limit
})

it('should abort other dials when one succeeds', async () => {
const remotePeer = await createEd25519PeerId()
const connection1 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
const connection2 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
const connection3 = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeer))
const actions: Record<string, () => Promise<Connection>> = {
'/ip4/127.0.0.1/tcp/1231': async () => {
// Slow dial
await delay(1000)

return connection1
},
'/ip4/127.0.0.1/tcp/1232': async () => {
// Fast dial
await delay(10)

return connection2
},
'/ip4/127.0.0.1/tcp/1233': async () => {
// Slow dial
await delay(1000)

return connection3
}
}
const signals: Record<string, AbortSignal> = {}

components.transportManager.transportForMultiaddr.returns(stubInterface<Transport>())
components.transportManager.dial.callsFake(async (ma, opts = {}) => {
const maStr = ma.toString()
const action = actions[maStr]

if (action != null) {
signals[maStr] = opts.signal
return action()
}

throw new Error(`No action found for multiaddr ${maStr}`)
})

dialer = new DialQueue(components, {
maxParallelDials: 50,
maxParallelDialsPerPeer: 10
})

await expect(dialer.dial(Object.keys(actions).map(str => multiaddr(str)))).to.eventually.equal(connection2)

// Dial attempt finished without connection
expect(signals['/ip4/127.0.0.1/tcp/1231']).to.have.property('aborted', true)
// Dial attempt led to connection
expect(signals['/ip4/127.0.0.1/tcp/1232']).to.have.property('aborted', false)
// Dial attempt finished without connection
expect(signals['/ip4/127.0.0.1/tcp/1233']).to.have.property('aborted', true)
})

it('should ignore DNS addresses for other peers', async () => {
const remotePeer = await createEd25519PeerId()
const otherRemotePeer = await createEd25519PeerId()
Expand Down
100 changes: 36 additions & 64 deletions packages/libp2p/test/connection-manager/direct.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import delay from 'delay'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import pDefer from 'p-defer'
import pWaitFor from 'p-wait-for'
import sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
Expand Down Expand Up @@ -218,84 +217,57 @@
.and.to.have.property('code', ERR_TIMEOUT)
})

it('should dial to the max concurrency', async () => {
const peerId = await createEd25519PeerId()
const addrs = [
multiaddr('/ip4/0.0.0.0/tcp/8000'),
multiaddr('/ip4/0.0.0.0/tcp/8001'),
multiaddr('/ip4/0.0.0.0/tcp/8002')
]
it('should only dial to the max concurrency', async () => {
const peerId1 = await createEd25519PeerId()
const peerId2 = await createEd25519PeerId()
const peerId3 = await createEd25519PeerId()

const addr1 = multiaddr(`/ip4/127.0.0.1/tcp/1234/p2p/${peerId1}`)
const addr2 = multiaddr(`/ip4/127.0.12.4/tcp/3210/p2p/${peerId2}`)
const addr3 = multiaddr(`/ip4/123.3.11.1/tcp/2010/p2p/${peerId3}`)

const slowDial = async (): Promise<Connection> => {
await delay(100)
return mockConnection(mockMultiaddrConnection(mockDuplex(), peerId1))
}

const actions: Record<string, (...args: any[]) => Promise<any>> = {
[addr1.toString()]: slowDial,
[addr2.toString()]: slowDial,
[addr3.toString()]: async () => mockConnection(mockMultiaddrConnection(mockDuplex(), peerId3))
}

const dialer = new DialQueue(localComponents, {
maxParallelDials: 2,
maxParallelDialsPerPeer: 10
maxParallelDials: 2
})

const deferredDial = pDefer<Connection>()
const transportManagerDialStub = sinon.stub(localTM, 'dial')
transportManagerDialStub.callsFake(async () => deferredDial.promise)
transportManagerDialStub.callsFake(async ma => {
const maStr = ma.toString()
const action = actions[maStr]

// Perform 3 multiaddr dials
void dialer.dial(addrs)

// Let the call stack run
await delay(0)
if (action != null) {
return action()
}

// We should have 2 in progress, and 1 waiting
expect(transportManagerDialStub).to.have.property('callCount', 2)
throw new Error(`No action found for multiaddr ${maStr}`)

Check warning on line 253 in packages/libp2p/test/connection-manager/direct.node.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/test/connection-manager/direct.node.ts#L253

Added line #L253 was not covered by tests
})

deferredDial.resolve(mockConnection(mockMultiaddrConnection(mockDuplex(), peerId)))
// dial 3 different peers
void Promise.all([
dialer.dial(addr1),
dialer.dial(addr2),
dialer.dial(addr3)
])

// Let the call stack run
await delay(0)

// Only two dials should be executed, as the first dial will succeed
// We should have 2 in progress, and 1 waiting
expect(transportManagerDialStub).to.have.property('callCount', 2)
})

it('should append the remote peerId to multiaddrs', async () => {
const addrs = [
multiaddr('/ip4/0.0.0.0/tcp/8000'),
multiaddr('/ip4/0.0.0.0/tcp/8001'),
multiaddr('/ip4/0.0.0.0/tcp/8002'),
multiaddr('/unix/tmp/some/path.sock')
]

// Inject data into the AddressBook
await localComponents.peerStore.merge(remoteComponents.peerId, {
multiaddrs: addrs
})

const dialer = new DialQueue(localComponents, {
maxParallelDialsPerPeer: 10
})

const transportManagerDialStub = sinon.stub(localTM, 'dial')
transportManagerDialStub.callsFake(async (ma) => {
await delay(10)
return mockConnection(mockMultiaddrConnection(mockDuplex(), remoteComponents.peerId))
})

// Perform dial
await dialer.dial(remoteComponents.peerId)
// stop dials
dialer.stop()

// Dialled each address
expect(transportManagerDialStub).to.have.property('callCount', 4)

for (let i = 0; i < addrs.length; i++) {
const call = transportManagerDialStub.getCall(i)
const ma = call.args[0]

// should not append peerId to path multiaddrs
if (ma.toString().startsWith('/unix')) {
expect(ma.toString()).to.not.endWith(`/p2p/${remoteComponents.peerId.toString()}`)

continue
}

expect(ma.toString()).to.endWith(`/p2p/${remoteComponents.peerId.toString()}`)
}
})
})

Expand Down
Loading
Loading