Skip to content

Commit

Permalink
feat(relay): ordered validator execution (#1966)
Browse files Browse the repository at this point in the history
* feat(relay): ordered validator execution

* fix: make more readable

* test: ignore accepts only

* fix: idempotent .subscribe

* fix(rln-relay): make validators private

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>

* fix: include comments, unsubscribe behaviour

* fix: compilation

---------

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>
  • Loading branch information
rymnc and Ivansete-status authored Sep 5, 2023
1 parent 97a7c9d commit debc5f1
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 144 deletions.
18 changes: 8 additions & 10 deletions apps/wakunode2/wakunode2_validator_signed.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,16 @@ proc withinTimeWindow*(msg: WakuMessage): bool =
proc addSignedTopicValidator*(w: WakuRelay, topic: PubsubTopic, publicTopicKey: SkPublicKey) =
debug "adding validator to signed topic", topic=topic, publicTopicKey=publicTopicKey

proc validator(topic: string, message: messages.Message): Future[errors.ValidationResult] {.async.} =
let msg = WakuMessage.decode(message.data)
proc validator(topic: string, msg: WakuMessage): Future[errors.ValidationResult] {.async.} =
var outcome = errors.ValidationResult.Reject

if msg.isOk():
if msg.get.timestamp != 0:
if msg.get.withinTimeWindow():
let msgHash = SkMessage(topic.msgHash(msg.get))
let recoveredSignature = SkSignature.fromRaw(msg.get.meta)
if recoveredSignature.isOk():
if recoveredSignature.get.verify(msgHash, publicTopicKey):
outcome = errors.ValidationResult.Accept
if msg.timestamp != 0:
if msg.withinTimeWindow():
let msgHash = SkMessage(topic.msgHash(msg))
let recoveredSignature = SkSignature.fromRaw(msg.meta)
if recoveredSignature.isOk():
if recoveredSignature.get.verify(msgHash, publicTopicKey):
outcome = errors.ValidationResult.Accept

waku_msg_validator_signed_outcome.inc(labelValues = [$outcome])
return outcome
Expand Down
8 changes: 2 additions & 6 deletions tests/waku_relay/test_waku_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,9 @@ suite "Waku Relay":
await sleepAsync(500.millis)

# Validator
proc validator(topic: PubsubTopic, msg: Message): Future[ValidationResult] {.async.} =
let msg = WakuMessage.decode(msg.data)
if msg.isErr():
return ValidationResult.Ignore

proc validator(topic: PubsubTopic, msg: WakuMessage): Future[ValidationResult] {.async.} =
# only relay messages with contentTopic1
if msg.value.contentTopic != contentTopic:
if msg.contentTopic != contentTopic:
return ValidationResult.Reject

return ValidationResult.Accept
Expand Down
9 changes: 2 additions & 7 deletions tests/waku_relay/test_wakunode_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,13 @@ suite "WakuNode - Relay":
var completionFutValidatorRej = newFuture[bool]()

# set a topic validator for pubSubTopic
proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
proc validator(topic: string, msg: WakuMessage): Future[ValidationResult] {.async.} =
## the validator that only allows messages with contentTopic1 to be relayed
check:
topic == pubSubTopic

let msg = WakuMessage.decode(message.data)
if msg.isErr():
completionFutValidatorAcc.complete(false)
return ValidationResult.Reject

# only relay messages with contentTopic1
if msg.value.contentTopic != contentTopic1:
if msg.contentTopic != contentTopic1:
completionFutValidatorRej.complete(true)
return ValidationResult.Reject

Expand Down
55 changes: 0 additions & 55 deletions tests/waku_rln_relay/test_waku_rln_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -710,61 +710,6 @@ suite "Waku rln relay":
msgValidate3 == MessageValidationResult.Valid
msgValidate4 == MessageValidationResult.Invalid

asyncTest "should validate invalid proofs if bandwidth is available":
let index = MembershipIndex(5)

let rlnConf = WakuRlnConfig(rlnRelayDynamic: false,
rlnRelayCredIndex: some(index),
rlnRelayBandwidthThreshold: 4,
rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_3"))
let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf)
require:
wakuRlnRelayRes.isOk()
let wakuRlnRelay = wakuRlnRelayRes.get()

# get the current epoch time
let time = epochTime()

# create some messages from the same peer and append rln proof to them, except wm4
var
# this one will pass through the bandwidth threshold
wm1 = WakuMessage(payload: "Spam".toBytes())
# this message, will be over the bandwidth threshold, hence has to be verified
wm2 = WakuMessage(payload: "Valid message".toBytes())
# this message will be over the bandwidth threshold, hence has to be verified, will be false (since no proof)
wm3 = WakuMessage(payload: "Invalid message".toBytes())
wm4 = WakuMessage(payload: "Spam message".toBytes())

let
proofAdded1 = wakuRlnRelay.appendRLNProof(wm1, time)
proofAdded2 = wakuRlnRelay.appendRLNProof(wm2, time+EpochUnitSeconds)
proofAdded3 = wakuRlnRelay.appendRLNProof(wm4, time)

# ensure proofs are added
require:
proofAdded1
proofAdded2
proofAdded3

# validate messages
# validateMessage proc checks the validity of the message fields and adds it to the log (if valid)
let
# this should be no verification, Valid
msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1, some(time))
# this should be verification, Valid
msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2, some(time))
# this should be verification, Invalid
msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3, some(time))
# this should be verification, Spam
msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4, some(time))

