Skip to content

Commit

Permalink
feat!: use ShardingParams on subscriptions, make Decoder/Encoder auto…
Browse files Browse the repository at this point in the history
… sharding friendly by default (#1958)

* fix: use pubsubTopic from current ones if not set

* fix: improve type on dial method

* enforce same pubusb on filter.subscribe, make content topic to pubsub mapping default for decoder / encoder

* fix mapping problem

* update tests

* add error handling

* fix typo

* up lock

* rm lock

* up lock

* remove only

* fix content topic

* fix ephemeral test

* fix filter unsubscribe test

* up utils

* fix subscribe test

* up interfaces and filter api

* remove only

* up ping test

* fix subscribe test

* fix push test

* fix lightPush

* fix multiple pubsub

* remove only, fix subscribe filter test

* remove only

* fix cluster ID selection and named sharding subscription test

* fix unsubscribe test

* fix light push test

* fix light push test

* fix push test

* fix relay publish

* create runNode and fix relay tests

* generalize runNodes, fix some tests

* fix store tests

* fix toAsyncIterator tests

* remove only

* fix lightPush

* use generics

* try fix test

* run failing tests

* remove only

* address failed tests, remove DefaultPubsubTopic dependency in some tests
  • Loading branch information
weboko authored Apr 28, 2024
1 parent 86249df commit f3627c4
Show file tree
Hide file tree
Showing 49 changed files with 1,495 additions and 1,374 deletions.
161 changes: 75 additions & 86 deletions packages/core/src/lib/message/version_0.spec.ts
Original file line number Diff line number Diff line change
@@ -1,108 +1,97 @@
import type { IProtoMessage } from "@waku/interfaces";
import { contentTopicToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import fc from "fast-check";

import { createDecoder, createEncoder, DecodedMessage } from "./version_0.js";

const contentTopic = "/js-waku/1/tests/bytes";
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);

describe("Waku Message version 0", function () {
it("Round trip binary serialization", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
async (contentTopic, pubsubTopic, payload) => {
const encoder = createEncoder({
contentTopic
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;

expect(result.contentTopic).to.eq(contentTopic);
expect(result.pubsubTopic).to.eq(pubsubTopic);
expect(result.version).to.eq(0);
expect(result.ephemeral).to.be.false;
expect(result.payload).to.deep.eq(payload);
expect(result.timestamp).to.not.be.undefined;
}
)
fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => {
const encoder = createEncoder({
contentTopic
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;

expect(result.contentTopic).to.eq(contentTopic);
expect(result.pubsubTopic).to.eq(pubsubTopic);
expect(result.version).to.eq(0);
expect(result.ephemeral).to.be.false;
expect(result.payload).to.deep.eq(payload);
expect(result.timestamp).to.not.be.undefined;
})
);
});

it("Ephemeral field set to true", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
async (contentTopic, pubsubTopic, payload) => {
const encoder = createEncoder({
contentTopic,
ephemeral: true
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;

expect(result.ephemeral).to.be.true;
}
)
fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => {
const encoder = createEncoder({
contentTopic,
ephemeral: true
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;

expect(result.ephemeral).to.be.true;
})
);
});

it("Meta field set when metaSetter is specified", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
async (contentTopic, pubsubTopic, payload) => {
// Encode the length of the payload
// Not a relevant real life example
const metaSetter = (
msg: IProtoMessage & { meta: undefined }
): Uint8Array => {
const buffer = new ArrayBuffer(4);
const view = new DataView(buffer);
view.setUint32(0, msg.payload.length, false);
return new Uint8Array(buffer);
};

const encoder = createEncoder({
contentTopic,
ephemeral: true,
metaSetter
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;

const expectedMeta = metaSetter({
payload,
timestamp: undefined,
contentTopic: "",
ephemeral: undefined,
meta: undefined,
rateLimitProof: undefined,
version: undefined
});

expect(result.meta).to.deep.eq(expectedMeta);
}
)
fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => {
// Encode the length of the payload
// Not a relevant real life example
const metaSetter = (
msg: IProtoMessage & { meta: undefined }
): Uint8Array => {
const buffer = new ArrayBuffer(4);
const view = new DataView(buffer);
view.setUint32(0, msg.payload.length, false);
return new Uint8Array(buffer);
};

const encoder = createEncoder({
contentTopic,
ephemeral: true,
metaSetter
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;

const expectedMeta = metaSetter({
payload,
timestamp: undefined,
contentTopic: "",
ephemeral: undefined,
meta: undefined,
rateLimitProof: undefined,
version: undefined
});

expect(result.meta).to.deep.eq(expectedMeta);
})
);
});
});
Expand Down
10 changes: 4 additions & 6 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import type { PeerId } from "@libp2p/interface";

import type { IDecodedMessage, IDecoder, SingleShardInfo } from "./message.js";
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, PubsubTopic } from "./misc.js";
import type {
Callback,
IBaseProtocolCore,
IBaseProtocolSDK
IBaseProtocolSDK,
ShardingParams
} from "./protocols.js";
import type { IReceiver } from "./receiver.js";

