From cd3d0f6abe03a630d367148fb47a974594d5cbc0 Mon Sep 17 00:00:00 2001 From: rymnc <43716372+rymnc@users.noreply.github.com> Date: Mon, 24 Jul 2023 17:16:37 +0530 Subject: [PATCH] feat(rln-relay): close db connection appropriately --- vendor/zerokit | 2 +- waku/v2/node/waku_node.nim | 4 ++++ .../group_manager/group_manager_base.nim | 3 +++ .../group_manager/on_chain/group_manager.nim | 24 +++++++++++++++---- .../group_manager/static/group_manager.nim | 7 ++++++ waku/v2/waku_rln_relay/rln/rln_interface.nim | 5 ++++ waku/v2/waku_rln_relay/rln_relay.nim | 7 ++++++ 7 files changed, 47 insertions(+), 5 deletions(-) diff --git a/vendor/zerokit b/vendor/zerokit index c6b7a8c0a4..9d4ed68450 160000 --- a/vendor/zerokit +++ b/vendor/zerokit @@ -1 +1 @@ -Subproject commit c6b7a8c0a401dc9a3f5b0511ebfb8727fc19b53a +Subproject commit 9d4ed68450e20626081e24490ba320278f6d3f7b diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index df8b3b0a5f..57d0d7a848 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -828,4 +828,8 @@ proc stop*(node: WakuNode) {.async.} = await node.switch.stop() node.peerManager.stop() + when defined(rln): + if not node.wakuRlnRelay.isNil(): + await node.wakuRlnRelay.stop() + node.started = false diff --git a/waku/v2/waku_rln_relay/group_manager/group_manager_base.nim b/waku/v2/waku_rln_relay/group_manager/group_manager_base.nim index 31878632d1..0dc1ceb307 100644 --- a/waku/v2/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/v2/waku_rln_relay/group_manager/group_manager_base.nim @@ -89,6 +89,9 @@ method withdrawBatch*(g: GroupManager, identitySecretHashes: seq[IdentitySecretH method atomicBatch*(g: GroupManager, idCommitments: seq[IDCommitment], toRemoveIndices: seq[MembershipIndex]): Future[void] {.base,gcsafe.} = raise newException(CatchableError, "atomicBatch proc for " & $g.type & " is not implemented yet") +method stop*(g: GroupManager): Future[void] {.base,gcsafe.} = + raise newException(CatchableError, "stop proc for " & $g.type & " is not implemented yet") + # This proc is used to set a callback that will be called when an identity commitment is withdrawn # The callback may be called multiple times, and should be used to for any post processing method onWithdraw*(g: GroupManager, cb: OnWithdrawCallback) {.base,gcsafe.} = diff --git a/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim index 8f484eca0c..0cf68f54fd 100644 --- a/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/v2/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -62,7 +62,8 @@ type const DefaultKeyStorePath* = "rlnKeystore.json" const DefaultKeyStorePassword* = "password" -const BlockChunkSize* = 100'u64 +const DecayFactor* = 1.2 +const DefaultChunkSize* = 1000 template initializedGuard(g: OnchainGroupManager): untyped = if not g.initialized: @@ -320,28 +321,33 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} = let ethRpc = g.ethRpc.get() + # the block chunk size decays exponentially with the number of blocks + # the minimum chunk size is 100 + var blockChunkSize = 1_000_000 + var fromBlock = if g.latestProcessedBlock.isSome(): info "resuming onchain sync from block", fromBlock = g.latestProcessedBlock.get() g.latestProcessedBlock.get() else: info "starting onchain sync from scratch" - # chunk size is 1000 blocks BlockNumber(0) let latestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) try: # we always want to sync from last processed block => latest if fromBlock == BlockNumber(0) or - fromBlock + BlockNumber(BlockChunkSize) < latestBlock: + fromBlock + BlockNumber(blockChunkSize) < latestBlock: # chunk events while true: let currentLatestBlock = cast[BlockNumber](await g.ethRpc.get().provider.eth_blockNumber()) - let toBlock = min(fromBlock + BlockNumber(BlockChunkSize), currentLatestBlock) + let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) info "chunking events", fromBlock = fromBlock, toBlock = toBlock await g.getAndHandleEvents(fromBlock, some(toBlock)) fromBlock = toBlock + 1 if fromBlock >= currentLatestBlock: break + let newChunkSize = float(blockChunkSize) / DecayFactor + blockChunkSize = max(int(newChunkSize), DefaultChunkSize) else: await g.getAndHandleEvents(fromBlock, some(BlockNumber(0))) except CatchableError: @@ -496,3 +502,13 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = error "failed to restart group sync", error = getCurrentExceptionMsg() g.initialized = true + +method stop*(g: OnchainGroupManager): Future[void] {.async.} = + if g.ethRpc.isSome(): + g.ethRpc.get().ondisconnect = nil + await g.ethRpc.get().close() + let flushed = g.rlnInstance.flush() + if not flushed: + error "failed to flush to the tree db" + + g.initialized = false \ No newline at end of file diff --git a/waku/v2/waku_rln_relay/group_manager/static/group_manager.nim b/waku/v2/waku_rln_relay/group_manager/static/group_manager.nim index 3a3d831230..c2ccae8c21 100644 --- a/waku/v2/waku_rln_relay/group_manager/static/group_manager.nim +++ b/waku/v2/waku_rln_relay/group_manager/static/group_manager.nim @@ -102,3 +102,10 @@ method onRegister*(g: StaticGroupManager, cb: OnRegisterCallback) {.gcsafe.} = method onWithdraw*(g: StaticGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = g.withdrawCb = some(cb) + +method stop*(g: StaticGroupManager): Future[void] = + initializedGuard(g) + # No-op + var retFut = newFuture[void]("StaticGroupManager.stop") + retFut.complete() + return retFut diff --git a/waku/v2/waku_rln_relay/rln/rln_interface.nim b/waku/v2/waku_rln_relay/rln/rln_interface.nim index 34def0f289..828b205484 100644 --- a/waku/v2/waku_rln_relay/rln/rln_interface.nim +++ b/waku/v2/waku_rln_relay/rln/rln_interface.nim @@ -188,3 +188,8 @@ proc get_metadata*(ctx: ptr RLN, output_buffer: ptr Buffer): bool {.importc: "ge ## gets the metadata stored by ctx and populates the passed pointer output_buffer with it ## the output_buffer holds the metadata as a byte seq ## the return bool value indicates the success or failure of the operation + +proc flush*(ctx: ptr RLN): bool {.importc: "flush".} +## flushes the write buffer to the database +## the return bool value indicates the success or failure of the operation +## This allows more robust and graceful handling of the database connection \ No newline at end of file diff --git a/waku/v2/waku_rln_relay/rln_relay.nim b/waku/v2/waku_rln_relay/rln_relay.nim index 0c04c18a06..a8deccf917 100644 --- a/waku/v2/waku_rln_relay/rln_relay.nim +++ b/waku/v2/waku_rln_relay/rln_relay.nim @@ -90,6 +90,13 @@ type WakuRLNRelay* = ref object of RootObj groupManager*: GroupManager messageBucket*: Option[TokenBucket] +method stop*(rlnPeer: WakuRLNRelay) {.async.} = + ## stops the rln-relay protocol + ## Throws an error if it cannot stop the rln-relay protocol + + # stop the group sync, and flush data to tree db + await rlnPeer.groupManager.stop() + proc hasDuplicate*(rlnPeer: WakuRLNRelay, proofMetadata: ProofMetadata): RlnRelayResult[bool] = ## returns true if there is another message in the `nullifierLog` of the `rlnPeer` with the same