Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: use ShardingParams on subscriptions, make Decoder/Encoder auto sharding friendly by default #1958

Merged
merged 45 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6acf183
fix: use pubsubTopic from current ones if not set
weboko Apr 15, 2024
a7bf4fd
fix: improve type on dial method
weboko Apr 15, 2024
08397ff
enforce same pubusb on filter.subscribe, make content topic to pubsub…
weboko Apr 18, 2024
c89c8f9
fix mapping problem
weboko Apr 18, 2024
5248761
update tests
weboko Apr 18, 2024
f9b9bda
add error handling
weboko Apr 18, 2024
fd60cc2
fix typo
weboko Apr 18, 2024
f7849c3
merge with master
weboko Apr 22, 2024
22cfb50
up lock
weboko Apr 22, 2024
8db5bf8
rm lock
weboko Apr 22, 2024
0f79e17
up lock
weboko Apr 22, 2024
0c7f4a1
remove only
weboko Apr 22, 2024
a8922ba
fix content topic
weboko Apr 22, 2024
eabe940
Merge branch 'master' of github.com:waku-org/js-waku into weboko/prot…
weboko Apr 24, 2024
68f3619
fix ephemeral test
weboko Apr 24, 2024
364045b
fix filter unsubscribe test
weboko Apr 24, 2024
b324f64
up utils
weboko Apr 24, 2024
f9d5546
fix subscribe test
weboko Apr 24, 2024
a629aa7
up interfaces and filter api
weboko Apr 24, 2024
d1aff71
remove only
weboko Apr 24, 2024
801b286
up ping test
weboko Apr 24, 2024
c10e7d9
fix subscribe test
weboko Apr 25, 2024
5ee014d
fix push test
weboko Apr 25, 2024
32852ad
fix lightPush
weboko Apr 25, 2024
f971624
fix multiple pubsub
weboko Apr 25, 2024
a929da7
remove only, fix subscribe filter test
weboko Apr 25, 2024
6e29167
remove only
weboko Apr 25, 2024
1641074
merge with master
weboko Apr 25, 2024
7d79bda
fix cluster ID selection and named sharding subscription test
weboko Apr 25, 2024
c574863
fix unsubscribe test
weboko Apr 25, 2024
8873aa1
fix light push test
weboko Apr 25, 2024
7d24612
fix light push test
weboko Apr 25, 2024
d8b1952
fix push test
weboko Apr 25, 2024
600c02b
fix relay publish
weboko Apr 25, 2024
88ba615
create runNode and fix relay tests
weboko Apr 25, 2024
90731d7
generalize runNodes, fix some tests
weboko Apr 26, 2024
a4c3d50
fix store tests
weboko Apr 26, 2024
16ffc67
fix toAsyncIterator tests
weboko Apr 26, 2024
e47cc67
remove only
weboko Apr 26, 2024
7e1260b
fix lightPush
weboko Apr 26, 2024
9d02f2b
use generics
weboko Apr 26, 2024
b0381b4
try fix test
weboko Apr 26, 2024
53d4db8
run failing tests
weboko Apr 26, 2024
fd9cb86
remove only
weboko Apr 26, 2024
f7595c2
address failed tests, remove DefaultPubsubTopic dependency in some tests
weboko Apr 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 70 additions & 81 deletions packages/core/src/lib/message/version_0.spec.ts
Original file line number Diff line number Diff line change
@@ -1,108 +1,97 @@
import type { IProtoMessage } from "@waku/interfaces";
import { contentTopicToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import fc from "fast-check";

import { createDecoder, createEncoder, DecodedMessage } from "./version_0.js";

const contentTopic = "/js-waku/1/tests/bytes";
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);

