Skip to content

Commit

Permalink
chore(rln-relay): clean up nullifier table every MaxEpochGap (#1994)
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc authored Sep 6, 2023
1 parent ea31b53 commit 483f40c
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 17 deletions.
1 change: 0 additions & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayBandwidthThreshold: conf.rlnRelayBandwidthThreshold
)

try:
Expand Down
108 changes: 96 additions & 12 deletions tests/waku_rln_relay/test_wakunode_rln_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,12 @@ import
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/peerid,
libp2p/multiaddress,
libp2p/switch,
libp2p/protocols/pubsub/pubsub,
eth/keys
libp2p/protocols/pubsub/pubsub
import
../../../waku/waku_core,
../../../waku/waku_node,
../../../waku/waku_rln_relay,
../../../waku/waku_keystore,
../testlib/wakucore,
../testlib/wakunode

Expand Down Expand Up @@ -206,7 +201,6 @@ procSuite "WakuNode - RLN relay":
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_4"),
rlnRelayBandwidthThreshold: 0,
))

await node1.start()
Expand All @@ -217,7 +211,6 @@ procSuite "WakuNode - RLN relay":
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_5"),
rlnRelayBandwidthThreshold: 0,
))

await node2.start()
Expand All @@ -228,7 +221,6 @@ procSuite "WakuNode - RLN relay":
await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(3.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_6"),
rlnRelayBandwidthThreshold: 0,
))

await node3.start()
Expand Down Expand Up @@ -308,7 +300,6 @@ procSuite "WakuNode - RLN relay":
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_7"),
rlnRelayBandwidthThreshold: 0,
))

await node1.start()
Expand All @@ -320,7 +311,6 @@ procSuite "WakuNode - RLN relay":
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_8"),
rlnRelayBandwidthThreshold: 0,
))

await node2.start()
Expand All @@ -332,7 +322,6 @@ procSuite "WakuNode - RLN relay":
await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(3.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_9"),
rlnRelayBandwidthThreshold: 0,
))

await node3.start()
Expand Down Expand Up @@ -407,3 +396,98 @@ procSuite "WakuNode - RLN relay":
await node1.stop()
await node2.stop()
await node3.stop()

asyncTest "clearNullifierLog: should clear epochs > MaxEpochGap":

let
# publisher node
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(0))
# Relay node
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0))
# Subscriber
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0))

contentTopic = ContentTopic("/waku/2/default-content/proto")

# set up 2 nodes
# node1
await node1.mountRelay(@[DefaultPubsubTopic])

# mount rlnrelay in off-chain mode
await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_10"),
))

await node1.start()

# node 2
await node2.mountRelay(@[DefaultPubsubTopic])

# mount rlnrelay in off-chain mode
await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_11"),
))

await node2.start()

await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])

# get the current epoch time
let time = epochTime()
# create some messages with rate limit proofs
var
wm1 = WakuMessage(payload: "message 1".toBytes(), contentTopic: contentTopic)
proofAdded1 = node1.wakuRlnRelay.appendRLNProof(wm1, time)
# another message in the same epoch as wm1, it will break the messaging rate limit
wm2 = WakuMessage(payload: "message 2".toBytes(), contentTopic: contentTopic)
proofAdded2 = node1.wakuRlnRelay.appendRLNProof(wm2, time + EpochUnitSeconds)
# wm3 points to the next epoch
wm3 = WakuMessage(payload: "message 3".toBytes(), contentTopic: contentTopic)
proofAdded3 = node1.wakuRlnRelay.appendRLNProof(wm3, time + EpochUnitSeconds * 2)

# check proofs are added correctly
check:
proofAdded1
proofAdded2
proofAdded3

# relay handler for node2
var completionFut1 = newFuture[bool]()
var completionFut2 = newFuture[bool]()
var completionFut3 = newFuture[bool]()
proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
debug "The received topic:", topic
if topic == DefaultPubsubTopic:
if msg == wm1:
completionFut1.complete(true)
if msg == wm2:
completionFut2.complete(true)
if msg == wm3:
completionFut3.complete(true)

# mount the relay handler for node2
node2.subscribe(DefaultPubsubTopic, relayHandler)
await sleepAsync(2000.millis)

await node1.publish(DefaultPubsubTopic, wm1)
await sleepAsync(10.seconds)
await node1.publish(DefaultPubsubTopic, wm2)
await sleepAsync(10.seconds)
await node1.publish(DefaultPubsubTopic, wm3)

let
res1 = await completionFut1.withTimeout(10.seconds)
res2 = await completionFut2.withTimeout(10.seconds)
res3 = await completionFut3.withTimeout(10.seconds)

check:
(res1 and res2 and res3) == true # all 3 are valid
node2.wakuRlnRelay.nullifierLog.len() == 1 # after clearing, only 1 is stored

await node1.stop()
await node2.stop()
20 changes: 16 additions & 4 deletions waku/waku_rln_relay/rln_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type WakuRlnConfig* = object
rlnRelayCredPath*: string
rlnRelayCredPassword*: string
rlnRelayTreePath*: string
rlnRelayBandwidthThreshold*: int

proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[(
seq[RawMembershipCredentials], string
Expand Down Expand Up @@ -77,7 +76,7 @@ proc calcEpoch*(t: float64): Epoch =

type WakuRLNRelay* = ref object of RootObj
# the log of nullifiers and Shamir shares of the past messages grouped per epoch
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
nullifierLog*: OrderedTable[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message
groupManager*: GroupManager

Expand Down Expand Up @@ -300,13 +299,26 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay,
msg.proof = proofGenRes.get().encode().buffer
return true

proc clearNullifierLog(rlnPeer: WakuRlnRelay) =
# clear the first MaxEpochGap epochs of the nullifer log
# if more than MaxEpochGap epochs are in the log
# note: the epochs are ordered ascendingly
if rlnPeer.nullifierLog.len().uint < MaxEpochGap:
return

trace "clearing epochs from the nullifier log", count = MaxEpochGap
let epochsToClear = rlnPeer.nullifierLog.keys().toSeq()[0..<MaxEpochGap]
for epoch in epochsToClear:
rlnPeer.nullifierLog.del(epoch)

proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
spamHandler: Option[SpamHandler] = none(SpamHandler)): WakuValidatorHandler =
spamHandler = none(SpamHandler)): WakuValidatorHandler =
## this procedure is a thin wrapper for the pubsub addValidator method
## it sets a validator for waku messages, acting in the registered pubsub topic
## the message validation logic is according to https://rfc.vac.dev/spec/17/
proc validator(topic: string, message: WakuMessage): Future[pubsub.ValidationResult] {.async.} =
trace "rln-relay topic validator is called"
wakuRlnRelay.clearNullifierLog()

let decodeRes = RateLimitProof.init(message.proof)

Expand Down Expand Up @@ -342,7 +354,7 @@ proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
return validator

proc mount(conf: WakuRlnConfig,
registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)
registrationHandler = none(RegistrationHandler)
): Future[WakuRlnRelay] {.async.} =
var
groupManager: GroupManager
Expand Down

0 comments on commit 483f40c

Please sign in to comment.