Skip to content

Commit

Permalink
feat!: update to libp2p@2.x.x (#504)
Browse files Browse the repository at this point in the history
* feat!: update to libp2p@2.x.x

Incorporates API changes from the upcoming libp2p@2.x.x release.

BREAKING CHANGE: Can only be used with libp2p@2.x.x or later

* chore: remove CodeError

* chore: update deps

* chore: fix linting
  • Loading branch information
achingbrain authored Sep 11, 2024
1 parent 9e57215 commit 1f8f634
Show file tree
Hide file tree
Showing 20 changed files with 5,320 additions and 7,331 deletions.
12,257 changes: 5,121 additions & 7,136 deletions package-lock.json

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,44 +73,44 @@
},
"homepage": "https://github.com/ChainSafe/js-libp2p-gossipsub#readme",
"dependencies": {
"@libp2p/crypto": "^4.0.1",
"@libp2p/interface": "^1.5.0",
"@libp2p/interface-internal": "^1.0.7",
"@libp2p/peer-id": "^4.0.5",
"@libp2p/pubsub": "^9.0.8",
"@libp2p/crypto": "^5.0.0",
"@libp2p/interface": "^2.0.0",
"@libp2p/interface-internal": "^2.0.0",
"@libp2p/peer-id": "^5.0.0",
"@libp2p/pubsub": "^10.0.0",
"@multiformats/multiaddr": "^12.1.14",
"denque": "^2.1.0",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.3",
"multiformats": "^13.0.1",
"protons-runtime": "5.4.0",
"protons-runtime": "^5.5.0",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.0.1"
},
"devDependencies": {
"@chainsafe/as-sha256": "^0.4.1",
"@dapplion/benchmark": "^0.2.4",
"@libp2p/floodsub": "^9.0.9",
"@libp2p/interface-compliance-tests": "^5.2.0",
"@libp2p/logger": "^4.0.5",
"@libp2p/peer-id-factory": "^4.0.5",
"@libp2p/peer-store": "^10.0.8",
"@libp2p/floodsub": "^10.0.0",
"@libp2p/interface-compliance-tests": "^6.0.0",
"@libp2p/logger": "^5.0.0",
"@libp2p/peer-store": "^11.0.0",
"@types/node": "^20.11.6",
"@types/sinon": "^17.0.3",
"abortable-iterator": "^5.1.0",
"aegir": "^42.2.2",
"datastore-core": "^9.2.7",
"aegir": "^44.1.1",
"datastore-core": "^10.0.0",
"delay": "^6.0.0",
"mkdirp": "^3.0.1",
"it-all": "^3.0.6",
"mkdirp": "^3.0.1",
"p-defer": "^4.0.0",
"p-event": "^6.0.0",
"p-retry": "^6.2.0",
"p-wait-for": "^5.0.2",
"protons": "^7.5.0",
"sinon": "^17.0.1",
"time-cache": "^0.3.0",
"ts-sinon": "^2.0.2"
"sinon": "^18.0.1",
"sinon-ts": "^2.0.0",
"time-cache": "^0.3.0"
},
"engines": {
"npm": ">=8.7.0"
Expand Down
17 changes: 17 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export class InvalidPeerScoreParamsError extends Error {
static name = 'InvalidPeerScoreParamsError'

constructor (message = 'Invalid peer score params') {
super(message)
this.name = 'InvalidPeerScoreParamsError'
}
}

export class InvalidPeerScoreThresholdsError extends Error {
static name = 'InvalidPeerScoreThresholdsError'

constructor (message = 'Invalid peer score thresholds') {
super(message)
this.name = 'InvalidPeerScoreThresholdsError'
}
}
35 changes: 19 additions & 16 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { TypedEventEmitter, StrictSign, StrictNoSign, TopicValidatorResult, serviceCapabilities, serviceDependencies } from '@libp2p/interface'
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'
import { peerIdFromMultihash, peerIdFromString } from '@libp2p/peer-id'
import { encode } from 'it-length-prefixed'
import { pipe } from 'it-pipe'
import { pushable } from 'it-pushable'
import * as Digest from 'multiformats/hashes/digest'
import * as constants from './constants.js'
import {
ACCEPT_FROM_WHITELIST_DURATION_MS,
Expand Down Expand Up @@ -73,7 +74,8 @@ import type {
TopicValidatorFn,
Logger,
ComponentLogger,
Topology
Topology,
PrivateKey
} from '@libp2p/interface'
import type { ConnectionManager, IncomingStreamData, Registrar } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -166,13 +168,13 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
maxOutboundStreams?: number

/**
* Pass true to run on transient connections - data or time-limited
* Pass true to run on limited connections - data or time-limited
* connections that may be closed at any time such as circuit relay
* connections.
*
* @default false
*/
runOnTransientConnection?: boolean
runOnLimitedConnection?: boolean

/**
* Specify max buffer size in bytes for OutboundStream.
Expand Down Expand Up @@ -259,6 +261,7 @@ interface AcceptFromWhitelistEntry {
}

export interface GossipSubComponents {
privateKey: PrivateKey
peerId: PeerId
peerStore: PeerStore
registrar: Registrar
Expand Down Expand Up @@ -420,7 +423,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
private status: GossipStatus = { code: GossipStatusCode.stopped }
private readonly maxInboundStreams?: number
private readonly maxOutboundStreams?: number
private readonly runOnTransientConnection?: boolean
private readonly runOnLimitedConnection?: boolean
private readonly allowedTopics: Set<TopicStr> | null

private heartbeatTimer: {
Expand Down Expand Up @@ -554,7 +557,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

this.maxInboundStreams = options.maxInboundStreams
this.maxOutboundStreams = options.maxOutboundStreams
this.runOnTransientConnection = options.runOnTransientConnection
this.runOnLimitedConnection = options.runOnLimitedConnection

this.allowedTopics = (opts.allowedTopics != null) ? new Set(opts.allowedTopics) : null
}
Expand Down Expand Up @@ -591,7 +594,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

this.log('starting')

this.publishConfig = await getPublishConfigFromPeerId(this.globalSignaturePolicy, this.components.peerId)
this.publishConfig = getPublishConfigFromPeerId(this.globalSignaturePolicy, this.components.peerId, this.components.privateKey)

// Create the outbound inflight queue
// This ensures that outbound stream creation happens sequentially
Expand Down Expand Up @@ -619,7 +622,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
registrar.handle(multicodec, this.onIncomingStream.bind(this), {
maxInboundStreams: this.maxInboundStreams,
maxOutboundStreams: this.maxOutboundStreams,
runOnTransientConnection: this.runOnTransientConnection
runOnLimitedConnection: this.runOnLimitedConnection
})
)
)
Expand All @@ -646,7 +649,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
const topology: Topology = {
onConnect: this.onPeerConnected.bind(this),
onDisconnect: this.onPeerDisconnected.bind(this),
notifyOnTransient: this.runOnTransientConnection
notifyOnLimitedConnection: this.runOnLimitedConnection
}
const registrarTopologyIds = await Promise.all(
this.multicodecs.map(async (multicodec) => registrar.register(multicodec, topology))
Expand Down Expand Up @@ -817,7 +820,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
try {
const stream = new OutboundStream(
await connection.newStream(this.multicodecs, {
runOnTransientConnection: this.runOnTransientConnection
runOnLimitedConnection: this.runOnLimitedConnection
}),
(e) => { this.log.error('outbound pipe error', e) },
{ maxBufferSize: this.opts.maxOutboundBufferSize }
Expand Down Expand Up @@ -1778,7 +1781,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
return
}

const peer = peerIdFromBytes(pi.peerID)
const peer = peerIdFromMultihash(Digest.decode(pi.peerID))
const p = peer.toString()

if (this.peers.has(p)) {
Expand Down Expand Up @@ -1895,7 +1898,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

// remove explicit peers, peers with negative scores, and backoffed peers
fanoutPeers.forEach((id) => {
if (!this.direct.has(id) && this.score.score(id) >= 0 && ((backoff == null) || !backoff.has(id))) {
if (!this.direct.has(id) && this.score.score(id) >= 0 && backoff?.has(id) !== true) {
toAdd.add(id)
}
})
Expand All @@ -1911,7 +1914,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.opts.D,
(id: PeerIdStr): boolean =>
// filter direct peers and peers with negative score
!toAdd.has(id) && !this.direct.has(id) && this.score.score(id) >= 0 && ((backoff == null) || !backoff.has(id))
!toAdd.has(id) && !this.direct.has(id) && this.score.score(id) >= 0 && backoff?.has(id) !== true
)

newPeers.forEach((peer) => {
Expand Down Expand Up @@ -2610,13 +2613,13 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
try {
peerInfo = await this.components.peerStore.get(id)
} catch (err: any) {
if (err.code !== 'ERR_NOT_FOUND') {
if (err.name !== 'NotFoundError') {
throw err
}
}

return {
peerID: id.toBytes(),
peerID: id.toMultihash().bytes,
signedPeerRecord: peerInfo?.peerRecordEnvelope
}
})
Expand Down Expand Up @@ -2741,7 +2744,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
!this.direct.has(id)
) {
const score = getScore(id)
if (((backoff == null) || !backoff.has(id)) && score >= 0) candidateMeshPeers.add(id)
if (backoff?.has(id) !== true && score >= 0) candidateMeshPeers.add(id)
// instead of having to find gossip peers after heartbeat which require another loop
// we prepare peers to gossip in a topic within heartbeat to improve performance
if (score >= this.opts.scoreThresholds.gossipThreshold) peersToGossip.add(id)
Expand Down
52 changes: 34 additions & 18 deletions src/message/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */
/* eslint-disable @typescript-eslint/no-empty-interface */

import { type Codec, CodeError, decodeMessage, type DecodeOptions, encodeMessage, message } from 'protons-runtime'
import { type Codec, decodeMessage, type DecodeOptions, encodeMessage, MaxLengthError, message } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface RPC {
Expand Down Expand Up @@ -256,34 +256,42 @@ export namespace RPC {
switch (tag >>> 3) {
case 1: {
if (opts.limits?.ihave != null && obj.ihave.length === opts.limits.ihave) {
throw new CodeError('decode error - map field "ihave" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "ihave" had too many elements')
}

obj.ihave.push(RPC.ControlIHave.codec().decode(reader, reader.uint32()))
obj.ihave.push(RPC.ControlIHave.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.ihave$
}))
break
}
case 2: {
if (opts.limits?.iwant != null && obj.iwant.length === opts.limits.iwant) {
throw new CodeError('decode error - map field "iwant" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "iwant" had too many elements')
}

obj.iwant.push(RPC.ControlIWant.codec().decode(reader, reader.uint32()))
obj.iwant.push(RPC.ControlIWant.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.iwant$
}))
break
}
case 3: {
if (opts.limits?.graft != null && obj.graft.length === opts.limits.graft) {
throw new CodeError('decode error - map field "graft" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "graft" had too many elements')
}

obj.graft.push(RPC.ControlGraft.codec().decode(reader, reader.uint32()))
obj.graft.push(RPC.ControlGraft.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.graft$
}))
break
}
case 4: {
if (opts.limits?.prune != null && obj.prune.length === opts.limits.prune) {
throw new CodeError('decode error - map field "prune" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "prune" had too many elements')
}

obj.prune.push(RPC.ControlPrune.codec().decode(reader, reader.uint32()))
obj.prune.push(RPC.ControlPrune.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.prune$
}))
break
}
default: {
Expand Down Expand Up @@ -356,7 +364,7 @@ export namespace RPC {
}
case 2: {
if (opts.limits?.messageIDs != null && obj.messageIDs.length === opts.limits.messageIDs) {
throw new CodeError('decode error - map field "messageIDs" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "messageIDs" had too many elements')
}

obj.messageIDs.push(reader.bytes())
Expand Down Expand Up @@ -422,7 +430,7 @@ export namespace RPC {
switch (tag >>> 3) {
case 1: {
if (opts.limits?.messageIDs != null && obj.messageIDs.length === opts.limits.messageIDs) {
throw new CodeError('decode error - map field "messageIDs" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "messageIDs" had too many elements')
}

obj.messageIDs.push(reader.bytes())
Expand Down Expand Up @@ -562,10 +570,12 @@ export namespace RPC {
}
case 2: {
if (opts.limits?.peers != null && obj.peers.length === opts.limits.peers) {
throw new CodeError('decode error - map field "peers" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "peers" had too many elements')
}

obj.peers.push(RPC.PeerInfo.codec().decode(reader, reader.uint32()))
obj.peers.push(RPC.PeerInfo.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.peers$
}))
break
}
case 3: {
Expand Down Expand Up @@ -708,22 +718,28 @@ export namespace RPC {
switch (tag >>> 3) {
case 1: {
if (opts.limits?.subscriptions != null && obj.subscriptions.length === opts.limits.subscriptions) {
throw new CodeError('decode error - map field "subscriptions" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "subscriptions" had too many elements')
}

obj.subscriptions.push(RPC.SubOpts.codec().decode(reader, reader.uint32()))
obj.subscriptions.push(RPC.SubOpts.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.subscriptions$
}))
break
}
case 2: {
if (opts.limits?.messages != null && obj.messages.length === opts.limits.messages) {
throw new CodeError('decode error - map field "messages" had too many elements', 'ERR_MAX_LENGTH')
throw new MaxLengthError('Decode error - map field "messages" had too many elements')
}

obj.messages.push(RPC.Message.codec().decode(reader, reader.uint32()))
obj.messages.push(RPC.Message.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.messages$
}))
break
}
case 3: {
obj.control = RPC.ControlMessage.codec().decode(reader, reader.uint32())
obj.control = RPC.ControlMessage.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.control
})
break
}
default: {
Expand Down
2 changes: 0 additions & 2 deletions src/score/constants.ts

This file was deleted.

Loading

0 comments on commit 1f8f634

Please sign in to comment.