1
1
import { logger } from '@libp2p/logger'
2
- import all from 'it-all'
3
- import filter from 'it-filter'
4
- import { pipe } from 'it-pipe'
5
2
import errCode from 'err-code'
6
- import type { Multiaddr , Resolver } from '@multiformats/multiaddr'
7
- import { multiaddr , resolvers } from '@multiformats/multiaddr'
3
+ import { isMultiaddr , Multiaddr , Resolver , multiaddr , resolvers } from '@multiformats/multiaddr'
8
4
import { TimeoutController } from 'timeout-abort-controller'
9
- import { AbortError } from '@libp2p/interfaces/errors'
10
5
import { anySignal } from 'any-signal'
11
6
import { setMaxListeners } from 'events'
12
7
import { DialAction , DialRequest } from './dial-request.js'
@@ -22,10 +17,8 @@ import {
22
17
import type { Connection , ConnectionGater } from '@libp2p/interface-connection'
23
18
import type { AbortOptions } from '@libp2p/interfaces'
24
19
import type { Startable } from '@libp2p/interfaces/startable'
25
- import type { PeerId } from '@libp2p/interface-peer-id'
20
+ import { isPeerId , PeerId } from '@libp2p/interface-peer-id'
26
21
import { getPeer } from '../../get-peer.js'
27
- import sort from 'it-sort'
28
- import map from 'it-map'
29
22
import type { AddressSorter , PeerStore } from '@libp2p/interface-peer-store'
30
23
import type { Metrics } from '@libp2p/interface-metrics'
31
24
import type { Dialer } from '@libp2p/interface-connection-manager'
@@ -98,7 +91,7 @@ export class DefaultDialer implements Startable, Dialer {
98
91
private readonly maxDialsPerPeer : number
99
92
public tokens : number [ ]
100
93
public pendingDials : Map < string , PendingDial >
101
- public pendingDialTargets : Map < string , PendingDialTarget >
94
+ public pendingDialTargets : Map < string , AbortController >
102
95
private started : boolean
103
96
104
97
constructor ( components : DefaultDialerComponents , init : DialerInit = { } ) {
@@ -147,7 +140,7 @@ export class DefaultDialer implements Startable, Dialer {
147
140
this . pendingDials . clear ( )
148
141
149
142
for ( const pendingTarget of this . pendingDialTargets . values ( ) ) {
150
- pendingTarget . reject ( new AbortError ( 'Dialer was destroyed' ) )
143
+ pendingTarget . abort ( )
151
144
}
152
145
this . pendingDialTargets . clear ( )
153
146
}
@@ -157,8 +150,8 @@ export class DefaultDialer implements Startable, Dialer {
157
150
* The dial to the first address that is successfully able to upgrade a connection
158
151
* will be used.
159
152
*/
160
- async dial ( peer : PeerId | Multiaddr , options : AbortOptions = { } ) : Promise < Connection > {
161
- const { id, multiaddrs } = getPeer ( peer )
153
+ async dial ( peerIdOrMultiaddr : PeerId | Multiaddr , options : AbortOptions = { } ) : Promise < Connection > {
154
+ const { id, multiaddrs } = getPeer ( peerIdOrMultiaddr )
162
155
163
156
if ( this . components . peerId . equals ( id ) ) {
164
157
throw errCode ( new Error ( 'Tried to dial self' ) , codes . ERR_DIALED_SELF )
@@ -177,13 +170,35 @@ export class DefaultDialer implements Startable, Dialer {
177
170
178
171
log ( 'creating dial target for %p' , id )
179
172
180
- const dialTarget = await this . _createCancellableDialTarget ( id , options )
173
+ // resolving multiaddrs can involve dns lookups so allow them to be aborted
174
+ const controller = new AbortController ( )
175
+ const controllerId = randomId ( )
176
+ this . pendingDialTargets . set ( controllerId , controller )
177
+ let signal = controller . signal
178
+
179
+ // merge with the passed signal, if any
180
+ if ( options . signal != null ) {
181
+ signal = anySignal ( [ signal , options . signal ] )
182
+ }
183
+
184
+ let dialTarget : DialTarget
185
+
186
+ try {
187
+ dialTarget = await this . _createDialTarget ( peerIdOrMultiaddr , {
188
+ ...options ,
189
+ signal
190
+ } )
191
+ } finally {
192
+ // done resolving the multiaddrs so remove the abort controller
193
+ this . pendingDialTargets . delete ( controllerId )
194
+ }
181
195
182
196
if ( dialTarget . addrs . length === 0 ) {
183
197
throw errCode ( new Error ( 'The dial request has no valid addresses' ) , codes . ERR_NO_VALID_ADDRESSES )
184
198
}
185
199
186
- const pendingDial = this . pendingDials . get ( dialTarget . id ) ?? this . _createPendingDial ( dialTarget , options )
200
+ // try to join an in-flight dial for this peer if one is available
201
+ const pendingDial = this . pendingDials . get ( dialTarget . id . toString ( ) ) ?? this . _createPendingDial ( dialTarget , options )
187
202
188
203
try {
189
204
const connection = await pendingDial . promise
@@ -202,76 +217,77 @@ export class DefaultDialer implements Startable, Dialer {
202
217
}
203
218
}
204
219
205
- /**
206
- * Connects to a given `peer` by dialing all of its known addresses.
207
- * The dial to the first address that is successfully able to upgrade a connection
208
- * will be used.
209
- */
210
- async _createCancellableDialTarget ( peer : PeerId , options : AbortOptions ) : Promise < DialTarget > {
211
- // Make dial target promise cancellable
212
- const id = `${ ( parseInt ( String ( Math . random ( ) * 1e9 ) , 10 ) ) . toString ( ) } ${ Date . now ( ) } `
213
- const cancellablePromise = new Promise < DialTarget > ( ( resolve , reject ) => {
214
- this . pendingDialTargets . set ( id , { resolve, reject } )
215
- } )
216
-
217
- try {
218
- const dialTarget = await Promise . race ( [
219
- this . _createDialTarget ( peer , options ) ,
220
- cancellablePromise
221
- ] )
222
-
223
- return dialTarget
224
- } finally {
225
- this . pendingDialTargets . delete ( id )
226
- }
227
- }
228
-
229
220
/**
230
221
* Creates a DialTarget. The DialTarget is used to create and track
231
222
* the DialRequest to a given peer.
232
- * If a multiaddr is received it should be the first address attempted.
223
+ *
224
+ * If a multiaddr is received it should be the only address attempted.
225
+ *
233
226
* Multiaddrs not supported by the available transports will be filtered out.
234
227
*/
235
- async _createDialTarget ( peer : PeerId , options : AbortOptions ) : Promise < DialTarget > {
228
+ async _createDialTarget ( peerIdOrMultiaddr : PeerId | Multiaddr , options : AbortOptions ) : Promise < DialTarget > {
236
229
const _resolve = this . _resolve . bind ( this )
237
230
238
- let addrs = await pipe (
239
- await this . components . peerStore . addressBook . get ( peer ) ,
240
- ( source ) => filter ( source , async ( address ) => {
241
- return ! ( await this . components . connectionGater . denyDialMultiaddr ( peer , address . multiaddr ) )
242
- } ) ,
243
- // Sort addresses so, for example, we try certified public address first
244
- ( source ) => sort ( source , this . addressSorter ) ,
245
- async function * resolve ( source ) {
246
- for await ( const a of source ) {
247
- yield * await _resolve ( a . multiaddr , options )
248
- }
249
- } ,
250
- // Multiaddrs not supported by the available transports will be filtered out.
251
- ( source ) => filter ( source , ( ma ) => Boolean ( this . components . transportManager . transportForMultiaddr ( ma ) ) ) ,
252
- ( source ) => map ( source , ( ma ) => {
253
- if ( peer . toString ( ) === ma . getPeerId ( ) ) {
254
- return ma
255
- }
231
+ let addrs = isMultiaddr ( peerIdOrMultiaddr ) ? [ peerIdOrMultiaddr ] : await this . _loadAddresses ( peerIdOrMultiaddr )
256
232
257
- return ma . encapsulate ( `/p2p/${ peer . toString ( ) } ` )
258
- } ) ,
259
- async ( source ) => await all ( source )
260
- )
233
+ addrs = ( await Promise . all (
234
+ addrs . map ( async ( ma ) => await _resolve ( ma , options ) )
235
+ ) )
236
+ . flat ( )
237
+ // Multiaddrs not supported by the available transports will be filtered out.
238
+ . filter ( ma => Boolean ( this . components . transportManager . transportForMultiaddr ( ma ) ) )
261
239
240
+ // deduplicate addresses
262
241
addrs = [ ...new Set ( addrs ) ]
263
242
264
243
if ( addrs . length > this . maxAddrsToDial ) {
265
- await this . components . peerStore . delete ( peer )
266
244
throw errCode ( new Error ( 'dial with more addresses than allowed' ) , codes . ERR_TOO_MANY_ADDRESSES )
267
245
}
268
246
247
+ const peerId = isPeerId ( peerIdOrMultiaddr ) ? peerIdOrMultiaddr : undefined
248
+
249
+ if ( peerId != null ) {
250
+ const peerIdMultiaddr = `/p2p/${ peerId . toString ( ) } `
251
+ addrs = addrs . map ( addr => {
252
+ const addressPeerId = addr . getPeerId ( )
253
+
254
+ if ( addressPeerId == null || ! peerId . equals ( addressPeerId ) ) {
255
+ return addr . encapsulate ( peerIdMultiaddr )
256
+ }
257
+
258
+ return addr
259
+ } )
260
+ }
261
+
269
262
return {
270
- id : peer . toString ( ) ,
263
+ id : peerId == null ? randomId ( ) : peerId . toString ( ) ,
271
264
addrs
272
265
}
273
266
}
274
267
268
+ /**
269
+ * Loads a list of addresses from the peer store for the passed peer id
270
+ */
271
+ async _loadAddresses ( peer : PeerId ) : Promise < Multiaddr [ ] > {
272
+ const addresses = await this . components . peerStore . addressBook . get ( peer )
273
+
274
+ return ( await Promise . all (
275
+ addresses . map ( async address => {
276
+ const deny = await this . components . connectionGater . denyDialMultiaddr ( peer , address . multiaddr )
277
+
278
+ if ( deny ) {
279
+ return false
280
+ }
281
+
282
+ return address
283
+ } )
284
+ ) )
285
+ . filter ( isTruthy )
286
+ // Sort addresses so, for example, we try certified public address first
287
+ . sort ( this . addressSorter )
288
+ . map ( address => address . multiaddr )
289
+ }
290
+
275
291
/**
276
292
* Creates a PendingDial that wraps the underlying DialRequest
277
293
*/
@@ -383,3 +399,17 @@ export class DefaultDialer implements Startable, Dialer {
383
399
}
384
400
}
385
401
}
402
+
403
+ /**
404
+ * Type safe version of `list.filter(Boolean)`
405
+ */
406
+ function isTruthy < T > ( e : T | false | null | undefined ) : e is T {
407
+ return Boolean ( e )
408
+ }
409
+
410
+ /**
411
+ * Returns a random string
412
+ */
413
+ function randomId ( ) : string {
414
+ return `${ ( parseInt ( String ( Math . random ( ) * 1e9 ) , 10 ) ) . toString ( ) } ${ Date . now ( ) } `
415
+ }
0 commit comments