Skip to content

Commit

Permalink
fix(rln-relay): graceful shutdown with non-zero exit code
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc committed Feb 14, 2024
1 parent d037705 commit c1ca0f4
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 55 deletions.
15 changes: 8 additions & 7 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,13 @@ proc setupProtocols(node: WakuNode,
except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())

if conf.rlnRelay:
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

if conf.rlnRelay:
when defined(rln_v2):
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
Expand All @@ -472,6 +477,7 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
onErrAction: onErrAction,
)
else:
let rlnConf = WakuRlnConfig(
Expand All @@ -482,6 +488,7 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
onErrAction: onErrAction,
)

try:
Expand All @@ -490,12 +497,6 @@ proc setupProtocols(node: WakuNode,
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())

if conf.store:
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

# Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
Expand Down
2 changes: 2 additions & 0 deletions waku/common/error_handling.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
type
OnFatalErrorHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].}
2 changes: 1 addition & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ proc mountRlnRelay*(node: WakuNode,
raise newException(CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay")

let rlnRelayRes = waitFor WakuRlnRelay.new(rlnConf,
registrationHandler)
registrationHandler)
if rlnRelayRes.isErr():
raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
let rlnRelay = rlnRelayRes.get()
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import
chronos
import
../waku_core,
../common/error_handling,
./common

const DefaultPageSize*: uint = 25

type
ArchiveDriverResult*[T] = Result[T, string]
ArchiveDriver* = ref object of RootObj
OnErrHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].}

type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)

Expand Down
3 changes: 2 additions & 1 deletion waku/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import
../driver,
../../common/databases/dburl,
../../common/databases/db_sqlite,
../../common/error_handling,
./sqlite_driver,
./sqlite_driver/migrations as archive_driver_sqlite_migrations,
./queue_driver
Expand All @@ -29,7 +30,7 @@ proc new*(T: type ArchiveDriver,
vacuum: bool,
migrate: bool,
maxNumConn: int,
onErrAction: OnErrHandler):
onErrAction: OnFatalErrorHandler):
Result[T, string] =
## url - string that defines the database
## vacuum - if true, a cleanup operation will be applied to the database
Expand Down
3 changes: 2 additions & 1 deletion waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import
chronos,
chronicles
import
../../../common/error_handling,
../../../waku_core,
../../common,
../../driver,
Expand Down Expand Up @@ -89,7 +90,7 @@ const DefaultMaxNumConns = 50
proc new*(T: type PostgresDriver,
dbUrl: string,
maxConnections = DefaultMaxNumConns,
onErrAction: OnErrHandler = nil):
onErrAction: OnFatalErrorHandler = nil):
ArchiveDriverResult[T] =

## Very simplistic split of max connections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import
stew/results
import
../../driver,
../../../common/databases/db_postgres
../../../common/databases/db_postgres,
../../../common/error_handling

## Simple query to validate that the postgres is working and attending requests
const HealthCheckQuery = "SELECT version();"
Expand All @@ -17,7 +18,7 @@ const MaxNumTrials = 20
const TrialInterval = 1.seconds

proc checkConnectivity*(connPool: PgAsyncPool,
onErrAction: OnErrHandler) {.async.} =
onErrAction: OnFatalErrorHandler) {.async.} =

while true:

Expand Down
2 changes: 2 additions & 0 deletions waku/waku_rln_relay/group_manager/group_manager_base.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import
../../common/error_handling,
../protocol_types,
../protocol_metrics,
../constants,
Expand Down Expand Up @@ -44,6 +45,7 @@ type
initialized*: bool
latestIndex*: MembershipIndex
validRoots*: Deque[MerkleNode]
onErrAction*: OnFatalErrorHandler
when defined(rln_v2):
userMessageLimit*: Option[UserMessageLimit]

Expand Down
58 changes: 33 additions & 25 deletions waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized:
raise newException(CatchableError, "OnchainGroupManager is not initialized")

template retryWrapper(g: OnchainGroupManager, res: auto, errStr: string, body: untyped): auto =
retryWrapper(res, RetryStrategy.new(), errStr, g.onErrAction):
body


proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] =
try:
Expand Down Expand Up @@ -234,19 +238,19 @@ when defined(rln_v2):
let membershipFee = g.membershipFee.get()

var gasPrice: int
retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"):
g.retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"):
int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitment = identityCredential.idCommitment.toUInt256()

