Skip to content

Commit

Permalink
feat: allow ignoring PublishError.Duplicate (#404)
Browse files Browse the repository at this point in the history
* feat: allow for duplicate messages (#402)

* feat: return empty array if ignoreDuplicateMessages is enabled (#402)

* feat: added gauge metric for tracking ignored duplicate messages (#402)

* feat: updated metric name for gauge to include published messages that are ignored (#402)

* chore: renaming

* chore: fix linter error

* chore: updated name to ignoreDuplicatePublishError (#402)

---------

Co-authored-by: Cayman <caymannava@gmail.com>
  • Loading branch information
maschad and wemeetagain authored Feb 21, 2023
1 parent bae1492 commit dcde3c9
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
asyncValidation: boolean
/** Do not throw `InsufficientPeers` error if publishing to zero peers */
allowPublishToZeroPeers: boolean
/** Do not throw `PublishError.Duplicate` if publishing duplicate messages */
ignoreDuplicatePublishError: boolean
/** For a single stream, await processing each RPC before processing the next */
awaitRpcHandler: boolean
/** For a single RPC, await processing each message before processing the next */
Expand Down Expand Up @@ -2013,9 +2015,16 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
const msgId = await this.msgIdFn(msg)
const msgIdStr = this.msgIdToStrFn(msgId)

// Current publish opt takes precedence global opts, while preserving false value
const ignoreDuplicatePublishError = opts?.ignoreDuplicatePublishError ?? this.opts.ignoreDuplicatePublishError

if (this.seenCache.has(msgIdStr)) {
// This message has already been seen. We don't re-publish messages that have already
// been published on the network.
if (ignoreDuplicatePublishError) {
this.metrics?.onPublishDuplicateMsg(topic)
return { recipients: [] }
}
throw Error('PublishError.Duplicate')
}

Expand Down
11 changes: 11 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ export function getMetrics(
labelNames: ['topic']
}),

duplicateMsgIgnored: register.gauge<{ topic: TopicLabel }>({
name: 'gossisub_ignored_published_duplicate_msgs_total',
help: 'Total count of published duplicate message ignored by topic',
labelNames: ['topic']
}),

/* Metrics related to scoring */
/** Total times score() is called */
scoreFnCalls: register.gauge({
Expand Down Expand Up @@ -629,6 +635,11 @@ export function getMetrics(
}
},

onPublishDuplicateMsg(topicStr: TopicStr): void {
const topic = this.toTopic(topicStr)
this.duplicateMsgIgnored.inc({ topic }, 1)
},

onRpcRecv(rpc: IRPC, rpcBytes: number): void {
this.rpcRecvBytes.inc(rpcBytes)
this.rpcRecvCount.inc(1)
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export enum SignaturePolicy {

export type PublishOpts = {
allowPublishToZeroPeers?: boolean
ignoreDuplicatePublishError?: boolean
}

export enum PublishConfigType {
Expand Down

0 comments on commit dcde3c9

Please sign in to comment.