Skip to content

Commit

Permalink
chore(rln-relay): gracefully handle chain forks (#1623)
Browse files Browse the repository at this point in the history
* chore(rln-relay): gracefully handle chain forks

* fix(rln-relay): better root windowing

* fix(rln-relay): better future generation for test

* fix(rln-relay): reduced width

* fix: better naming of futs, collision free
  • Loading branch information
rymnc authored Mar 31, 2023
1 parent 11ff93c commit 00a3812
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 24 deletions.
62 changes: 57 additions & 5 deletions tests/v2/waku_rln_relay/test_rln_group_manager_onchain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ else:
{.push raises: [].}

import
std/[options, osproc, streams, strutils],
std/[options, osproc, streams, strutils, tables],
stew/[results, byteutils],
stew/shims/net as stewNet,
testutils/unittests,
Expand Down Expand Up @@ -246,17 +246,20 @@ suite "Onchain group manager":

asyncTest "startGroupSync: should fetch history correctly":
let manager = await setup()
let credentials = generateCredentials(manager.rlnInstance, 5)
const credentialCount = 6
let credentials = generateCredentials(manager.rlnInstance, credentialCount)
await manager.init()

let merkleRootBeforeRes = manager.rlnInstance.getMerkleRoot()
require:
merkleRootBeforeRes.isOk()
let merkleRootBefore = merkleRootBeforeRes.get()

var futures = [newFuture[void](), newFuture[void](), newFuture[void](), newFuture[void](), newFuture[void]()]

proc generateCallback(futs: array[0..4, Future[system.void]], credentials: seq[IdentityCredential]): OnRegisterCallback =
type TestGroupSyncFuts = array[0..credentialCount - 1, Future[void]]
var futures: TestGroupSyncFuts
for i in 0 ..< futures.len():
futures[i] = newFuture[void]()
proc generateCallback(futs: TestGroupSyncFuts, credentials: seq[IdentityCredential]): OnRegisterCallback =
var futureIndex = 0
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
if registrations.len == 1 and
Expand All @@ -281,6 +284,7 @@ suite "Onchain group manager":

check:
merkleRootBefore != merkleRootAfter
manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize

asyncTest "register: should guard against uninitialized state":
let manager = await setup()
Expand Down Expand Up @@ -477,6 +481,54 @@ suite "Onchain group manager":
check:
verifiedRes.get() == false

asyncTest "backfillRootQueue: should backfill roots in event of chain reorg":
let manager = await setup()
const credentialCount = 6
let credentials = generateCredentials(manager.rlnInstance, credentialCount)
await manager.init()

type TestBackfillFuts = array[0..credentialCount - 1, Future[void]]
var futures: TestBackfillFuts
for i in 0 ..< futures.len():
futures[i] = newFuture[void]()

proc generateCallback(futs: TestBackfillFuts, credentials: seq[IdentityCredential]): OnRegisterCallback =
var futureIndex = 0
proc callback(registrations: seq[Membership]): Future[void] {.async.} =
if registrations.len == 1 and
registrations[0].idCommitment == credentials[futureIndex].idCommitment and
registrations[0].index == MembershipIndex(futureIndex + 1):
futs[futureIndex].complete()
futureIndex += 1
return callback

manager.onRegister(generateCallback(futures, credentials))
await manager.startGroupSync()

for i in 0 ..< credentials.len():
await manager.register(credentials[i])

await allFutures(futures)

# At this point, we should have a full root queue, 5 roots, and partial buffer of 1 root
require:
manager.validRoots.len() == credentialCount - 1
manager.validRootBuffer.len() == 1

# We can now simulate a chain reorg by calling backfillRootQueue
var blockTable = default(BlockTable)
blockTable[1.uint] = @[Membership(idCommitment: credentials[4].idCommitment, index: 4.uint)]

let expectedLastRoot = manager.validRootBuffer[0]
await manager.backfillRootQueue(blockTable)

# We should now have 5 roots in the queue, and no partial buffer
check:
manager.validRoots.len() == credentialCount - 1
manager.validRootBuffer.len() == 0
manager.validRoots[credentialCount - 2] == expectedLastRoot


################################
## Terminating/removing Ganache
################################
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import
../protocol_types,
../constants,
../rln
import
options,
Expand Down Expand Up @@ -87,18 +88,18 @@ method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretH
method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} =
g.withdrawCb = some(cb)