var txHash: TxHash
let storageIndex = g.usingStorageIndex.get()
debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex, userMessageLimit = userMessageLimit
retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"):
g.retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"):
await registryContract.register(storageIndex, idCommitment, u256(userMessageLimit)).send(gasPrice = gasPrice)

# wait for the transaction to be mined
var tsReceipt: ReceiptObject
retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"):
g.retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"):
await ethRpc.getMinedTransactionReceipt(txHash)
debug "registration transaction mined", txHash = txHash
g.registrationTxHash = some(txHash)
Expand Down Expand Up @@ -282,19 +286,19 @@ else:
let membershipFee = g.membershipFee.get()

var gasPrice: int
retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"):
g.retryWrapper(gasPrice, "Failed to get gas price"):
int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitment = credentials.idCommitment.toUInt256()

var txHash: TxHash
let storageIndex = g.usingStorageIndex.get()
debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex
retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"):
g.retryWrapper(txHash, "Failed to register the member"):
await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice)

# wait for the transaction to be mined
var tsReceipt: ReceiptObject
retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"):
g.retryWrapper(tsReceipt, "Failed to get the transaction receipt"):
await ethRpc.getMinedTransactionReceipt(txHash)
debug "registration transaction mined", txHash = txHash
g.registrationTxHash = some(txHash)
Expand Down Expand Up @@ -393,7 +397,7 @@ proc getRawEvents(g: OnchainGroupManager,
let rlnContract = g.rlnContract.get()

var events: JsonNode
retryWrapper(events, RetryStrategy.new(), "Failed to get the events"):
g.retryWrapper(events, "Failed to get the events"):
await rlnContract.getJsonLogs(MemberRegistered,
fromBlock = some(fromBlock.blockId()),
toBlock = some(toBlock.blockId()))
Expand Down Expand Up @@ -486,26 +490,29 @@ proc getAndHandleEvents(g: OnchainGroupManager,

return true

proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void =
proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration) =
g.blockFetchingActive = false

proc runIntervalLoop() {.async, gcsafe.} =
g.blockFetchingActive = true

while g.blockFetchingActive:
var retCb: bool
retryWrapper(retCb, RetryStrategy.new(), "Failed to run the interval loop"):
g.retryWrapper(retCb, "Failed to run the interval block fetching loop"):
await cb()
await sleepAsync(interval)

# using asyncSpawn is OK here since
# we make use of the error handling provided by
# OnFatalErrorHandler
asyncSpawn runIntervalLoop()


proc getNewBlockCallback(g: OnchainGroupManager): proc =
let ethRpc = g.ethRpc.get()
proc wrappedCb(): Future[bool] {.async, gcsafe.} =
var latestBlock: BlockNumber
retryWrapper(latestBlock, RetryStrategy.new(), "Failed to get the latest block number"):
g.retryWrapper(latestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())

if latestBlock <= g.latestProcessedBlock:
Expand All @@ -514,7 +521,7 @@ proc getNewBlockCallback(g: OnchainGroupManager): proc =
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle new block"):
g.retryWrapper(handleBlockRes, "Failed to handle new block"):
await g.getAndHandleEvents(fromBlock, latestBlock)
return true
return wrappedCb
Expand Down Expand Up @@ -548,15 +555,15 @@ proc startOnchainSync(g: OnchainGroupManager):
# chunk events
while true:
var currentLatestBlock: BlockNumber
retryWrapper(currentLatestBlock, RetryStrategy.new(), "Failed to get the latest block number"):
g.retryWrapper(currentLatestBlock, "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
if fromBlock >= currentLatestBlock:
break

let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle old blocks"):
g.retryWrapper(handleBlockRes, "Failed to handle old blocks"):
await g.getAndHandleEvents(fromBlock, toBlock)
fromBlock = toBlock + 1

Expand Down Expand Up @@ -588,11 +595,11 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
method init*(g: OnchainGroupManager): Future[void] {.async.} =
var ethRpc: Web3
# check if the Ethereum client is reachable
retryWrapper(ethRpc, RetryStrategy.new(), "Failed to connect to the Ethereum client"):
g.retryWrapper(ethRpc, "Failed to connect to the Ethereum client"):
await newWeb3(g.ethClientUrl)
# Set the chain id
var chainId: Quantity
retryWrapper(chainId, RetryStrategy.new(), "Failed to get the chain id"):
g.retryWrapper(chainId, "Failed to get the chain id"):
await ethRpc.provider.eth_chainId()
g.chainId = some(chainId)

Expand All @@ -609,12 +616,12 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =

# get the current storage index
var usingStorageIndex: Uint16
retryWrapper(usingStorageIndex, RetryStrategy.new(), "Failed to get the storage index"):
g.retryWrapper(usingStorageIndex, "Failed to get the storage index"):
await registryContract.usingStorageIndex().call()

g.usingStorageIndex = some(usingStorageIndex)
var rlnContractAddress: Address
retryWrapper(rlnContractAddress, RetryStrategy.new(), "Failed to get the rln contract address"):
g.retryWrapper(rlnContractAddress, "Failed to get the rln contract address"):
await registryContract.storages(usingStorageIndex).call()
let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress)

Expand Down Expand Up @@ -670,12 +677,12 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =

# check if the contract exists by calling a static function
var membershipFee: Uint256
retryWrapper(membershipFee, RetryStrategy.new(), "Failed to get the membership deposit"):
g.retryWrapper(membershipFee, "Failed to get the membership deposit"):
await rlnContract.MEMBERSHIP_DEPOSIT().call()
g.membershipFee = some(membershipFee)

var deployedBlockNumber: Uint256
retryWrapper(deployedBlockNumber, RetryStrategy.new(), "Failed to get the deployed block number"):
g.retryWrapper(deployedBlockNumber, "Failed to get the deployed block number"):
await rlnContract.deployedBlockNumber().call()
debug "using rln storage", deployedBlockNumber, rlnContractAddress
g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)
Expand All @@ -686,15 +693,16 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
var newEthRpc: Web3
retryWrapper(newEthRpc, RetryStrategy.new(), "Failed to reconnect with the Ethereum client"):
g.retryWrapper(newEthRpc, "Failed to reconnect with the Ethereum client"):
await newWeb3(g.ethClientUrl)
newEthRpc.ondisconnect = ethRpc.ondisconnect
g.ethRpc = some(newEthRpc)