describe("Waku Message version 0", function () {
it("Round trip binary serialization", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
async (contentTopic, pubsubTopic, payload) => {
const encoder = createEncoder({
contentTopic
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;
fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => {
const encoder = createEncoder({
contentTopic
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;

expect(result.contentTopic).to.eq(contentTopic);
expect(result.pubsubTopic).to.eq(pubsubTopic);
expect(result.version).to.eq(0);
expect(result.ephemeral).to.be.false;
expect(result.payload).to.deep.eq(payload);
expect(result.timestamp).to.not.be.undefined;
}
)
expect(result.contentTopic).to.eq(contentTopic);
expect(result.pubsubTopic).to.eq(pubsubTopic);
expect(result.version).to.eq(0);
expect(result.ephemeral).to.be.false;
expect(result.payload).to.deep.eq(payload);
expect(result.timestamp).to.not.be.undefined;
})
);
});

it("Ephemeral field set to true", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
async (contentTopic, pubsubTopic, payload) => {
const encoder = createEncoder({
contentTopic,
ephemeral: true
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;
fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => {
const encoder = createEncoder({
contentTopic,
ephemeral: true
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;

expect(result.ephemeral).to.be.true;
}
)
expect(result.ephemeral).to.be.true;
})
);
});

it("Meta field set when metaSetter is specified", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
async (contentTopic, pubsubTopic, payload) => {
// Encode the length of the payload
// Not a relevant real life example
const metaSetter = (
msg: IProtoMessage & { meta: undefined }
): Uint8Array => {
const buffer = new ArrayBuffer(4);
const view = new DataView(buffer);
view.setUint32(0, msg.payload.length, false);
return new Uint8Array(buffer);
};
fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => {
// Encode the length of the payload
// Not a relevant real life example
const metaSetter = (
msg: IProtoMessage & { meta: undefined }
): Uint8Array => {
const buffer = new ArrayBuffer(4);
const view = new DataView(buffer);
view.setUint32(0, msg.payload.length, false);
return new Uint8Array(buffer);
};

const encoder = createEncoder({
contentTopic,
ephemeral: true,
metaSetter
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;
const encoder = createEncoder({
contentTopic,
ephemeral: true,
metaSetter
});
const bytes = await encoder.toWire({ payload });
const decoder = createDecoder(contentTopic);
const protoResult = await decoder.fromWireToProtoObj(bytes);
const result = (await decoder.fromProtoObj(
pubsubTopic,
protoResult!
)) as DecodedMessage;

const expectedMeta = metaSetter({
payload,
timestamp: undefined,
contentTopic: "",
ephemeral: undefined,
meta: undefined,
rateLimitProof: undefined,
version: undefined
});
const expectedMeta = metaSetter({
payload,
timestamp: undefined,
contentTopic: "",
ephemeral: undefined,
meta: undefined,
rateLimitProof: undefined,
version: undefined
});

expect(result.meta).to.deep.eq(expectedMeta);
}
)
expect(result.meta).to.deep.eq(expectedMeta);
})
);
});
});
Expand Down
4 changes: 2 additions & 2 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { PeerId, Stream } from "@libp2p/interface";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { MultiaddrInput } from "@multiformats/multiaddr";