# Acceptable roots for merkle root validation of incoming messages
const AcceptableRootWindowSize* = 5

proc updateValidRootQueue*(rootQueue: var Deque[MerkleNode], root: MerkleNode): void =
proc slideRootQueue*(rootQueue: var Deque[MerkleNode], root: MerkleNode): seq[MerkleNode] =
## updates the root queue with the latest root and pops the oldest one when the capacity of `AcceptableRootWindowSize` is reached
let overflowCount = rootQueue.len() - AcceptableRootWindowSize
if overflowCount >= 0:
# Delete the oldest `overflowCount` elements in the deque (index 0..`overflowCount`)
for i in 0..overflowCount:
rootQueue.popFirst()
let overflowCount = rootQueue.len() - AcceptableRootWindowSize + 1
var overflowedRoots = newSeq[MerkleNode]()
if overflowCount > 0:
# Delete the oldest `overflowCount` roots in the deque (index 0..`overflowCount`)
# insert into overflowedRoots seq and return
for i in 0 ..< overflowCount:
overFlowedRoots.add(rootQueue.popFirst())
# Push the next root into the queue
rootQueue.addLast(root)
return overFlowedRoots

method indexOfRoot*(g: GroupManager, root: MerkleNode): int {.base,gcsafe,raises:[].} =
## returns the index of the root in the merkle tree.
Expand All @@ -112,12 +113,18 @@ method validateRoot*(g: GroupManager, root: MerkleNode): bool {.base,gcsafe,rais
return true
return false

template updateValidRootQueue*(g: GroupManager) =
template slideRootQueue*(g: GroupManager): untyped =
let rootRes = g.rlnInstance.getMerkleRoot()
if rootRes.isErr():
raise newException(ValueError, "failed to get merkle root")
let rootAfterUpdate = rootRes.get()
updateValidRootQueue(g.validRoots, rootAfterUpdate)

var rootBuffer: Deque[MerkleNode]
let overflowedRoots = slideRootQueue(g.validRoots, rootAfterUpdate)
if overflowedRoots.len > 0:
for root in overflowedRoots:
discard rootBuffer.slideRootQueue(root)
rootBuffer

method verifyProof*(g: GroupManager,
input: openArray[byte],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type
keystorePassword*: Option[string]
saveKeystore*: bool
registrationHandler*: Option[RegistrationHandler]
# this buffer exists to backfill appropriate roots for the merkle tree,
# in event of a reorg. we store 5 in the buffer. Maybe need to revisit this,
# because the average reorg depth is 1 to 2 blocks.
validRootBuffer*: Deque[MerkleNode]

const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password"
Expand All @@ -70,7 +74,7 @@ method register*(g: OnchainGroupManager, idCommitment: IDCommitment): Future[voi
if g.registerCb.isSome():
await g.registerCb.get()(@[Membership(idCommitment: idCommitment, index: g.latestIndex)])

g.updateValidRootQueue()
g.validRootBuffer = g.slideRootQueue()

g.latestIndex += 1

Expand All @@ -92,7 +96,7 @@ method registerBatch*(g: OnchainGroupManager, idCommitments: seq[IDCommitment]):
membersSeq.add(member)
await g.registerCb.get()(membersSeq)

g.updateValidRootQueue()
g.validRootBuffer = g.slideRootQueue()

g.latestIndex += MembershipIndex(idCommitments.len())

Expand Down Expand Up @@ -176,6 +180,22 @@ proc parseEvent*(event: type MemberRegistered,

type BlockTable* = OrderedTable[BlockNumber, seq[Membership]]

proc backfillRootQueue*(g: OnchainGroupManager, blockTable: BlockTable): Future[void] {.async.} =
if blocktable.len() > 0:
for blockNumber, members in blocktable.pairs():
let deletionSuccess = g.rlnInstance.removeMembers(members.mapIt(it.index))
debug "deleting members to reconcile state"
if not deletionSuccess:
error "failed to delete members from the tree", success=deletionSuccess
raise newException(ValueError, "failed to delete member from the tree, tree is inconsistent")
# backfill the tree's acceptable roots
for i in 0..blocktable.len()-1:
# remove the last root
g.validRoots.popLast()
for i in 0..blockTable.len()-1:
# add the backfilled root
g.validRoots.addLast(g.validRootBuffer.popLast())

proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
initializedGuard(g)

Expand All @@ -193,19 +213,32 @@ proc getEvents*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[
normalizedToBlock = fromBlock

var blockTable = default(BlockTable)
var toRemoveBlockTable = default(BlockTable)

let events = await rlnContract.getJsonLogs(MemberRegistered, fromBlock = some(fromBlock.blockId()), toBlock = some(normalizedToBlock.blockId()))
if events.len == 0:
debug "no events found"
return blockTable

for event in events:
let blockNumber = parseHexInt(event["blockNumber"].getStr()).uint
let removed = event["removed"].getBool()
let parsedEventRes = parseEvent(MemberRegistered, event)
if parsedEventRes.isErr():
error "failed to parse the MemberRegistered event", error=parsedEventRes.error()
raise newException(ValueError, "failed to parse the MemberRegistered event")
let parsedEvent = parsedEventRes.get()

if removed:
# remove the registration from the tree, per block
warn "member removed from the tree as per canonical chain", index=parsedEvent.index
if toRemoveBlockTable.hasKey(blockNumber):
toRemoveBlockTable[blockNumber].add(parsedEvent)
else:
toRemoveBlockTable[blockNumber] = @[parsedEvent]

await g.backfillRootQueue(toRemoveBlockTable)

if blockTable.hasKey(blockNumber):
blockTable[blockNumber].add(parsedEvent)
else:
Expand All @@ -221,8 +254,8 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu
let startingIndex = members[0].index
try:
await g.registerBatch(members.mapIt(it.idCommitment))
except:
error "failed to insert members into the tree"
except CatchableError:
error "failed to insert members into the tree", error=getCurrentExceptionMsg()
raise newException(ValueError, "failed to insert members into the tree")
trace "new members added to the Merkle tree", commitments=members.mapIt(it.idCommitment.inHex()) , startingIndex=startingIndex
let lastIndex = startingIndex + members.len.uint - 1
Expand All @@ -235,7 +268,9 @@ proc seedBlockTableIntoTree*(g: OnchainGroupManager, blockTable: BlockTable): Fu

return

proc getEventsAndSeedIntoTree*(g: OnchainGroupManager, fromBlock: BlockNumber, toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
proc getEventsAndSeedIntoTree*(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
initializedGuard(g)

let events = await g.getEvents(fromBlock, toBlock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ method init*(g: StaticGroupManager): Future[void] {.async,gcsafe.} =
if not membersInserted:
raise newException(ValueError, "Failed to insert members into the merkle tree")

g.updateValidRootQueue()
discard g.slideRootQueue()

g.latestIndex += MembershipIndex(idCommitments.len() - 1)

Expand All @@ -56,7 +56,7 @@ method register*(g: StaticGroupManager, idCommitment: IDCommitment): Future[void
if not memberInserted:
raise newException(ValueError, "Failed to insert member into the merkle tree")

g.updateValidRootQueue()
discard g.slideRootQueue()

g.latestIndex += 1

Expand All @@ -77,7 +77,7 @@ method registerBatch*(g: StaticGroupManager, idCommitments: seq[IDCommitment]):
memberSeq.add(Membership(idCommitment: idCommitments[i], index: g.latestIndex + MembershipIndex(i)))
await g.registerCb.get()(memberSeq)

g.updateValidRootQueue()
discard g.slideRootQueue()

g.latestIndex += MembershipIndex(idCommitments.len() - 1)

Expand Down
7 changes: 7 additions & 0 deletions waku/v2/protocol/waku_rln_relay/rln/wrappers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ proc removeMember*(rlnInstance: ptr RLN, index: MembershipIndex): bool =
let deletion_success = delete_member(rlnInstance, index)
return deletion_success

proc removeMembers*(rlnInstance: ptr RLN, indices: seq[MembershipIndex]): bool =
for index in indices:
let deletion_success = delete_member(rlnInstance, index)
if not deletion_success:
return false
return true

proc getMerkleRoot*(rlnInstance: ptr RLN): MerkleNodeResult =
# read the Merkle Tree root after insertion
var
Expand Down

0 comments on commit 00a3812

Please sign in to comment.