Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rln-relay): missed roots during sync #2015

Merged
merged 1 commit into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions tests/waku_rln_relay/test_rln_group_manager_onchain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ suite "Onchain group manager":
manager.initialized
manager.rlnContractDeployedBlockNumber > 0

await manager.stop()

asyncTest "should error on initialization when loaded metadata does not match":
let manager = await setup()
await manager.init()
Expand Down Expand Up @@ -220,12 +222,14 @@ suite "Onchain group manager":

await manager.init()
await manager.startGroupSync()
await manager.stop()

asyncTest "startGroupSync: should guard against uninitialized state":
let manager = await setup()

expect(ValueError):
await manager.startGroupSync()
await manager.stop()

asyncTest "startGroupSync: should sync to the state of the group":
let manager = await setup()
Expand Down Expand Up @@ -262,6 +266,7 @@ suite "Onchain group manager":

check:
merkleRootBefore != merkleRootAfter
await manager.stop()

asyncTest "startGroupSync: should fetch history correctly":
let manager = await setup()
Expand Down Expand Up @@ -303,13 +308,15 @@ suite "Onchain group manager":
check:
merkleRootBefore != merkleRootAfter
manager.validRootBuffer.len() == credentialCount - AcceptableRootWindowSize
await manager.stop()

asyncTest "register: should guard against uninitialized state":
let manager = await setup()
let dummyCommitment = default(IDCommitment)

expect(ValueError):
await manager.register(dummyCommitment)
await manager.stop()

asyncTest "register: should register successfully":
let manager = await setup()
Expand All @@ -329,6 +336,7 @@ suite "Onchain group manager":
check:
merkleRootAfter.inHex() != merkleRootBefore.inHex()
manager.latestIndex == 1
await manager.stop()

asyncTest "register: callback is called":
let manager = await setup()
Expand All @@ -354,13 +362,15 @@ suite "Onchain group manager":

check:
manager.rlnInstance.getMetadata().get().validRoots == manager.validRoots.toSeq()
await manager.stop()

asyncTest "withdraw: should guard against uninitialized state":
let manager = await setup()
let idSecretHash = generateCredentials(manager.rlnInstance).idSecretHash

expect(ValueError):
await manager.withdraw(idSecretHash)
await manager.stop()

asyncTest "validateRoot: should validate good root":
let manager = await setup()
Expand Down Expand Up @@ -402,6 +412,7 @@ suite "Onchain group manager":

check:
validated
await manager.stop()

asyncTest "validateRoot: should reject bad root":
let manager = await setup()
Expand Down Expand Up @@ -432,6 +443,7 @@ suite "Onchain group manager":

check:
validated == false
await manager.stop()

asyncTest "verifyProof: should verify valid proof":
let manager = await setup()
Expand Down Expand Up @@ -474,6 +486,7 @@ suite "Onchain group manager":

check:
verifiedRes.get()
await manager.stop()

asyncTest "verifyProof: should reject invalid proof":
let manager = await setup()
Expand Down Expand Up @@ -510,6 +523,7 @@ suite "Onchain group manager":

check:
verifiedRes.get() == false
await manager.stop()

asyncTest "backfillRootQueue: should backfill roots in event of chain reorg":
let manager = await setup()
Expand Down Expand Up @@ -554,6 +568,7 @@ suite "Onchain group manager":
manager.validRoots.len() == credentialCount - 1
manager.validRootBuffer.len() == 0
manager.validRoots[credentialCount - 2] == expectedLastRoot
await manager.stop()

asyncTest "isReady should return false if ethRpc is none":
var manager = await setup()
Expand All @@ -563,13 +578,15 @@ suite "Onchain group manager":

check:
(await manager.isReady()) == false
await manager.stop()

asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed":
var manager = await setup()
await manager.init()

check:
(await manager.isReady()) == false
await manager.stop()

asyncTest "isReady should return true if ethRpc is ready":
var manager = await setup()
Expand All @@ -579,6 +596,7 @@ suite "Onchain group manager":

