Skip to content

Commit

Permalink
feat(lightpush): peer management for protocols (#2003)
Browse files Browse the repository at this point in the history
* chore: make `dropConnection` to be a public function

* feat: peers are maintained for protocols
- passes `ConnectionManager` to ProtocolSDK to allow disconnecting from within protocol
- maintains `numPeersToUse` for each protocol within BaseProtocolSDK

* fix: pass options to protocols

* chore: update interfaces to allow public access

* chore: improve logging on protocol

* fix: renew peer upon failure

* chore(tests): allow DefaultPubsubTopic

* feat(lightpush): write peer management tests

* chore: rename test

* feat: add lock to `maintainPeers()` to handle parallelisation of requests
fixes parallelisation of lightpush.send() requests

* fix: concurrent lightpush requests

* fix: test & improve peers fetching

* chore: use getter

* address comments

* chore: smaller improvements

* feat: attempt to improve time for first lightpush.send()

* chore: use `window.interval` for type-safety

* chore: remove delays

* feat: add autoRetry

* feat: `forceUseAllPeers` to wait for all connected peers to be resoled
  • Loading branch information
danisharora099 authored Jun 19, 2024
1 parent 69f9045 commit 93e78c3
Show file tree
Hide file tree
Showing 18 changed files with 542 additions and 157 deletions.
56 changes: 28 additions & 28 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ export class ConnectionManager
return instance;
}

stop(): void {
this.keepAliveManager.stopAll();
this.libp2p.removeEventListener(
"peer:connect",
this.onEventHandlers["peer:connect"]
);
this.libp2p.removeEventListener(
"peer:disconnect",
this.onEventHandlers["peer:disconnect"]
);
this.libp2p.removeEventListener(
"peer:discovery",
this.onEventHandlers["peer:discovery"]
);
}

async dropConnection(peerId: PeerId): Promise<void> {
try {
this.keepAliveManager.stop(peerId);
await this.libp2p.hangUp(peerId);
log.info(`Dropped connection with peer ${peerId.toString()}`);
} catch (error) {
log.error(
`Error dropping connection with peer ${peerId.toString()} - ${error}`
);
}
}

public async getPeersByDiscovery(): Promise<PeersByDiscoveryResult> {
const peersDiscovered = await this.libp2p.peerStore.all();
const peersConnected = this.libp2p
Expand Down Expand Up @@ -200,22 +228,6 @@ export class ConnectionManager
this.startPeerDisconnectionListener();
}

stop(): void {
this.keepAliveManager.stopAll();
this.libp2p.removeEventListener(
"peer:connect",
this.onEventHandlers["peer:connect"]
);
this.libp2p.removeEventListener(
"peer:disconnect",
this.onEventHandlers["peer:disconnect"]
);
this.libp2p.removeEventListener(
"peer:discovery",
this.onEventHandlers["peer:discovery"]
);
}

private async dialPeer(peerId: PeerId): Promise<void> {
this.currentActiveParallelDialCount += 1;
let dialAttempt = 0;
Expand Down Expand Up @@ -298,18 +310,6 @@ export class ConnectionManager
}
}

private async dropConnection(peerId: PeerId): Promise<void> {
try {
this.keepAliveManager.stop(peerId);
await this.libp2p.hangUp(peerId);
log.info(`Dropped connection with peer ${peerId.toString()}`);
} catch (error) {
log.error(
`Error dropping connection with peer ${peerId.toString()} - ${error}`
);
}
}

