diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index b5bfe90fd4..b1497d3e02 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -34,7 +34,7 @@ export class ConnectionManager implements IConnectionManager { private currentActiveParallelDialCount = 0; private pendingPeerDialQueue: Array = []; - private isConnectedToNetwork: boolean = navigator.onLine; + private isConnectedToNetwork: boolean = window.navigator.onLine; private isConnectedToWakuNetwork: boolean = false; private constructor( diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 129e9177a0..79fa7b88be 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -5,6 +5,7 @@ import { type ContentTopic, CoreProtocolResult, CreateSubscriptionResult, + EConnectionStateEvents, type IAsyncIterator, type IDecodedMessage, type IDecoder, @@ -48,6 +49,8 @@ export class SubscriptionManager implements ISubscriptionSDK { readonly peers: Peer[]; readonly receivedMessagesHashStr: string[] = []; + private subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS; + private keepAliveTimer: number | null = null; private subscriptionCallbacks: Map< @@ -70,6 +73,8 @@ export class SubscriptionManager implements ISubscriptionSDK { callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { + this.subscribeOptions = options; + const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; // check that all decoders are configured for the same pubsub topic as this subscription @@ -236,11 +241,40 @@ export class SubscriptionManager implements ISubscriptionSDK { } private startNetworkMonitoring(): void { - // this.protocol.addLibp2pEventListener("waku:connection", (evt) => console.log(evt)); + // @ts-expect-error: tmp change while PR in draft + this.protocol.addLibp2pEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.networkStateListener as () => void + ); } private stopNetworkMonitoring(): void { - // this.protocol.removeLibp2pEventListener("waku:connection", (evt) => console.log(evt)); + // @ts-expect-error: tmp change while PR in draft + this.protocol.removeLibp2pEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.networkStateListener as () => void + ); + } + + private async networkStateListener(isConnected: boolean): Promise { + if (!isConnected) { + this.stopKeepAlivePings(); + return; + } + + const result = await this.ping(); + const renewPeerPromises = result.failures.map((v) => { + if (v.peerId) { + // @ts-expect-error: tmp change while PR in draft + return this.protocol.renewPeer(v.peerId); + } + }); + + await Promise.all(renewPeerPromises); + + this.startKeepAlivePings( + this.subscribeOptions?.keepAlive || DEFAULT_SUBSCRIBE_OPTIONS.keepAlive + ); } private startKeepAlivePings(interval: number): void {