Expand All @@ -31,7 +30,6 @@ export type IFilter = IReceiver & IBaseProtocolCore;
export type IFilterSDK = IReceiver &
IBaseProtocolSDK & { protocol: IBaseProtocolCore } & {
createSubscription(
pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic,
peerId?: PeerId
pubsubTopicShardInfo?: ShardingParams | PubsubTopic
): Promise<IFilterSubscription>;
};
2 changes: 1 addition & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export type IBaseProtocolSDK = {
};

export type ContentTopicInfo = {
clusterId: number;
clusterId?: number;
contentTopics: string[];
};

Expand Down
4 changes: 2 additions & 2 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { PeerId, Stream } from "@libp2p/interface";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { MultiaddrInput } from "@multiformats/multiaddr";

import { IConnectionManager } from "./connection_manager.js";
import type { IFilterSDK } from "./filter.js";
Expand All @@ -18,7 +18,7 @@ export interface Waku {

connectionManager: IConnectionManager;

dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;
dial(peer: PeerId | MultiaddrInput, protocols?: Protocols[]): Promise<Stream>;

start(): Promise<void>;

Expand Down
22 changes: 7 additions & 15 deletions packages/message-encryption/src/ecies.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import { IProtoMessage } from "@waku/interfaces";
import { contentTopicToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import fc from "fast-check";

import { getPublicKey } from "./crypto/index.js";
import { createDecoder, createEncoder } from "./ecies.js";

const contentTopic = "/js-waku/1/tests/bytes";
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);

describe("Ecies Encryption", function () {
this.timeout(20000);
it("Round trip binary encryption [ecies, no signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (pubsubTopic, contentTopic, payload, privateKey) => {
async (payload, privateKey) => {
const publicKey = getPublicKey(privateKey);

const encoder = createEncoder({
Expand Down Expand Up @@ -46,18 +48,10 @@ describe("Ecies Encryption", function () {

await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (
pubsubTopic,
contentTopic,
payload,
alicePrivateKey,
bobPrivateKey
) => {
async (payload, alicePrivateKey, bobPrivateKey) => {
const alicePublicKey = getPublicKey(alicePrivateKey);
const bobPublicKey = getPublicKey(bobPrivateKey);

Expand Down Expand Up @@ -89,11 +83,9 @@ describe("Ecies Encryption", function () {
it("Check meta is set [ecies]", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (pubsubTopic, contentTopic, payload, privateKey) => {
async (payload, privateKey) => {
const publicKey = getPublicKey(privateKey);
const metaSetter = (
msg: IProtoMessage & { meta: undefined }
Expand Down
3 changes: 1 addition & 2 deletions packages/message-encryption/src/ecies.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0";
import {
type EncoderOptions as BaseEncoderOptions,
DefaultPubsubTopic,
type IDecoder,
type IEncoder,
type IMessage,
Expand Down Expand Up @@ -200,7 +199,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
privateKey: Uint8Array,
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic
pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic
): Decoder {
return new Decoder(
determinePubsubTopic(contentTopic, pubsubTopicShardInfo),
Expand Down
16 changes: 7 additions & 9 deletions packages/message-encryption/src/symmetric.spec.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import { IProtoMessage } from "@waku/interfaces";
import { contentTopicToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import fc from "fast-check";

import { getPublicKey } from "./crypto/index.js";
import { createDecoder, createEncoder } from "./symmetric.js";

const contentTopic = "/js-waku/1/tests/bytes";
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);

describe("Symmetric Encryption", function () {
it("Round trip binary encryption [symmetric, no signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (pubsubTopic, contentTopic, payload, symKey) => {
async (payload, symKey) => {
const encoder = createEncoder({
contentTopic,
symKey
Expand Down Expand Up @@ -41,12 +43,10 @@ describe("Symmetric Encryption", function () {
it("Round trip binary encryption [symmetric, signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (pubsubTopic, contentTopic, payload, sigPrivKey, symKey) => {
async (payload, sigPrivKey, symKey) => {
const sigPubKey = getPublicKey(sigPrivKey);

const encoder = createEncoder({
Expand Down Expand Up @@ -77,11 +77,9 @@ describe("Symmetric Encryption", function () {
it("Check meta is set [symmetric]", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (pubsubTopic, contentTopic, payload, symKey) => {
async (payload, symKey) => {
const metaSetter = (
msg: IProtoMessage & { meta: undefined }
): Uint8Array => {
Expand Down
Loading

0 comments on commit f3627c4

Please sign in to comment.