From 3030fe3b592767246905bf24abd65a398963d145 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 25 Oct 2022 12:23:33 +0530 Subject: [PATCH 1/5] feat(rln-relay): track last seen event --- .../waku_rln_relay/waku_rln_relay_types.nim | 2 + .../waku_rln_relay/waku_rln_relay_utils.nim | 85 ++++++++++++------- 2 files changed, 54 insertions(+), 33 deletions(-) diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim index 7d66a44a3f..4713d439ad 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_types.nim @@ -106,6 +106,7 @@ when defined(rln) or (not defined(rln) and not defined(rlnzerokit)): nullifierLog*: Table[Epoch, seq[ProofMetadata]] lastEpoch*: Epoch # the epoch of the last published rln message validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion + lastSeenMembershipIndex*: MembershipIndex # the last seen membership index when defined(rlnzerokit): type WakuRLNRelay* = ref object @@ -130,6 +131,7 @@ when defined(rlnzerokit): nullifierLog*: Table[Epoch, seq[ProofMetadata]] lastEpoch*: Epoch # the epoch of the last published rln message validMerkleRoots*: Deque[MerkleNode] # An array of valid merkle roots, which are updated in a FIFO fashion + lastSeenMembershipIndex*: MembershipIndex # the last seen membership index type MessageValidationResult* {.pure.} = enum diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim index 3b4f983c51..b15b6a0bf6 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim @@ -194,8 +194,8 @@ proc inHex*(value: IDKey or IDCommitment or MerkleNode or Nullifier or Epoch or return valueHex proc toMembershipIndex(v: UInt256): MembershipIndex = - let result: MembershipIndex = cast[MembershipIndex](v) - return result + let membershipIndex: MembershipIndex = cast[MembershipIndex](v) + return membershipIndex proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAccountPrivKey: keys.PrivateKey, ethClientAddress: string, membershipContractAddress: Address, registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)): Future[Result[MembershipIndex, string]] {.async.} = # TODO may need to also get eth Account Private Key as PrivateKey @@ -913,23 +913,48 @@ proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResul return err(memberAdded.error()) return ok() -# the types of inputs to this handler matches the MemberRegistered event/proc defined in the MembershipContract interface -type RegistrationEventHandler = proc(pubkey: Uint256, index: Uint256): void {.gcsafe, closure, raises: [Defect].} +type GroupUpdateHandler = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.gcsafe, raises: [Defect].} -proc subscribeToMemberRegistrations(web3: Web3, contractAddress: Address, handler: RegistrationEventHandler, fromBlock: string = "0x0"): Future[Subscription] {.async, gcsafe} = - var contractObj = web3.contractSender(MembershipContract, contractAddress) - return await contractObj.subscribe(MemberRegistered, %*{"fromBlock": fromBlock, "address": contractAddress}) do(pubkey: Uint256, index: Uint256){.raises: [Defect], gcsafe.}: +proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = + # assuming all the members arrive in order + # TODO: check the index and the pubkey depending on + # the group update operation + var handler: GroupUpdateHandler + handler = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.raises: [Defect].} = + var pk: IDCommitment + try: + pk = pubkey.toIDCommitment() + except: + return err("invalid pubkey") + var isSuccessful: RlnRelayResult[void] try: + isSuccessful = rlnPeer.insertMember(pk) + except: + return err("failed to insert member") + if isSuccessful.isErr(): + return err("failed to add a new member to the Merkle tree") + else: + debug "new member added to the Merkle tree", pubkey=pubkey, index=index + debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex) + rlnPeer.lastSeenMembershipIndex = index.toMembershipIndex() + return ok() + return handler + +proc subscribeToMemberRegistrations(web3: Web3, contractAddress: Address, fromBlock: string = "0x0", handler: GroupUpdateHandler): Future[Subscription] {.async, gcsafe.} = + var contractObj = web3.contractSender(MembershipContract, contractAddress) + return await contractObj.subscribe(MemberRegistered, %*{"fromBlock": fromBlock, "address": contractAddress}) do(pubkey: Uint256, index: Uint256){.gcsafe.}: debug "onRegister", pubkey = pubkey, index = index - handler(pubkey, index) - except Exception as err: - # chronos still raises exceptions which inherit directly from Exception - error "Error handling new member registration: ", err=err.msg - doAssert false, err.msg + let groupUpdateRes = handler(pubkey, index) + if groupUpdateRes.isErr(): + error "Error handling new member registration: ", err=groupUpdateRes.error() do (err: CatchableError): error "Error from subscription: ", err=err.msg -proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Option[Address] = none(Address), contractAddress: Address, blockNumber: string = "0x0", handler: RegistrationEventHandler) {.async, gcsafe.} = +proc subscribeToGroupEvents(ethClientUri: string, + ethAccountAddress: Option[Address] = none(Address), + contractAddress: Address, + blockNumber: string = "0x0", + handler: GroupUpdateHandler) {.async, gcsafe.} = ## connects to the eth client whose URI is supplied as `ethClientUri` ## subscribes to the `MemberRegistered` event emitted from the `MembershipContract` which is available on the supplied `contractAddress` ## it collects all the events starting from the given `blockNumber` @@ -945,20 +970,25 @@ proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Option[Addr proc startSubscription(web3: Web3) {.async, gcsafe.} = # subscribe to the MemberRegistered events - # TODO can do similarly for deletion events, though it is not yet supported - discard await subscribeToMemberRegistrations(web3, contractAddress, handler, blockNumber) + # TODO: can do similarly for deletion events, though it is not yet supported + # TODO: add block number for reconnection logic + discard await subscribeToMemberRegistrations(web3 = web3, + contractAddress = contractAddress, + handler = handler) await startSubscription(web3) web3.onDisconnect = proc() = debug "connection to ethereum node dropped", lastBlock = latestBlock - - -proc handleGroupUpdates*(rlnPeer: WakuRLNRelay, handler: RegistrationEventHandler) {.async, gcsafe.} = +proc handleGroupUpdates*(rlnPeer: WakuRLNRelay) {.async, gcsafe.} = # mounts the supplied handler for the registration events emitting from the membership contract - await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, ethAccountAddress = rlnPeer.ethAccountAddress, contractAddress = rlnPeer.membershipContractAddress, handler = handler) - + let handler = generateGroupUpdateHandler(rlnPeer) + await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, + ethAccountAddress = rlnPeer.ethAccountAddress, + contractAddress = rlnPeer.membershipContractAddress, + handler = handler) + proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string, contentTopic: ContentTopic, spamHandler: Option[SpamHandler] = none(SpamHandler)) = ## this procedure is a thin wrapper for the pubsub addValidator method @@ -1056,8 +1086,7 @@ proc mountRlnRelayStatic*(node: WakuNode, node.addRLNRelayValidator(pubsubTopic, contentTopic, spamHandler) debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic - node.wakuRlnRelay = rlnPeer - + node.wakuRlnRelay = rlnPeer proc mountRlnRelayDynamic*(node: WakuNode, ethClientAddr: string = "", @@ -1124,17 +1153,7 @@ proc mountRlnRelayDynamic*(node: WakuNode, pubsubTopic: pubsubTopic, contentTopic: contentTopic) - - proc handler(pubkey: Uint256, index: Uint256) = - debug "a new key is added", pubkey=pubkey - # assuming all the members arrive in order - let pk = pubkey.toIDCommitment() - let isSuccessful = rlnPeer.insertMember(pk) - debug "received pk", pk=pk.inHex, index=index - debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex) - doAssert(isSuccessful.isOk()) - - asyncSpawn rlnPeer.handleGroupUpdates(handler) + asyncSpawn rlnPeer.handleGroupUpdates() debug "dynamic group management is started" # adds a topic validator for the supplied pubsub topic at the relay protocol # messages published on this pubsub topic will be relayed upon a successful validation, otherwise they will be dropped From bd43a8a92984960fda39f466169ed1afa341f80a Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 25 Oct 2022 12:38:27 +0530 Subject: [PATCH 2/5] fix(rln-relay): clean up subscribeToMemberRegistrations proc --- .../waku_rln_relay/waku_rln_relay_utils.nim | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim index b15b6a0bf6..6448a9d8b9 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim @@ -940,15 +940,25 @@ proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = return ok() return handler -proc subscribeToMemberRegistrations(web3: Web3, contractAddress: Address, fromBlock: string = "0x0", handler: GroupUpdateHandler): Future[Subscription] {.async, gcsafe.} = +proc subscribeToMemberRegistrations(web3: Web3, + contractAddress: Address, + fromBlock: string = "0x0", + handler: GroupUpdateHandler): Future[Subscription] {.async, gcsafe.} = var contractObj = web3.contractSender(MembershipContract, contractAddress) - return await contractObj.subscribe(MemberRegistered, %*{"fromBlock": fromBlock, "address": contractAddress}) do(pubkey: Uint256, index: Uint256){.gcsafe.}: - debug "onRegister", pubkey = pubkey, index = index - let groupUpdateRes = handler(pubkey, index) - if groupUpdateRes.isErr(): - error "Error handling new member registration: ", err=groupUpdateRes.error() - do (err: CatchableError): - error "Error from subscription: ", err=err.msg + + let onMemberRegistered = proc (pubkey: Uint256, index: Uint256) {.gcsafe.} = + debug "onRegister", pubkey = pubkey, index = index + let groupUpdateRes = handler(pubkey, index) + if groupUpdateRes.isErr(): + error "Error handling new member registration: ", err=groupUpdateRes.error() + + let onError = proc (err: CatchableError) = + error "Error in subscription", err=err.msg + + return await contractObj.subscribe(MemberRegistered, + %*{"fromBlock": fromBlock, "address": contractAddress}, + onMemberRegistered, + onError) proc subscribeToGroupEvents(ethClientUri: string, ethAccountAddress: Option[Address] = none(Address), From 3b76d3f8c91032eb4b15b3edb7755771a23ea4c2 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Tue, 25 Oct 2022 16:57:45 +0530 Subject: [PATCH 3/5] fix(rln-relay): tests --- tests/v2/test_waku_rln_relay_onchain.nim | 9 +++++++-- waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim | 6 +++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/v2/test_waku_rln_relay_onchain.nim b/tests/v2/test_waku_rln_relay_onchain.nim index f1243a3f63..9b1c927b70 100644 --- a/tests/v2/test_waku_rln_relay_onchain.nim +++ b/tests/v2/test_waku_rln_relay_onchain.nim @@ -205,7 +205,7 @@ procSuite "Waku-rln-relay": debug "membership commitment key", pk2 = pk2 var events = [newFuture[void](), newFuture[void]()] - proc handler(pubkey: Uint256, index: Uint256) = + proc handler(pubkey: Uint256, index: Uint256): RlnRelayResult[void] = debug "handler is called", pubkey = pubkey, index = index if pubkey == pk: events[0].complete() @@ -214,9 +214,14 @@ procSuite "Waku-rln-relay": let isSuccessful = rlnPeer.rlnInstance.insertMember(pubkey.toIDCommitment()) check: isSuccessful + return ok() # mount the handler for listening to the contract events - await rlnPeer.handleGroupUpdates(handler) + await subscribeToGroupEvents(ethClientUri = EthClient, + ethAccountAddress = some(accounts[0]), + contractAddress = contractAddress, + blockNumber = "0x0", + handler = handler) # register a member to the contract let tx = await contractObj.register(pk).send(value = MembershipFee) diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim index 6448a9d8b9..6b69e7a586 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim @@ -913,7 +913,7 @@ proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResul return err(memberAdded.error()) return ok() -type GroupUpdateHandler = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.gcsafe, raises: [Defect].} +type GroupUpdateHandler* = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.gcsafe, raises: [Defect].} proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = # assuming all the members arrive in order @@ -950,7 +950,7 @@ proc subscribeToMemberRegistrations(web3: Web3, debug "onRegister", pubkey = pubkey, index = index let groupUpdateRes = handler(pubkey, index) if groupUpdateRes.isErr(): - error "Error handling new member registration: ", err=groupUpdateRes.error() + error "Error handling new member registration", err=groupUpdateRes.error() let onError = proc (err: CatchableError) = error "Error in subscription", err=err.msg @@ -960,7 +960,7 @@ proc subscribeToMemberRegistrations(web3: Web3, onMemberRegistered, onError) -proc subscribeToGroupEvents(ethClientUri: string, +proc subscribeToGroupEvents*(ethClientUri: string, ethAccountAddress: Option[Address] = none(Address), contractAddress: Address, blockNumber: string = "0x0", From 750199eeb35cc4e63ec354a5096ee929768e99c3 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Thu, 27 Oct 2022 17:25:59 +0530 Subject: [PATCH 4/5] fix(rln-relay): unnecessary try-except --- waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim index 6b69e7a586..38c6572a7f 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim @@ -926,11 +926,7 @@ proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = pk = pubkey.toIDCommitment() except: return err("invalid pubkey") - var isSuccessful: RlnRelayResult[void] - try: - isSuccessful = rlnPeer.insertMember(pk) - except: - return err("failed to insert member") + let isSuccessful = rlnPeer.insertMember(pk) if isSuccessful.isErr(): return err("failed to add a new member to the Merkle tree") else: From 582a6c322025d6d125f97ad434311170bb454c26 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Fri, 28 Oct 2022 12:00:22 +0530 Subject: [PATCH 5/5] fix(rln-relay): proc descriptions, logging --- .../waku_rln_relay/waku_rln_relay_utils.nim | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim index 38c6572a7f..d20f123988 100644 --- a/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim +++ b/waku/v2/protocol/waku_rln_relay/waku_rln_relay_utils.nim @@ -916,9 +916,9 @@ proc addAll*(wakuRlnRelay: WakuRLNRelay, list: seq[IDCommitment]): RlnRelayResul type GroupUpdateHandler* = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.gcsafe, raises: [Defect].} proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = - # assuming all the members arrive in order - # TODO: check the index and the pubkey depending on - # the group update operation + ## assuming all the members arrive in order + ## TODO: check the index and the pubkey depending on + ## the group update operation var handler: GroupUpdateHandler handler = proc(pubkey: Uint256, index: Uint256): RlnRelayResult[void] {.raises: [Defect].} = var pk: IDCommitment @@ -932,7 +932,10 @@ proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = else: debug "new member added to the Merkle tree", pubkey=pubkey, index=index debug "acceptable window", validRoots=rlnPeer.validMerkleRoots.mapIt(it.inHex) - rlnPeer.lastSeenMembershipIndex = index.toMembershipIndex() + let membershipIndex = index.toMembershipIndex() + if rlnPeer.lastSeenMembershipIndex != membershipIndex + 1: + warn "membership index gap, may have lost connection", gap = membershipIndex - rlnPeer.lastSeenMembershipIndex + rlnPeer.lastSeenMembershipIndex = membershipIndex return ok() return handler @@ -940,6 +943,11 @@ proc subscribeToMemberRegistrations(web3: Web3, contractAddress: Address, fromBlock: string = "0x0", handler: GroupUpdateHandler): Future[Subscription] {.async, gcsafe.} = + ## subscribes to member registrations, on a given membership group contract + ## `fromBlock` indicates the block number from which the subscription starts + ## `handler` is a callback that is called when a new member is registered + ## the callback is called with the pubkey and the index of the new member + ## TODO: need a similar proc for member deletions var contractObj = web3.contractSender(MembershipContract, contractAddress) let onMemberRegistered = proc (pubkey: Uint256, index: Uint256) {.gcsafe.} = @@ -988,7 +996,8 @@ proc subscribeToGroupEvents*(ethClientUri: string, proc handleGroupUpdates*(rlnPeer: WakuRLNRelay) {.async, gcsafe.} = - # mounts the supplied handler for the registration events emitting from the membership contract + ## generates the groupUpdateHandler which is called when a new member is registered, + ## and has the WakuRLNRelay instance as a closure let handler = generateGroupUpdateHandler(rlnPeer) await subscribeToGroupEvents(ethClientUri = rlnPeer.ethClientAddress, ethAccountAddress = rlnPeer.ethAccountAddress,