diff --git a/tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim b/tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim index 30ecd0cc91..711b8699de 100644 --- a/tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim +++ b/tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim @@ -6,7 +6,7 @@ else: {.push raises: [].} import - std/[options, osproc, streams, strutils, tables], + std/[options, osproc, streams, strutils], stew/[results, byteutils], stew/shims/net as stewNet, testutils/unittests, @@ -516,11 +516,8 @@ suite "Onchain group manager": manager.validRootBuffer.len() == 1 # We can now simulate a chain reorg by calling backfillRootQueue - var blockTable = default(BlockTable) - blockTable[1.uint] = @[Membership(idCommitment: credentials[4].idCommitment, index: 4.uint)] - let expectedLastRoot = manager.validRootBuffer[0] - await manager.backfillRootQueue(blockTable) + await manager.backfillRootQueue(1) # We should now have 5 roots in the queue, and no partial buffer check: diff --git a/vendor/zerokit b/vendor/zerokit index 584c2cf4c0..c2d386cb74 160000 --- a/vendor/zerokit +++ b/vendor/zerokit @@ -1 +1 @@ -Subproject commit 584c2cf4c000b391ca6b415c09d8399fde329e5c +Subproject commit c2d386cb749f551541bb34c4386a3849485356f9 diff --git a/waku/v2/waku_rln_relay/conversion_utils.nim b/waku/v2/waku_rln_relay/conversion_utils.nim index b6b4809388..d70c706f5c 100644 --- a/waku/v2/waku_rln_relay/conversion_utils.nim +++ b/waku/v2/waku_rln_relay/conversion_utils.nim @@ -90,6 +90,20 @@ proc serialize*(roots: seq[MerkleNode]): seq[byte] = rootsBytes = concat(rootsBytes, @root) return rootsBytes +# Serializes a sequence of MembershipIndex's +proc serialize*(memIndices: seq[MembershipIndex]): seq[byte] = + var memIndicesBytes = newSeq[byte]() + + # serialize the memIndices, with its length prefixed + let len = toBytes(uint64(memIndices.len), Endianness.littleEndian) + memIndicesBytes.add(len) + + for memIndex in memIndices: + let memIndexBytes = toBytes(uint64(memIndex), Endianness.littleEndian) + memIndicesBytes = concat(memIndicesBytes, @memIndexBytes) + + return memIndicesBytes + proc toEpoch*(t: uint64): Epoch = ## converts `t` to `Epoch` in little-endian order let bytes = toBytes(t, Endianness.littleEndian) diff --git a/waku/v2/waku_rln_relay/group_manager/group_manager_base.nim b/waku/v2/waku_rln_relay/group_manager/group_manager_base.nim index ac06baecff..545b8ae29f 100644 --- a/waku/v2/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/v2/waku_rln_relay/group_manager/group_manager_base.nim @@ -83,6 +83,10 @@ method withdraw*(g: GroupManager, identitySecretHash: IdentitySecretHash): Futur method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretHash]): Future[void] {.base,gcsafe.} = raise newException(CatchableError, "withdrawBatch proc for " & $g.type & " is not implemented yet") +# This proc is used to insert and remove a set of commitments from the merkle tree +method atomicBatch*(g: GroupManager, idCommitments: seq[IDCommitment], toRemoveIndices: seq[MembershipIndex]): Future[void] {.base,gcsafe.} = + raise newException(CatchableError, "atomicBatch proc for " & $g.type & " is not implemented yet") + # This proc is used to set a callback that will be called when an identity commitment is withdrawn # The callback may be called multiple times, and should be used to for any post processing method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} = diff --git a/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim index 3ea9c228c0..25147c672b 100644 --- a/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -67,25 +67,17 @@ template initializedGuard(g: OnchainGroupManager): untyped = method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} = initializedGuard(g) - let memberInserted = g.rlnInstance.insertMember(idCommitment) - if not memberInserted: - raise newException(ValueError,"member insertion failed") + await g.registerBatch(@[idCommitment]) - if g.registerCb.isSome(): - await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)]) - - g.validRootBuffer = g.slideRootQueue() - - g.latestIndex += 1 - - return - -method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = +method atomicBatch*(g: OnchainGroupManager, + idCommitments = newSeq[IDCommitment](), + toRemoveIndices = newSeq[MembershipIndex]()): Future[void] {.async.} = initializedGuard(g) - let membersInserted = g.rlnInstance.insertMembers(g.latestIndex, idCommitments) - if not membersInserted: - raise newException(ValueError, "Failed to insert members into the merkle tree") + let startIndex = g.latestIndex + let operationSuccess = g.rlnInstance.atomicWrite(some(startIndex), idCommitments, toRemoveIndices) + if not operationSuccess: + raise newException(ValueError, "atomic batch operation failed") if g.registerCb.isSome(): var membersSeq = newSeq[Membership]() @@ -100,7 +92,12 @@ method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): g.latestIndex += MembershipIndex(idCommitments.len()) - return + +method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = + initializedGuard(g) + + await g.atomicBatch(idCommitments) + method register*(g: OnchainGroupManager, identityCredentials: IdentityCredential): Future[void] {.async.} = initializedGuard(g) @@ -154,7 +151,7 @@ method withdraw*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[voi method withdrawBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = initializedGuard(g) - # TODO: after slashing is enabled on the contract + # TODO: after slashing is enabled on the contract, use atomicBatch internally proc parseEvent(event: type MemberRegistered, log: JsonNode): GroupManagerResult[Membership] = @@ -178,28 +175,23 @@ proc parseEvent(event: type MemberRegistered, except CatchableError: return err("failed to parse the data field of the MemberRegistered event") -type BlockTable* = OrderedTable[BlockNumber, seq[Membership]] - -proc backfillRootQueue*(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} = - if blocktable.len() > 0: - for blockNumber, members in blocktable.pairs(): - let deletionSuccess = g.rlnInstance.removeMembers(members.mapIt(it.index)) - debug "deleting members to reconcile state" - if not deletionSuccess: - error "failed to delete members from the tree", success=deletionSuccess - raise newException(ValueError, "failed to delete member from the tree, tree is inconsistent") - # backfill the tree's acceptable roots - for i in 0..blocktable.len()-1: - # remove the last root - g.validRoots.popLast() - for i in 0..blockTable.len()-1: - # add the backfilled root - g.validRoots.addLast(g.validRootBuffer.popLast()) - -proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Membership) = - if blockTable.hasKeyOrPut(blockNumber, @[member]): +type BlockTable* = OrderedTable[BlockNumber, seq[(Membership, bool)]] + +proc backfillRootQueue*(g: OnchainGroupManager, len: uint): Future[void] {.async.} = + if len > 0: + # backfill the tree's acceptable roots + for i in 0..len-1: + # remove the last root + g.validRoots.popLast() + for i in 0..len-1: + # add the backfilled root + g.validRoots.addLast(g.validRootBuffer.popLast()) + +proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Membership, removed: bool) = + let memberTuple = (member, removed) + if blockTable.hasKeyOrPut(blockNumber, @[memberTuple]): try: - blockTable[blockNumber].add(member) + blockTable[blockNumber].add(memberTuple) except KeyError: # qed error "could not insert member into block table", blockNumber=blockNumber, member=member @@ -226,19 +218,18 @@ proc getRawEvents(g: OnchainGroupManager, toBlock = some(normalizedToBlock.blockId())) return events -proc getBlockTables(g: OnchainGroupManager, +proc getBlockTable(g: OnchainGroupManager, fromBlock: BlockNumber, - toBlock: Option[BlockNumber] = none(BlockNumber)): Future[(BlockTable, BlockTable)] {.async.} = + toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} = initializedGuard(g) var blockTable = default(BlockTable) - var toRemoveBlockTable = default(BlockTable) let events = await g.getRawEvents(fromBlock, toBlock) if events.len == 0: debug "no events found" - return (blockTable, toRemoveBlockTable) + return blockTable for event in events: let blockNumber = parseHexInt(event["blockNumber"].getStr()).uint @@ -248,52 +239,45 @@ proc getBlockTables(g: OnchainGroupManager, error "failed to parse the MemberRegistered event", error=parsedEventRes.error() raise newException(ValueError, "failed to parse the MemberRegistered event") let parsedEvent = parsedEventRes.get() + blockTable.insert(blockNumber, parsedEvent, removed) - if removed: - # remove the registration from the tree, per block - warn "member removed from the tree as per canonical chain", index=parsedEvent.index - toRemoveBlockTable.insert(blockNumber, parsedEvent) - else: - blockTable.insert(blockNumber, parsedEvent) - - return (blockTable, toRemoveBlockTable) + return blockTable -proc handleValidEvents(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} = +proc handleEvents(g: OnchainGroupManager, + blockTable: BlockTable): Future[void] {.async.} = initializedGuard(g) for blockNumber, members in blockTable.pairs(): - let latestIndex = g.latestIndex - let startingIndex = members[0].index try: - await g.registerBatch(members.mapIt(it.idCommitment)) + await g.atomicBatch(idCommitments = members.mapIt(it[0].idCommitment), + toRemoveIndices = members.filterIt(it[1]).mapIt(it[0].index)) except CatchableError: error "failed to insert members into the tree", error=getCurrentExceptionMsg() raise newException(ValueError, "failed to insert members into the tree") trace "new members added to the Merkle tree", commitments=members.mapIt(it.idCommitment.inHex()) , startingIndex=startingIndex - let lastIndex = startingIndex + members.len.uint - 1 - let indexGap = startingIndex - latestIndex - if not (toSeq(startingIndex..lastIndex) == members.mapIt(it.index)): - raise newException(ValueError, "membership indices are not sequential") - if indexGap != 1.uint and lastIndex != latestIndex and startingIndex != 0.uint: - warn "membership index gap, may have lost connection", lastIndex, currIndex=latestIndex, indexGap = indexGap g.latestProcessedBlock = some(blockNumber) return -proc handleRemovedEvents(g: OnchainGroupManager, toRemoveBlockTable: BlockTable): Future[void] {.async.} = +proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} = initializedGuard(g) - await g.backfillRootQueue(toRemoveBlockTable) + # count number of blocks that have been removed + var numRemovedBlocks: uint = 0 + for blockNumber, members in blockTable.pairs(): + if members.anyIt(it[1]): + numRemovedBlocks += 1 + + await g.backfillRootQueue(numRemovedBlocks) proc getAndHandleEvents(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} = initializedGuard(g) - let (validEvents, removedEvents) = await g.getBlockTables(fromBlock, toBlock) - await g.handleRemovedEvents(removedEvents) - await g.handleValidEvents(validEvents) - return + let blockTable = await g.getBlockTable(fromBlock, toBlock) + await g.handleEvents(blockTable) + await g.handleRemovedEvents(blockTable) proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler = proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} = @@ -435,7 +419,7 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = try: membershipFee = await contract.MEMBERSHIP_DEPOSIT().call() except CatchableError: - raise newException(ValueError, "could not get the membership deposit") + raise newException(ValueError, "could not get the membership deposit: {}") g.ethRpc = some(ethRpc) diff --git a/waku/v2/waku_rln_relay/group_manager/static/group_manager.nim b/waku/v2/waku_rln_relay/group_manager/static/group_manager.nim index b70f9f600e..757364bb2b 100644 --- a/waku/v2/waku_rln_relay/group_manager/static/group_manager.nim +++ b/waku/v2/waku_rln_relay/group_manager/static/group_manager.nim @@ -52,17 +52,8 @@ method startGroupSync*(g: StaticGroupManager): Future[void] = method register*(g: StaticGroupManager, idCommitment: IDCommitment): Future[void] {.async.} = initializedGuard(g) - let memberInserted = g.rlnInstance.insertMember(idCommitment) - if not memberInserted: - raise newException(ValueError, "Failed to insert member into the merkle tree") + await g.registerBatch(@[idCommitment]) - discard g.slideRootQueue() - - g.latestIndex += 1 - - if g.registerCb.isSome(): - await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)]) - return method registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} = initializedGuard(g) @@ -74,12 +65,12 @@ method registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]): if g.registerCb.isSome(): var memberSeq = newSeq[Membership]() for i in 0..