Skip to content

Commit

Permalink
feat(rln-relay): use new atomic_operation ffi api (#1733)
Browse files Browse the repository at this point in the history
* chore(rln-relay): bump zerokit

* feat(rln-relay): use new atomic_operations ffi api

* fix(rln-relay): static gm
  • Loading branch information
rymnc authored May 18, 2023
1 parent 665484c commit 611e953
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 107 deletions.
7 changes: 2 additions & 5 deletions tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ else:
{.push raises: [].}

import
std/[options, osproc, streams, strutils, tables],
std/[options, osproc, streams, strutils],
stew/[results, byteutils],
stew/shims/net as stewNet,
testutils/unittests,
Expand Down Expand Up @@ -516,11 +516,8 @@ suite "Onchain group manager":
manager.validRootBuffer.len() == 1

# We can now simulate a chain reorg by calling backfillRootQueue
var blockTable = default(BlockTable)
blockTable[1.uint] = @[Membership(idCommitment: credentials[4].idCommitment, index: 4.uint)]

let expectedLastRoot = manager.validRootBuffer[0]
await manager.backfillRootQueue(blockTable)
await manager.backfillRootQueue(1)

# We should now have 5 roots in the queue, and no partial buffer
check:
Expand Down
14 changes: 14 additions & 0 deletions waku/v2/waku_rln_relay/conversion_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ proc serialize*(roots: seq[MerkleNode]): seq[byte] =
rootsBytes = concat(rootsBytes, @root)
return rootsBytes

# Serializes a sequence of MembershipIndex's
proc serialize*(memIndices: seq[MembershipIndex]): seq[byte] =
var memIndicesBytes = newSeq[byte]()

# serialize the memIndices, with its length prefixed
let len = toBytes(uint64(memIndices.len), Endianness.littleEndian)
memIndicesBytes.add(len)

for memIndex in memIndices:
let memIndexBytes = toBytes(uint64(memIndex), Endianness.littleEndian)
memIndicesBytes = concat(memIndicesBytes, @memIndexBytes)

return memIndicesBytes

proc toEpoch*(t: uint64): Epoch =
## converts `t` to `Epoch` in little-endian order
let bytes = toBytes(t, Endianness.littleEndian)
Expand Down
4 changes: 4 additions & 0 deletions waku/v2/waku_rln_relay/group_manager/group_manager_base.nim
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ method withdraw*(g: GroupManager, identitySecretHash: IdentitySecretHash): Futur
method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretHash]): Future[void] {.base,gcsafe.} =
raise newException(CatchableError, "withdrawBatch proc for " & $g.type & " is not implemented yet")

# This proc is used to insert and remove a set of commitments from the merkle tree
method atomicBatch*(g: GroupManager, idCommitments: seq[IDCommitment], toRemoveIndices: seq[MembershipIndex]): Future[void] {.base,gcsafe.} =
raise newException(CatchableError, "atomicBatch proc for " & $g.type & " is not implemented yet")

# This proc is used to set a callback that will be called when an identity commitment is withdrawn
# The callback may be called multiple times, and should be used to for any post processing
method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} =
Expand Down
120 changes: 52 additions & 68 deletions waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,17 @@ template initializedGuard(g: OnchainGroupManager): untyped =
method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[void] {.async.} =
initializedGuard(g)

let memberInserted = g.rlnInstance.insertMember(idCommitment)
if not memberInserted:
raise newException(ValueError,"member insertion failed")
await g.registerBatch(@[idCommitment])

if g.registerCb.isSome():
await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)])

g.validRootBuffer = g.slideRootQueue()

g.latestIndex += 1

return

method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
method atomicBatch*(g: OnchainGroupManager,
idCommitments = newSeq[IDCommitment](),
toRemoveIndices = newSeq[MembershipIndex]()): Future[void] {.async.} =
initializedGuard(g)

let membersInserted = g.rlnInstance.insertMembers(g.latestIndex, idCommitments)
if not membersInserted:
raise newException(ValueError, "Failed to insert members into the merkle tree")
let startIndex = g.latestIndex
let operationSuccess = g.rlnInstance.atomicWrite(some(startIndex), idCommitments, toRemoveIndices)
if not operationSuccess:
raise newException(ValueError, "atomic batch operation failed")

