Skip to content

Commit

Permalink
feat: metadata protocol (#1732)
Browse files Browse the repository at this point in the history
* add proto

* add rpc and interfaces

* add protocol implementation

* update faulty proto def

* add rpc and interfaces

* refactor implementation & write test

* setup the metadata protocol as a service

* fix cases where metadata service needs to be undefined

* remove redundant catch block

* remove addressed TODO

* update import path

* log errors

* remove redundant code from handling incoming metadata request

* update tests

* add test to check for active connections

* change expects

* save remote peer's shard info after successful connection
  • Loading branch information
danisharora099 authored Dec 5, 2023
1 parent 12a5534 commit 9ac2a3f
Show file tree
Hide file tree
Showing 10 changed files with 501 additions and 1 deletion.
2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ export { ConnectionManager } from "./lib/connection_manager.js";

export { KeepAliveManager } from "./lib/keep_alive_manager.js";
export { StreamManager } from "./lib/stream_manager.js";

export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";
105 changes: 105 additions & 0 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import { IncomingStreamData } from "@libp2p/interface/stream-handler";
import { encodeRelayShard } from "@waku/enr";
import type { IMetadata, Libp2pComponents, ShardInfo } from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";

import { BaseProtocol } from "../base_protocol.js";

const log = new Logger("metadata");

export const MetadataCodec = "/vac/waku/metadata/1.0.0";

class Metadata extends BaseProtocol {
private readonly shardInfo: ShardInfo;
private libp2pComponents: Libp2pComponents;
constructor(shardInfo: ShardInfo, libp2p: Libp2pComponents) {
super(MetadataCodec, libp2p.components);
this.libp2pComponents = libp2p;
this.shardInfo = shardInfo;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);
});
}

/**
* Handle an incoming metadata request
*/
private async onRequest(streamData: IncomingStreamData): Promise<void> {
try {
const { stream, connection } = streamData;
const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode(
this.shardInfo
);

const encodedResponse = await pipe(
[encodedShardInfo],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);

const remoteShardInfoResponse =
this.decodeMetadataResponse(encodedResponse);

// add or update the shardInfo to peer store
await this.libp2pComponents.peerStore.merge(connection.remotePeer, {
metadata: {
shardInfo: encodeRelayShard(remoteShardInfoResponse)
}
});
} catch (error) {
log.error("Error handling metadata request", error);
}
}

/**
* Make a metadata query to a peer
*/
async query(peerId: PeerId): Promise<ShardInfo> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);

const peer = await this.getPeer(peerId);

const stream = await this.getStream(peer);

const encodedResponse = await pipe(
[request],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);

const decodedResponse = this.decodeMetadataResponse(encodedResponse);

return decodedResponse;
}

private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo {
const bytes = new Uint8ArrayList();

encodedResponse.forEach((chunk) => {
bytes.append(chunk);
});
const response = proto_metadata.WakuMetadataResponse.decode(
bytes
) as ShardInfo;

if (!response) log.error("Error decoding metadata response");

return response;
}
}

export function wakuMetadata(
shardInfo: ShardInfo
): (components: Libp2pComponents) => IMetadata {
return (components: Libp2pComponents) => new Metadata(shardInfo, components);
}
1 change: 1 addition & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ export * from "./misc.js";
export * from "./libp2p.js";
export * from "./keep_alive_manager.js";
export * from "./dns_discovery.js";
export * from "./metadata.js";
3 changes: 3 additions & 0 deletions packages/interfaces/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import type { Libp2pInit, Libp2pOptions } from "libp2p";
import type { identifyService } from "libp2p/identify";
import type { PingService } from "libp2p/ping";

import { IMetadata } from "./metadata";

export type Libp2pServices = {
ping: PingService;
metadata?: IMetadata;
pubsub?: GossipSub;
identify: ReturnType<ReturnType<typeof identifyService>>;
};
Expand Down
8 changes: 8 additions & 0 deletions packages/interfaces/src/metadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import type { PeerId } from "@libp2p/interface/peer-id";

import type { ShardInfo } from "./enr.js";
import type { IBaseProtocol } from "./protocols.js";

export interface IMetadata extends IBaseProtocol {
query(peerId: PeerId): Promise<ShardInfo | undefined>;
}
2 changes: 2 additions & 0 deletions packages/proto/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ export { PushResponse } from "./lib/light_push.js";
export * as proto_store from "./lib/store.js";

export * as proto_peer_exchange from "./lib/peer_exchange.js";

export * as proto_metadata from './lib/metadata.js'
12 changes: 12 additions & 0 deletions packages/proto/src/lib/metadata.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto3";


message WakuMetadataRequest {
optional uint32 cluster_id = 1;
repeated uint32 shards = 2;
}

message WakuMetadataResponse {
optional uint32 cluster_id = 1;
repeated uint32 shards = 2;
}
147 changes: 147 additions & 0 deletions packages/proto/src/lib/metadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/* eslint-disable import/export */
/* eslint-disable complexity */
/* eslint-disable @typescript-eslint/no-namespace */
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-empty-interface */