try:
asyncSpawn g.startOnchainSync()
except CatchableError:
error "failed to restart group sync", error = getCurrentExceptionMsg()
await g.startOnchainSync()
except CatchableError, Exception:
g.onErrAction("failed to restart group sync" & ": " & getCurrentExceptionMsg())


ethRpc.ondisconnect = proc() =
asyncSpawn onDisconnect()
Expand All @@ -719,7 +727,7 @@ proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
let ethRpc = g.ethRpc.get()

var syncing: JsonNode
retryWrapper(syncing, RetryStrategy.new(), "Failed to get the syncing status"):
g.retryWrapper(syncing, "Failed to get the syncing status"):
await ethRpc.provider.eth_syncing()
return syncing.getBool()

Expand All @@ -731,7 +739,7 @@ method isReady*(g: OnchainGroupManager):
return false

var currentBlock: BlockNumber
retryWrapper(currentBlock, RetryStrategy.new(), "Failed to get the current block number"):
g.retryWrapper(currentBlock, "Failed to get the current block number"):
cast[BlockNumber](await g.ethRpc
.get()
.provider
Expand Down
10 changes: 8 additions & 2 deletions waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import
../../../common/error_handling
import
chronos


type RetryStrategy* = object
shouldRetry*: bool
Expand All @@ -18,6 +19,7 @@ proc new*(T: type RetryStrategy): RetryStrategy =
template retryWrapper*(res: auto,
retryStrategy: RetryStrategy,
errStr: string,
errCallback: OnFatalErrorHandler = nil,
body: untyped): auto =
var retryCount = retryStrategy.retryCount
var shouldRetry = retryStrategy.shouldRetry
Expand All @@ -32,4 +34,8 @@ template retryWrapper*(res: auto,
exceptionMessage = getCurrentExceptionMsg()
await sleepAsync(retryStrategy.retryDelay)
if shouldRetry:
raise newException(CatchableError, errStr & ": " & exceptionMessage)
if errCallback == nil:
raise newException(CatchableError, errStr & ": " & exceptionMessage)
else:
errCallback(errStr & ": " & exceptionMessage)
return
Loading

0 comments on commit c1ca0f4

Please sign in to comment.