Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add support for sharded pubsub topics & remove support for named pubsub topics #1697

Merged
merged 17 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
shardInfoToPubsubTopics,
toAsyncIterator
} from "@waku/utils";
import { Logger } from "@waku/utils";
Expand Down Expand Up @@ -279,7 +280,9 @@ class Filter extends BaseProtocol implements IReceiver {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);

this.pubsubTopics = options?.pubsubTopics || [DefaultPubsubTopic];
this.pubsubTopics = options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: [DefaultPubsubTopic];

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import {
import { PushResponse } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
isMessageSizeUnderCap
isMessageSizeUnderCap,
shardInfoToPubsubTopics
} from "@waku/utils";
import { Logger } from "@waku/utils";
import all from "it-all";
Expand Down Expand Up @@ -50,7 +51,9 @@ class LightPush extends BaseProtocol implements ILightPush {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.pubsubTopics = options?.pubsubTopics ?? [DefaultPubsubTopic];
this.pubsubTopics = options?.shardInfo
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
? shardInfoToPubsubTopics(options.shardInfo)
: [DefaultPubsubTopic];
}

private async preparePushMessage(
Expand Down
34 changes: 29 additions & 5 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import type {
IMetaSetter,
IProtoMessage,
IRateLimitProof,
PubsubTopic
PubsubTopic,
SingleTopicShardInfo
} from "@waku/interfaces";
import { proto_message as proto } from "@waku/proto";
import { Logger } from "@waku/utils";
import { Logger, singleTopicShardInfoToPubsubTopic } from "@waku/utils";

import { DefaultPubsubTopic } from "../constants.js";

Expand Down Expand Up @@ -124,7 +125,20 @@ export function createEncoder({
ephemeral,
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(contentTopic, ephemeral, pubsubTopic, metaSetter);
if (typeof pubsubTopic === "string" && pubsubTopic !== DefaultPubsubTopic) {
throw new Error(
`Error: cannot use custom named pubsub topic: ${pubsubTopic}, must be ${DefaultPubsubTopic}`
);
}

return new Encoder(
contentTopic,
ephemeral,
typeof pubsubTopic === "string"
? pubsubTopic
: singleTopicShardInfoToPubsubTopic(pubsubTopic),
metaSetter
);
}

export class Decoder implements IDecoder<DecodedMessage> {
Expand Down Expand Up @@ -182,7 +196,17 @@ export class Decoder implements IDecoder<DecodedMessage> {
*/
export function createDecoder(
contentTopic: string,
pubsubTopic: PubsubTopic = DefaultPubsubTopic
pubsubTopic: SingleTopicShardInfo | PubsubTopic = DefaultPubsubTopic
): Decoder {
return new Decoder(pubsubTopic, contentTopic);
if (typeof pubsubTopic === "string" && pubsubTopic !== DefaultPubsubTopic) {
throw new Error(
`Error: cannot use custom named pubsub topic: ${pubsubTopic}, must be ${DefaultPubsubTopic}`
);
}
return new Decoder(
typeof pubsubTopic === "string"
? pubsubTopic
: singleTopicShardInfoToPubsubTopic(pubsubTopic),
contentTopic
);
}
10 changes: 8 additions & 2 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import {
PubsubTopic
} from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto";
import { ensurePubsubTopicIsConfigured, isDefined } from "@waku/utils";
import {
ensurePubsubTopicIsConfigured,
isDefined,
shardInfoToPubsubTopics
} from "@waku/utils";
import { Logger } from "@waku/utils";
import { concat, utf8ToBytes } from "@waku/utils/bytes";
import all from "it-all";
Expand Down Expand Up @@ -80,7 +84,9 @@ class Store extends BaseProtocol implements IStore {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
this.pubsubTopics = options?.pubsubTopics ?? [DefaultPubsubTopic];
this.pubsubTopics = options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: [DefaultPubsubTopic];
}

/**
Expand Down
7 changes: 6 additions & 1 deletion packages/interfaces/src/message.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import { ShardInfo } from "./enr.js";
import type { PubsubTopic } from "./misc.js";

export interface SingleTopicShardInfo extends Omit<ShardInfo, "indexList"> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
export interface SingleTopicShardInfo extends Omit<ShardInfo, "indexList"> {
export interface SingleTopicShardInfo extends Omit<ShardInfo, "indexList"> {

Topic and Shard are redundant. It's a single shard info, which is also a single topic info.

The redundancy of information makes the interface name more complicated for no added value.

Also, you break the interface with ShardInfo here as you removed the index list.

Considering you only bring one property (cluster) over from ShardInfo, I am not sure to understand the point of extending from ShardInfo. Just create a new type.

Re-using the type is only useful if you want a function that accept ShardInfo to also accept SingleTopicShardInfo. But because of the Omit, you are not getting that.

If that's what you want, then try to override the indexList with a tuple type to specify that it's an array with only one number.

If you don't need interface compatibility between ShardInfo and SingleTopicShardInfo, then don't make the latter extend the former, just redefine a new interface.

Copy link
Collaborator Author

@danisharora099 danisharora099 Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point that since the two interfaces don't share a lot of properties (have just one in common of the two), thus it might make more sense to define a new interface.

Omit was used because I wanted to showcase the link between the two types, while keeping one property. When is a case you think Omit would make sense for a usecase like this here? Perhaps when there are more number of properties?

--

Addressed redefining the interface here: 5dd508c

Copy link
Collaborator

@fryorcraken fryorcraken Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO Omit makes sense when you want common API to be used. e.g.

interface A {
foo: number
bar: string
}

interface B extends <A, "foo"> {}

function func (alpha: Partial<A>) {
...
}

// this works
func(B)

I see your point re code documentation and duplication but with so very few properties, I think it makes it less readable.

index: number;
}

export interface IRateLimitProof {
proof: Uint8Array;
merkleRoot: Uint8Array;
Expand Down Expand Up @@ -38,7 +43,7 @@ export interface IMetaSetter {
}

export interface EncoderOptions {
pubsubTopic?: PubsubTopic;
pubsubTopic?: SingleTopicShardInfo | PubsubTopic;
/** The content topic to set on outgoing messages. */
contentTopic: string;
/**
Expand Down
10 changes: 5 additions & 5 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import type { PeerId } from "@libp2p/interface/peer-id";
import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { Libp2pOptions } from "libp2p";

import type { ShardInfo } from "./enr.js";
import type { IDecodedMessage } from "./message.js";
import type { PubsubTopic } from "./misc.js";

export enum Protocols {
Relay = "relay",
Expand All @@ -23,9 +23,9 @@ export interface IBaseProtocol {

export type ProtocolCreateOptions = {
/**
* Waku supports usage of multiple pubsub topics, but this is still in early stages.
* Waku implements sharding to achieve scalability
* The format of the sharded topic is `/waku/2/rs/<shard_cluster_index>/<shard_number>`
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
* The format to specify a shard is:
* clusterId: number, shards: number[]
* To learn more about the sharding specifications implemented, see [Relay Sharding](https://rfc.vac.dev/spec/51/).
* The Pubsub Topic to use. Defaults to {@link @waku/core!DefaultPubsubTopic }.
*
Expand All @@ -39,7 +39,7 @@ export type ProtocolCreateOptions = {
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
*/
pubsubTopics?: PubsubTopic[];
shardInfo?: ShardInfo;
/**
* You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property.
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
Expand Down
31 changes: 26 additions & 5 deletions packages/message-encryption/src/ecies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import type {
IDecoder,
IEncoder,
IMessage,
IProtoMessage
IProtoMessage,
SingleTopicShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
import { Logger, singleTopicShardInfoToPubsubTopic } from "@waku/utils";

import { DecodedMessage } from "./decoded_message.js";
import {
Expand Down Expand Up @@ -104,8 +105,16 @@ export function createEncoder({
ephemeral = false,
metaSetter
}: EncoderOptions): Encoder {
if (typeof pubsubTopic === "string" && pubsubTopic !== DefaultPubsubTopic) {
throw new Error(
`Error: cannot use custom named pubsub topic: ${pubsubTopic}, must be ${DefaultPubsubTopic}`
);
}

return new Encoder(
pubsubTopic,
typeof pubsubTopic === "string"
? pubsubTopic
: singleTopicShardInfoToPubsubTopic(pubsubTopic),
contentTopic,
publicKey,
sigPrivKey,
Expand Down Expand Up @@ -193,7 +202,19 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
privateKey: Uint8Array,
pubsubTopic: PubsubTopic = DefaultPubsubTopic
pubsubTopic: SingleTopicShardInfo | PubsubTopic = DefaultPubsubTopic
): Decoder {
return new Decoder(pubsubTopic, contentTopic, privateKey);
if (typeof pubsubTopic === "string" && pubsubTopic !== DefaultPubsubTopic) {
throw new Error(
`Error: cannot use custom named pubsub topic: ${pubsubTopic}, must be ${DefaultPubsubTopic}`
);
}

return new Decoder(
typeof pubsubTopic === "string"
? pubsubTopic
: singleTopicShardInfoToPubsubTopic(pubsubTopic),
contentTopic,
privateKey
);
}
30 changes: 25 additions & 5 deletions packages/message-encryption/src/symmetric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import type {
IMessage,
IMetaSetter,
IProtoMessage,
PubsubTopic
PubsubTopic,
SingleTopicShardInfo
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
import { Logger, singleTopicShardInfoToPubsubTopic } from "@waku/utils";

import { DecodedMessage } from "./decoded_message.js";
import {
Expand Down Expand Up @@ -100,8 +101,15 @@ export function createEncoder({
ephemeral = false,
metaSetter
}: EncoderOptions): Encoder {
if (typeof pubsubTopic === "string" && pubsubTopic !== DefaultPubsubTopic) {
throw new Error(
`Error: cannot use custom named pubsub topic: ${pubsubTopic}, must be ${DefaultPubsubTopic}`
);
}
return new Encoder(
pubsubTopic,
typeof pubsubTopic === "string"
? pubsubTopic
: singleTopicShardInfoToPubsubTopic(pubsubTopic),
contentTopic,
symKey,
sigPrivKey,
Expand Down Expand Up @@ -189,7 +197,19 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
symKey: Uint8Array,
pubsubTopic: PubsubTopic = DefaultPubsubTopic
pubsubTopic: SingleTopicShardInfo | PubsubTopic = DefaultPubsubTopic
): Decoder {
return new Decoder(pubsubTopic, contentTopic, symKey);
if (typeof pubsubTopic === "string" && pubsubTopic !== DefaultPubsubTopic) {
throw new Error(
`Error: cannot use custom named pubsub topic: ${pubsubTopic}, must be ${DefaultPubsubTopic}`
);
}

return new Decoder(
typeof pubsubTopic === "string"
? pubsubTopic
: singleTopicShardInfoToPubsubTopic(pubsubTopic),
contentTopic,
symKey
);
}
10 changes: 8 additions & 2 deletions packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import {
SendError,
SendResult
} from "@waku/interfaces";
import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
import {
isWireSizeUnderCap,
shardInfoToPubsubTopics,
toAsyncIterator
} from "@waku/utils";
import { pushOrInitMapSet } from "@waku/utils";
import { Logger } from "@waku/utils";

Expand Down Expand Up @@ -68,7 +72,9 @@ class Relay implements IRelay {
}

this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = new Set(options?.pubsubTopics ?? [DefaultPubsubTopic]);
this.pubsubTopics = options?.shardInfo
? new Set(shardInfoToPubsubTopics(options.shardInfo))
: new Set([DefaultPubsubTopic]);

if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics();
Expand Down
32 changes: 23 additions & 9 deletions packages/sdk/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
ProtocolCreateOptions,
RelayNode
} from "@waku/interfaces";
import type { PubsubTopic } from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { createLibp2p, Libp2pOptions } from "libp2p";
import { identifyService } from "libp2p/identify";
import { pingService } from "libp2p/ping";
Expand All @@ -46,8 +48,12 @@
): Promise<LightNode> {
options = options ?? {};

if (!options.pubsubTopics) {
options.pubsubTopics = [DefaultPubsubTopic];
let pubsubTopics: PubsubTopic[];

if (!options.shardInfo) {
pubsubTopics = [DefaultPubsubTopic];
} else {
pubsubTopics = shardInfoToPubsubTopics(options.shardInfo);
}

const libp2pOptions = options?.libp2p ?? {};
Expand All @@ -69,7 +75,7 @@

return new WakuNode(
options ?? {},
options.pubsubTopics,
pubsubTopics,
libp2p,
store,
lightPush,
Expand All @@ -86,8 +92,12 @@
): Promise<RelayNode> {
options = options ?? {};

if (!options.pubsubTopics) {
options.pubsubTopics = [DefaultPubsubTopic];
let pubsubTopics: PubsubTopic[];

if (!options.shardInfo) {
pubsubTopics = [DefaultPubsubTopic];
} else {
pubsubTopics = shardInfoToPubsubTopics(options.shardInfo);
}

const libp2pOptions = options?.libp2p ?? {};
Expand All @@ -107,7 +117,7 @@

return new WakuNode(
options,
options.pubsubTopics,
pubsubTopics,
libp2p,
undefined,
undefined,
Expand All @@ -134,8 +144,12 @@
): Promise<FullNode> {
options = options ?? {};

if (!options.pubsubTopics) {
options.pubsubTopics = [DefaultPubsubTopic];
let pubsubTopics: PubsubTopic[];
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved

if (!options.shardInfo) {
pubsubTopics = [DefaultPubsubTopic];
} else {
pubsubTopics = shardInfoToPubsubTopics(options.shardInfo);
}

const libp2pOptions = options?.libp2p ?? {};
Expand All @@ -158,7 +172,7 @@

return new WakuNode(
options ?? {},
options.pubsubTopics,
pubsubTopics,
libp2p,
store,
lightPush,
Expand Down Expand Up @@ -206,5 +220,5 @@
...pubsubService,
...options?.services
}
}) as any as Libp2p; // TODO: make libp2p include it;

Check warning on line 223 in packages/sdk/src/create.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 223 in packages/sdk/src/create.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
}
Loading
Loading