Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable event emission for peer discovery/connection in ConnectionManager #1438

Merged
merged 20 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 97 additions & 2 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerInfo } from "@libp2p/interface-peer-info";
import type { ConnectionManagerOptions, IRelay } from "@waku/interfaces";
import type { Peer } from "@libp2p/interface-peer-store";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import {
ConnectionManagerOptions,
EPeersByDiscoveryEvents,
IPeersByDiscoveryEvents,
IRelay,
PeersByDiscoveryResult,
} from "@waku/interfaces";
import { Libp2p, Tags } from "@waku/interfaces";
import debug from "debug";

Expand All @@ -12,7 +20,7 @@ export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
export const DEFAULT_MAX_PARALLEL_DIALS = 3;

export class ConnectionManager {
export class ConnectionManager extends EventEmitter<IPeersByDiscoveryEvents> {
private static instances = new Map<string, ConnectionManager>();
private keepAliveManager: KeepAliveManager;
private options: ConnectionManagerOptions;
Expand Down Expand Up @@ -44,12 +52,57 @@ export class ConnectionManager {
return instance;
}

public async getPeersByDiscovery(): Promise<PeersByDiscoveryResult> {
const peersDiscovered = await this.libp2p.peerStore.all();
const peersConnected = this.libp2p
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can optimize here and just getConnections which returns tags as well so that later there is no reason to await for peer store

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. that is a more efficient way, agreed.
related comment: #1438 (comment)

.getConnections()
.map((conn) => conn.remotePeer);

const peersDiscoveredByBootstrap: Peer[] = [];
const peersDiscoveredByPeerExchange: Peer[] = [];
const peersConnectedByBootstrap: Peer[] = [];
const peersConnectedByPeerExchange: Peer[] = [];

for (const peer of peersDiscovered) {
const tags = await this.getTagNamesForPeer(peer.id);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to use getTagNamesForPeer but you can directly refer to peer.tags.keys() to avoid async operation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is done to keep fetching tags consistent through this function; agreed that it is async, but it is computationally low demand so should be ok

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering what is the difference if the same tags are present on peer in the loop as well as those you'd get from this.getTagNamesForPeer?


if (tags.includes(Tags.BOOTSTRAP)) {
peersDiscoveredByBootstrap.push(peer);
} else if (tags.includes(Tags.PEER_EXCHANGE)) {
peersDiscoveredByPeerExchange.push(peer);
}
}

for (const peerId of peersConnected) {
const peer = await this.libp2p.peerStore.get(peerId);
const tags = await this.getTagNamesForPeer(peerId);

if (tags.includes(Tags.BOOTSTRAP)) {
peersConnectedByBootstrap.push(peer);
} else if (tags.includes(Tags.PEER_EXCHANGE)) {
peersConnectedByPeerExchange.push(peer);
}
}

return {
DISCOVERED: {
[Tags.BOOTSTRAP]: peersDiscoveredByBootstrap,
[Tags.PEER_EXCHANGE]: peersDiscoveredByPeerExchange,
},
CONNECTED: {
[Tags.BOOTSTRAP]: peersConnectedByBootstrap,
[Tags.PEER_EXCHANGE]: peersConnectedByPeerExchange,
},
};
}

private constructor(
libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions,
relay?: IRelay,
options?: Partial<ConnectionManagerOptions>
) {
super();
this.libp2p = libp2p;
this.options = {
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
Expand Down Expand Up @@ -240,6 +293,30 @@ export class ConnectionManager {
void (async () => {
const { id: peerId } = evt.detail;

const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
);

if (isBootstrap) {
this.dispatchEvent(
new CustomEvent<PeerId>(
EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP,
{
detail: peerId,
}
)
);
} else {
this.dispatchEvent(
new CustomEvent<PeerId>(
EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE,
{
detail: peerId,
}
)
);
}

try {
await this.attemptDial(peerId);
} catch (error) {
Expand Down Expand Up @@ -267,7 +344,25 @@ export class ConnectionManager {
bootstrapConnections.length > this.options.maxBootstrapPeersAllowed
) {
await this.dropConnection(peerId);
} else {
this.dispatchEvent(
new CustomEvent<PeerId>(
EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP,
{
detail: peerId,
}
)
);
}
} else {
this.dispatchEvent(
new CustomEvent<PeerId>(
EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE,
{
detail: peerId,
}
)
);
}
})();
},
Expand Down
28 changes: 28 additions & 0 deletions packages/interfaces/src/connection_manager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";

export enum Tags {
BOOTSTRAP = "bootstrap",
PEER_EXCHANGE = "peer-exchange",
Expand All @@ -19,3 +22,28 @@ export interface ConnectionManagerOptions {
*/
maxParallelDials: number;
}

export enum EPeersByDiscoveryEvents {
PEER_DISCOVERY_BOOTSTRAP = "peer:discovery:bootstrap",
PEER_DISCOVERY_PEER_EXCHANGE = "peer:discovery:peer-exchange",
PEER_CONNECT_BOOTSTRAP = "peer:connected:bootstrap",
PEER_CONNECT_PEER_EXCHANGE = "peer:connected:peer-exchange",
}

export interface IPeersByDiscoveryEvents {
[EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP]: CustomEvent<PeerId>;
[EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE]: CustomEvent<PeerId>;
[EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP]: CustomEvent<PeerId>;
[EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE]: CustomEvent<PeerId>;
}

export interface PeersByDiscoveryResult {
DISCOVERED: {
[Tags.BOOTSTRAP]: Peer[];
[Tags.PEER_EXCHANGE]: Peer[];
};
CONNECTED: {
[Tags.BOOTSTRAP]: Peer[];
[Tags.PEER_EXCHANGE]: Peer[];
};
}
12 changes: 10 additions & 2 deletions packages/peer-exchange/src/waku_peer_exchange_discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { peerDiscovery as symbol } from "@libp2p/interface-peer-discovery";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerInfo } from "@libp2p/interface-peer-info";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import type { Libp2pComponents } from "@waku/interfaces";
import { Libp2pComponents, Tags } from "@waku/interfaces";
import debug from "debug";

import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
Expand Down Expand Up @@ -45,7 +45,7 @@ export interface Options {
maxRetries?: number;
}

export const DEFAULT_PEER_EXCHANGE_TAG_NAME = "peer-exchange";
export const DEFAULT_PEER_EXCHANGE_TAG_NAME = Tags.PEER_EXCHANGE;
const DEFAULT_PEER_EXCHANGE_TAG_VALUE = 50;
const DEFAULT_PEER_EXCHANGE_TAG_TTL = 120000;

Expand Down Expand Up @@ -134,6 +134,12 @@ export class PeerExchangeDiscovery
maxRetries = DEFAULT_MAX_RETRIES,
} = this.options;

log(
`Querying peer: ${peerIdStr} (attempt ${
this.queryAttempts.get(peerIdStr) ?? 1
})`
);

await this.query(peerId);

const currentAttempt = this.queryAttempts.get(peerIdStr) ?? 1;
Expand Down Expand Up @@ -189,6 +195,8 @@ export class PeerExchangeDiscovery
},
});

log(`Discovered peer: ${peerId.toString()}`);

this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", {
detail: {
Expand Down
Loading