if g.registerCb.isSome():
var membersSeq = newSeq[Membership]()
Expand All @@ -100,7 +92,12 @@ method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]):

g.latestIndex += MembershipIndex(idCommitments.len())

return

method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
initializedGuard(g)

await g.atomicBatch(idCommitments)


method register*(g: OnchainGroupManager, identityCredentials: IdentityCredential): Future[void] {.async.} =
initializedGuard(g)
Expand Down Expand Up @@ -154,7 +151,7 @@ method withdraw*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[voi
method withdrawBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
initializedGuard(g)

# TODO: after slashing is enabled on the contract
# TODO: after slashing is enabled on the contract, use atomicBatch internally

proc parseEvent(event: type MemberRegistered,
log: JsonNode): GroupManagerResult[Membership] =
Expand All @@ -178,28 +175,23 @@ proc parseEvent(event: type MemberRegistered,
except CatchableError:
return err("failed to parse the data field of the MemberRegistered event")

type BlockTable* = OrderedTable[BlockNumber, seq[Membership]]

proc backfillRootQueue*(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} =
if blocktable.len() > 0:
for blockNumber, members in blocktable.pairs():
let deletionSuccess = g.rlnInstance.removeMembers(members.mapIt(it.index))
debug "deleting members to reconcile state"
if not deletionSuccess:
error "failed to delete members from the tree", success=deletionSuccess
raise newException(ValueError, "failed to delete member from the tree, tree is inconsistent")
# backfill the tree's acceptable roots
for i in 0..blocktable.len()-1:
# remove the last root
g.validRoots.popLast()
for i in 0..blockTable.len()-1:
# add the backfilled root
g.validRoots.addLast(g.validRootBuffer.popLast())

proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Membership) =
if blockTable.hasKeyOrPut(blockNumber, @[member]):
type BlockTable* = OrderedTable[BlockNumber, seq[(Membership, bool)]]

proc backfillRootQueue*(g: OnchainGroupManager, len: uint): Future[void] {.async.} =
if len > 0:
# backfill the tree's acceptable roots
for i in 0..len-1:
# remove the last root
g.validRoots.popLast()
for i in 0..len-1:
# add the backfilled root
g.validRoots.addLast(g.validRootBuffer.popLast())

proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Membership, removed: bool) =
let memberTuple = (member, removed)
if blockTable.hasKeyOrPut(blockNumber, @[memberTuple]):
try:
blockTable[blockNumber].add(member)
blockTable[blockNumber].add(memberTuple)
except KeyError: # qed
error "could not insert member into block table", blockNumber=blockNumber, member=member

Expand All @@ -226,19 +218,18 @@ proc getRawEvents(g: OnchainGroupManager,
toBlock = some(normalizedToBlock.blockId()))
return events

