diff --git a/package.json b/package.json index 74bd182bf3..f083032c04 100644 --- a/package.json +++ b/package.json @@ -178,7 +178,7 @@ "dependencies": { "@libp2p/components": "^2.0.0", "@libp2p/crypto": "^1.0.0", - "@libp2p/interface-connection": "^2.0.0", + "@libp2p/interface-connection": "^3.0.1", "@libp2p/interface-peer-id": "^1.0.2", "@libp2p/interface-pubsub": "^2.0.0", "@libp2p/interface-registrar": "^2.0.0", @@ -205,8 +205,8 @@ "it-pair": "^2.0.2", "p-defer": "^4.0.0", "p-wait-for": "^5.0.0", - "protons": "^4.0.1", - "protons-runtime": "^2.0.2", + "protons": "^5.0.0", + "protons-runtime": "^3.0.1", "sinon": "^14.0.0", "util": "^0.12.4" } diff --git a/src/index.ts b/src/index.ts index 5a66c1e194..3c825f45fd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -485,13 +485,13 @@ export abstract class PubSubBaseProtocol extends EventEmi * Encode RPC object into a Uint8Array. * This can be override to use a custom router protobuf. */ - abstract encodeRpc (rpc: PubSubRPC): Uint8ArrayList + abstract encodeRpc (rpc: PubSubRPC): Uint8Array /** * Encode RPC object into a Uint8Array. * This can be override to use a custom router protobuf. */ - abstract encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList + abstract encodeMessage (rpc: PubSubRPCMessage): Uint8Array /** * Send an rpc object to a peer diff --git a/src/sign.ts b/src/sign.ts index 7b6bd27900..8a30ef3ad1 100644 --- a/src/sign.ts +++ b/src/sign.ts @@ -5,14 +5,13 @@ import type { PeerId } from '@libp2p/interface-peer-id' import { keys } from '@libp2p/crypto' import type { PubSubRPCMessage, SignedMessage } from '@libp2p/interface-pubsub' import { peerIdFromKeys } from '@libp2p/peer-id' -import type { Uint8ArrayList } from 'uint8arraylist' export const SignPrefix = uint8ArrayFromString('libp2p-pubsub:') /** * Signs the provided message with the given `peerId` */ -export async function signMessage (peerId: PeerId, message: { from: PeerId, topic: string, data: Uint8Array, sequenceNumber: bigint }, encode: (rpc: PubSubRPCMessage) => Uint8ArrayList): Promise { +export async function signMessage (peerId: PeerId, message: { from: PeerId, topic: string, data: Uint8Array, sequenceNumber: bigint }, encode: (rpc: PubSubRPCMessage) => Uint8Array): Promise { if (peerId.privateKey == null) { throw new Error('Cannot sign message, no private key present') } @@ -46,7 +45,7 @@ export async function signMessage (peerId: PeerId, message: { from: PeerId, topi /** * Verifies the signature of the given message */ -export async function verifySignature (message: SignedMessage, encode: (rpc: PubSubRPCMessage) => Uint8ArrayList) { +export async function verifySignature (message: SignedMessage, encode: (rpc: PubSubRPCMessage) => Uint8Array) { if (message.type !== 'signed') { throw new Error('Message type must be "signed" to be verified') } diff --git a/test/instance.spec.ts b/test/instance.spec.ts index 9f2f89780b..b0201dc1f5 100644 --- a/test/instance.spec.ts +++ b/test/instance.spec.ts @@ -8,7 +8,7 @@ class PubsubProtocol extends PubSubBaseProtocol { throw new Error('Method not implemented.') } - encodeRpc (rpc: PubSubRPC): Uint8ArrayList { + encodeRpc (rpc: PubSubRPC): Uint8Array { throw new Error('Method not implemented.') } @@ -16,7 +16,7 @@ class PubsubProtocol extends PubSubBaseProtocol { throw new Error('Method not implemented.') } - encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList { + encodeMessage (rpc: PubSubRPCMessage): Uint8Array { throw new Error('Method not implemented.') } diff --git a/test/lifecycle.spec.ts b/test/lifecycle.spec.ts index 6024d8ca86..9e268b4618 100644 --- a/test/lifecycle.spec.ts +++ b/test/lifecycle.spec.ts @@ -19,7 +19,7 @@ class PubsubProtocol extends PubSubBaseProtocol { throw new Error('Method not implemented.') } - encodeRpc (rpc: PubSubRPC): Uint8ArrayList { + encodeRpc (rpc: PubSubRPC): Uint8Array { throw new Error('Method not implemented.') } @@ -27,7 +27,7 @@ class PubsubProtocol extends PubSubBaseProtocol { throw new Error('Method not implemented.') } - encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList { + encodeMessage (rpc: PubSubRPCMessage): Uint8Array { throw new Error('Method not implemented.') } diff --git a/test/message/rpc.ts b/test/message/rpc.ts index f4e5283643..ebc741e04b 100644 --- a/test/message/rpc.ts +++ b/test/message/rpc.ts @@ -1,9 +1,9 @@ /* eslint-disable import/export */ /* eslint-disable @typescript-eslint/no-namespace */ -import { encodeMessage, decodeMessage, message, bool, string, bytes, uint64 } from 'protons-runtime' -import type { Codec } from 'protons-runtime' +import { encodeMessage, decodeMessage, message } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +import type { Codec } from 'protons-runtime' export interface RPC { subscriptions: RPC.SubOpts[] @@ -18,14 +18,57 @@ export namespace RPC { } export namespace SubOpts { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'subscribe', codec: bool, optional: true }, - 2: { name: 'topic', codec: string, optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.subscribe != null) { + writer.uint32(8) + writer.bool(obj.subscribe) + } + + if (obj.topic != null) { + writer.uint32(18) + writer.string(obj.topic) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.subscribe = reader.bool() + break + case 2: + obj.topic = reader.string() + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec } - export const encode = (obj: SubOpts): Uint8ArrayList => { + export const encode = (obj: SubOpts): Uint8Array => { return encodeMessage(obj, SubOpts.codec()) } @@ -44,18 +87,89 @@ export namespace RPC { } export namespace Message { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'from', codec: bytes, optional: true }, - 2: { name: 'data', codec: bytes, optional: true }, - 3: { name: 'seqno', codec: bytes, optional: true }, - 4: { name: 'topic', codec: string, optional: true }, - 5: { name: 'signature', codec: bytes, optional: true }, - 6: { name: 'key', codec: bytes, optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.from != null) { + writer.uint32(10) + writer.bytes(obj.from) + } + + if (obj.data != null) { + writer.uint32(18) + writer.bytes(obj.data) + } + + if (obj.seqno != null) { + writer.uint32(26) + writer.bytes(obj.seqno) + } + + if (obj.topic != null) { + writer.uint32(34) + writer.string(obj.topic) + } + + if (obj.signature != null) { + writer.uint32(42) + writer.bytes(obj.signature) + } + + if (obj.key != null) { + writer.uint32(50) + writer.bytes(obj.key) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.from = reader.bytes() + break + case 2: + obj.data = reader.bytes() + break + case 3: + obj.seqno = reader.bytes() + break + case 4: + obj.topic = reader.string() + break + case 5: + obj.signature = reader.bytes() + break + case 6: + obj.key = reader.bytes() + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec } - export const encode = (obj: Message): Uint8ArrayList => { + export const encode = (obj: Message): Uint8Array => { return encodeMessage(obj, Message.codec()) } @@ -64,15 +178,86 @@ export namespace RPC { } } + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'subscriptions', codec: RPC.SubOpts.codec(), repeats: true }, - 2: { name: 'messages', codec: RPC.Message.codec(), repeats: true }, - 3: { name: 'control', codec: ControlMessage.codec(), optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.subscriptions != null) { + for (const value of obj.subscriptions) { + writer.uint32(10) + RPC.SubOpts.codec().encode(value, writer) + } + } else { + throw new Error('Protocol error: required field "subscriptions" was not found in object') + } + + if (obj.messages != null) { + for (const value of obj.messages) { + writer.uint32(18) + RPC.Message.codec().encode(value, writer) + } + } else { + throw new Error('Protocol error: required field "messages" was not found in object') + } + + if (obj.control != null) { + writer.uint32(26) + ControlMessage.codec().encode(obj.control, writer) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.subscriptions = obj.subscriptions ?? [] + obj.subscriptions.push(RPC.SubOpts.codec().decode(reader, reader.uint32())) + break + case 2: + obj.messages = obj.messages ?? [] + obj.messages.push(RPC.Message.codec().decode(reader, reader.uint32())) + break + case 3: + obj.control = ControlMessage.codec().decode(reader, reader.uint32()) + break + default: + reader.skipType(tag & 7) + break + } + } + + obj.subscriptions = obj.subscriptions ?? [] + obj.messages = obj.messages ?? [] + + if (obj.subscriptions == null) { + throw new Error('Protocol error: value for required field "subscriptions" was not found in protobuf') + } + + if (obj.messages == null) { + throw new Error('Protocol error: value for required field "messages" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: RPC): Uint8ArrayList => { + export const encode = (obj: RPC): Uint8Array => { return encodeMessage(obj, RPC.codec()) } @@ -89,16 +274,114 @@ export interface ControlMessage { } export namespace ControlMessage { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'ihave', codec: ControlIHave.codec(), repeats: true }, - 2: { name: 'iwant', codec: ControlIWant.codec(), repeats: true }, - 3: { name: 'graft', codec: ControlGraft.codec(), repeats: true }, - 4: { name: 'prune', codec: ControlPrune.codec(), repeats: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.ihave != null) { + for (const value of obj.ihave) { + writer.uint32(10) + ControlIHave.codec().encode(value, writer) + } + } else { + throw new Error('Protocol error: required field "ihave" was not found in object') + } + + if (obj.iwant != null) { + for (const value of obj.iwant) { + writer.uint32(18) + ControlIWant.codec().encode(value, writer) + } + } else { + throw new Error('Protocol error: required field "iwant" was not found in object') + } + + if (obj.graft != null) { + for (const value of obj.graft) { + writer.uint32(26) + ControlGraft.codec().encode(value, writer) + } + } else { + throw new Error('Protocol error: required field "graft" was not found in object') + } + + if (obj.prune != null) { + for (const value of obj.prune) { + writer.uint32(34) + ControlPrune.codec().encode(value, writer) + } + } else { + throw new Error('Protocol error: required field "prune" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.ihave = obj.ihave ?? [] + obj.ihave.push(ControlIHave.codec().decode(reader, reader.uint32())) + break + case 2: + obj.iwant = obj.iwant ?? [] + obj.iwant.push(ControlIWant.codec().decode(reader, reader.uint32())) + break + case 3: + obj.graft = obj.graft ?? [] + obj.graft.push(ControlGraft.codec().decode(reader, reader.uint32())) + break + case 4: + obj.prune = obj.prune ?? [] + obj.prune.push(ControlPrune.codec().decode(reader, reader.uint32())) + break + default: + reader.skipType(tag & 7) + break + } + } + + obj.ihave = obj.ihave ?? [] + obj.iwant = obj.iwant ?? [] + obj.graft = obj.graft ?? [] + obj.prune = obj.prune ?? [] + + if (obj.ihave == null) { + throw new Error('Protocol error: value for required field "ihave" was not found in protobuf') + } + + if (obj.iwant == null) { + throw new Error('Protocol error: value for required field "iwant" was not found in protobuf') + } + + if (obj.graft == null) { + throw new Error('Protocol error: value for required field "graft" was not found in protobuf') + } + + if (obj.prune == null) { + throw new Error('Protocol error: value for required field "prune" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: ControlMessage): Uint8ArrayList => { + export const encode = (obj: ControlMessage): Uint8Array => { return encodeMessage(obj, ControlMessage.codec()) } @@ -113,14 +396,68 @@ export interface ControlIHave { } export namespace ControlIHave { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'topic', codec: string, optional: true }, - 2: { name: 'messageIDs', codec: bytes, repeats: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.topic != null) { + writer.uint32(10) + writer.string(obj.topic) + } + + if (obj.messageIDs != null) { + for (const value of obj.messageIDs) { + writer.uint32(18) + writer.bytes(value) + } + } else { + throw new Error('Protocol error: required field "messageIDs" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.topic = reader.string() + break + case 2: + obj.messageIDs = obj.messageIDs ?? [] + obj.messageIDs.push(reader.bytes()) + break + default: + reader.skipType(tag & 7) + break + } + } + + obj.messageIDs = obj.messageIDs ?? [] + + if (obj.messageIDs == null) { + throw new Error('Protocol error: value for required field "messageIDs" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: ControlIHave): Uint8ArrayList => { + export const encode = (obj: ControlIHave): Uint8Array => { return encodeMessage(obj, ControlIHave.codec()) } @@ -134,13 +471,60 @@ export interface ControlIWant { } export namespace ControlIWant { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'messageIDs', codec: bytes, repeats: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.messageIDs != null) { + for (const value of obj.messageIDs) { + writer.uint32(10) + writer.bytes(value) + } + } else { + throw new Error('Protocol error: required field "messageIDs" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.messageIDs = obj.messageIDs ?? [] + obj.messageIDs.push(reader.bytes()) + break + default: + reader.skipType(tag & 7) + break + } + } + + obj.messageIDs = obj.messageIDs ?? [] + + if (obj.messageIDs == null) { + throw new Error('Protocol error: value for required field "messageIDs" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: ControlIWant): Uint8ArrayList => { + export const encode = (obj: ControlIWant): Uint8Array => { return encodeMessage(obj, ControlIWant.codec()) } @@ -154,13 +538,49 @@ export interface ControlGraft { } export namespace ControlGraft { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'topic', codec: string, optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.topic != null) { + writer.uint32(10) + writer.string(obj.topic) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.topic = reader.string() + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec } - export const encode = (obj: ControlGraft): Uint8ArrayList => { + export const encode = (obj: ControlGraft): Uint8Array => { return encodeMessage(obj, ControlGraft.codec()) } @@ -176,15 +596,76 @@ export interface ControlPrune { } export namespace ControlPrune { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'topic', codec: string, optional: true }, - 2: { name: 'peers', codec: PeerInfo.codec(), repeats: true }, - 3: { name: 'backoff', codec: uint64, optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.topic != null) { + writer.uint32(10) + writer.string(obj.topic) + } + + if (obj.peers != null) { + for (const value of obj.peers) { + writer.uint32(18) + PeerInfo.codec().encode(value, writer) + } + } else { + throw new Error('Protocol error: required field "peers" was not found in object') + } + + if (obj.backoff != null) { + writer.uint32(24) + writer.uint64(obj.backoff) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.topic = reader.string() + break + case 2: + obj.peers = obj.peers ?? [] + obj.peers.push(PeerInfo.codec().decode(reader, reader.uint32())) + break + case 3: + obj.backoff = reader.uint64() + break + default: + reader.skipType(tag & 7) + break + } + } + + obj.peers = obj.peers ?? [] + + if (obj.peers == null) { + throw new Error('Protocol error: value for required field "peers" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: ControlPrune): Uint8ArrayList => { + export const encode = (obj: ControlPrune): Uint8Array => { return encodeMessage(obj, ControlPrune.codec()) } @@ -199,14 +680,57 @@ export interface PeerInfo { } export namespace PeerInfo { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'peerID', codec: bytes, optional: true }, - 2: { name: 'signedPeerRecord', codec: bytes, optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.peerID != null) { + writer.uint32(10) + writer.bytes(obj.peerID) + } + + if (obj.signedPeerRecord != null) { + writer.uint32(18) + writer.bytes(obj.signedPeerRecord) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.peerID = reader.bytes() + break + case 2: + obj.signedPeerRecord = reader.bytes() + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec } - export const encode = (obj: PeerInfo): Uint8ArrayList => { + export const encode = (obj: PeerInfo): Uint8Array => { return encodeMessage(obj, PeerInfo.codec()) } diff --git a/test/sign.spec.ts b/test/sign.spec.ts index 89e8d53d66..2e0a92588b 100644 --- a/test/sign.spec.ts +++ b/test/sign.spec.ts @@ -12,9 +12,8 @@ import { randomSeqno, toRpcMessage } from '../src/utils.js' import { keys } from '@libp2p/crypto' import type { PubSubRPCMessage } from '@libp2p/interface-pubsub' import type { PeerId } from '@libp2p/interface-peer-id' -import type { Uint8ArrayList } from 'uint8arraylist' -function encodeMessage (message: PubSubRPCMessage): Uint8ArrayList { +function encodeMessage (message: PubSubRPCMessage): Uint8Array { return RPC.Message.encode(message) } diff --git a/test/utils/index.ts b/test/utils/index.ts index f8a2f9b3cd..abeb01b4ac 100644 --- a/test/utils/index.ts +++ b/test/utils/index.ts @@ -6,7 +6,6 @@ import type { IncomingStreamData, Registrar, StreamHandler, StreamHandlerRecord, import type { Connection } from '@libp2p/interface-connection' import type { PeerId } from '@libp2p/interface-peer-id' import type { PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' -import type { Uint8ArrayList } from 'uint8arraylist' export const createPeerId = async (): Promise => { const peerId = await PeerIdFactory.createEd25519PeerId() @@ -25,7 +24,7 @@ export class PubsubImplementation extends PubSubBaseProtocol { return RPC.decode(bytes) } - encodeRpc (rpc: PubSubRPC): Uint8ArrayList { + encodeRpc (rpc: PubSubRPC): Uint8Array { return RPC.encode(rpc) } @@ -33,7 +32,7 @@ export class PubsubImplementation extends PubSubBaseProtocol { return RPC.Message.decode(bytes) } - encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList { + encodeMessage (rpc: PubSubRPCMessage): Uint8Array { return RPC.Message.encode(rpc) } }