Skip to content

Commit

Permalink
feat: gossipsub 1.2: IDONTWANT (#498)
Browse files Browse the repository at this point in the history
* feat: gossipsub 1.2: IDONTWANT

* chore: add unit test

* chore: remove packageManager from package.json

* chore: add idontwants cacheSize metric

* chore: fix lint error

* chore: make test less flaky

* chore: fix comment
  • Loading branch information
wemeetagain authored Sep 13, 2024
1 parent 6326e4d commit 5481add
Show file tree
Hide file tree
Showing 10 changed files with 725 additions and 16 deletions.
360 changes: 360 additions & 0 deletions package-lock.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ export const GossipsubIDv10 = '/meshsub/1.0.0'
*/
export const GossipsubIDv11 = '/meshsub/1.1.0'

/**
* The protocol ID for version 1.2.0 of the Gossipsub protocol
* See the spec for details about how v1.2.0 compares to v1.1.0:
* https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md
*/
export const GossipsubIDv12 = '/meshsub/1.2.0'

// Overlay parameters

/**
Expand Down Expand Up @@ -249,3 +256,6 @@ export const DEFAULT_METRIC_MESH_MESSAGE_DELIVERIES_WINDOWS = 1000

/** Wait for 1 more heartbeats before clearing a backoff */
export const BACKOFF_SLACK = 1

export const GossipsubIdontwantMinDataSize = 512
export const GossipsubIdontwantMaxMessages = 512
130 changes: 123 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type ReceivedMessageResult =
| ({ code: MessageStatus.invalid, msgIdStr?: MsgIdStr } & RejectReasonObj)
| { code: MessageStatus.valid, messageId: MessageId, msg: Message }

export const multicodec: string = constants.GossipsubIDv11
export const multicodec: string = constants.GossipsubIDv12

export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
/** if dial should fallback to floodsub */
Expand Down Expand Up @@ -211,6 +211,20 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
* It should be a number between 0 and 1, with a reasonable default of 0.25
*/
gossipFactor: number

/**
* The minimum message size in bytes to be considered for sending IDONTWANT messages
*
* @default 512
*/
idontwantMinDataSize?: number

/**
* The maximum number of IDONTWANT messages per heartbeat per peer
*
* @default 512
*/
idontwantMaxMessages?: number
}

export interface GossipsubMessage {
Expand Down Expand Up @@ -274,7 +288,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
* The signature policy to follow by default
*/
public readonly globalSignaturePolicy: typeof StrictSign | typeof StrictNoSign
public multicodecs: string[] = [constants.GossipsubIDv11, constants.GossipsubIDv10]
public multicodecs: string[] = [constants.GossipsubIDv12, constants.GossipsubIDv11, constants.GossipsubIDv10]

private publishConfig: PublishConfig | undefined

Expand Down Expand Up @@ -409,11 +423,24 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
*/
readonly gossipTracer: IWantTracer

/**
* Tracks IDONTWANT messages received by peers in the current heartbeat
*/
private readonly idontwantCounts = new Map<PeerIdStr, number>()

/**
* Tracks IDONTWANT messages received by peers and the heartbeat they were received in
*
* idontwants are stored for `mcacheLength` heartbeats before being pruned,
* so this map is bounded by peerCount * idontwantMaxMessages * mcacheLength
*/
private readonly idontwants = new Map<PeerIdStr, Map<MsgIdStr, number>>()

private readonly components: GossipSubComponents

private directPeerInitial: ReturnType<typeof setTimeout> | null = null

public static multicodec: string = constants.GossipsubIDv11
public static multicodec: string = constants.GossipsubIDv12

// Options
readonly opts: Required<GossipOptions>
Expand Down Expand Up @@ -462,6 +489,8 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
opportunisticGraftTicks: constants.GossipsubOpportunisticGraftTicks,
directConnectTicks: constants.GossipsubDirectConnectTicks,
gossipFactor: constants.GossipsubGossipFactor,
idontwantMinDataSize: constants.GossipsubIdontwantMinDataSize,
idontwantMaxMessages: constants.GossipsubIdontwantMaxMessages,
...options,
scoreParams: createPeerScoreParams(options.scoreParams),
scoreThresholds: createPeerScoreThresholds(options.scoreThresholds)
Expand Down Expand Up @@ -750,6 +779,8 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.seenCache.clear()
if (this.fastMsgIdCache != null) this.fastMsgIdCache.clear()
if (this.directPeerInitial != null) clearTimeout(this.directPeerInitial)
this.idontwantCounts.clear()
this.idontwants.clear()

this.log('stopped')
}
Expand Down Expand Up @@ -956,6 +987,9 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.control.delete(id)
// Remove from backoff mapping
this.outbound.delete(id)
// Remove from idontwant tracking
this.idontwantCounts.delete(id)
this.idontwants.delete(id)

// Remove from peer scoring
this.score.removePeer(id)
Expand Down Expand Up @@ -1019,6 +1053,10 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
prune: this.decodeRpcLimits.maxControlMessages,
prune$: {
peers: this.decodeRpcLimits.maxPeerInfos
},
idontwant: this.decodeRpcLimits.maxControlMessages,
idontwant$: {
messageIDs: this.decodeRpcLimits.maxIdontwantMessageIDs
}
}
}
Expand Down Expand Up @@ -1310,6 +1348,11 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.seenCache.put(msgIdStr)
}

// possibly send IDONTWANTs to mesh peers
if ((rpcMsg.data?.length ?? 0) >= this.opts.idontwantMinDataSize) {
this.sendIDontWants(msgId, rpcMsg.topic, propagationSource.toString())
}