proc getBlockTables(g: OnchainGroupManager,
proc getBlockTable(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[(BlockTable, BlockTable)] {.async.} =
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
initializedGuard(g)

var blockTable = default(BlockTable)
var toRemoveBlockTable = default(BlockTable)

let events = await g.getRawEvents(fromBlock, toBlock)

if events.len == 0:
debug "no events found"
return (blockTable, toRemoveBlockTable)
return blockTable

for event in events:
let blockNumber = parseHexInt(event["blockNumber"].getStr()).uint
Expand All @@ -248,52 +239,45 @@ proc getBlockTables(g: OnchainGroupManager,
error "failed to parse the MemberRegistered event", error=parsedEventRes.error()
raise newException(ValueError, "failed to parse the MemberRegistered event")
let parsedEvent = parsedEventRes.get()
blockTable.insert(blockNumber, parsedEvent, removed)

if removed:
# remove the registration from the tree, per block
warn "member removed from the tree as per canonical chain", index=parsedEvent.index
toRemoveBlockTable.insert(blockNumber, parsedEvent)
else:
blockTable.insert(blockNumber, parsedEvent)

return (blockTable, toRemoveBlockTable)
return blockTable

proc handleValidEvents(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} =
proc handleEvents(g: OnchainGroupManager,
blockTable: BlockTable): Future[void] {.async.} =
initializedGuard(g)

for blockNumber, members in blockTable.pairs():
let latestIndex = g.latestIndex
let startingIndex = members[0].index
try:
await g.registerBatch(members.mapIt(it.idCommitment))
await g.atomicBatch(idCommitments = members.mapIt(it[0].idCommitment),
toRemoveIndices = members.filterIt(it[1]).mapIt(it[0].index))
except CatchableError:
error "failed to insert members into the tree", error=getCurrentExceptionMsg()
raise newException(ValueError, "failed to insert members into the tree")
trace "new members added to the Merkle tree", commitments=members.mapIt(it.idCommitment.inHex()) , startingIndex=startingIndex
let lastIndex = startingIndex + members.len.uint - 1
let indexGap = startingIndex - latestIndex
if not (toSeq(startingIndex..lastIndex) == members.mapIt(it.index)):
raise newException(ValueError, "membership indices are not sequential")
if indexGap != 1.uint and lastIndex != latestIndex and startingIndex != 0.uint:
warn "membership index gap, may have lost connection", lastIndex, currIndex=latestIndex, indexGap = indexGap
g.latestProcessedBlock = some(blockNumber)

return

proc handleRemovedEvents(g: OnchainGroupManager, toRemoveBlockTable: BlockTable): Future[void] {.async.} =
proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} =
initializedGuard(g)

await g.backfillRootQueue(toRemoveBlockTable)
# count number of blocks that have been removed
var numRemovedBlocks: uint = 0
for blockNumber, members in blockTable.pairs():
if members.anyIt(it[1]):
numRemovedBlocks += 1

await g.backfillRootQueue(numRemovedBlocks)

proc getAndHandleEvents(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
initializedGuard(g)

let (validEvents, removedEvents) = await g.getBlockTables(fromBlock, toBlock)
await g.handleRemovedEvents(removedEvents)
await g.handleValidEvents(validEvents)
return
let blockTable = await g.getBlockTable(fromBlock, toBlock)
await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)

proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
Expand Down Expand Up @@ -435,7 +419,7 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
try:
membershipFee = await contract.MEMBERSHIP_DEPOSIT().call()
except CatchableError:
raise newException(ValueError, "could not get the membership deposit")
raise newException(ValueError, "could not get the membership deposit: {}")


g.ethRpc = some(ethRpc)
Expand Down
15 changes: 3 additions & 12 deletions waku/v2/waku_rln_relay/group_manager/static/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,8 @@ method startGroupSync*(g: StaticGroupManager): Future[void] =
method register*(g: StaticGroupManager, idCommitment: IDCommitment): Future[void] {.async.} =
initializedGuard(g)

let memberInserted = g.rlnInstance.insertMember(idCommitment)
if not memberInserted:
raise newException(ValueError, "Failed to insert member into the merkle tree")
await g.registerBatch(@[idCommitment])

discard g.slideRootQueue()

g.latestIndex += 1

if g.registerCb.isSome():
await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)])
return

method registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]): Future[void] {.async.} =
initializedGuard(g)
Expand All @@ -74,12 +65,12 @@ method registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]):
if g.registerCb.isSome():
var memberSeq = newSeq[Membership]()
for i in 0..<idCommitments.len():
memberSeq.add(Membership(idCommitment: idCommitments[i], index: g.latestIndex + MembershipIndex(i)))
memberSeq.add(Membership(idCommitment: idCommitments[i], index: g.latestIndex + MembershipIndex(i) + 1))
await g.registerCb.get()(memberSeq)

discard g.slideRootQueue()

g.latestIndex += MembershipIndex(idCommitments.len() - 1)
g.latestIndex += MembershipIndex(idCommitments.len())

return