private processDialQueue(): void {
if (
this.pendingPeerDialQueue.length > 0 &&
Expand Down
1 change: 1 addition & 0 deletions packages/interfaces/src/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export interface IConnectionStateEvents {

export interface IConnectionManager
extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> {
dropConnection(peerId: PeerId): Promise<void>;
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
stop(): void;
}
4 changes: 3 additions & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ export type IBaseProtocolCore = {
};

export type IBaseProtocolSDK = {
numPeers: number;
renewPeer: (peerToDisconnect: PeerId) => Promise<void>;
readonly connectedPeers: Peer[];
readonly numPeersToUse: number;
};

export type ContentTopicInfo = {
Expand Down
32 changes: 31 additions & 1 deletion packages/interfaces/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,35 @@ import type { IEncoder, IMessage } from "./message.js";
import { SDKProtocolResult } from "./protocols.js";

export interface ISender {
send: (encoder: IEncoder, message: IMessage) => Promise<SDKProtocolResult>;
send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: SendOptions
) => Promise<SDKProtocolResult>;
}

/**
* Options for using LightPush
*/
export type SendOptions = {
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
forceUseAllPeers?: boolean;
/**
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};
19 changes: 5 additions & 14 deletions packages/sdk/src/light-node/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { type Libp2pComponents, type LightNode } from "@waku/interfaces";

import { wakuFilter } from "../protocols/filter.js";
import { wakuLightPush } from "../protocols/light_push.js";
import { wakuStore } from "../protocols/store.js";
import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js";
import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js";

Expand All @@ -18,15 +15,9 @@ export async function createLightNode(
): Promise<LightNode> {
const libp2p = await createLibp2pAndUpdateOptions(options);

const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);

return new WakuNode(
options as WakuOptions,
libp2p,
store,
lightPush,
filter
) as LightNode;
return new WakuNode(options as WakuOptions, libp2p, {
store: true,
lightpush: true,
filter: true
}) as LightNode;
}
213 changes: 209 additions & 4 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,220 @@
import { IBaseProtocolSDK } from "@waku/interfaces";
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IBaseProtocolSDK, SendOptions } from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";

interface Options {
numPeersToUse?: number;
maintainPeersInterval?: number;
}

const DEFAULT_NUM_PEERS_TO_USE = 3;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;

export class BaseProtocolSDK implements IBaseProtocolSDK {
public readonly numPeers: number;
public readonly numPeersToUse: number;
private peers: Peer[] = [];
private maintainPeersIntervalId: ReturnType<
typeof window.setInterval
> | null = null;
log: Logger;

constructor(options: Options) {
this.numPeers = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
private maintainPeersLock = false;

constructor(
protected core: BaseProtocol,
private connectionManager: ConnectionManager,
options: Options
) {
this.log = new Logger(`sdk:${core.multicodec}`);
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;

void this.startMaintainPeersInterval(maintainPeersInterval);
}

get connectedPeers(): Peer[] {
return this.peers;
}

/**
* Disconnects from a peer and tries to find a new one to replace it.
* @param peerToDisconnect The peer to disconnect from.
*/
public async renewPeer(peerToDisconnect: PeerId): Promise<void> {
this.log.info(`Renewing peer ${peerToDisconnect}`);
try {
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);

await this.findAndAddPeers(1);
} catch (error) {
this.log.info(
"Peer renewal failed, relying on the interval to find a new peer"
);
}
}

/**
* Stops the maintain peers interval.
*/
public stopMaintainPeersInterval(): void {
if (this.maintainPeersIntervalId) {
clearInterval(this.maintainPeersIntervalId);
this.maintainPeersIntervalId = null;
this.log.info("Maintain peers interval stopped");
}
}

/**
* Checks if there are peers to send a message to.
* If `forceUseAllPeers` is `false` (default) and there are connected peers, returns `true`.
* If `forceUseAllPeers` is `true` or there are no connected peers, tries to find new peers from the ConnectionManager.
* If `autoRetry` is `false`, returns `false` if no peers are found.
* If `autoRetry` is `true`, tries to find new peers from the ConnectionManager with exponential backoff.
* Returns `true` if peers are found, `false` otherwise.
* @param options Optional options object
* @param options.autoRetry Optional flag to enable auto-retry with exponential backoff (default: false)
* @param options.forceUseAllPeers Optional flag to force using all available peers (default: false)
* @param options.initialDelay Optional initial delay in milliseconds for exponential backoff (default: 10)
* @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3)
* @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100)
*/
protected hasPeers = async (
options: Partial<SendOptions> = {}
): Promise<boolean> => {
const {
autoRetry = false,
forceUseAllPeers = false,
initialDelay = 10,
maxAttempts = 3,
maxDelay = 100
} = options;

if (!forceUseAllPeers && this.connectedPeers.length > 0) return true;

let attempts = 0;
while (attempts < maxAttempts) {
attempts++;
if (await this.maintainPeers()) {
if (this.peers.length < this.numPeersToUse) {
this.log.warn(
`Found only ${this.peers.length} peers, expected ${this.numPeersToUse}`
);
}
return true;
}
if (!autoRetry) return false;
const delayMs = Math.min(
initialDelay * Math.pow(2, attempts - 1),
maxDelay
);
await delay(delayMs);
}

this.log.error("Failed to find peers to send message to");
return false;
};

/**
* Starts an interval to maintain the peers list to `numPeersToUse`.
* @param interval The interval in milliseconds to maintain the peers.
*/
private async startMaintainPeersInterval(interval: number): Promise<void> {
this.log.info("Starting maintain peers interval");
try {
await this.maintainPeers();
this.maintainPeersIntervalId = setInterval(() => {
this.maintainPeers().catch((error) => {
this.log.error("Error during maintain peers interval:", error);
});
}, interval);
this.log.info(
`Maintain peers interval started with interval ${interval}ms`
);
} catch (error) {
this.log.error("Error starting maintain peers interval:", error);
throw error;
}
}

/**
* Maintains the peers list to `numPeersToUse`.
*/
private async maintainPeers(): Promise<boolean> {
if (this.maintainPeersLock) {
return false;
}

this.maintainPeersLock = true;
this.log.info(`Maintaining peers, current count: ${this.peers.length}`);
try {
const numPeersToAdd = this.numPeersToUse - this.peers.length;
if (numPeersToAdd > 0) {
await this.findAndAddPeers(numPeersToAdd);
}
this.log.info(
`Peer maintenance completed, current count: ${this.peers.length}`
);
} finally {
this.maintainPeersLock = false;
}
return true;
}

/**
* Finds and adds new peers to the peers list.
* @param numPeers The number of peers to find and add.
*/
private async findAndAddPeers(numPeers: number): Promise<void> {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
this.peers = [...this.peers, ...additionalPeers];
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
);
} catch (error) {
this.log.error("Error finding and adding new peers:", error);
throw error;
}
}

/**
* Finds additional peers.
* Attempts to find peers without using bootstrap peers first,
* If no peers are found,
* tries with bootstrap peers.
* @param numPeers The number of peers to find.
*/
private async findAdditionalPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding ${numPeers} additional peers`);
try {
let newPeers = await this.core.getPeers({
maxBootstrapPeers: 0,
numPeers: numPeers
});

if (newPeers.length === 0) {
this.log.warn("No new peers found, trying with bootstrap peers");
newPeers = await this.core.getPeers({
maxBootstrapPeers: numPeers,
numPeers: numPeers
});
}

newPeers = newPeers.filter(
(peer) => this.peers.some((p) => p.id === peer.id) === false
);
return newPeers;
} catch (error) {
this.log.error("Error finding additional peers:", error);
throw error;
}
}
}
Loading

0 comments on commit 93e78c3

Please sign in to comment.