import { IConnectionManager } from "./connection_manager.js";
import type { IFilterSDK } from "./filter.js";
Expand All @@ -18,7 +18,7 @@ export interface Waku {

connectionManager: IConnectionManager;

dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;
dial(peer: PeerId | MultiaddrInput, protocols?: Protocols[]): Promise<Stream>;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix for discrepancy in types - this prevents us from doing .dial(multiAddrStr) in TS


start(): Promise<void>;

Expand Down
22 changes: 7 additions & 15 deletions packages/message-encryption/src/ecies.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import { IProtoMessage } from "@waku/interfaces";
import { contentTopicToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import fc from "fast-check";

import { getPublicKey } from "./crypto/index.js";
import { createDecoder, createEncoder } from "./ecies.js";

const contentTopic = "/js-waku/1/tests/bytes";
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);

describe("Ecies Encryption", function () {
this.timeout(20000);
it("Round trip binary encryption [ecies, no signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (pubsubTopic, contentTopic, payload, privateKey) => {
async (payload, privateKey) => {
const publicKey = getPublicKey(privateKey);

const encoder = createEncoder({
Expand Down Expand Up @@ -46,18 +48,10 @@ describe("Ecies Encryption", function () {

await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (
pubsubTopic,
contentTopic,
payload,
alicePrivateKey,
bobPrivateKey
) => {
async (payload, alicePrivateKey, bobPrivateKey) => {
const alicePublicKey = getPublicKey(alicePrivateKey);
const bobPublicKey = getPublicKey(bobPrivateKey);

Expand Down Expand Up @@ -89,11 +83,9 @@ describe("Ecies Encryption", function () {
it("Check meta is set [ecies]", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (pubsubTopic, contentTopic, payload, privateKey) => {
async (payload, privateKey) => {
const publicKey = getPublicKey(privateKey);
const metaSetter = (
msg: IProtoMessage & { meta: undefined }
Expand Down
16 changes: 7 additions & 9 deletions packages/message-encryption/src/symmetric.spec.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import { IProtoMessage } from "@waku/interfaces";
import { contentTopicToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import fc from "fast-check";

import { getPublicKey } from "./crypto/index.js";
import { createDecoder, createEncoder } from "./symmetric.js";

const contentTopic = "/js-waku/1/tests/bytes";
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);

describe("Symmetric Encryption", function () {
it("Round trip binary encryption [symmetric, no signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (pubsubTopic, contentTopic, payload, symKey) => {
async (payload, symKey) => {
const encoder = createEncoder({
contentTopic,
symKey
Expand Down Expand Up @@ -41,12 +43,10 @@ describe("Symmetric Encryption", function () {
it("Round trip binary encryption [symmetric, signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (pubsubTopic, contentTopic, payload, sigPrivKey, symKey) => {
async (payload, sigPrivKey, symKey) => {
const sigPubKey = getPublicKey(sigPrivKey);

const encoder = createEncoder({
Expand Down Expand Up @@ -77,11 +77,9 @@ describe("Symmetric Encryption", function () {
it("Check meta is set [symmetric]", async function () {
await fc.assert(
fc.asyncProperty(
fc.string({ minLength: 1 }),
fc.string({ minLength: 1 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (pubsubTopic, contentTopic, payload, symKey) => {
async (payload, symKey) => {
const metaSetter = (
msg: IProtoMessage & { meta: undefined }
): Uint8Array => {
Expand Down
30 changes: 29 additions & 1 deletion packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,21 @@
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): Promise<Unsubscribe> {
const subscription = await this.createSubscription();
const pubsubTopics = this.getPubsubTopics(decoders);
weboko marked this conversation as resolved.
Show resolved Hide resolved

if (pubsubTopics.length === 0) {
throw Error(
"Failed to subscribe: no pubsubTopic found on decoders provided."
);
}

if (pubsubTopics.length > 1) {
throw Error(
"Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile."
);
}

const subscription = await this.createSubscription(pubsubTopics[0]);

await subscription.subscribe(decoders, callback);

Expand All @@ -314,6 +328,20 @@
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}

private getPubsubTopics(decoders: IDecoder<any> | IDecoder<any>[]): string[] {

Check warning on line 332 in packages/sdk/src/protocols/filter.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 332 in packages/sdk/src/protocols/filter.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 332 in packages/sdk/src/protocols/filter.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 332 in packages/sdk/src/protocols/filter.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
weboko marked this conversation as resolved.
Show resolved Hide resolved
if (!Array.isArray(decoders)) {
return [decoders.pubsubTopic];
}

if (decoders.length === 0) {
return [];
}

const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic));

return [...pubsubTopics];
}
}

export function wakuFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ describe("Connection state", function () {
expect(eventCount).to.be.eq(1);
});

it("`waku:online` bwtween 2 js-waku relay nodes", async function () {
it("`waku:online` between 2 js-waku relay nodes", async function () {
const waku1 = await createRelayNode({
staticNoiseKey: NOISE_KEY_1
});
Expand Down Expand Up @@ -159,7 +159,7 @@ describe("Connection state", function () {
expect(waku.isConnected()).to.be.false;
});

it("isConnected bwtween 2 js-waku relay nodes", async function () {
it("isConnected between 2 js-waku relay nodes", async function () {
const waku1 = await createRelayNode({
staticNoiseKey: NOISE_KEY_1
});
Expand Down
Loading
Loading