Skip to content

Commit

Permalink
feat(filter): use protocol peer management (#2047)
Browse files Browse the repository at this point in the history
* feat: leverage protocol peer management

* chore: add test

* chore: address comments

* chore: add todo
  • Loading branch information
danisharora099 authored Jul 3, 2024
1 parent 42126a6 commit 4db508b
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 53 deletions.
3 changes: 2 additions & 1 deletion packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
IBaseProtocolCore,
IBaseProtocolSDK,
ProtocolError,
ProtocolUseOptions,
SDKProtocolResult,
ShardingParams
} from "./protocols.js";
Expand Down Expand Up @@ -34,7 +35,7 @@ export type IFilterSDK = IReceiver &
IBaseProtocolSDK & { protocol: IBaseProtocolCore } & {
createSubscription(
pubsubTopicShardInfo?: ShardingParams | PubsubTopic,
options?: SubscribeOptions
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult>;
};

Expand Down
27 changes: 27 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,33 @@ export type ApplicationInfo = {

export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo;

//TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048
/**
* Options for using LightPush and Filter
*/
export type ProtocolUseOptions = {
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
forceUseAllPeers?: boolean;
/**
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};

export type ProtocolCreateOptions = {
/**
* @deprecated
Expand Down
30 changes: 2 additions & 28 deletions packages/interfaces/src/sender.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,10 @@
import type { IEncoder, IMessage } from "./message.js";
import { SDKProtocolResult } from "./protocols.js";
import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js";

export interface ISender {
send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: SendOptions
sendOptions?: ProtocolUseOptions
) => Promise<SDKProtocolResult>;
}

/**
* Options for using LightPush
*/
export type SendOptions = {
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
forceUseAllPeers?: boolean;
/**
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};
4 changes: 2 additions & 2 deletions packages/sdk/src/protocols/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IBaseProtocolSDK, SendOptions } from "@waku/interfaces";
import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";

interface Options {
Expand Down Expand Up @@ -86,7 +86,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100)
*/
protected hasPeers = async (
options: Partial<SendOptions> = {}
options: Partial<ProtocolUseOptions> = {}
): Promise<boolean> => {
const {
autoRetry = false,
Expand Down
32 changes: 14 additions & 18 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
ProtocolUseOptions,
type PubsubTopic,
SDKProtocolResult,
type ShardingParams,
Expand Down Expand Up @@ -45,7 +46,6 @@ const DEFAULT_SUBSCRIBE_OPTIONS = {
};
export class SubscriptionManager implements ISubscriptionSDK {
private readonly pubsubTopic: PubsubTopic;
readonly peers: Peer[];
readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;

Expand All @@ -56,10 +56,9 @@ export class SubscriptionManager implements ISubscriptionSDK {

constructor(
pubsubTopic: PubsubTopic,
remotePeers: Peer[],
private peers: Peer[],
private protocol: FilterCore
) {
this.peers = remotePeers;
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
}
Expand Down Expand Up @@ -314,42 +313,39 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
* @returns The subscription object.
*/
async createSubscription(
pubsubTopicShardInfo: ShardingParams | PubsubTopic
pubsubTopicShardInfo: ShardingParams | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult> {
options = {
autoRetry: true,
...options
} as ProtocolUseOptions;

const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0];

ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);

let peers: Peer[] = [];
try {
peers = await this.protocol.getPeers();
} catch (error) {
log.error("Error getting peers to initiate subscription: ", error);
return {
error: ProtocolError.GENERIC_FAIL,
subscription: null
};
}
if (peers.length === 0) {
const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
return {
error: ProtocolError.NO_PEER_AVAILABLE,
subscription: null
};
}

log.info(
`Creating filter subscription with ${peers.length} peers: `,
peers.map((peer) => peer.id.toString())
`Creating filter subscription with ${this.connectedPeers.length} peers: `,
this.connectedPeers.map((peer) => peer.id.toString())
);

const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(pubsubTopic, peers, this.protocol)
new SubscriptionManager(pubsubTopic, this.connectedPeers, this.protocol)
);

return {
Expand Down
8 changes: 4 additions & 4 deletions packages/sdk/src/protocols/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import {
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
SDKProtocolResult,
SendOptions
ProtocolUseOptions,
SDKProtocolResult
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";

Expand All @@ -35,12 +35,12 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
async send(
encoder: IEncoder,
message: IMessage,
_options?: SendOptions
_options?: ProtocolUseOptions
): Promise<SDKProtocolResult> {
const options = {
autoRetry: true,
..._options
} as SendOptions;
} as ProtocolUseOptions;

const successes: PeerId[] = [];
const failures: Failure[] = [];
Expand Down
77 changes: 77 additions & 0 deletions packages/tests/tests/filter/peer_management.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { DefaultPubsubTopic, LightNode } from "@waku/interfaces";
import {
createDecoder,
createEncoder,
DecodedMessage,
utf8ToBytes
} from "@waku/sdk";
import { expect } from "chai";
import { describe } from "mocha";

import {
afterEachCustom,
beforeEachCustom,
ServiceNodesFleet
} from "../../src/index.js";
import {
runMultipleNodes,
teardownNodesWithRedundancy
} from "../filter/utils.js";

//TODO: add unit tests,

describe("Waku Filter: Peer Management: E2E", function () {
this.timeout(15000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;

beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
undefined,
undefined,
5
);
});

afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});

const pubsubTopic = DefaultPubsubTopic;
const contentTopic = "/test";

const encoder = createEncoder({
pubsubTopic,
contentTopic
});

const decoder = createDecoder(contentTopic, pubsubTopic);

it("Number of peers are maintained correctly", async function () {
const { error, subscription } =
await waku.filter.createSubscription(pubsubTopic);
if (!subscription || error) {
expect.fail("Could not create subscription");
}

const messages: DecodedMessage[] = [];
const { failures, successes } = await subscription.subscribe(
[decoder],
(msg) => {
messages.push(msg);
}
);

await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});

expect(successes.length).to.be.greaterThan(0);
expect(successes.length).to.be.equal(waku.filter.numPeersToUse);

if (failures) {
expect(failures.length).to.equal(0);
}
});
});

0 comments on commit 4db508b

Please sign in to comment.