diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index fee3861ce1..3b96a4b97b 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -383,7 +383,8 @@ proc setupProtocols(node: WakuNode, rlnRelayEthAccountAddress: conf.rlnRelayEthAccountAddress, rlnRelayCredPath: conf.rlnRelayCredPath, rlnRelayCredentialsPassword: conf.rlnRelayCredentialsPassword, - rlnRelayTreePath: conf.rlnRelayTreePath + rlnRelayTreePath: conf.rlnRelayTreePath, + rlnRelayBandwidthThreshold: conf.rlnRelayBandwidthThreshold ) try: diff --git a/apps/wakunode2/external_config.nim b/apps/wakunode2/external_config.nim index 74c1b5dcdb..67a2cbf4c9 100644 --- a/apps/wakunode2/external_config.nim +++ b/apps/wakunode2/external_config.nim @@ -211,6 +211,11 @@ type defaultValue: "" name: "rln-relay-tree-path" }: string + rlnRelayBandwidthThreshold* {. + desc: "Message rate in bytes/sec after which verification of proofs should happen", + defaultValue: 0 # to maintain backwards compatibility + name: "rln-relay-bandwidth-threshold" }: int + staticnodes* {. desc: "Peer multiaddr to directly connect with. Argument may be repeated." name: "staticnode" }: seq[string] diff --git a/tests/v2/waku_rln_relay/test_waku_rln_relay.nim b/tests/v2/waku_rln_relay/test_waku_rln_relay.nim index 60be443d8c..9c891aa96b 100644 --- a/tests/v2/waku_rln_relay/test_waku_rln_relay.nim +++ b/tests/v2/waku_rln_relay/test_waku_rln_relay.nim @@ -662,7 +662,8 @@ suite "Waku rln relay": let rlnConf = WakuRlnConfig(rlnRelayDynamic: false, rlnRelayPubsubTopic: RlnRelayPubsubTopic, rlnRelayContentTopic: RlnRelayContentTopic, - rlnRelayCredIndex: index.uint) + rlnRelayCredIndex: index.uint, + rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_2")) let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf) require: wakuRlnRelayRes.isOk() @@ -708,6 +709,63 @@ suite "Waku rln relay": msgValidate2 == MessageValidationResult.Spam msgValidate3 == MessageValidationResult.Valid msgValidate4 == MessageValidationResult.Invalid + + asyncTest "should validate invalid proofs if bandwidth is available": + let index = MembershipIndex(5) + + let rlnConf = WakuRlnConfig(rlnRelayDynamic: false, + rlnRelayPubsubTopic: RlnRelayPubsubTopic, + rlnRelayContentTopic: RlnRelayContentTopic, + rlnRelayCredIndex: index.uint, + rlnRelayBandwidthThreshold: 4, + rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_3")) + let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf) + require: + wakuRlnRelayRes.isOk() + let wakuRlnRelay = wakuRlnRelayRes.get() + + # get the current epoch time + let time = epochTime() + + # create some messages from the same peer and append rln proof to them, except wm4 + var + # this one will pass through the bandwidth threshold + wm1 = WakuMessage(payload: "Spam".toBytes()) + # this message, will be over the bandwidth threshold, hence has to be verified + wm2 = WakuMessage(payload: "Valid message".toBytes()) + # this message will be over the bandwidth threshold, hence has to be verified, will be false (since no proof) + wm3 = WakuMessage(payload: "Invalid message".toBytes()) + wm4 = WakuMessage(payload: "Spam message".toBytes()) + + let + proofAdded1 = wakuRlnRelay.appendRLNProof(wm1, time) + proofAdded2 = wakuRlnRelay.appendRLNProof(wm2, time+EpochUnitSeconds) + proofAdded3 = wakuRlnRelay.appendRLNProof(wm4, time) + + # ensure proofs are added + require: + proofAdded1 + proofAdded2 + proofAdded3 + + # validate messages + # validateMessage proc checks the validity of the message fields and adds it to the log (if valid) + let + # this should be no verification, Valid + msgValidate1 = wakuRlnRelay.validateMessage(wm1, some(time)) + # this should be verification, Valid + msgValidate2 = wakuRlnRelay.validateMessage(wm2, some(time)) + # this should be verification, Invalid + msgValidate3 = wakuRlnRelay.validateMessage(wm3, some(time)) + # this should be verification, Spam + msgValidate4 = wakuRlnRelay.validateMessage(wm4, some(time)) + + check: + msgValidate1 == MessageValidationResult.Valid + msgValidate2 == MessageValidationResult.Valid + msgValidate3 == MessageValidationResult.Invalid + msgValidate4 == MessageValidationResult.Spam + test "toIDCommitment and toUInt256": # create an instance of rln diff --git a/tests/v2/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/v2/waku_rln_relay/test_wakunode_rln_relay.nim index 5a7e7d6152..1ac356c747 100644 --- a/tests/v2/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/v2/waku_rln_relay/test_wakunode_rln_relay.nim @@ -142,6 +142,7 @@ procSuite "WakuNode - RLN relay": rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 1.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_4"), + rlnRelayBandwidthThreshold: 0, )) await node1.start() @@ -154,6 +155,7 @@ procSuite "WakuNode - RLN relay": rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 2.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_5"), + rlnRelayBandwidthThreshold: 0, )) await node2.start() @@ -166,6 +168,7 @@ procSuite "WakuNode - RLN relay": rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 3.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_6"), + rlnRelayBandwidthThreshold: 0, )) await node3.start() @@ -248,6 +251,7 @@ procSuite "WakuNode - RLN relay": rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 1.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_7"), + rlnRelayBandwidthThreshold: 0, )) await node1.start() @@ -261,6 +265,7 @@ procSuite "WakuNode - RLN relay": rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 2.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_8"), + rlnRelayBandwidthThreshold: 0, )) await node2.start() @@ -274,6 +279,7 @@ procSuite "WakuNode - RLN relay": rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 3.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_9"), + rlnRelayBandwidthThreshold: 0, )) await node3.start() diff --git a/waku/v2/waku_rln_relay/rln_relay.nim b/waku/v2/waku_rln_relay/rln_relay.nim index 2bef8acc90..0c04c18a06 100644 --- a/waku/v2/waku_rln_relay/rln_relay.nim +++ b/waku/v2/waku_rln_relay/rln_relay.nim @@ -5,7 +5,7 @@ else: import std/[algorithm, sequtils, strutils, tables, times, os, deques], - chronicles, options, chronos, stint, + chronicles, options, chronos, chronos/ratelimit, stint, confutils, web3, json, web3/ethtypes, @@ -23,7 +23,8 @@ import ./protocol_metrics import ../waku_core, - ../waku_keystore + ../waku_keystore, + ../utils/collector logScope: topics = "waku rln_relay" @@ -41,6 +42,7 @@ type WakuRlnConfig* = object rlnRelayCredPath*: string rlnRelayCredentialsPassword*: string rlnRelayTreePath*: string + rlnRelayBandwidthThreshold*: int proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[( seq[RawMembershipCredentials], string @@ -86,6 +88,7 @@ type WakuRLNRelay* = ref object of RootObj nullifierLog*: Table[Epoch, seq[ProofMetadata]] lastEpoch*: Epoch # the epoch of the last published rln message groupManager*: GroupManager + messageBucket*: Option[TokenBucket] proc hasDuplicate*(rlnPeer: WakuRLNRelay, proofMetadata: ProofMetadata): RlnRelayResult[bool] = @@ -160,14 +163,16 @@ proc absDiff*(e1, e2: Epoch): uint64 = else: return epoch2 - epoch1 -proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, - timeOption: Option[float64] = none(float64)): MessageValidationResult = +proc validateMessage*(rlnPeer: WakuRLNRelay, + msg: WakuMessage, + timeOption = none(float64)): MessageValidationResult = ## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e., ## the `msg`'s epoch is within MaxEpochGap of the current epoch ## the `msg` has valid rate limit proof ## the `msg` does not violate the rate limit ## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds) ## if `timeOption` is supplied, then the current epoch is calculated based on that + let decodeRes = RateLimitProof.init(msg.proof) if decodeRes.isErr(): return MessageValidationResult.Invalid @@ -286,6 +291,18 @@ proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay, let contentTopic = wakuRlnRelay.contentTopic proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = trace "rln-relay topic validator is called" + + ## Check if enough tokens can be consumed from the message bucket + try: + if wakuRlnRelay.messageBucket.isSome() and + wakuRlnRelay.messageBucket.get().tryConsume(message.data.len): + return pubsub.ValidationResult.Accept + else: + info "message bandwidth limit exceeded, running rate limit proof validation" + except OverflowDefect: # not a problem + debug "not enough bandwidth, running rate limit proof validation" + + let decodeRes = WakuMessage.decode(message.data) if decodeRes.isOk(): let @@ -377,9 +394,14 @@ proc mount(conf: WakuRlnConfig, # Start the group sync await groupManager.startGroupSync() + let messageBucket = if conf.rlnRelayBandwidthThreshold > 0: + some(TokenBucket.new(conf.rlnRelayBandwidthThreshold)) + else: none(TokenBucket) + return WakuRLNRelay(pubsubTopic: conf.rlnRelayPubsubTopic, contentTopic: conf.rlnRelayContentTopic, - groupManager: groupManager) + groupManager: groupManager, + messageBucket: messageBucket) proc new*(T: type WakuRlnRelay,