Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rln-relay): track last seen event #1296

Merged
merged 14 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions tests/v2/test_waku_rln_relay_onchain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ procSuite "Waku-rln-relay":
debug "membership commitment key", pk2 = pk2

var events = [newFuture[void](), newFuture[void]()]
proc handler(pubkey: Uint256, index: Uint256) =
proc handler(pubkey: Uint256, index: Uint256): RlnRelayResult[void] =
debug "handler is called", pubkey = pubkey, index = index
if pubkey == pk:
events[0].complete()
Expand All @@ -214,9 +214,14 @@ procSuite "Waku-rln-relay":
let isSuccessful = rlnPeer.rlnInstance.insertMember(pubkey.toIDCommitment())
check:
isSuccessful
return ok()

# mount the handler for listening to the contract events
await rlnPeer.handleGroupUpdates(handler)
await subscribeToGroupEvents(ethClientUri = EthClient,
ethAccountAddress = some(accounts[0]),
contractAddress = contractAddress,
blockNumber = "0x0",
handler = handler)

# register a member to the contract
let tx = await contractObj.register(pk).send(value = MembershipFee)
Expand Down
2 changes: 2 additions & 0 deletions waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ when defined(rln) or (not defined(rln) and not defined(rlnzerokit)):
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message
validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion
lastSeenMembershipIndex*: MembershipIndex # the last seen membership index

when defined(rlnzerokit):
type WakuRLNRelay* = ref object
Expand All @@ -130,6 +131,7 @@ when defined(rlnzerokit):
nullifierLog*: Table[Epoch, seq[ProofMetadata]]
lastEpoch*: Epoch # the epoch of the last published rln message
validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion
lastSeenMembershipIndex*: MembershipIndex # the last seen membership index


type MessageValidationResult* {.pure.} = enum
Expand Down
106 changes: 70 additions & 36 deletions waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ proc inHex*(value: IDKey or IDCommitment or MerkleNode or Nullifier or Epoch or
return valueHex

proc toMembershipIndex(v: UInt256): MembershipIndex =
let result: MembershipIndex = cast[MembershipIndex](v)
return result
let membershipIndex: MembershipIndex = cast[MembershipIndex](v)
return membershipIndex

proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAccountPrivKey: keys.PrivateKey, ethClientAddress: string, membershipContractAddress: Address, registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)): Future[Result[MembershipIndex, string]] {.async.} =
# TODO may need to also get eth Account Private Key as PrivateKey
Expand Down Expand Up @@ -920,23 +920,62 @@ proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResul
return err(memberAdded.error())
return ok()

# the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface
type RegistrationEventHandler = proc(pubkey: Uint256, index: Uint256): void {.gcsafe, closure, raises: [Defect].}
type GroupUpdateHandler* = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.gcsafe, raises: [Defect].}

proc subscribeToMemberRegistrations(web3: Web3, contractAddress: Address, handler: RegistrationEventHandler, fromBlock: string = "0x0"): Future[Subscription] {.async, gcsafe} =
var contractObj = web3.contractSender(MembershipContract, contractAddress)
return await contractObj.subscribe(MemberRegistered, %*{"fromBlock": fromBlock, "address": contractAddress}) do(pubkey: Uint256, index: Uint256){.raises: [Defect], gcsafe.}:
proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler =
## assuming all the members arrive in order
## TODO: check the index and the pubkey depending on
## the group update operation
var handler: GroupUpdateHandler
handler = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.raises: [Defect].} =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of the member deletion feature, this handler will soon take in an additional argument, GroupUpdateType, which will be either Insertion or Deletion, and then we can handle those cases separately.

var pk: IDCommitment
try:
debug "onRegister", pubkey = pubkey, index = index
handler(pubkey, index)
except Exception as err:
# chronos still raises exceptions which inherit directly from Exception
error "Error handling new member registration: ", err=err.msg
doAssert false, err.msg
do (err: CatchableError):
error "Error from subscription: ", err=err.msg
pk = pubkey.toIDCommitment()
except:
return err("invalid pubkey")
let isSuccessful = rlnPeer.insertMember(pk)
if isSuccessful.isErr():
return err("failed to add a new member to the Merkle tree")
else:
debug "new member added to the Merkle tree", pubkey=pubkey, index=index
debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex)
let membershipIndex = index.toMembershipIndex()
if rlnPeer.lastSeenMembershipIndex != membershipIndex + 1:
warn "membership index gap, may have lost connection", gap = membershipIndex - rlnPeer.lastSeenMembershipIndex
rlnPeer.lastSeenMembershipIndex = membershipIndex
return ok()
return handler

