Skip to content

Commit 6ce898d

Browse files
feat: enable event emission for peer discovery/connection in ConnectionManager (#1438)
* disable libp2p autodial * improve logs for peer-exchange * add a function to fetch discovered and connected peers by discovery * connection-manager: introduce event emissions by discovery * write a spec test for events * minor code improvement for peer-exchange * rm: comment * rename peer event result interface * switch to using libp2p EventEmitter * rename variables for readability * reset peer-exchange spec file * address review * test: minor refactor * fix: failing test * increase peer IDs to test against for attemptDial * improve structuring
1 parent 793abe7 commit 6ce898d

File tree

4 files changed

+384
-117
lines changed

4 files changed

+384
-117
lines changed

packages/core/src/lib/connection_manager.ts

+97-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
import type { PeerId } from "@libp2p/interface-peer-id";
22
import type { PeerInfo } from "@libp2p/interface-peer-info";
3-
import type { ConnectionManagerOptions, IRelay } from "@waku/interfaces";
3+
import type { Peer } from "@libp2p/interface-peer-store";
4+
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
5+
import {
6+
ConnectionManagerOptions,
7+
EPeersByDiscoveryEvents,
8+
IPeersByDiscoveryEvents,
9+
IRelay,
10+
PeersByDiscoveryResult,
11+
} from "@waku/interfaces";
412
import { Libp2p, Tags } from "@waku/interfaces";
513
import debug from "debug";
614

@@ -12,7 +20,7 @@ export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
1220
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
1321
export const DEFAULT_MAX_PARALLEL_DIALS = 3;
1422

15-
export class ConnectionManager {
23+
export class ConnectionManager extends EventEmitter<IPeersByDiscoveryEvents> {
1624
private static instances = new Map<string, ConnectionManager>();
1725
private keepAliveManager: KeepAliveManager;
1826
private options: ConnectionManagerOptions;
@@ -44,12 +52,57 @@ export class ConnectionManager {
4452
return instance;
4553
}
4654

55+
public async getPeersByDiscovery(): Promise<PeersByDiscoveryResult> {
56+
const peersDiscovered = await this.libp2p.peerStore.all();
57+
const peersConnected = this.libp2p
58+
.getConnections()
59+
.map((conn) => conn.remotePeer);
60+
61+
const peersDiscoveredByBootstrap: Peer[] = [];
62+
const peersDiscoveredByPeerExchange: Peer[] = [];
63+
const peersConnectedByBootstrap: Peer[] = [];
64+
const peersConnectedByPeerExchange: Peer[] = [];
65+
66+
for (const peer of peersDiscovered) {
67+
const tags = await this.getTagNamesForPeer(peer.id);
68+
69+
if (tags.includes(Tags.BOOTSTRAP)) {
70+
peersDiscoveredByBootstrap.push(peer);
71+
} else if (tags.includes(Tags.PEER_EXCHANGE)) {
72+
peersDiscoveredByPeerExchange.push(peer);
73+
}
74+
}
75+
76+
for (const peerId of peersConnected) {
77+
const peer = await this.libp2p.peerStore.get(peerId);
78+
const tags = await this.getTagNamesForPeer(peerId);
79+
80+
if (tags.includes(Tags.BOOTSTRAP)) {
81+
peersConnectedByBootstrap.push(peer);
82+
} else if (tags.includes(Tags.PEER_EXCHANGE)) {
83+
peersConnectedByPeerExchange.push(peer);
84+
}
85+
}
86+
87+
return {
88+
DISCOVERED: {
89+
[Tags.BOOTSTRAP]: peersDiscoveredByBootstrap,
90+
[Tags.PEER_EXCHANGE]: peersDiscoveredByPeerExchange,
91+
},
92+
CONNECTED: {
93+
[Tags.BOOTSTRAP]: peersConnectedByBootstrap,
94+
[Tags.PEER_EXCHANGE]: peersConnectedByPeerExchange,
95+
},
96+
};
97+
}
98+
4799
private constructor(
48100
libp2p: Libp2p,
49101
keepAliveOptions: KeepAliveOptions,
50102
relay?: IRelay,
51103
options?: Partial<ConnectionManagerOptions>
52104
) {
105+
super();
53106
this.libp2p = libp2p;
54107
this.options = {
55108
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
@@ -240,6 +293,30 @@ export class ConnectionManager {
240293
void (async () => {
241294
const { id: peerId } = evt.detail;
242295

296+
const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
297+
Tags.BOOTSTRAP
298+
);
299+
300+
if (isBootstrap) {
301+
this.dispatchEvent(
302+
new CustomEvent<PeerId>(
303+
EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP,
304+
{
305+
detail: peerId,
306+
}
307+
)
308+
);
309+
} else {
310+
this.dispatchEvent(
311+
new CustomEvent<PeerId>(
312+
EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE,
313+
{
314+
detail: peerId,
315+
}
316+
)
317+
);
318+
}
319+
243320
try {
244321
await this.attemptDial(peerId);
245322
} catch (error) {
@@ -267,7 +344,25 @@ export class ConnectionManager {
267344
bootstrapConnections.length > this.options.maxBootstrapPeersAllowed
268345
) {
269346
await this.dropConnection(peerId);
347+
} else {
348+
this.dispatchEvent(
349+
new CustomEvent<PeerId>(
350+
EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP,
351+
{
352+
detail: peerId,
353+
}
354+
)
355+
);
270356
}
357+
} else {
358+
this.dispatchEvent(
359+
new CustomEvent<PeerId>(
360+
EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE,
361+
{
362+
detail: peerId,
363+
}
364+
)
365+
);
271366
}
272367
})();
273368
},

packages/interfaces/src/connection_manager.ts

+28
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import type { PeerId } from "@libp2p/interface-peer-id";
2+
import type { Peer } from "@libp2p/interface-peer-store";
3+
14
export enum Tags {
25
BOOTSTRAP = "bootstrap",
36
PEER_EXCHANGE = "peer-exchange",
@@ -19,3 +22,28 @@ export interface ConnectionManagerOptions {
1922
*/
2023
maxParallelDials: number;
2124
}
25+
26+
export enum EPeersByDiscoveryEvents {
27+
PEER_DISCOVERY_BOOTSTRAP = "peer:discovery:bootstrap",
28+
PEER_DISCOVERY_PEER_EXCHANGE = "peer:discovery:peer-exchange",
29+
PEER_CONNECT_BOOTSTRAP = "peer:connected:bootstrap",
30+
PEER_CONNECT_PEER_EXCHANGE = "peer:connected:peer-exchange",
31+
}
32+
33+
export interface IPeersByDiscoveryEvents {
34+
[EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP]: CustomEvent<PeerId>;
35+
[EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE]: CustomEvent<PeerId>;
36+
[EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP]: CustomEvent<PeerId>;
37+
[EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE]: CustomEvent<PeerId>;
38+
}
39+
40+
export interface PeersByDiscoveryResult {
41+
DISCOVERED: {
42+
[Tags.BOOTSTRAP]: Peer[];
43+
[Tags.PEER_EXCHANGE]: Peer[];
44+
};
45+
CONNECTED: {
46+
[Tags.BOOTSTRAP]: Peer[];
47+
[Tags.PEER_EXCHANGE]: Peer[];
48+
};
49+
}

packages/peer-exchange/src/waku_peer_exchange_discovery.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { peerDiscovery as symbol } from "@libp2p/interface-peer-discovery";
77
import type { PeerId } from "@libp2p/interface-peer-id";
88
import type { PeerInfo } from "@libp2p/interface-peer-info";
99
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
10-
import type { Libp2pComponents } from "@waku/interfaces";
10+
import { Libp2pComponents, Tags } from "@waku/interfaces";
1111
import debug from "debug";
1212

1313
import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
@@ -45,7 +45,7 @@ export interface Options {
4545
maxRetries?: number;
4646
}
4747

48-
export const DEFAULT_PEER_EXCHANGE_TAG_NAME = "peer-exchange";
48+
export const DEFAULT_PEER_EXCHANGE_TAG_NAME = Tags.PEER_EXCHANGE;
4949
const DEFAULT_PEER_EXCHANGE_TAG_VALUE = 50;
5050
const DEFAULT_PEER_EXCHANGE_TAG_TTL = 120000;
5151

@@ -134,6 +134,12 @@ export class PeerExchangeDiscovery
134134
maxRetries = DEFAULT_MAX_RETRIES,
135135
} = this.options;
136136

137+
log(
138+
`Querying peer: ${peerIdStr} (attempt ${
139+
this.queryAttempts.get(peerIdStr) ?? 1
140+
})`
141+
);
142+
137143
await this.query(peerId);
138144

139145
const currentAttempt = this.queryAttempts.get(peerIdStr) ?? 1;
@@ -189,6 +195,8 @@ export class PeerExchangeDiscovery
189195
},
190196
});
191197

198+
log(`Discovered peer: ${peerId.toString()}`);
199+
192200
this.dispatchEvent(
193201
new CustomEvent<PeerInfo>("peer", {
194202
detail: {

0 commit comments

Comments
 (0)