From e07b013e0a29d05db6a1e318b7c04c422143bccd Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 10 Jul 2024 16:38:25 +0530 Subject: [PATCH 01/12] feat: validate messages for individual filter nodes & perform renewals --- packages/core/src/lib/filter/index.ts | 9 ++- packages/sdk/src/protocols/filter.ts | 106 ++++++++++++++++++++++++-- 2 files changed, 108 insertions(+), 7 deletions(-) diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 11cada6b70..78d1f61182 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -35,7 +35,8 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { constructor( private handleIncomingMessage: ( pubsubTopic: PubsubTopic, - wakuMessage: WakuMessage + wakuMessage: WakuMessage, + peerIdStr: string ) => Promise, libp2p: Libp2p, options?: ProtocolCreateOptions @@ -78,7 +79,11 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { return; } - await this.handleIncomingMessage(pubsubTopic, wakuMessage); + await this.handleIncomingMessage( + pubsubTopic, + wakuMessage, + connection.remotePeer.toString() + ); } }).then( () => { diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index c10e9582db..4bd810bce1 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -39,6 +39,13 @@ type SubscriptionCallback = { callback: Callback; }; +type ReceivedMessageHashes = { + all: Set; + nodes: { + [peerId: string]: Set; + }; +}; + const log = new Logger("sdk:filter"); const MINUTE = 60 * 1000; @@ -51,6 +58,8 @@ export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; + private readonly receivedMessagesHashes: ReceivedMessageHashes; + private messageValidationTimer: number | null = null; private peerFailures: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; @@ -67,6 +76,33 @@ export class SubscriptionManager implements ISubscriptionSDK { ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); + const allPeerIdStrs = this.getPeers().map((p) => p.id.toString()); + this.receivedMessagesHashes = { + all: new Set(), + nodes: { + ...Object.fromEntries( + allPeerIdStrs.map((peerId) => [peerId, new Set()]) + ) + } + }; + + this.startMessageValidationCycle(); + } + + get messageHashes(): string[] { + return [...this.receivedMessagesHashes.all]; + } + + addHash(hash: string, peerIdStr?: string): void { + this.receivedMessagesHashes.all.add(hash); + + if (peerIdStr) { + if (peerIdStr in this.receivedMessagesHashes) { + this.receivedMessagesHashes.nodes[peerIdStr].add(hash); + } else { + this.receivedMessagesHashes.nodes[peerIdStr] = new Set([hash]); + } + } } public async subscribe( @@ -146,8 +182,13 @@ export class SubscriptionManager implements ISubscriptionSDK { const results = await Promise.allSettled(promises); const finalResult = this.handleResult(results, "unsubscribe"); - if (this.subscriptionCallbacks.size === 0 && this.keepAliveTimer) { - this.stopKeepAlivePings(); + if (this.subscriptionCallbacks.size === 0) { + if (this.keepAliveTimer) { + this.stopKeepAlivePings(); + } + if (this.messageValidationTimer) { + this.stopMessageValidationCycle(); + } } return finalResult; @@ -180,11 +221,17 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - async processIncomingMessage(message: WakuMessage): Promise { + async processIncomingMessage( + message: WakuMessage, + peerIdStr: string + ): Promise { const hashedMessageStr = messageHashStr( this.pubsubTopic, message as IProtoMessage ); + + this.addHash(hashedMessageStr, peerIdStr); + if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { log.info("Message already received, skipping"); return; @@ -312,6 +359,55 @@ export class SubscriptionManager implements ISubscriptionSDK { clearInterval(this.keepAliveTimer); this.keepAliveTimer = null; } + + private startMessageValidationCycle(): void { + const validationCycle = async (): Promise => { + log.info("Starting message validation cycle"); + + for (const hash of this.receivedMessagesHashes.all) { + for (const [peerIdStr, hashes] of Object.entries( + this.receivedMessagesHashes.nodes + )) { + if (!hashes.has(hash)) { + log.error( + `Message with hash ${hash} not received from peer ${peerIdStr}. Attempting renewal.` + ); + const peerId = this.getPeers().find( + (p) => p.id.toString() === peerIdStr + )?.id; + if (!peerId) { + log.error( + `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + ); + return; + } + try { + await this.renewPeer(peerId); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + } + } + } + } + }; + + this.messageValidationTimer = setInterval(() => { + void validationCycle().catch((error) => { + log.error("Error in message validation cycle:", error); + }); + }, MINUTE) as unknown as number; + } + + private stopMessageValidationCycle(): void { + if (!this.messageValidationTimer) { + log.info("Already stopped message validation cycle."); + return; + } + + log.info("Stopping message validation cycle."); + clearInterval(this.messageValidationTimer); + this.messageValidationTimer = null; + } } class FilterSDK extends BaseProtocolSDK implements IFilterSDK { @@ -326,7 +422,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { ) { super( new FilterCore( - async (pubsubTopic: PubsubTopic, wakuMessage: WakuMessage) => { + async (pubsubTopic, wakuMessage, peerIdStr) => { const subscription = this.getActiveSubscription(pubsubTopic); if (!subscription) { log.error( @@ -335,7 +431,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { return; } - await subscription.processIncomingMessage(wakuMessage); + await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, libp2p, options From 3f76f91c208752e61a0441530dc6837d60330c3f Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 10 Jul 2024 17:12:55 +0530 Subject: [PATCH 02/12] chore: fix spell check --- packages/sdk/src/protocols/filter.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 4bd810bce1..0d07115259 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -76,13 +76,11 @@ export class SubscriptionManager implements ISubscriptionSDK { ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); - const allPeerIdStrs = this.getPeers().map((p) => p.id.toString()); + const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); this.receivedMessagesHashes = { all: new Set(), nodes: { - ...Object.fromEntries( - allPeerIdStrs.map((peerId) => [peerId, new Set()]) - ) + ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) } }; From 133f5ee80b86ca319145b96e74df146528c6e140 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 16 Jul 2024 19:08:57 +0530 Subject: [PATCH 03/12] chore: use a max threshold before peer renewal --- packages/sdk/src/protocols/filter.ts | 57 ++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 0d07115259..e436698b3f 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -13,6 +13,7 @@ import { type IProtoMessage, type ISubscriptionSDK, type Libp2p, + PeerIdStr, type ProtocolCreateOptions, ProtocolError, ProtocolUseOptions, @@ -42,7 +43,7 @@ type SubscriptionCallback = { type ReceivedMessageHashes = { all: Set; nodes: { - [peerId: string]: Set; + [peerId: PeerIdStr]: Set; }; }; @@ -50,6 +51,7 @@ const log = new Logger("sdk:filter"); const MINUTE = 60 * 1000; const DEFAULT_MAX_PINGS = 3; +const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: MINUTE @@ -61,7 +63,9 @@ export class SubscriptionManager implements ISubscriptionSDK { private readonly receivedMessagesHashes: ReceivedMessageHashes; private messageValidationTimer: number | null = null; private peerFailures: Map = new Map(); + private missedMessagesByPeer: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; + private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; private subscriptionCallbacks: Map< ContentTopic, @@ -83,6 +87,7 @@ export class SubscriptionManager implements ISubscriptionSDK { ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) } }; + allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); this.startMessageValidationCycle(); } @@ -330,6 +335,13 @@ export class SubscriptionManager implements ISubscriptionSDK { Array.from(this.subscriptionCallbacks.keys()) ); + this.peerFailures.delete(peerId.toString()); + this.missedMessagesByPeer.delete(peerId.toString()); + delete this.receivedMessagesHashes.nodes[peerId.toString()]; + + this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); + this.missedMessagesByPeer.set(newPeer.id.toString(), 0); + return newPeer; } @@ -367,22 +379,25 @@ export class SubscriptionManager implements ISubscriptionSDK { this.receivedMessagesHashes.nodes )) { if (!hashes.has(hash)) { - log.error( - `Message with hash ${hash} not received from peer ${peerIdStr}. Attempting renewal.` - ); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + this.incrementMissedMessageCount(peerIdStr); + if (this.shouldRenewPeer(peerIdStr)) { + log.info( + `Peer ${peerIdStr} has missed too many messages, renewing.` ); - return; - } - try { - await this.renewPeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + const peerId = this.getPeers().find( + (p) => p.id.toString() === peerIdStr + )?.id; + if (!peerId) { + log.error( + `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + ); + return; + } + try { + await this.renewAndSubscribePeer(peerId); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + } } } } @@ -406,6 +421,16 @@ export class SubscriptionManager implements ISubscriptionSDK { clearInterval(this.messageValidationTimer); this.messageValidationTimer = null; } + + private incrementMissedMessageCount(peerIdStr: string): void { + const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; + this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); + } + + private shouldRenewPeer(peerIdStr: string): boolean { + const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; + return missedMessages > this.maxMissedMessagesThreshold; + } } class FilterSDK extends BaseProtocolSDK implements IFilterSDK { From 151976fb8ef4b3688ace2e23a64ceef0577c3a88 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 16 Jul 2024 22:18:24 +0530 Subject: [PATCH 04/12] chore: switch from a validation cycle timer to adhoc validation --- packages/interfaces/src/filter.ts | 1 + packages/sdk/src/protocols/filter.ts | 135 ++++++++++++--------------- 2 files changed, 59 insertions(+), 77 deletions(-) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 4c56a3d680..2b770b30c1 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -16,6 +16,7 @@ import type { IReceiver } from "./receiver.js"; export type SubscribeOptions = { keepAlive?: number; pingsBeforePeerRenewed?: number; + maxMissedMessagesThreshold?: number; }; export type IFilter = IReceiver & IBaseProtocolCore; diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index e436698b3f..9487ceff2c 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -61,7 +61,6 @@ export class SubscriptionManager implements ISubscriptionSDK { readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; private readonly receivedMessagesHashes: ReceivedMessageHashes; - private messageValidationTimer: number | null = null; private peerFailures: Map = new Map(); private missedMessagesByPeer: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; @@ -88,8 +87,6 @@ export class SubscriptionManager implements ISubscriptionSDK { } }; allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); - - this.startMessageValidationCycle(); } get messageHashes(): string[] { @@ -100,11 +97,7 @@ export class SubscriptionManager implements ISubscriptionSDK { this.receivedMessagesHashes.all.add(hash); if (peerIdStr) { - if (peerIdStr in this.receivedMessagesHashes) { - this.receivedMessagesHashes.nodes[peerIdStr].add(hash); - } else { - this.receivedMessagesHashes.nodes[peerIdStr] = new Set([hash]); - } + this.receivedMessagesHashes.nodes[peerIdStr].add(hash); } } @@ -113,7 +106,11 @@ export class SubscriptionManager implements ISubscriptionSDK { callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { + this.keepAliveTimer = options.keepAlive || MINUTE; this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; + this.maxMissedMessagesThreshold = + options.maxMissedMessagesThreshold || + DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -189,9 +186,6 @@ export class SubscriptionManager implements ISubscriptionSDK { if (this.keepAliveTimer) { this.stopKeepAlivePings(); } - if (this.messageValidationTimer) { - this.stopMessageValidationCycle(); - } } return finalResult; @@ -224,6 +218,37 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } + private async validateMessage(): Promise { + for (const hash of this.receivedMessagesHashes.all) { + for (const [peerIdStr, hashes] of Object.entries( + this.receivedMessagesHashes.nodes + )) { + if (!hashes.has(hash)) { + this.incrementMissedMessageCount(peerIdStr); + if (this.shouldRenewPeer(peerIdStr)) { + log.info( + `Peer ${peerIdStr} has missed too many messages, renewing.` + ); + const peerId = this.getPeers().find( + (p) => p.id.toString() === peerIdStr + )?.id; + if (!peerId) { + log.error( + `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + ); + continue; + } + try { + await this.renewAndSubscribePeer(peerId); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + } + } + } + } + } + } + async processIncomingMessage( message: WakuMessage, peerIdStr: string @@ -234,6 +259,7 @@ export class SubscriptionManager implements ISubscriptionSDK { ); this.addHash(hashedMessageStr, peerIdStr); + await this.validateMessage(); if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { log.info("Message already received, skipping"); @@ -327,22 +353,29 @@ export class SubscriptionManager implements ISubscriptionSDK { } } - private async renewAndSubscribePeer(peerId: PeerId): Promise { - const newPeer = await this.renewPeer(peerId); - await this.protocol.subscribe( - this.pubsubTopic, - newPeer, - Array.from(this.subscriptionCallbacks.keys()) - ); - - this.peerFailures.delete(peerId.toString()); - this.missedMessagesByPeer.delete(peerId.toString()); - delete this.receivedMessagesHashes.nodes[peerId.toString()]; + private async renewAndSubscribePeer( + peerId: PeerId + ): Promise { + try { + const newPeer = await this.renewPeer(peerId); + await this.protocol.subscribe( + this.pubsubTopic, + newPeer, + Array.from(this.subscriptionCallbacks.keys()) + ); - this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); - this.missedMessagesByPeer.set(newPeer.id.toString(), 0); + this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); + this.missedMessagesByPeer.set(newPeer.id.toString(), 0); - return newPeer; + return newPeer; + } catch (error) { + log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); + return; + } finally { + this.peerFailures.delete(peerId.toString()); + this.missedMessagesByPeer.delete(peerId.toString()); + delete this.receivedMessagesHashes.nodes[peerId.toString()]; + } } private startKeepAlivePings(options: SubscribeOptions): void { @@ -370,58 +403,6 @@ export class SubscriptionManager implements ISubscriptionSDK { this.keepAliveTimer = null; } - private startMessageValidationCycle(): void { - const validationCycle = async (): Promise => { - log.info("Starting message validation cycle"); - - for (const hash of this.receivedMessagesHashes.all) { - for (const [peerIdStr, hashes] of Object.entries( - this.receivedMessagesHashes.nodes - )) { - if (!hashes.has(hash)) { - this.incrementMissedMessageCount(peerIdStr); - if (this.shouldRenewPeer(peerIdStr)) { - log.info( - `Peer ${peerIdStr} has missed too many messages, renewing.` - ); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` - ); - return; - } - try { - await this.renewAndSubscribePeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); - } - } - } - } - } - }; - - this.messageValidationTimer = setInterval(() => { - void validationCycle().catch((error) => { - log.error("Error in message validation cycle:", error); - }); - }, MINUTE) as unknown as number; - } - - private stopMessageValidationCycle(): void { - if (!this.messageValidationTimer) { - log.info("Already stopped message validation cycle."); - return; - } - - log.info("Stopping message validation cycle."); - clearInterval(this.messageValidationTimer); - this.messageValidationTimer = null; - } - private incrementMissedMessageCount(peerIdStr: string): void { const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); From 1bc2e3324377184630ea8b44a2244c7aaf886732 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 16 Jul 2024 22:22:36 +0530 Subject: [PATCH 05/12] chore: add test --- .../tests/filter/peer_management.spec.ts | 63 ++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index eaf1218458..7eb7d08632 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -1,7 +1,8 @@ import { DefaultPubsubTopic, ISubscriptionSDK, - LightNode + LightNode, + SDKProtocolResult } from "@waku/interfaces"; import { createDecoder, @@ -16,6 +17,7 @@ import { describe } from "mocha"; import { afterEachCustom, beforeEachCustom, + ServiceNode, ServiceNodesFleet } from "../../src/index.js"; import { @@ -177,4 +179,63 @@ describe("Waku Filter: Peer Management: E2E", function () { waku.filter.numPeersToUse ); }); + + it("Renews peer on consistent missed messages", async function () { + const [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + undefined, + undefined, + 2 + ); + const nodeWithoutDiscovery = new ServiceNode("WithoutDiscovery"); + await nodeWithoutDiscovery.start({ lightpush: true, filter: true }); + await waku.dial(await nodeWithoutDiscovery.getMultiaddrWithId()); + + const { error, subscription: sub } = + await waku.filter.createSubscription(pubsubTopic); + if (!sub || error) { + throw new Error("Could not create subscription"); + } + + const messages: DecodedMessage[] = []; + const { successes } = await sub.subscribe( + [decoder], + (msg) => { + messages.push(msg); + }, + { messageValidation: 100 } + ); + + expect(successes.length).to.be.greaterThan(0); + expect(successes.length).to.be.equal(waku.filter.numPeersToUse); + + const sendMessage: () => Promise = async () => + waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello_World") + }); + + await sendMessage(); + + successes + .map(async (peerId) => + [ + (await nodeWithoutDiscovery.getPeerId()).toString(), + serviceNodes.nodes.map(async (node) => + (await node.getPeerId()).toString() + ) + ] + .flat() + .includes(peerId.toString()) + ) + .forEach((isConnected) => expect(isConnected).to.be.true); + + // send 2 more messages + await sendMessage(); + await sendMessage(); + + expect(waku.filter.connectedPeers.length).to.equal(2); + expect( + waku.filter.connectedPeers.map((p) => p.id.toString()) + ).to.not.include((await nodeWithoutDiscovery.getPeerId()).toString()); + }); }); From c4777da14d9823d60dd45acea8adb49f0562e9c7 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 17 Jul 2024 12:45:00 +0530 Subject: [PATCH 06/12] fix: test --- packages/tests/tests/filter/peer_management.spec.ts | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 7eb7d08632..d78310ae5d 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -198,13 +198,9 @@ describe("Waku Filter: Peer Management: E2E", function () { } const messages: DecodedMessage[] = []; - const { successes } = await sub.subscribe( - [decoder], - (msg) => { - messages.push(msg); - }, - { messageValidation: 100 } - ); + const { successes } = await sub.subscribe([decoder], (msg) => { + messages.push(msg); + }); expect(successes.length).to.be.greaterThan(0); expect(successes.length).to.be.equal(waku.filter.numPeersToUse); From 8108dc3441dcba8735f6277371392295989a89bb Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 17 Jul 2024 14:49:15 +0530 Subject: [PATCH 07/12] chore: address comments --- packages/sdk/src/protocols/filter.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 7cb48e96e7..ab178500ce 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -49,7 +49,6 @@ type ReceivedMessageHashes = { const log = new Logger("sdk:filter"); -const MINUTE = 60 * 1000; const DEFAULT_MAX_PINGS = 3; const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; const DEFAULT_KEEP_ALIVE = 30 * 1000; @@ -107,7 +106,7 @@ export class SubscriptionManager implements ISubscriptionSDK { callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { - this.keepAliveTimer = options.keepAlive || MINUTE; + this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; this.maxMissedMessagesThreshold = options.maxMissedMessagesThreshold || From f766e316f0cf560bd7b76fa6640ce5d43365f330 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 17 Jul 2024 15:10:56 +0530 Subject: [PATCH 08/12] fix: renewal without a new peer available --- packages/sdk/src/protocols/base_protocol.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 27ee1b8832..854c0fabce 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -51,14 +51,15 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { public async renewPeer(peerToDisconnect: PeerId): Promise { this.log.info(`Renewing peer ${peerToDisconnect}`); + await this.connectionManager.dropConnection(peerToDisconnect); + const peer = (await this.findAndAddPeers(1))[0]; if (!peer) { - throw new Error( - "Failed to find a new peer to replace the disconnected one" + this.log.error( + "Failed to find a new peer to replace the disconnected one." ); } - await this.connectionManager.dropConnection(peerToDisconnect); this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect)); this.log.info( `Peer ${peerToDisconnect} disconnected and removed from the peer list` From 128dd587d63793661e92c321360d05bdd651d3c2 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 17 Jul 2024 15:16:53 +0530 Subject: [PATCH 09/12] chore: validating messages should be non-blocking --- packages/sdk/src/protocols/filter.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index ab178500ce..af69f1d218 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -4,8 +4,8 @@ import { ConnectionManager, FilterCore } from "@waku/core"; import { type Callback, type ContentTopic, - CoreProtocolResult, - CreateSubscriptionResult, + type CoreProtocolResult, + type CreateSubscriptionResult, type IAsyncIterator, type IDecodedMessage, type IDecoder, @@ -13,14 +13,14 @@ import { type IProtoMessage, type ISubscriptionSDK, type Libp2p, - PeerIdStr, + type PeerIdStr, type ProtocolCreateOptions, ProtocolError, - ProtocolUseOptions, + type ProtocolUseOptions, type PubsubTopic, - SDKProtocolResult, + type SDKProtocolResult, type ShardingParams, - SubscribeOptions, + type SubscribeOptions, type Unsubscribe } from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; @@ -93,7 +93,7 @@ export class SubscriptionManager implements ISubscriptionSDK { return [...this.receivedMessagesHashes.all]; } - addHash(hash: string, peerIdStr?: string): void { + private addHash(hash: string, peerIdStr?: string): void { this.receivedMessagesHashes.all.add(hash); if (peerIdStr) { @@ -259,7 +259,7 @@ export class SubscriptionManager implements ISubscriptionSDK { ); this.addHash(hashedMessageStr, peerIdStr); - await this.validateMessage(); + void this.validateMessage(); if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { log.info("Message already received, skipping"); From 47f8b4360c73d158d15093887e3c8e349551ce75 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 17 Jul 2024 22:13:18 +0530 Subject: [PATCH 10/12] chore: minor improvements --- packages/tests/tests/filter/peer_management.spec.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index d78310ae5d..5f3fc505c6 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -180,7 +180,7 @@ describe("Waku Filter: Peer Management: E2E", function () { ); }); - it("Renews peer on consistent missed messages", async function () { + it.only("Renews peer on consistent missed messages", async function () { const [serviceNodes, waku] = await runMultipleNodes( this.ctx, undefined, @@ -189,6 +189,9 @@ describe("Waku Filter: Peer Management: E2E", function () { ); const nodeWithoutDiscovery = new ServiceNode("WithoutDiscovery"); await nodeWithoutDiscovery.start({ lightpush: true, filter: true }); + const nodeWithouDiscoveryPeerIdStr = ( + await nodeWithoutDiscovery.getPeerId() + ).toString(); await waku.dial(await nodeWithoutDiscovery.getMultiaddrWithId()); const { error, subscription: sub } = @@ -215,7 +218,7 @@ describe("Waku Filter: Peer Management: E2E", function () { successes .map(async (peerId) => [ - (await nodeWithoutDiscovery.getPeerId()).toString(), + nodeWithouDiscoveryPeerIdStr, serviceNodes.nodes.map(async (node) => (await node.getPeerId()).toString() ) @@ -232,6 +235,6 @@ describe("Waku Filter: Peer Management: E2E", function () { expect(waku.filter.connectedPeers.length).to.equal(2); expect( waku.filter.connectedPeers.map((p) => p.id.toString()) - ).to.not.include((await nodeWithoutDiscovery.getPeerId()).toString()); + ).to.not.include(nodeWithouDiscoveryPeerIdStr); }); }); From f565e108561f2fdc5d7179c38975e9a77b32fce9 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 17 Jul 2024 22:35:49 +0530 Subject: [PATCH 11/12] chore: rm only --- packages/tests/tests/filter/peer_management.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 5f3fc505c6..616e8d1fb9 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -180,7 +180,7 @@ describe("Waku Filter: Peer Management: E2E", function () { ); }); - it.only("Renews peer on consistent missed messages", async function () { + it("Renews peer on consistent missed messages", async function () { const [serviceNodes, waku] = await runMultipleNodes( this.ctx, undefined, From c0822aebf9a6d8f13e8cb5331bc83a228978405f Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 17 Jul 2024 23:52:25 +0530 Subject: [PATCH 12/12] chore: fix test --- .../tests/filter/peer_management.spec.ts | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 616e8d1fb9..c676c884a0 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -187,6 +187,11 @@ describe("Waku Filter: Peer Management: E2E", function () { undefined, 2 ); + const serviceNodesPeerIdStr = await Promise.all( + serviceNodes.nodes.map(async (node) => + (await node.getPeerId()).toString() + ) + ); const nodeWithoutDiscovery = new ServiceNode("WithoutDiscovery"); await nodeWithoutDiscovery.start({ lightpush: true, filter: true }); const nodeWithouDiscoveryPeerIdStr = ( @@ -216,17 +221,12 @@ describe("Waku Filter: Peer Management: E2E", function () { await sendMessage(); successes - .map(async (peerId) => - [ - nodeWithouDiscoveryPeerIdStr, - serviceNodes.nodes.map(async (node) => - (await node.getPeerId()).toString() - ) - ] - .flat() - .includes(peerId.toString()) + .map((peerId) => + [nodeWithouDiscoveryPeerIdStr, ...serviceNodesPeerIdStr].includes( + peerId.toString() + ) ) - .forEach((isConnected) => expect(isConnected).to.be.true); + .forEach((isConnected) => expect(isConnected).to.eq(true)); // send 2 more messages await sendMessage();