check:
msgValidate1 == MessageValidationResult.Valid
msgValidate2 == MessageValidationResult.Valid
msgValidate3 == MessageValidationResult.Invalid
msgValidate4 == MessageValidationResult.Spam


test "toIDCommitment and toUInt256":
# create an instance of rln
let rlnInstance = createRLNInstanceWrapper()
Expand Down
2 changes: 1 addition & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ when defined(rln):
# register rln validator for all subscribed relay pubsub topics
for pubsubTopic in node.wakuRelay.subscribedTopics:
debug "Registering RLN validator for topic", pubsubTopic=pubsubTopic
procCall GossipSub(node.wakuRelay).addValidator(pubsubTopic, validator)
node.wakuRelay.addValidator(pubsubTopic, validator)
node.wakuRlnRelay = rlnRelay

## Waku peer-exchange
Expand Down
53 changes: 38 additions & 15 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ else:
{.push raises: [].}

import
std/sequtils,
stew/results,
sequtils,
chronos,
chronicles,
metrics,
Expand Down Expand Up @@ -122,7 +122,14 @@ const GossipsubParameters = GossipSubParams(
type
WakuRelayResult*[T] = Result[T, string]
WakuRelayHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].}
WakuValidatorHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[ValidationResult] {.gcsafe, raises: [Defect].}
WakuRelay* = ref object of GossipSub
# a map of PubsubTopic's => seq[WakuValidatorHandler] that are
# called in order every time a message is received on a given pubsub topic
wakuValidators: Table[PubsubTopic, seq[WakuValidatorHandler]]
# a map that stores whether the ordered validator has been inserted
# for a given PubsubTopic
validatorInserted: Table[PubsubTopic, bool]

proc initProtocolHandler(w: WakuRelay) =
proc handler(conn: Connection, proto: string) {.async.} =
Expand Down Expand Up @@ -167,9 +174,11 @@ proc new*(T: type WakuRelay, switch: Switch): WakuRelayResult[T] =

return ok(w)

method addValidator*(w: WakuRelay, topic: varargs[string], handler: ValidatorHandler) {.gcsafe.} =
procCall GossipSub(w).addValidator(topic, handler)

proc addValidator*(w: WakuRelay,
topic: varargs[string],
handler: WakuValidatorHandler) {.gcsafe.} =
for t in topic:
w.wakuValidators.mgetOrPut(t, @[]).add(handler)

method start*(w: WakuRelay) {.async.} =
debug "start"
Expand All @@ -179,21 +188,32 @@ method stop*(w: WakuRelay) {.async.} =
debug "stop"
await procCall GossipSub(w).stop()

# rejects messages that are not WakuMessage
proc validator(pubsubTopic: string, message: messages.Message): Future[ValidationResult] {.async.} =
# can be optimized by checking if the message is a WakuMessage without allocating memory
# see nim-libp2p protobuf library
let msg = WakuMessage.decode(message.data)
if msg.isOk():
return ValidationResult.Accept
return ValidationResult.Reject

proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
GossipSub(w).topics.hasKey(topic)

proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
return toSeq(GossipSub(w).topics.keys())

proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} =
# rejects messages that are not WakuMessage
let wrappedValidator = proc(pubsubTopic: string,
message: messages.Message): Future[ValidationResult] {.async.} =
# can be optimized by checking if the message is a WakuMessage without allocating memory
# see nim-libp2p protobuf library
let msgRes = WakuMessage.decode(message.data)
if msgRes.isErr():
return ValidationResult.Reject
let msg = msgRes.get()

# now sequentially validate the message
if w.wakuValidators.hasKey(pubsubTopic):
for validator in w.wakuValidators[pubsubTopic]:
let validatorRes = await validator(pubsubTopic, msg)
if validatorRes != ValidationResult.Accept:
return validatorRes
return ValidationResult.Accept
return wrappedValidator

proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) =
debug "subscribe", pubsubTopic=pubsubTopic

Expand All @@ -209,8 +229,10 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle
else:
return handler(pubsubTopic, decMsg.get())

# add the default validator to the topic
procCall GossipSub(w).addValidator(pubSubTopic, validator)
# add the ordered validator to the topic
if not w.validatorInserted.hasKey(pubSubTopic):
procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator())
w.validatorInserted[pubSubTopic] = true