Expand Down
10 changes: 6 additions & 4 deletions waku/v2/waku_rln_relay/rln/rln_interface.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ proc init_tree_with_leaves*(ctx: ptr RLN, input_buffer: ptr Buffer): bool {.impo
## leaves are set one after each other starting from index 0
## the return bool value indicates the success or failure of the operation

proc set_leaves_from*(ctx: ptr RLN, index: uint, input_buffer: ptr Buffer): bool {.importc: "set_leaves_from".}
## sets multiple leaves in the tree stored by ctx to the value passed by input_buffer
## the input_buffer holds a serialized vector of leaves (32 bytes each)
## the input_buffer size is prefixed by a 8 bytes integer indicating the number of leaves
proc atomic_write*(ctx: ptr RLN, index: uint, leaves_buffer: ptr Buffer, indices_buffer: ptr Buffer): bool {.importc: "atomic_operation".}
## sets multiple leaves, and zeroes out indices in the tree stored by ctx to the value passed by input_buffer
## the leaves_buffer holds a serialized vector of leaves (32 bytes each)
## the leaves_buffer size is prefixed by a 8 bytes integer indicating the number of leaves
## the indices_bufffer holds a serialized vector of indices (8 bytes each)
## the indices_buffer size is prefixed by a 8 bytes integer indicating the number of indices
## leaves are set one after each other starting from index `index`
## the return bool value indicates the success or failure of the operation

Expand Down
48 changes: 31 additions & 17 deletions waku/v2/waku_rln_relay/rln/wrappers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -230,34 +230,48 @@ proc insertMember*(rlnInstance: ptr RLN, idComm: IDCommitment): bool =
let memberAdded = update_next_member(rlnInstance, pkBufferPtr)
return memberAdded

proc atomicWrite*(rlnInstance: ptr RLN,
index = none(MembershipIndex),
idComms = newSeq[IDCommitment](),
toRemoveIndices = newSeq[MembershipIndex]()): bool =
## Insert multiple members i.e., identity commitments, and remove multiple members
## returns true if the operation is successful
## returns false if the operation fails

let startIndex = if index.isNone(): MembershipIndex(0) else: index.get()

# serialize the idComms
let idCommsBytes = serialize(idComms)
var idCommsBuffer = idCommsBytes.toBuffer()
let idCommsBufferPtr = addr idCommsBuffer

# serialize the toRemoveIndices
let indicesBytes = serialize(toRemoveIndices)
var indicesBuffer = indicesBytes.toBuffer()
let indicesBufferPtr = addr indicesBuffer

let operationSuccess = atomic_write(rlnInstance,
startIndex,
idCommsBufferPtr,
indicesBufferPtr)
return operationSuccess

proc insertMembers*(rlnInstance: ptr RLN,
index: MembershipIndex,
idComms: seq[IDCommitment]): bool =
index: MembershipIndex,
idComms: seq[IDCommitment]): bool =
## Insert multiple members i.e., identity commitments
## returns true if the insertion is successful
## returns false if any of the insertions fails
## Note: This proc is atomic, i.e., if any of the insertions fails, all the previous insertions are rolled back

# serialize the idComms
let idCommsBytes = serialize(idComms)

var idCommsBuffer = idCommsBytes.toBuffer()
let idCommsBufferPtr = addr idCommsBuffer
# add the member to the tree
let membersAdded = set_leaves_from(rlnInstance, index, idCommsBufferPtr)
return membersAdded
return atomicWrite(rlnInstance, some(index), idComms)

proc removeMember*(rlnInstance: ptr RLN, index: MembershipIndex): bool =
let deletion_success = delete_member(rlnInstance, index)
return deletion_success
let deletionSuccess = delete_member(rlnInstance, index)
return deletionSuccess

proc removeMembers*(rlnInstance: ptr RLN, indices: seq[MembershipIndex]): bool =
for index in indices:
let deletion_success = delete_member(rlnInstance, index)
if not deletion_success:
return false
return true
return atomicWrite(rlnInstance, idComms = @[], toRemoveIndices = indices)

proc getMerkleRoot*(rlnInstance: ptr RLN): MerkleNodeResult =
# read the Merkle Tree root after insertion
Expand Down

0 comments on commit 611e953

Please sign in to comment.