import { encodeMessage, decodeMessage, message } from 'protons-runtime'
import type { Codec } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface WakuMetadataRequest {
clusterId?: number
shards: number[]
}

export namespace WakuMetadataRequest {
let _codec: Codec<WakuMetadataRequest>

export const codec = (): Codec<WakuMetadataRequest> => {
if (_codec == null) {
_codec = message<WakuMetadataRequest>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}

if (obj.clusterId != null) {
w.uint32(8)
w.uint32(obj.clusterId)
}

if (obj.shards != null) {
for (const value of obj.shards) {
w.uint32(16)
w.uint32(value)
}
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length) => {
const obj: any = {
shards: []
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
obj.clusterId = reader.uint32()
break
case 2:
obj.shards.push(reader.uint32())
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
})
}

return _codec
}

export const encode = (obj: Partial<WakuMetadataRequest>): Uint8Array => {
return encodeMessage(obj, WakuMetadataRequest.codec())
}

export const decode = (buf: Uint8Array | Uint8ArrayList): WakuMetadataRequest => {
return decodeMessage(buf, WakuMetadataRequest.codec())
}
}

export interface WakuMetadataResponse {
clusterId?: number
shards: number[]
}

export namespace WakuMetadataResponse {
let _codec: Codec<WakuMetadataResponse>

export const codec = (): Codec<WakuMetadataResponse> => {
if (_codec == null) {
_codec = message<WakuMetadataResponse>((obj, w, opts = {}) => {
if (opts.lengthDelimited !== false) {
w.fork()
}

if (obj.clusterId != null) {
w.uint32(8)
w.uint32(obj.clusterId)
}

if (obj.shards != null) {
for (const value of obj.shards) {
w.uint32(16)
w.uint32(value)
}
}

if (opts.lengthDelimited !== false) {
w.ldelim()
}
}, (reader, length) => {
const obj: any = {
shards: []
}

const end = length == null ? reader.len : reader.pos + length

while (reader.pos < end) {
const tag = reader.uint32()

switch (tag >>> 3) {
case 1:
obj.clusterId = reader.uint32()
break
case 2:
obj.shards.push(reader.uint32())
break
default:
reader.skipType(tag & 7)
break
}
}

return obj
})
}

return _codec
}

export const encode = (obj: Partial<WakuMetadataResponse>): Uint8Array => {
return encodeMessage(obj, WakuMetadataResponse.codec())
}

export const decode = (buf: Uint8Array | Uint8ArrayList): WakuMetadataResponse => {
return decodeMessage(buf, WakuMetadataResponse.codec())
}
}
18 changes: 17 additions & 1 deletion packages/sdk/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
DefaultUserAgent,
wakuFilter,
wakuLightPush,
wakuMetadata,
WakuNode,
WakuOptions,
wakuStore
Expand All @@ -16,11 +17,13 @@ import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
import type {
CreateLibp2pOptions,
FullNode,
IMetadata,
Libp2p,
Libp2pComponents,
LightNode,
ProtocolCreateOptions,
RelayNode
RelayNode,
ShardInfo
} from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
Expand Down Expand Up @@ -54,6 +57,7 @@ export async function createLightNode(
}

const libp2p = await defaultLibp2p(
options.shardInfo,
undefined,
libp2pOptions,
options?.userAgent
Expand Down Expand Up @@ -90,6 +94,7 @@ export async function createRelayNode(
}

const libp2p = await defaultLibp2p(
options.shardInfo,
wakuGossipSub(options),
libp2pOptions,
options?.userAgent
Expand Down Expand Up @@ -134,6 +139,7 @@ export async function createFullNode(
}

const libp2p = await defaultLibp2p(
options.shardInfo,
wakuGossipSub(options),
libp2pOptions,
options?.userAgent
Expand Down Expand Up @@ -169,7 +175,12 @@ type PubsubService = {
pubsub?: (components: Libp2pComponents) => GossipSub;
};

type MetadataService = {
metadata?: (components: Libp2pComponents) => IMetadata;
};

export async function defaultLibp2p(
shardInfo?: ShardInfo,
wakuGossipSub?: PubsubService["pubsub"],
options?: Partial<CreateLibp2pOptions>,
userAgent?: string
Expand All @@ -191,6 +202,10 @@ export async function defaultLibp2p(
? { pubsub: wakuGossipSub }
: {};

const metadataService: MetadataService = shardInfo
? { metadata: wakuMetadata(shardInfo) }
: {};

return createLibp2p({
connectionManager: {
minConnections: 1
Expand All @@ -204,6 +219,7 @@ export async function defaultLibp2p(
agentVersion: userAgent ?? DefaultUserAgent
}),
ping: pingService(),
...metadataService,
...pubsubService,
...options?.services
}
Expand Down
Loading

0 comments on commit 9ac2a3f

Please sign in to comment.