# set this topic parameters for scoring
w.topicParams[pubsubTopic] = TopicParameters
Expand All @@ -222,6 +244,7 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
debug "unsubscribe", pubsubTopic=pubsubTopic

procCall GossipSub(w).unsubscribeAll(pubsubTopic)
w.validatorInserted.del(pubsubTopic)

proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage): Future[int] {.async.} =
trace "publish", pubsubTopic=pubsubTopic
Expand Down
82 changes: 32 additions & 50 deletions waku/waku_rln_relay/rln_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import
./protocol_types,
./protocol_metrics
import
../waku_relay, # for WakuRelayHandler
../waku_core,
../waku_keystore,
../utils/collector
Expand Down Expand Up @@ -79,7 +80,6 @@ type WakuRLNRelay* = ref object of RootObj
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message
groupManager*: GroupManager
messageBucket*: Option[TokenBucket]

method stop*(rlnPeer: WakuRLNRelay) {.async.} =
## stops the rln-relay protocol
Expand Down Expand Up @@ -301,57 +301,44 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay,
return true

proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay,
spamHandler: Option[SpamHandler] = none(SpamHandler)): pubsub.ValidatorHandler =
spamHandler: Option[SpamHandler] = none(SpamHandler)): WakuValidatorHandler =
## this procedure is a thin wrapper for the pubsub addValidator method
## it sets a validator for waku messages, acting in the registered pubsub topic
## the message validation logic is according to https://rfc.vac.dev/spec/17/
proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} =
proc validator(topic: string, message: WakuMessage): Future[pubsub.ValidationResult] {.async.} =
trace "rln-relay topic validator is called"

## Check if enough tokens can be consumed from the message bucket
try:
if wakuRlnRelay.messageBucket.isSome() and
wakuRlnRelay.messageBucket.get().tryConsume(message.data.len):
return pubsub.ValidationResult.Accept
else:
trace "message bandwidth limit exceeded, running rate limit proof validation"
except OverflowDefect: # not a problem
trace "not enough bandwidth, running rate limit proof validation"
let decodeRes = RateLimitProof.init(message.proof)

let decodeRes = WakuMessage.decode(message.data)
if decodeRes.isOk():
let wakumessage = decodeRes.value
let decodeRes = RateLimitProof.init(wakumessage.proof)
if decodeRes.isErr():
return pubsub.ValidationResult.Reject

if decodeRes.isErr():
return pubsub.ValidationResult.Reject
let msgProof = decodeRes.get()

# validate the message and update log
let validationRes = wakuRlnRelay.validateMessageAndUpdateLog(message)

let msgProof = decodeRes.get()

# validate the message and update log
let validationRes = wakuRlnRelay.validateMessageAndUpdateLog(wakumessage)

let
proof = toHex(msgProof.proof)
epoch = fromEpoch(msgProof.epoch)
root = inHex(msgProof.merkleRoot)
shareX = inHex(msgProof.shareX)
shareY = inHex(msgProof.shareY)
nullifier = inHex(msgProof.nullifier)
payload = string.fromBytes(wakumessage.payload)
case validationRes:
of Valid:
trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
return pubsub.ValidationResult.Accept
of Invalid:
trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
return pubsub.ValidationResult.Reject
of Spam:
trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
if spamHandler.isSome():
let handler = spamHandler.get()
handler(wakumessage)
return pubsub.ValidationResult.Reject
let
proof = toHex(msgProof.proof)
epoch = fromEpoch(msgProof.epoch)
root = inHex(msgProof.merkleRoot)
shareX = inHex(msgProof.shareX)
shareY = inHex(msgProof.shareY)
nullifier = inHex(msgProof.nullifier)
payload = string.fromBytes(message.payload)
case validationRes:
of Valid:
trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
return pubsub.ValidationResult.Accept
of Invalid:
trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
return pubsub.ValidationResult.Reject
of Spam:
trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier
if spamHandler.isSome():
let handler = spamHandler.get()
handler(message)
return pubsub.ValidationResult.Reject
return validator

proc mount(conf: WakuRlnConfig,
Expand Down Expand Up @@ -393,12 +380,7 @@ proc mount(conf: WakuRlnConfig,
# Start the group sync
await groupManager.startGroupSync()

let messageBucket = if conf.rlnRelayBandwidthThreshold > 0:
some(TokenBucket.new(conf.rlnRelayBandwidthThreshold))
else: none(TokenBucket)

return WakuRLNRelay(groupManager: groupManager,
messageBucket: messageBucket)
return WakuRLNRelay(groupManager: groupManager)


proc new*(T: type WakuRlnRelay,
Expand Down

0 comments on commit debc5f1

Please sign in to comment.