Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add support for sharded pubsub topics & remove support for named pubsub topics #1697

Merged
merged 17 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@ import type { Libp2p } from "@libp2p/interface";
import type { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
import type {
IBaseProtocol,
Libp2pComponents,
PubsubTopic,
ShardInfo
} from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";

import { DefaultPubsubTopic } from "./constants.js";
import { filterPeers } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";

Expand Down Expand Up @@ -89,4 +96,10 @@ export class BaseProtocol implements IBaseProtocol {
// Filter the peers based on the specified criteria
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
}

initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] {
return shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultPubsubTopic];
}
}
2 changes: 1 addition & 1 deletion packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class Filter extends BaseProtocol implements IReceiver {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);

this.pubsubTopics = options?.pubsubTopics || [DefaultPubsubTopic];
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { PeerId } from "@libp2p/interface/peer-id";
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { IRelay, PeerIdStr } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { Logger, pubsubTopicToSingleTopicShardInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import type { PingService } from "libp2p/ping";

Expand Down Expand Up @@ -123,7 +123,7 @@ export class KeepAliveManager {
if (!meshPeers.includes(peerIdStr)) continue;

const encoder = createEncoder({
pubsubTopic: topic,
pubsubTopicShardInfo: pubsubTopicToSingleTopicShardInfo(topic),
contentTopic: RelayPingContentTopic,
ephemeral: true
});
Expand Down
3 changes: 1 addition & 2 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";

import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubsubTopic } from "../constants.js";

import { PushRpc } from "./push_rpc.js";

Expand Down Expand Up @@ -50,7 +49,7 @@ class LightPush extends BaseProtocol implements ILightPush {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.pubsubTopics = options?.pubsubTopics ?? [DefaultPubsubTopic];
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
}

private async preparePushMessage(
Expand Down
25 changes: 19 additions & 6 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import type {
IMetaSetter,
IProtoMessage,
IRateLimitProof,
PubsubTopic
PubsubTopic,
SingleTopicShardInfo
} from "@waku/interfaces";
import { proto_message as proto } from "@waku/proto";
import { Logger } from "@waku/utils";
import { Logger, singleTopicShardInfoToPubsubTopic } from "@waku/utils";

import { DefaultPubsubTopic } from "../constants.js";

Expand Down Expand Up @@ -119,12 +120,19 @@ export class Encoder implements IEncoder {
* messages.
*/
export function createEncoder({
pubsubTopic = DefaultPubsubTopic,
pubsubTopicShardInfo,
contentTopic,
ephemeral,
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(contentTopic, ephemeral, pubsubTopic, metaSetter);
return new Encoder(
contentTopic,
ephemeral,
pubsubTopicShardInfo?.index
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would the index not be defined? It's not optional on SingleTopicShardInfo.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this is being extra cautious & is easily swappable with a simple check of pubsubTopicShardInfo but this was just added as an extra check to ensure that if somebody force passes a string, or a structure that doesn't have index, we fallback to using the DefaultPubsubTopic
also something @weboko wanted here: #1697 (comment)

happy to follow up with a change if you prefer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually updated this part of #1742

? singleTopicShardInfoToPubsubTopic(pubsubTopicShardInfo)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully this goes away and the encoder does the automated conversion from content topic to pubsub topic cc @adklempner.

I think the encoder should do the conversion from shard info to pubsub topic too. The API should be reviewed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the encoder should do the conversion from shard info to pubsub topic too. The API should be reviewed.

can you elaborate?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, API (createEncoder) is looking ok actually. did not realized it was an internal API (new Encoder).

: DefaultPubsubTopic,
metaSetter
);
}

export class Decoder implements IDecoder<DecodedMessage> {
Expand Down Expand Up @@ -182,7 +190,12 @@ export class Decoder implements IDecoder<DecodedMessage> {
*/
export function createDecoder(
contentTopic: string,
pubsubTopic: PubsubTopic = DefaultPubsubTopic
pubsubTopicShardInfo?: SingleTopicShardInfo
): Decoder {
return new Decoder(pubsubTopic, contentTopic);
return new Decoder(
pubsubTopicShardInfo?.index
? singleTopicShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
contentTopic
);
}
3 changes: 1 addition & 2 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";

import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubsubTopic } from "../constants.js";
import { toProtoMessage } from "../to_proto_message.js";

import { HistoryRpc, PageDirection, Params } from "./history_rpc.js";
Expand Down Expand Up @@ -80,7 +79,7 @@ class Store extends BaseProtocol implements IStore {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
this.pubsubTopics = options?.pubsubTopics ?? [DefaultPubsubTopic];
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
}

/**
Expand Down
15 changes: 12 additions & 3 deletions packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import type {
IStore,
Libp2p,
PubsubTopic,
ShardInfo,
Waku
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { Logger, shardInfoToPubsubTopics } from "@waku/utils";

import { ConnectionManager } from "./connection_manager.js";
import { DefaultPubsubTopic } from "./constants.js";

export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
Expand Down Expand Up @@ -50,16 +52,23 @@ export class WakuNode implements Waku {
public filter?: IFilter;
public lightPush?: ILightPush;
public connectionManager: ConnectionManager;
public readonly pubsubTopics: PubsubTopic[];

constructor(
options: WakuOptions,
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p,
pubsubShardInfo?: ShardInfo,
store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter,
relay?: (libp2p: Libp2p) => IRelay
) {
if (!pubsubShardInfo) {
this.pubsubTopics = [DefaultPubsubTopic];
} else {
this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo);
}

this.libp2p = libp2p;

if (store) {
Expand Down Expand Up @@ -88,7 +97,7 @@ export class WakuNode implements Waku {
peerId,
libp2p,
{ pingKeepAlive, relayKeepAlive },
pubsubTopics,
this.pubsubTopics,
this.relay
);

Expand Down
7 changes: 6 additions & 1 deletion packages/interfaces/src/message.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { ShardInfo } from "./enr.js";
import type { PubsubTopic } from "./misc.js";

export interface SingleTopicShardInfo extends Omit<ShardInfo, "indexList"> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
export interface SingleTopicShardInfo extends Omit<ShardInfo, "indexList"> {
export interface SingleTopicShardInfo extends Omit<ShardInfo, "indexList"> {

Topic and Shard are redundant. It's a single shard info, which is also a single topic info.

The redundancy of information makes the interface name more complicated for no added value.

Also, you break the interface with ShardInfo here as you removed the index list.

Considering you only bring one property (cluster) over from ShardInfo, I am not sure to understand the point of extending from ShardInfo. Just create a new type.

Re-using the type is only useful if you want a function that accept ShardInfo to also accept SingleTopicShardInfo. But because of the Omit, you are not getting that.

If that's what you want, then try to override the indexList with a tuple type to specify that it's an array with only one number.

If you don't need interface compatibility between ShardInfo and SingleTopicShardInfo, then don't make the latter extend the former, just redefine a new interface.

Copy link
Collaborator Author

@danisharora099 danisharora099 Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point that since the two interfaces don't share a lot of properties (have just one in common of the two), thus it might make more sense to define a new interface.

Omit was used because I wanted to showcase the link between the two types, while keeping one property. When is a case you think Omit would make sense for a usecase like this here? Perhaps when there are more number of properties?

--

Addressed redefining the interface here: 5dd508c

Copy link
Collaborator

@fryorcraken fryorcraken Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO Omit makes sense when you want common API to be used. e.g.

interface A {
foo: number
bar: string
}

interface B extends <A, "foo"> {}

function func (alpha: Partial<A>) {
...
}

// this works
func(B)

I see your point re code documentation and duplication but with so very few properties, I think it makes it less readable.

index: number;
}

export interface IRateLimitProof {
proof: Uint8Array;
merkleRoot: Uint8Array;
Expand Down Expand Up @@ -38,7 +43,7 @@ export interface IMetaSetter {
}

export interface EncoderOptions {
pubsubTopic?: PubsubTopic;
pubsubTopicShardInfo?: SingleTopicShardInfo;
/** The content topic to set on outgoing messages. */
contentTopic: string;
/**
Expand Down
10 changes: 5 additions & 5 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import type { PeerId } from "@libp2p/interface/peer-id";
import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { Libp2pOptions } from "libp2p";

import type { ShardInfo } from "./enr.js";
import type { IDecodedMessage } from "./message.js";
import type { PubsubTopic } from "./misc.js";

export enum Protocols {
Relay = "relay",
Expand All @@ -23,9 +23,9 @@ export interface IBaseProtocol {

export type ProtocolCreateOptions = {
/**
* Waku supports usage of multiple pubsub topics, but this is still in early stages.
* Waku implements sharding to achieve scalability
* The format of the sharded topic is `/waku/2/rs/<shard_cluster_index>/<shard_number>`
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
* The format to specify a shard is:
* clusterId: number, shards: number[]
* To learn more about the sharding specifications implemented, see [Relay Sharding](https://rfc.vac.dev/spec/51/).
* The Pubsub Topic to use. Defaults to {@link @waku/core!DefaultPubsubTopic }.
*
Expand All @@ -39,7 +39,7 @@ export type ProtocolCreateOptions = {
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
*/
pubsubTopics?: PubsubTopic[];
shardInfo?: ShardInfo;
/**
* You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property.
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
Expand Down
21 changes: 15 additions & 6 deletions packages/message-encryption/src/ecies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import type {
IDecoder,
IEncoder,
IMessage,
IProtoMessage
IProtoMessage,
SingleTopicShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
import { Logger, singleTopicShardInfoToPubsubTopic } from "@waku/utils";

import { DecodedMessage } from "./decoded_message.js";
import {
Expand Down Expand Up @@ -97,15 +98,17 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubsubTopic = DefaultPubsubTopic,
pubsubTopicShardInfo,
contentTopic,
publicKey,
sigPrivKey,
ephemeral = false,
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
pubsubTopic,
pubsubTopicShardInfo?.index
? singleTopicShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
contentTopic,
publicKey,
sigPrivKey,
Expand Down Expand Up @@ -193,7 +196,13 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
privateKey: Uint8Array,
pubsubTopic: PubsubTopic = DefaultPubsubTopic
pubsubTopicShardInfo?: SingleTopicShardInfo
): Decoder {
return new Decoder(pubsubTopic, contentTopic, privateKey);
return new Decoder(
pubsubTopicShardInfo?.index
? singleTopicShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
contentTopic,
privateKey
);
}
21 changes: 15 additions & 6 deletions packages/message-encryption/src/symmetric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import type {
IMessage,
IMetaSetter,
IProtoMessage,
PubsubTopic
PubsubTopic,
SingleTopicShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
import { Logger, singleTopicShardInfoToPubsubTopic } from "@waku/utils";

import { DecodedMessage } from "./decoded_message.js";
import {
Expand Down Expand Up @@ -93,15 +94,17 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubsubTopic = DefaultPubsubTopic,
pubsubTopicShardInfo,
contentTopic,
symKey,
sigPrivKey,
ephemeral = false,
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
pubsubTopic,
pubsubTopicShardInfo?.index
? singleTopicShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
contentTopic,
symKey,
sigPrivKey,
Expand Down Expand Up @@ -189,7 +192,13 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
symKey: Uint8Array,
pubsubTopic: PubsubTopic = DefaultPubsubTopic
pubsubTopicShardInfo?: SingleTopicShardInfo
): Decoder {
return new Decoder(pubsubTopic, contentTopic, symKey);
return new Decoder(
pubsubTopicShardInfo?.index
? singleTopicShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
contentTopic,
symKey
);
}
10 changes: 8 additions & 2 deletions packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import {
SendError,
SendResult
} from "@waku/interfaces";
import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
import {
isWireSizeUnderCap,
shardInfoToPubsubTopics,
toAsyncIterator
} from "@waku/utils";
import { pushOrInitMapSet } from "@waku/utils";
import { Logger } from "@waku/utils";

Expand Down Expand Up @@ -68,7 +72,9 @@ class Relay implements IRelay {
}

this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = new Set(options?.pubsubTopics ?? [DefaultPubsubTopic]);
this.pubsubTopics = options?.shardInfo
? new Set(shardInfoToPubsubTopics(options.shardInfo))
: new Set([DefaultPubsubTopic]);

if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics();
Expand Down
Loading
Loading