// (Optional) Provide custom validation here with dynamic validators per topic
// NOTE: This custom topicValidator() must resolve fast (< 100ms) to allow scores
// to not penalize peers for long validation times.
Expand Down Expand Up @@ -1359,10 +1402,11 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
return
}

const iwant = (controlMsg.ihave != null) ? this.handleIHave(id, controlMsg.ihave) : []
const ihave = (controlMsg.iwant != null) ? this.handleIWant(id, controlMsg.iwant) : []
const prune = (controlMsg.graft != null) ? await this.handleGraft(id, controlMsg.graft) : []
;(controlMsg.prune != null) && (await this.handlePrune(id, controlMsg.prune))
const iwant = (controlMsg.ihave?.length > 0) ? this.handleIHave(id, controlMsg.ihave) : []
const ihave = (controlMsg.iwant?.length > 0) ? this.handleIWant(id, controlMsg.iwant) : []
const prune = (controlMsg.graft?.length > 0) ? await this.handleGraft(id, controlMsg.graft) : []
;(controlMsg.prune?.length > 0) && (await this.handlePrune(id, controlMsg.prune))
;(controlMsg.idontwant?.length > 0) && this.handleIdontwant(id, controlMsg.idontwant)

if ((iwant.length === 0) && (ihave.length === 0) && (prune.length === 0)) {
return
Expand Down Expand Up @@ -1691,6 +1735,39 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
}
}

private handleIdontwant (id: PeerIdStr, idontwant: RPC.ControlIDontWant[]): void {
let idontwantCount = this.idontwantCounts.get(id) ?? 0
// return early if we have already received too many IDONTWANT messages from the peer
if (idontwantCount >= this.opts.idontwantMaxMessages) {
return
}
const startIdontwantCount = idontwantCount

let idontwants = this.idontwants.get(id)
if (idontwants == null) {
idontwants = new Map()
this.idontwants.set(id, idontwants)
}
let idonthave = 0
// eslint-disable-next-line no-labels
out: for (const { messageIDs } of idontwant) {
for (const msgId of messageIDs) {
if (idontwantCount >= this.opts.idontwantMaxMessages) {
// eslint-disable-next-line no-labels
break out
}
idontwantCount++

const msgIdStr = this.msgIdToStrFn(msgId)
idontwants.set(msgIdStr, this.heartbeatTicks)
if (!this.mcache.msgs.has(msgIdStr)) idonthave++
}
}
this.idontwantCounts.set(id, idontwantCount)
const total = idontwantCount - startIdontwantCount
this.metrics?.onIdontwantRcv(total, idonthave)
}

/**
* Add standard backoff log for a peer in a topic
*/
Expand Down Expand Up @@ -2353,6 +2430,27 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
this.sendRpc(id, out)
}

private sendIDontWants (msgId: Uint8Array, topic: string, source: PeerIdStr): void {
const ids = this.mesh.get(topic)
if (ids == null) {
return
}

// don't send IDONTWANT to:
// - the source
// - peers that don't support v1.2
const tosend = new Set(ids)
tosend.delete(source)
for (const id of tosend) {
if (this.streamsOutbound.get(id)?.protocol !== constants.GossipsubIDv12) {
tosend.delete(id)
}
}

const idontwantRpc = createGossipRpc([], { idontwant: [{ messageIDs: [msgId] }] })
this.sendRpcInBatch(tosend, idontwantRpc)
}

/**
* Send an rpc object to a peer
*/
Expand Down Expand Up @@ -2701,6 +2799,18 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
// apply IWANT request penalties
this.applyIwantPenalties()

// clean up IDONTWANT counters
this.idontwantCounts.clear()

// clean up old tracked IDONTWANTs
for (const idontwants of this.idontwants.values()) {
for (const [msgId, heartbeatTick] of idontwants) {
if (this.heartbeatTicks - heartbeatTick >= this.opts.mcacheLength) {
idontwants.delete(msgId)
}
}
}

// ensure direct peers are connected
if (this.heartbeatTicks % this.opts.directConnectTicks === 0) {
// we only do this every few ticks to allow pending connections to complete and account for restarts/downtime
Expand Down Expand Up @@ -3069,6 +3179,12 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
}
metrics.cacheSize.set({ cache: 'backoff' }, backoffSize)

let idontwantsCount = 0
for (const idontwant of this.idontwants.values()) {
idontwantsCount += idontwant.size
}
metrics.cacheSize.set({ cache: 'idontwants' }, idontwantsCount)

// Peer counts

for (const [topicStr, peers] of this.topics) {
Expand Down
2 changes: 2 additions & 0 deletions src/message/decodeRpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export interface DecodeRPCLimits {
maxMessages: number
maxIhaveMessageIDs: number
maxIwantMessageIDs: number
maxIdontwantMessageIDs: number
maxControlMessages: number
maxPeerInfos: number
}
Expand All @@ -12,6 +13,7 @@ export const defaultDecodeRpcLimits: DecodeRPCLimits = {
maxMessages: Infinity,
maxIhaveMessageIDs: Infinity,
maxIwantMessageIDs: Infinity,
maxIdontwantMessageIDs: Infinity,
maxControlMessages: Infinity,
maxPeerInfos: Infinity
}
6 changes: 6 additions & 0 deletions src/message/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ message RPC {
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
repeated ControlIDontWant idontwant = 5;
}

message ControlIHave {
Expand All @@ -49,4 +50,9 @@ message RPC {
optional bytes peerID = 1;
optional bytes signedPeerRecord = 2;
}

message ControlIDontWant {
repeated bytes messageIDs = 1;
}

}
Loading

0 comments on commit 5481add

Please sign in to comment.