Skip to content

Commit

Permalink
chore(rln-relay): add isReady check (#1989)
Browse files Browse the repository at this point in the history
* chore(rln-relay): add isReady check

* fix(rln-relay): multiple parameters for checking if node is in sync

* fix: set latesthead in newHeadCallback

* fix: explicit rpc call

* fix: unhandled exception
  • Loading branch information
rymnc authored Sep 6, 2023
1 parent 483f40c commit 5638bd0
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 7 deletions.
25 changes: 25 additions & 0 deletions tests/waku_rln_relay/test_rln_group_manager_onchain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,31 @@ suite "Onchain group manager":
manager.validRootBuffer.len() == 0
manager.validRoots[credentialCount - 2] == expectedLastRoot

asyncTest "isReady should return false if ethRpc is none":
var manager = await setup()
await manager.init()

manager.ethRpc = none(Web3)

check:
(await manager.isReady()) == false

asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed":
var manager = await setup()
await manager.init()

check:
(await manager.isReady()) == false

asyncTest "isReady should return true if ethRpc is ready":
var manager = await setup()
await manager.init()
# node can only be ready after group sync is done
await manager.startGroupSync()

check:
(await manager.isReady()) == true


################################
## Terminating/removing Ganache
Expand Down
12 changes: 10 additions & 2 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -732,8 +732,8 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message
when defined(rln):
proc mountRlnRelay*(node: WakuNode,
rlnConf: WakuRlnConfig,
spamHandler: Option[SpamHandler] = none(SpamHandler),
registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)) {.async.} =
spamHandler = none(SpamHandler),
registrationHandler = none(RegistrationHandler)) {.async.} =
info "mounting rln relay"

if node.wakuRelay.isNil():
Expand Down Expand Up @@ -903,3 +903,11 @@ proc stop*(node: WakuNode) {.async.} =
await node.wakuRlnRelay.stop()

node.started = false

proc isReady*(node: WakuNode): Future[bool] {.async.} =
when defined(rln):
if node.wakuRlnRelay == nil:
return false
return await node.wakuRlnRelay.isReady()
## TODO: add other protocol `isReady` checks
return true
3 changes: 3 additions & 0 deletions waku/waku_rln_relay/group_manager/group_manager_base.nim
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,6 @@ method generateProof*(g: GroupManager,
if proofGenRes.isErr():
return err("proof generation failed: " & $proofGenRes.error())
return ok(proofGenRes.value())

method isReady*(g: GroupManager): Future[bool] {.base,gcsafe.} =
raise newException(CatchableError, "isReady proc for " & $g.type & " is not implemented yet")
49 changes: 44 additions & 5 deletions waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,21 @@ proc getAndHandleEvents(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: Option[BlockNumber] = none(BlockNumber)): Future[void] {.async.} =
initializedGuard(g)

let blockTable = await g.getBlockTable(fromBlock, toBlock)
proc getLatestBlockNumber(): BlockNumber =
if toBlock.isSome():
# if toBlock = 0, that implies the latest block
# which is the case when we are syncing block-by-block
# therefore, toBlock = fromBlock + 1
# if toBlock != 0, then we are chunking blocks
# therefore, toBlock = fromBlock + blockChunkSize (which is handled)
return max(fromBlock + 1, toBlock.get())
return fromBlock

let blockTable = await g.getBlockTable(fromBlock, toBlock)
await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)

g.latestProcessedBlock = if toBlock.isSome(): toBlock.get()
else: fromBlock
g.latestProcessedBlock = getLatestBlockNumber()
let metadataSetRes = g.setMetadata()
if metadataSetRes.isErr():
# this is not a fatal error, hence we don't raise an exception
Expand Down Expand Up @@ -473,7 +481,6 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
let metadataGetRes = g.rlnInstance.getMetadata()
if metadataGetRes.isErr():
warn "could not initialize with persisted rln metadata"
g.latestProcessedBlock = BlockNumber(0)
else:
let metadata = metadataGetRes.get()
if metadata.chainId != uint64(g.chainId.get()):
Expand All @@ -500,6 +507,7 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
raise newException(ValueError,
"could not get the deployed block number: " & getCurrentExceptionMsg())
g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)
g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)

ethRpc.ondisconnect = proc() =
error "Ethereum client disconnected"
Expand Down Expand Up @@ -528,3 +536,34 @@ method stop*(g: OnchainGroupManager): Future[void] {.async.} =
error "failed to flush to the tree db"

g.initialized = false

proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
let ethRpc = g.ethRpc.get()

try:
let syncing = await ethRpc.provider.eth_syncing()
return syncing.getBool()
except CatchableError:
error "failed to get the syncing status", error = getCurrentExceptionMsg()
return false

method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
initializedGuard(g)

if g.ethRpc.isNone():
return false

var currentBlock: BlockNumber
try:
currentBlock = cast[BlockNumber](await g.ethRpc
.get()
.provider
.eth_blockNumber())
except CatchableError:
error "failed to get the current block number", error = getCurrentExceptionMsg()
return false

if g.latestProcessedBlock < currentBlock:
return false

return not (await g.isSyncing())
6 changes: 6 additions & 0 deletions waku/waku_rln_relay/group_manager/static/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,9 @@ method stop*(g: StaticGroupManager): Future[void] =
var retFut = newFuture[void]("StaticGroupManager.stop")
retFut.complete()
return retFut

method isReady*(g: StaticGroupManager): Future[bool] {.gcsafe.} =
initializedGuard(g)
var retFut = newFuture[bool]("StaticGroupManager.isReady")
retFut.complete(true)
return retFut
12 changes: 12 additions & 0 deletions waku/waku_rln_relay/rln_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,18 @@ proc mount(conf: WakuRlnConfig,

return WakuRLNRelay(groupManager: groupManager)

proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async.} =
## returns true if the rln-relay protocol is ready to relay messages
## returns false otherwise

# could be nil during startup
if rlnPeer.groupManager == nil:
return false
try:
return await rlnPeer.groupManager.isReady()
except CatchableError:
error "could not check if the rln-relay protocol is ready", err = getCurrentExceptionMsg()
return false

proc new*(T: type WakuRlnRelay,
conf: WakuRlnConfig,
Expand Down

0 comments on commit 5638bd0

Please sign in to comment.