diff --git a/package.json b/package.json index 6b82d04068..ca3d3d0bb6 100644 --- a/package.json +++ b/package.json @@ -176,7 +176,6 @@ "release": "aegir release" }, "dependencies": { - "@libp2p/components": "^3.1.1", "@libp2p/crypto": "^1.0.0", "@libp2p/interface-connection": "^3.0.1", "@libp2p/interface-peer-id": "^1.0.2", @@ -193,10 +192,10 @@ "it-length-prefixed": "^8.0.2", "it-pipe": "^2.0.3", "it-pushable": "^3.0.0", - "multiformats": "^9.6.3", + "multiformats": "^10.0.0", "p-queue": "^7.2.0", "uint8arraylist": "^2.0.0", - "uint8arrays": "^3.0.0" + "uint8arrays": "^4.0.2" }, "devDependencies": { "@libp2p/peer-id-factory": "^1.0.0", @@ -205,8 +204,8 @@ "it-pair": "^2.0.2", "p-defer": "^4.0.0", "p-wait-for": "^5.0.0", - "protons": "^5.1.0", - "protons-runtime": "^3.1.0", + "protons": "^6.0.0", + "protons-runtime": "^4.0.1", "sinon": "^14.0.0", "util": "^0.12.4" } diff --git a/src/index.ts b/src/index.ts index a05f726b7d..e73509f6b3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -12,20 +12,24 @@ import { verifySignature } from './sign.js' import type { PeerId } from '@libp2p/interface-peer-id' -import type { IncomingStreamData } from '@libp2p/interface-registrar' +import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar' import type { Connection } from '@libp2p/interface-connection' import { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult, TopicValidatorFn, TopicValidatorResult } from '@libp2p/interface-pubsub' import { PeerMap, PeerSet } from '@libp2p/peer-collections' -import { Components, Initializable } from '@libp2p/components' import type { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p:pubsub') +export interface PubSubComponents { + peerId: PeerId + registrar: Registrar +} + /** * PubSubBaseProtocol handles the peers and connections logic for pubsub routers * and specifies the API that pubsub routers should have. */ -export abstract class PubSubBaseProtocol extends EventEmitter implements PubSub, Initializable { +export abstract class PubSubBaseProtocol extends EventEmitter implements PubSub { public started: boolean /** * Map of topics to which peers are subscribed to @@ -60,14 +64,14 @@ export abstract class PubSubBaseProtocol public queue: Queue public multicodecs: string[] - public components: Components = new Components() + public components: PubSubComponents private _registrarTopologyIds: string[] | undefined protected enabled: boolean private readonly maxInboundStreams: number private readonly maxOutboundStreams: number - constructor (props: PubSubInit) { + constructor (components: PubSubComponents, props: PubSubInit) { super() const { @@ -80,6 +84,7 @@ export abstract class PubSubBaseProtocol await registrar.handle(multicodec, this._onIncomingStream, { @@ -145,7 +146,7 @@ export abstract class PubSubBaseProtocol('message', { @@ -584,7 +585,7 @@ export abstract class PubSubBaseProtocol { const peerId = await createPeerId() pubsub = new PubsubImplementation({ + peerId, + registrar: new MockRegistrar() + }, { multicodecs: [protocol], emitSelf: true }) - pubsub.init(new Components({ - peerId, - registrar: new MockRegistrar() - })) }) before(async () => { @@ -77,13 +75,12 @@ describe('emitSelf', () => { const peerId = await createPeerId() pubsub = new PubsubImplementation({ + peerId, + registrar: new MockRegistrar() + }, { multicodecs: [protocol], emitSelf: false }) - pubsub.init(new Components({ - peerId, - registrar: new MockRegistrar() - })) }) before(async () => { diff --git a/test/instance.spec.ts b/test/instance.spec.ts index b0201dc1f5..da6eb290b9 100644 --- a/test/instance.spec.ts +++ b/test/instance.spec.ts @@ -2,6 +2,8 @@ import { expect } from 'aegir/chai' import { PubSubBaseProtocol } from '../src/index.js' import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' import type { Uint8ArrayList } from 'uint8arraylist' +import { MockRegistrar } from './utils/index.js' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' class PubsubProtocol extends PubSubBaseProtocol { decodeRpc (bytes: Uint8Array): PubSubRPC { @@ -33,9 +35,14 @@ describe('pubsub instance', () => { }).to.throw() }) - it('should accept valid parameters', () => { + it('should accept valid parameters', async () => { + const peerId = await createEd25519PeerId() + expect(() => { - new PubsubProtocol({ // eslint-disable-line no-new + return new PubsubProtocol({ + peerId, + registrar: new MockRegistrar() + }, { // eslint-disable-line no-new multicodecs: ['/pubsub/1.0.0'] }) }).not.to.throw() diff --git a/test/lifecycle.spec.ts b/test/lifecycle.spec.ts index 9e268b4618..d4b94a8744 100644 --- a/test/lifecycle.spec.ts +++ b/test/lifecycle.spec.ts @@ -11,7 +11,6 @@ import { import type { PeerId } from '@libp2p/interface-peer-id' import type { Registrar } from '@libp2p/interface-registrar' import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' -import { Components } from '@libp2p/components' import type { Uint8ArrayList } from 'uint8arraylist' class PubsubProtocol extends PubSubBaseProtocol { @@ -52,12 +51,11 @@ describe('pubsub base lifecycle', () => { } pubsub = new PubsubProtocol({ - multicodecs: ['/pubsub/1.0.0'] - }) - pubsub.init(new Components({ peerId: peerId, registrar: sinonMockRegistrar - })) + }, { + multicodecs: ['/pubsub/1.0.0'] + }) expect(pubsub.peers.size).to.be.eql(0) }) @@ -112,19 +110,17 @@ describe('pubsub base lifecycle', () => { registrarB = new MockRegistrar() pubsubA = new PubsubImplementation({ - multicodecs: [protocol] - }) - pubsubA.init(new Components({ peerId: peerIdA, registrar: registrarA - })) - pubsubB = new PubsubImplementation({ + }, { multicodecs: [protocol] }) - pubsubB.init(new Components({ + pubsubB = new PubsubImplementation({ peerId: peerIdB, registrar: registrarB - })) + }, { + multicodecs: [protocol] + }) }) // start pubsub diff --git a/test/message.spec.ts b/test/message.spec.ts index 236d6dae8a..6ae2eda114 100644 --- a/test/message.spec.ts +++ b/test/message.spec.ts @@ -9,7 +9,6 @@ import { } from './utils/index.js' import type { PeerId } from '@libp2p/interface-peer-id' import type { Message } from '@libp2p/interface-pubsub' -import { Components } from '@libp2p/components' import { randomSeqno } from '../src/utils.js' describe('pubsub base messages', () => { @@ -19,12 +18,11 @@ describe('pubsub base messages', () => { before(async () => { peerId = await createPeerId() pubsub = new PubsubImplementation({ - multicodecs: ['/pubsub/1.0.0'] - }) - pubsub.init(new Components({ peerId: peerId, registrar: new MockRegistrar() - })) + }, { + multicodecs: ['/pubsub/1.0.0'] + }) }) afterEach(() => { diff --git a/test/message/rpc.ts b/test/message/rpc.ts index e9798c9ddd..adfbb13f57 100644 --- a/test/message/rpc.ts +++ b/test/message/rpc.ts @@ -1,5 +1,7 @@ /* eslint-disable import/export */ +/* eslint-disable complexity */ /* eslint-disable @typescript-eslint/no-namespace */ +/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */ import { encodeMessage, decodeMessage, message } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' @@ -22,23 +24,23 @@ export namespace RPC { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.subscribe != null) { - writer.uint32(8) - writer.bool(obj.subscribe) + w.uint32(8) + w.bool(obj.subscribe) } if (obj.topic != null) { - writer.uint32(18) - writer.string(obj.topic) + w.uint32(18) + w.string(obj.topic) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = {} @@ -91,43 +93,43 @@ export namespace RPC { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.from != null) { - writer.uint32(10) - writer.bytes(obj.from) + w.uint32(10) + w.bytes(obj.from) } if (obj.data != null) { - writer.uint32(18) - writer.bytes(obj.data) + w.uint32(18) + w.bytes(obj.data) } if (obj.seqno != null) { - writer.uint32(26) - writer.bytes(obj.seqno) + w.uint32(26) + w.bytes(obj.seqno) } if (obj.topic != null) { - writer.uint32(34) - writer.string(obj.topic) + w.uint32(34) + w.string(obj.topic) } if (obj.signature != null) { - writer.uint32(42) - writer.bytes(obj.signature) + w.uint32(42) + w.bytes(obj.signature) } if (obj.key != null) { - writer.uint32(50) - writer.bytes(obj.key) + w.uint32(50) + w.bytes(obj.key) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = {} @@ -182,36 +184,38 @@ export namespace RPC { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.subscriptions != null) { for (const value of obj.subscriptions) { - writer.uint32(10) - RPC.SubOpts.codec().encode(value, writer) + w.uint32(10) + RPC.SubOpts.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "subscriptions" was not found in object') } if (obj.messages != null) { for (const value of obj.messages) { - writer.uint32(18) - RPC.Message.codec().encode(value, writer) + w.uint32(18) + RPC.Message.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "messages" was not found in object') } if (obj.control != null) { - writer.uint32(26) - ControlMessage.codec().encode(obj.control, writer) + w.uint32(26) + ControlMessage.codec().encode(obj.control, w, { + writeDefaults: false + }) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = { @@ -268,49 +272,49 @@ export namespace ControlMessage { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.ihave != null) { for (const value of obj.ihave) { - writer.uint32(10) - ControlIHave.codec().encode(value, writer) + w.uint32(10) + ControlIHave.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "ihave" was not found in object') } if (obj.iwant != null) { for (const value of obj.iwant) { - writer.uint32(18) - ControlIWant.codec().encode(value, writer) + w.uint32(18) + ControlIWant.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "iwant" was not found in object') } if (obj.graft != null) { for (const value of obj.graft) { - writer.uint32(26) - ControlGraft.codec().encode(value, writer) + w.uint32(26) + ControlGraft.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "graft" was not found in object') } if (obj.prune != null) { for (const value of obj.prune) { - writer.uint32(34) - ControlPrune.codec().encode(value, writer) + w.uint32(34) + ControlPrune.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "prune" was not found in object') } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = { @@ -370,27 +374,25 @@ export namespace ControlIHave { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.topic != null) { - writer.uint32(10) - writer.string(obj.topic) + w.uint32(10) + w.string(obj.topic) } if (obj.messageIDs != null) { for (const value of obj.messageIDs) { - writer.uint32(18) - writer.bytes(value) + w.uint32(18) + w.bytes(value) } - } else { - throw new Error('Protocol error: required field "messageIDs" was not found in object') } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = { @@ -440,22 +442,20 @@ export namespace ControlIWant { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.messageIDs != null) { for (const value of obj.messageIDs) { - writer.uint32(10) - writer.bytes(value) + w.uint32(10) + w.bytes(value) } - } else { - throw new Error('Protocol error: required field "messageIDs" was not found in object') } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = { @@ -502,18 +502,18 @@ export namespace ControlGraft { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.topic != null) { - writer.uint32(10) - writer.string(obj.topic) + w.uint32(10) + w.string(obj.topic) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = {} @@ -560,32 +560,32 @@ export namespace ControlPrune { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.topic != null) { - writer.uint32(10) - writer.string(obj.topic) + w.uint32(10) + w.string(obj.topic) } if (obj.peers != null) { for (const value of obj.peers) { - writer.uint32(18) - PeerInfo.codec().encode(value, writer) + w.uint32(18) + PeerInfo.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "peers" was not found in object') } if (obj.backoff != null) { - writer.uint32(24) - writer.uint64(obj.backoff) + w.uint32(24) + w.uint64(obj.backoff) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = { @@ -639,23 +639,23 @@ export namespace PeerInfo { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.peerID != null) { - writer.uint32(10) - writer.bytes(obj.peerID) + w.uint32(10) + w.bytes(obj.peerID) } if (obj.signedPeerRecord != null) { - writer.uint32(18) - writer.bytes(obj.signedPeerRecord) + w.uint32(18) + w.bytes(obj.signedPeerRecord) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = {} diff --git a/test/pubsub.spec.ts b/test/pubsub.spec.ts index 63f9e59105..c3a32a3a94 100644 --- a/test/pubsub.spec.ts +++ b/test/pubsub.spec.ts @@ -13,7 +13,6 @@ import { } from './utils/index.js' import type { PeerId } from '@libp2p/interface-peer-id' import { PeerSet } from '@libp2p/peer-collections' -import { Components } from '@libp2p/components' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { noSignMsgId } from '../src/utils.js' import type { Message, PubSubRPC } from '@libp2p/interface-pubsub' @@ -31,13 +30,12 @@ describe('pubsub base implementation', () => { beforeEach(async () => { const peerId = await createPeerId() pubsub = new PubsubImplementation({ + peerId: peerId, + registrar: new MockRegistrar() + }, { multicodecs: [protocol], emitSelf: true }) - pubsub.init(new Components({ - peerId: peerId, - registrar: new MockRegistrar() - })) }) afterEach(async () => await pubsub.stop()) @@ -72,7 +70,7 @@ describe('pubsub base implementation', () => { // Get the first message sent to _publish, and validate it const signedMessage: Message = publishMessageSpy.getCall(0).lastArg - await expect(pubsub.validate(pubsub.components.getPeerId(), signedMessage)).to.eventually.be.undefined() + await expect(pubsub.validate(pubsub.components.peerId, signedMessage)).to.eventually.be.undefined() }) it('calls publishes messages twice', async () => { @@ -105,12 +103,11 @@ describe('pubsub base implementation', () => { beforeEach(async () => { const peerId = await createPeerId() pubsub = new PubsubImplementation({ - multicodecs: [protocol] - }) - pubsub.init(new Components({ peerId: peerId, registrar: new MockRegistrar() - })) + }, { + multicodecs: [protocol] + }) await pubsub.start() }) @@ -138,19 +135,17 @@ describe('pubsub base implementation', () => { registrarB = new MockRegistrar() pubsubA = new PubsubImplementation({ - multicodecs: [protocol] - }) - pubsubA.init(new Components({ peerId: peerIdA, registrar: registrarA - })) - pubsubB = new PubsubImplementation({ + }, { multicodecs: [protocol] }) - pubsubB.init(new Components({ + pubsubB = new PubsubImplementation({ peerId: peerIdB, registrar: registrarB - })) + }, { + multicodecs: [protocol] + }) }) // start pubsub and connect nodes @@ -210,12 +205,11 @@ describe('pubsub base implementation', () => { beforeEach(async () => { const peerId = await createPeerId() pubsub = new PubsubImplementation({ - multicodecs: [protocol] - }) - pubsub.init(new Components({ peerId: peerId, registrar: new MockRegistrar() - })) + }, { + multicodecs: [protocol] + }) await pubsub.start() }) @@ -247,19 +241,17 @@ describe('pubsub base implementation', () => { registrarB = new MockRegistrar() pubsubA = new PubsubImplementation({ - multicodecs: [protocol] - }) - pubsubA.init(new Components({ peerId: peerIdA, registrar: registrarA - })) - pubsubB = new PubsubImplementation({ + }, { multicodecs: [protocol] }) - pubsubB.init(new Components({ + pubsubB = new PubsubImplementation({ peerId: peerIdB, registrar: registrarB - })) + }, { + multicodecs: [protocol] + }) }) // start pubsub and connect nodes @@ -343,12 +335,11 @@ describe('pubsub base implementation', () => { beforeEach(async () => { peerId = await createPeerId() pubsub = new PubsubImplementation({ - multicodecs: [protocol] - }) - pubsub.init(new Components({ peerId: peerId, registrar: new MockRegistrar() - })) + }, { + multicodecs: [protocol] + }) await pubsub.start() }) @@ -373,12 +364,11 @@ describe('pubsub base implementation', () => { beforeEach(async () => { peerId = await createPeerId() pubsub = new PubsubImplementation({ - multicodecs: [protocol] - }) - pubsub.init(new Components({ peerId: peerId, registrar: new MockRegistrar() - })) + }, { + multicodecs: [protocol] + }) }) afterEach(async () => await pubsub.stop()) @@ -445,12 +435,11 @@ describe('pubsub base implementation', () => { beforeEach(async () => { peerId = await createPeerId() pubsub = new PubsubImplementation({ - multicodecs: [protocol] - }) - pubsub.init(new Components({ peerId: peerId, registrar: new MockRegistrar() - })) + }, { + multicodecs: [protocol] + }) await pubsub.start() }) diff --git a/test/topic-validators.spec.ts b/test/topic-validators.spec.ts index 6896300ab6..35cd9e6aee 100644 --- a/test/topic-validators.spec.ts +++ b/test/topic-validators.spec.ts @@ -11,7 +11,6 @@ import { } from './utils/index.js' import type { PeerId } from '@libp2p/interface-peer-id' import { PubSubRPC, TopicValidatorResult } from '@libp2p/interface-pubsub' -import { Components } from '@libp2p/components' const protocol = '/pubsub/1.0.0' @@ -25,13 +24,12 @@ describe('topic validators', () => { otherPeerId = await createEd25519PeerId() pubsub = new PubsubImplementation({ + peerId: peerId, + registrar: new MockRegistrar() + }, { multicodecs: [protocol], globalSignaturePolicy: 'StrictNoSign' }) - pubsub.init(new Components({ - peerId: peerId, - registrar: new MockRegistrar() - })) await pubsub.start() })