diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 6738ae4596..2fbe197a5b 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -460,8 +460,13 @@ proc setupProtocols(node: WakuNode, except CatchableError: return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) - if conf.rlnRelay: + var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = + ## Action to be taken when an internal error occurs during the node run. + ## e.g. the connection with the database is lost and not recovered. + error "Unrecoverable error occurred", error = msg + quit(QuitFailure) + if conf.rlnRelay: when defined(rln_v2): let rlnConf = WakuRlnConfig( rlnRelayDynamic: conf.rlnRelayDynamic, @@ -472,6 +477,7 @@ proc setupProtocols(node: WakuNode, rlnRelayCredPassword: conf.rlnRelayCredPassword, rlnRelayTreePath: conf.rlnRelayTreePath, rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit, + onFatalErrorAction: onFatalErrorAction, ) else: let rlnConf = WakuRlnConfig( @@ -482,6 +488,7 @@ proc setupProtocols(node: WakuNode, rlnRelayCredPath: conf.rlnRelayCredPath, rlnRelayCredPassword: conf.rlnRelayCredPassword, rlnRelayTreePath: conf.rlnRelayTreePath, + onFatalErrorAction: onFatalErrorAction, ) try: @@ -490,18 +497,12 @@ proc setupProtocols(node: WakuNode, return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) if conf.store: - var onErrAction = proc(msg: string) {.gcsafe, closure.} = - ## Action to be taken when an internal error occurs during the node run. - ## e.g. the connection with the database is lost and not recovered. - error "Unrecoverable error occurred", error = msg - quit(QuitFailure) - # Archive setup let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration, conf.storeMaxNumDbConnections, - onErrAction) + onFatalErrorAction) if archiveDriverRes.isErr(): return err("failed to setup archive driver: " & archiveDriverRes.error) diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index 2a444a78df..3fa1dd92ff 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -62,7 +62,7 @@ proc configureStore(node: WakuNode, Future[Result[void, string]] {.async.} = ## This snippet is extracted/duplicated from the app.nim file - var onErrAction = proc(msg: string) {.gcsafe, closure.} = + var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = ## Action to be taken when an internal error occurs during the node run. ## e.g. the connection with the database is lost and not recovered. # error "Unrecoverable error occurred", error = msg @@ -74,7 +74,7 @@ proc configureStore(node: WakuNode, storeVacuum, storeDbMigration, storeMaxNumDbConnections, - onErrAction) + onFatalErrorAction) if archiveDriverRes.isErr(): return err("failed to setup archive driver: " & archiveDriverRes.error) diff --git a/waku/common/error_handling.nim b/waku/common/error_handling.nim new file mode 100644 index 0000000000..8aa36c80e0 --- /dev/null +++ b/waku/common/error_handling.nim @@ -0,0 +1,2 @@ +type + OnFatalErrorHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].} \ No newline at end of file diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 01d4613f91..db30f202ea 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -979,7 +979,7 @@ proc mountRlnRelay*(node: WakuNode, raise newException(CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay") let rlnRelayRes = waitFor WakuRlnRelay.new(rlnConf, - registrationHandler) + registrationHandler) if rlnRelayRes.isErr(): raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error) let rlnRelay = rlnRelayRes.get() diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index 3a9262f482..a315ea0b52 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -9,6 +9,7 @@ import chronos import ../waku_core, + ../common/error_handling, ./common const DefaultPageSize*: uint = 25 @@ -16,7 +17,6 @@ const DefaultPageSize*: uint = 25 type ArchiveDriverResult*[T] = Result[T, string] ArchiveDriver* = ref object of RootObj - OnErrHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].} type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp) diff --git a/waku/waku_archive/driver/builder.nim b/waku/waku_archive/driver/builder.nim index 976769c875..e853b63a62 100644 --- a/waku/waku_archive/driver/builder.nim +++ b/waku/waku_archive/driver/builder.nim @@ -12,6 +12,7 @@ import ../driver, ../../common/databases/dburl, ../../common/databases/db_sqlite, + ../../common/error_handling, ./sqlite_driver, ./sqlite_driver/migrations as archive_driver_sqlite_migrations, ./queue_driver @@ -29,13 +30,13 @@ proc new*(T: type ArchiveDriver, vacuum: bool, migrate: bool, maxNumConn: int, - onErrAction: OnErrHandler): + onFatalErrorAction: OnFatalErrorHandler): Result[T, string] = ## url - string that defines the database ## vacuum - if true, a cleanup operation will be applied to the database ## migrate - if true, the database schema will be updated ## maxNumConn - defines the maximum number of connections to handle simultaneously (Postgres) - ## onErrAction - called if, e.g., the connection with db got lost + ## onFatalErrorAction - called if, e.g., the connection with db got lost let dbUrlValidationRes = dburl.validateDbUrl(url) if dbUrlValidationRes.isErr(): @@ -85,7 +86,7 @@ proc new*(T: type ArchiveDriver, when defined(postgres): let res = PostgresDriver.new(dbUrl = url, maxConnections = maxNumConn, - onErrAction = onErrAction) + onFatalErrorAction = onFatalErrorAction) if res.isErr(): return err("failed to init postgres archive driver: " & res.error) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index a2450c6ce5..f5adbfb2c4 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -11,6 +11,7 @@ import chronos, chronicles import + ../../../common/error_handling, ../../../waku_core, ../../common, ../../driver, @@ -89,7 +90,7 @@ const DefaultMaxNumConns = 50 proc new*(T: type PostgresDriver, dbUrl: string, maxConnections = DefaultMaxNumConns, - onErrAction: OnErrHandler = nil): + onFatalErrorAction: OnFatalErrorHandler = nil): ArchiveDriverResult[T] = ## Very simplistic split of max connections @@ -101,11 +102,11 @@ proc new*(T: type PostgresDriver, let writeConnPool = PgAsyncPool.new(dbUrl, maxNumConnOnEachPool).valueOr: return err("error creating write conn pool PgAsyncPool") - if not isNil(onErrAction): - asyncSpawn checkConnectivity(readConnPool, onErrAction) + if not isNil(onFatalErrorAction): + asyncSpawn checkConnectivity(readConnPool, onFatalErrorAction) - if not isNil(onErrAction): - asyncSpawn checkConnectivity(writeConnPool, onErrAction) + if not isNil(onFatalErrorAction): + asyncSpawn checkConnectivity(writeConnPool, onFatalErrorAction) return ok(PostgresDriver(writeConnPool: writeConnPool, readConnPool: readConnPool)) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim b/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim index e2df5f473f..1559db2a5a 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim @@ -8,7 +8,8 @@ import stew/results import ../../driver, - ../../../common/databases/db_postgres + ../../../common/databases/db_postgres, + ../../../common/error_handling ## Simple query to validate that the postgres is working and attending requests const HealthCheckQuery = "SELECT version();" @@ -17,7 +18,7 @@ const MaxNumTrials = 20 const TrialInterval = 1.seconds proc checkConnectivity*(connPool: PgAsyncPool, - onErrAction: OnErrHandler) {.async.} = + onFatalErrorAction: OnFatalErrorHandler) {.async.} = while true: @@ -29,7 +30,7 @@ proc checkConnectivity*(connPool: PgAsyncPool, block errorBlock: ## Force close all the opened connections. No need to close gracefully. (await connPool.resetConnPool()).isOkOr: - onErrAction("checkConnectivity resetConnPool error: " & error) + onFatalErrorAction("checkConnectivity resetConnPool error: " & error) var numTrial = 0 while numTrial < MaxNumTrials: @@ -42,6 +43,6 @@ proc checkConnectivity*(connPool: PgAsyncPool, numTrial.inc() ## The connection couldn't be resumed. Let's inform the upper layers. - onErrAction("postgres health check error: " & error) + onFatalErrorAction("postgres health check error: " & error) await sleepAsync(CheckConnectivityInterval) diff --git a/waku/waku_rln_relay/group_manager/group_manager_base.nim b/waku/waku_rln_relay/group_manager/group_manager_base.nim index 1c5cd8e5c6..a8f7ca55cc 100644 --- a/waku/waku_rln_relay/group_manager/group_manager_base.nim +++ b/waku/waku_rln_relay/group_manager/group_manager_base.nim @@ -1,4 +1,5 @@ import + ../../common/error_handling, ../protocol_types, ../protocol_metrics, ../constants, @@ -44,6 +45,7 @@ type initialized*: bool latestIndex*: MembershipIndex validRoots*: Deque[MerkleNode] + onFatalErrorAction*: OnFatalErrorHandler when defined(rln_v2): userMessageLimit*: Option[UserMessageLimit] diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index d393b4ec13..22b392967a 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -113,6 +113,10 @@ template initializedGuard(g: OnchainGroupManager): untyped = if not g.initialized: raise newException(CatchableError, "OnchainGroupManager is not initialized") +template retryWrapper(g: OnchainGroupManager, res: auto, errStr: string, body: untyped): auto = + retryWrapper(res, RetryStrategy.new(), errStr, g.onFatalErrorAction): + body + proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] = try: @@ -234,19 +238,19 @@ when defined(rln_v2): let membershipFee = g.membershipFee.get() var gasPrice: int - retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"): + g.retryWrapper(gasPrice, "Failed to get gas price"): int(await ethRpc.provider.eth_gasPrice()) * 2 let idCommitment = identityCredential.idCommitment.toUInt256() var txHash: TxHash let storageIndex = g.usingStorageIndex.get() debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex, userMessageLimit = userMessageLimit - retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"): + g.retryWrapper(txHash, "Failed to register the member"): await registryContract.register(storageIndex, idCommitment, u256(userMessageLimit)).send(gasPrice = gasPrice) # wait for the transaction to be mined var tsReceipt: ReceiptObject - retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"): + g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"): await ethRpc.getMinedTransactionReceipt(txHash) debug "registration transaction mined", txHash = txHash g.registrationTxHash = some(txHash) @@ -282,19 +286,19 @@ else: let membershipFee = g.membershipFee.get() var gasPrice: int - retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"): + g.retryWrapper(gasPrice, "Failed to get gas price"): int(await ethRpc.provider.eth_gasPrice()) * 2 let idCommitment = credentials.idCommitment.toUInt256() var txHash: TxHash let storageIndex = g.usingStorageIndex.get() debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex - retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"): + g.retryWrapper(txHash, "Failed to register the member"): await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice) # wait for the transaction to be mined var tsReceipt: ReceiptObject - retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"): + g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"): await ethRpc.getMinedTransactionReceipt(txHash) debug "registration transaction mined", txHash = txHash g.registrationTxHash = some(txHash) @@ -393,7 +397,7 @@ proc getRawEvents(g: OnchainGroupManager, let rlnContract = g.rlnContract.get() var events: JsonNode - retryWrapper(events, RetryStrategy.new(), "Failed to get the events"): + g.retryWrapper(events, "Failed to get the events"): await rlnContract.getJsonLogs(MemberRegistered, fromBlock = some(fromBlock.blockId()), toBlock = some(toBlock.blockId())) @@ -486,7 +490,7 @@ proc getAndHandleEvents(g: OnchainGroupManager, return true -proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void = +proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) = g.blockFetchingActive = false proc runIntervalLoop() {.async, gcsafe.} = @@ -494,10 +498,13 @@ proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void = while g.blockFetchingActive: var retCb: bool - retryWrapper(retCb, RetryStrategy.new(), "Failed to run the interval loop"): + g.retryWrapper(retCb, "Failed to run the interval block fetching loop"): await cb() await sleepAsync(interval) + # using asyncSpawn is OK here since + # we make use of the error handling provided by + # OnFatalErrorHandler asyncSpawn runIntervalLoop() @@ -505,7 +512,7 @@ proc getNewBlockCallback(g: OnchainGroupManager): proc = let ethRpc = g.ethRpc.get() proc wrappedCb(): Future[bool] {.async, gcsafe.} = var latestBlock: BlockNumber - retryWrapper(latestBlock, RetryStrategy.new(), "Failed to get the latest block number"): + g.retryWrapper(latestBlock, "Failed to get the latest block number"): cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) if latestBlock <= g.latestProcessedBlock: @@ -514,7 +521,7 @@ proc getNewBlockCallback(g: OnchainGroupManager): proc = # inc by 1 to prevent double processing let fromBlock = g.latestProcessedBlock + 1 var handleBlockRes: bool - retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle new block"): + g.retryWrapper(handleBlockRes, "Failed to handle new block"): await g.getAndHandleEvents(fromBlock, latestBlock) return true return wrappedCb @@ -548,7 +555,7 @@ proc startOnchainSync(g: OnchainGroupManager): # chunk events while true: var currentLatestBlock: BlockNumber - retryWrapper(currentLatestBlock, RetryStrategy.new(), "Failed to get the latest block number"): + g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"): cast[BlockNumber](await ethRpc.provider.eth_blockNumber()) if fromBlock >= currentLatestBlock: break @@ -556,7 +563,7 @@ proc startOnchainSync(g: OnchainGroupManager): let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock) debug "fetching events", fromBlock = fromBlock, toBlock = toBlock var handleBlockRes: bool - retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle old blocks"): + g.retryWrapper(handleBlockRes, "Failed to handle old blocks"): await g.getAndHandleEvents(fromBlock, toBlock) fromBlock = toBlock + 1 @@ -588,11 +595,11 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = method init*(g: OnchainGroupManager): Future[void] {.async.} = var ethRpc: Web3 # check if the Ethereum client is reachable - retryWrapper(ethRpc, RetryStrategy.new(), "Failed to connect to the Ethereum client"): + g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"): await newWeb3(g.ethClientUrl) # Set the chain id var chainId: Quantity - retryWrapper(chainId, RetryStrategy.new(), "Failed to get the chain id"): + g.retryWrapper(chainId, "Failed to get the chain id"): await ethRpc.provider.eth_chainId() g.chainId = some(chainId) @@ -609,12 +616,12 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = # get the current storage index var usingStorageIndex: Uint16 - retryWrapper(usingStorageIndex, RetryStrategy.new(), "Failed to get the storage index"): + g.retryWrapper(usingStorageIndex, "Failed to get the storage index"): await registryContract.usingStorageIndex().call() g.usingStorageIndex = some(usingStorageIndex) var rlnContractAddress: Address - retryWrapper(rlnContractAddress, RetryStrategy.new(), "Failed to get the rln contract address"): + g.retryWrapper(rlnContractAddress, "Failed to get the rln contract address"): await registryContract.storages(usingStorageIndex).call() let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress) @@ -670,12 +677,12 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = # check if the contract exists by calling a static function var membershipFee: Uint256 - retryWrapper(membershipFee, RetryStrategy.new(), "Failed to get the membership deposit"): + g.retryWrapper(membershipFee, "Failed to get the membership deposit"): await rlnContract.MEMBERSHIP_DEPOSIT().call() g.membershipFee = some(membershipFee) var deployedBlockNumber: Uint256 - retryWrapper(deployedBlockNumber, RetryStrategy.new(), "Failed to get the deployed block number"): + g.retryWrapper(deployedBlockNumber, "Failed to get the deployed block number"): await rlnContract.deployedBlockNumber().call() debug "using rln storage", deployedBlockNumber, rlnContractAddress g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber) @@ -686,15 +693,16 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} = let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber) info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock var newEthRpc: Web3 - retryWrapper(newEthRpc, RetryStrategy.new(), "Failed to reconnect with the Ethereum client"): + g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"): await newWeb3(g.ethClientUrl) newEthRpc.ondisconnect = ethRpc.ondisconnect g.ethRpc = some(newEthRpc) try: - asyncSpawn g.startOnchainSync() - except CatchableError: - error "failed to restart group sync", error = getCurrentExceptionMsg() + await g.startOnchainSync() + except CatchableError, Exception: + g.onFatalErrorAction("failed to restart group sync" & ": " & getCurrentExceptionMsg()) + ethRpc.ondisconnect = proc() = asyncSpawn onDisconnect() @@ -719,7 +727,7 @@ proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} = let ethRpc = g.ethRpc.get() var syncing: JsonNode - retryWrapper(syncing, RetryStrategy.new(), "Failed to get the syncing status"): + g.retryWrapper(syncing, "Failed to get the syncing status"): await ethRpc.provider.eth_syncing() return syncing.getBool() @@ -731,7 +739,7 @@ method isReady*(g: OnchainGroupManager): return false var currentBlock: BlockNumber - retryWrapper(currentBlock, RetryStrategy.new(), "Failed to get the current block number"): + g.retryWrapper(currentBlock, "Failed to get the current block number"): cast[BlockNumber](await g.ethRpc .get() .provider diff --git a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim index d0a621dc50..5a1c5ec4f1 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim @@ -1,6 +1,7 @@ +import + ../../../common/error_handling import chronos - type RetryStrategy* = object shouldRetry*: bool @@ -18,6 +19,7 @@ proc new*(T: type RetryStrategy): RetryStrategy = template retryWrapper*(res: auto, retryStrategy: RetryStrategy, errStr: string, + errCallback: OnFatalErrorHandler = nil, body: untyped): auto = var retryCount = retryStrategy.retryCount var shouldRetry = retryStrategy.shouldRetry @@ -32,4 +34,8 @@ template retryWrapper*(res: auto, exceptionMessage = getCurrentExceptionMsg() await sleepAsync(retryStrategy.retryDelay) if shouldRetry: - raise newException(CatchableError, errStr & ": " & exceptionMessage) + if errCallback == nil: + raise newException(CatchableError, errStr & " errCallback == nil: " & exceptionMessage) + else: + errCallback(errStr & ": " & exceptionMessage) + return diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 174bf6e34f..98765120b9 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -25,6 +25,7 @@ when defined(rln_v2): import ./nonce_manager import + ../common/error_handling, ../waku_relay, # for WakuRelayHandler ../waku_core, ../waku_keystore, @@ -33,16 +34,18 @@ import logScope: topics = "waku rln_relay" -type WakuRlnConfig* = object - rlnRelayDynamic*: bool - rlnRelayCredIndex*: Option[uint] - rlnRelayEthContractAddress*: string - rlnRelayEthClientAddress*: string - rlnRelayCredPath*: string - rlnRelayCredPassword*: string - rlnRelayTreePath*: string - when defined(rln_v2): - rlnRelayUserMessageLimit*: uint64 +type + WakuRlnConfig* = object + rlnRelayDynamic*: bool + rlnRelayCredIndex*: Option[uint] + rlnRelayEthContractAddress*: string + rlnRelayEthClientAddress*: string + rlnRelayCredPath*: string + rlnRelayCredPassword*: string + rlnRelayTreePath*: string + onFatalErrorAction*: OnFatalErrorHandler + when defined(rln_v2): + rlnRelayUserMessageLimit*: uint64 proc createMembershipList*(rln: ptr RLN, n: int): RlnRelayResult[( seq[RawMembershipCredentials], string @@ -84,10 +87,11 @@ type WakuRLNRelay* = ref object of RootObj nullifierLog*: OrderedTable[Epoch, seq[ProofMetadata]] lastEpoch*: Epoch # the epoch of the last published rln message groupManager*: GroupManager + onFatalErrorAction*: OnFatalErrorHandler when defined(rln_v2): nonceManager: NonceManager -method stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} = +proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} = ## stops the rln-relay protocol ## Throws an error if it cannot stop the rln-relay protocol @@ -370,6 +374,7 @@ proc mount(conf: WakuRlnConfig, ): Future[WakuRlnRelay] {.async: (raises: [Exception]).} = var groupManager: GroupManager + wakuRlnRelay: WakuRLNRelay # create an RLN instance let rlnInstanceRes = createRLNInstance(tree_path = conf.rlnRelayTreePath) if rlnInstanceRes.isErr(): @@ -383,7 +388,8 @@ proc mount(conf: WakuRlnConfig, groupManager = StaticGroupManager(groupSize: StaticGroupSize, groupKeys: parsedGroupKeysRes.get(), membershipIndex: conf.rlnRelayCredIndex, - rlnInstance: rlnInstance) + rlnInstance: rlnInstance, + onFatalErrorAction: conf.onFatalErrorAction) # we don't persist credentials in static mode since they exist in ./constants.nim else: # dynamic setup @@ -398,7 +404,9 @@ proc mount(conf: WakuRlnConfig, registrationHandler: registrationHandler, keystorePath: rlnRelayCredPath, keystorePassword: rlnRelayCredPassword, - membershipIndex: conf.rlnRelayCredIndex) + membershipIndex: conf.rlnRelayCredIndex, + onFatalErrorAction: conf.onFatalErrorAction) + # Initialize the groupManager await groupManager.init() # Start the group sync @@ -406,9 +414,12 @@ proc mount(conf: WakuRlnConfig, when defined(rln_v2): return WakuRLNRelay(groupManager: groupManager, - nonceManager: NonceManager.init(conf.rlnRelayUserMessageLimit)) + nonceManager: NonceManager.init(conf.rlnRelayUserMessageLimit), + onFatalErrorAction: conf.onFatalErrorAction) else: - return WakuRLNRelay(groupManager: groupManager) + return WakuRLNRelay(groupManager: groupManager, + onFatalErrorAction: conf.onFatalErrorAction) + proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} = ## returns true if the rln-relay protocol is ready to relay messages