proc subscribeToMemberRegistrations(web3: Web3,
contractAddress: Address,
fromBlock: string = "0x0",
handler: GroupUpdateHandler): Future[Subscription] {.async, gcsafe.} =
## subscribes to member registrations, on a given membership group contract
## `fromBlock` indicates the block number from which the subscription starts
## `handler` is a callback that is called when a new member is registered
## the callback is called with the pubkey and the index of the new member
## TODO: need a similar proc for member deletions
var contractObj = web3.contractSender(MembershipContract, contractAddress)

proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Option[Address] = none(Address), contractAddress: Address, blockNumber: string = "0x0", handler: RegistrationEventHandler) {.async, gcsafe.} =
let onMemberRegistered = proc (pubkey: Uint256, index: Uint256) {.gcsafe.} =
debug "onRegister", pubkey = pubkey, index = index
let groupUpdateRes = handler(pubkey, index)
if groupUpdateRes.isErr():
error "Error handling new member registration", err=groupUpdateRes.error()

let onError = proc (err: CatchableError) =
error "Error in subscription", err=err.msg

return await contractObj.subscribe(MemberRegistered,
%*{"fromBlock": fromBlock, "address": contractAddress},
onMemberRegistered,
onError)

proc subscribeToGroupEvents*(ethClientUri: string,
ethAccountAddress: Option[Address] = none(Address),
contractAddress: Address,
blockNumber: string = "0x0",
handler: GroupUpdateHandler) {.async, gcsafe.} =
## connects to the eth client whose URI is supplied as `ethClientUri`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the proc description to match the new semantics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 582a6c3

## subscribes to the `MemberRegistered` event emitted from the `MembershipContract` which is available on the supplied `contractAddress`
## it collects all the events starting from the given `blockNumber`
Expand All @@ -952,21 +991,27 @@ proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Option[Addr

proc startSubscription(web3: Web3) {.async, gcsafe.} =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to have a separate proc for this, why not just call the subscribeToMemberRegistrations proc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, when we have the reconnection logic, it becomes easier to resubscribe to events from the last processed block.

# subscribe to the MemberRegistered events
# TODO can do similarly for deletion events, though it is not yet supported
discard await subscribeToMemberRegistrations(web3, contractAddress, handler, blockNumber)
# TODO: can do similarly for deletion events, though it is not yet supported
# TODO: add block number for reconnection logic
discard await subscribeToMemberRegistrations(web3 = web3,
contractAddress = contractAddress,
handler = handler)

await startSubscription(web3)
web3.onDisconnect = proc() =
debug "connection to ethereum node dropped", lastBlock = latestBlock


proc handleGroupUpdates*(rlnPeer: WakuRLNRelay) {.async, gcsafe.} =
## generates the groupUpdateHandler which is called when a new member is registered,
## and has the WakuRLNRelay instance as a closure
let handler = generateGroupUpdateHandler(rlnPeer)
await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress,
ethAccountAddress = rlnPeer.ethAccountAddress,
contractAddress = rlnPeer.membershipContractAddress,
handler = handler)


proc handleGroupUpdates*(rlnPeer: WakuRLNRelay, handler: RegistrationEventHandler) {.async, gcsafe.} =
# mounts the supplied handler for the registration events emitting from the membership contract
await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, ethAccountAddress = rlnPeer.ethAccountAddress, contractAddress = rlnPeer.membershipContractAddress, handler = handler)


proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string, contentTopic: ContentTopic, spamHandler: Option[SpamHandler] = none(SpamHandler)) =
## this procedure is a thin wrapper for the pubsub addValidator method
## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic
Expand Down Expand Up @@ -1063,8 +1108,7 @@ proc mountRlnRelayStatic*(node: WakuNode,
node.addRLNRelayValidator(pubsubTopic, contentTopic, spamHandler)
debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic

node.wakuRlnRelay = rlnPeer

node.wakuRlnRelay = rlnPeer

proc mountRlnRelayDynamic*(node: WakuNode,
ethClientAddr: string = "",
Expand Down Expand Up @@ -1131,17 +1175,7 @@ proc mountRlnRelayDynamic*(node: WakuNode,
pubsubTopic: pubsubTopic,
contentTopic: contentTopic)


proc handler(pubkey: Uint256, index: Uint256) =
debug "a new key is added", pubkey=pubkey
# assuming all the members arrive in order
let pk = pubkey.toIDCommitment()
let isSuccessful = rlnPeer.insertMember(pk)
debug "received pk", pk=pk.inHex, index=index
debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex)
doAssert(isSuccessful.isOk())

asyncSpawn rlnPeer.handleGroupUpdates(handler)
asyncSpawn rlnPeer.handleGroupUpdates()
debug "dynamic group management is started"
# adds a topic validator for the supplied pubsub topic at the relay protocol
# messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped
Expand Down