check:
(await manager.isReady()) == true
await manager.stop()


################################
Expand Down
64 changes: 21 additions & 43 deletions waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -232,30 +232,20 @@ proc insert(blockTable: var BlockTable, blockNumber: BlockNumber, member: Member

proc getRawEvents(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[JsonNode] {.async.} =
toBlock: BlockNumber): Future[JsonNode] {.async.} =
initializedGuard(g)

let ethRpc = g.ethRpc.get()
let rlnContract = g.rlnContract.get()

var normalizedToBlock: BlockNumber
if toBlock.isSome():
var value = toBlock.get()
if value == 0:
# set to latest block
value = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
normalizedToBlock = value
else:
normalizedToBlock = fromBlock

let events = await rlnContract.getJsonLogs(MemberRegistered,
fromBlock = some(fromBlock.blockId()),
toBlock = some(normalizedToBlock.blockId()))
toBlock = some(toBlock.blockId()))
return events

proc getBlockTable(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[BlockTable] {.async.} =
fromBlock: BlockNumber,
toBlock: BlockNumber): Future[BlockTable] {.async.} =
initializedGuard(g)

var blockTable = default(BlockTable)
Expand Down Expand Up @@ -311,23 +301,14 @@ proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable): Future

proc getAndHandleEvents(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
toBlock: BlockNumber): Future[void] {.async.} =
initializedGuard(g)
proc getLatestBlockNumber(): BlockNumber =
if toBlock.isSome():
# if toBlock = 0, that implies the latest block
# which is the case when we are syncing block-by-block
# therefore, toBlock = fromBlock + 1
# if toBlock != 0, then we are chunking blocks
# therefore, toBlock = fromBlock + blockChunkSize (which is handled)
return max(fromBlock + 1, toBlock.get())
return fromBlock

let blockTable = await g.getBlockTable(fromBlock, toBlock)
await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)

g.latestProcessedBlock = getLatestBlockNumber()
g.latestProcessedBlock = toBlock
let metadataSetRes = g.setMetadata()
if metadataSetRes.isErr():
# this is not a fatal error, hence we don't raise an exception
Expand All @@ -337,11 +318,13 @@ proc getAndHandleEvents(g: OnchainGroupManager,

proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
let latestBlock = blockheader.number.uint
let latestBlock = BlockNumber(blockheader.number)
trace "block received", blockNumber = latestBlock
# get logs from the last block
try:
asyncSpawn g.getAndHandleEvents(latestBlock)
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
asyncSpawn g.getAndHandleEvents(fromBlock, latestBlock)
except CatchableError:
warn "failed to handle log: ", error=getCurrentExceptionMsg()
return newHeadCallback
Expand All @@ -368,28 +351,23 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
let blockChunkSize = 2_000

var fromBlock = if g.latestProcessedBlock > g.rlnContractDeployedBlockNumber:
info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock
g.latestProcessedBlock + 1
else:
info "starting onchain sync from deployed block number", deployedBlockNumber = g.rlnContractDeployedBlockNumber
g.rlnContractDeployedBlockNumber

let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
try:
# we always want to sync from last processed block => latest
if fromBlock == BlockNumber(0) or
fromBlock + BlockNumber(blockChunkSize) < latestBlock:
# chunk events
while true:
let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber())
let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
info "chunking events", fromBlock = fromBlock, toBlock = toBlock
await g.getAndHandleEvents(fromBlock, some(toBlock))
fromBlock = toBlock + 1
if fromBlock >= currentLatestBlock:
break
else:
await g.getAndHandleEvents(fromBlock, some(BlockNumber(0)))
# chunk events
while true:
let currentLatestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
if fromBlock >= currentLatestBlock:
break

let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
await g.getAndHandleEvents(fromBlock, toBlock)
fromBlock = toBlock + 1

except CatchableError:
raise newException(ValueError, "failed to get the history/reconcile missed blocks: " & getCurrentExceptionMsg())

Expand Down