Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rln-relay): graceful shutdown with non-zero exit code #2429

Merged
merged 4 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,13 @@ proc setupProtocols(node: WakuNode,
except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())

if conf.rlnRelay:
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

if conf.rlnRelay:
when defined(rln_v2):
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
Expand All @@ -472,6 +477,7 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
onFatalErrorAction: onFatalErrorAction,
)
else:
let rlnConf = WakuRlnConfig(
Expand All @@ -482,6 +488,7 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
onFatalErrorAction: onFatalErrorAction,
)

try:
Expand All @@ -490,18 +497,12 @@ proc setupProtocols(node: WakuNode,
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())

if conf.store:
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

# Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections,
onErrAction)
onFatalErrorAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ proc configureStore(node: WakuNode,
Future[Result[void, string]] {.async.} =
## This snippet is extracted/duplicated from the app.nim file

var onErrAction = proc(msg: string) {.gcsafe, closure.} =
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
# error "Unrecoverable error occurred", error = msg
Expand All @@ -74,7 +74,7 @@ proc configureStore(node: WakuNode,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onErrAction)
onFatalErrorAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

Expand Down
2 changes: 2 additions & 0 deletions waku/common/error_handling.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
type
OnFatalErrorHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].}
2 changes: 1 addition & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ proc mountRlnRelay*(node: WakuNode,
raise newException(CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay")

let rlnRelayRes = waitFor WakuRlnRelay.new(rlnConf,
registrationHandler)
registrationHandler)
if rlnRelayRes.isErr():
raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
let rlnRelay = rlnRelayRes.get()
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import
chronos
import
../waku_core,
../common/error_handling,
./common

const DefaultPageSize*: uint = 25

type
ArchiveDriverResult*[T] = Result[T, string]
ArchiveDriver* = ref object of RootObj
OnErrHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].}

type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)

Expand Down
7 changes: 4 additions & 3 deletions waku/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import
../driver,
../../common/databases/dburl,
../../common/databases/db_sqlite,
../../common/error_handling,
./sqlite_driver,
./sqlite_driver/migrations as archive_driver_sqlite_migrations,
./queue_driver
Expand All @@ -29,13 +30,13 @@ proc new*(T: type ArchiveDriver,
vacuum: bool,
migrate: bool,
maxNumConn: int,
onErrAction: OnErrHandler):
onFatalErrorAction: OnFatalErrorHandler):
Result[T, string] =
## url - string that defines the database
## vacuum - if true, a cleanup operation will be applied to the database
## migrate - if true, the database schema will be updated
## maxNumConn - defines the maximum number of connections to handle simultaneously (Postgres)
## onErrAction - called if, e.g., the connection with db got lost
## onFatalErrorAction - called if, e.g., the connection with db got lost

let dbUrlValidationRes = dburl.validateDbUrl(url)
if dbUrlValidationRes.isErr():
Expand Down Expand Up @@ -85,7 +86,7 @@ proc new*(T: type ArchiveDriver,
when defined(postgres):
let res = PostgresDriver.new(dbUrl = url,
maxConnections = maxNumConn,
onErrAction = onErrAction)
onFatalErrorAction = onFatalErrorAction)
if res.isErr():
return err("failed to init postgres archive driver: " & res.error)

Expand Down
11 changes: 6 additions & 5 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import
chronos,
chronicles
import
../../../common/error_handling,
../../../waku_core,
../../common,
../../driver,
Expand Down Expand Up @@ -89,7 +90,7 @@ const DefaultMaxNumConns = 50
proc new*(T: type PostgresDriver,
dbUrl: string,
maxConnections = DefaultMaxNumConns,
onErrAction: OnErrHandler = nil):
onFatalErrorAction: OnFatalErrorHandler = nil):
ArchiveDriverResult[T] =

## Very simplistic split of max connections
Expand All @@ -101,11 +102,11 @@ proc new*(T: type PostgresDriver,
let writeConnPool = PgAsyncPool.new(dbUrl, maxNumConnOnEachPool).valueOr:
return err("error creating write conn pool PgAsyncPool")

if not isNil(onErrAction):
asyncSpawn checkConnectivity(readConnPool, onErrAction)
if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(readConnPool, onFatalErrorAction)

if not isNil(onErrAction):
asyncSpawn checkConnectivity(writeConnPool, onErrAction)
if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(writeConnPool, onFatalErrorAction)

return ok(PostgresDriver(writeConnPool: writeConnPool,
readConnPool: readConnPool))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import
stew/results
import
../../driver,
../../../common/databases/db_postgres
../../../common/databases/db_postgres,
../../../common/error_handling

## Simple query to validate that the postgres is working and attending requests
const HealthCheckQuery = "SELECT version();"
Expand All @@ -17,7 +18,7 @@ const MaxNumTrials = 20
const TrialInterval = 1.seconds

proc checkConnectivity*(connPool: PgAsyncPool,
onErrAction: OnErrHandler) {.async.} =
onFatalErrorAction: OnFatalErrorHandler) {.async.} =

while true:

Expand All @@ -29,7 +30,7 @@ proc checkConnectivity*(connPool: PgAsyncPool,
block errorBlock:
## Force close all the opened connections. No need to close gracefully.
(await connPool.resetConnPool()).isOkOr:
onErrAction("checkConnectivity resetConnPool error: " & error)
onFatalErrorAction("checkConnectivity resetConnPool error: " & error)

var numTrial = 0
while numTrial < MaxNumTrials:
Expand All @@ -42,6 +43,6 @@ proc checkConnectivity*(connPool: PgAsyncPool,
numTrial.inc()

## The connection couldn't be resumed. Let's inform the upper layers.
onErrAction("postgres health check error: " & error)
onFatalErrorAction("postgres health check error: " & error)

await sleepAsync(CheckConnectivityInterval)
2 changes: 2 additions & 0 deletions waku/waku_rln_relay/group_manager/group_manager_base.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import
../../common/error_handling,
../protocol_types,
../protocol_metrics,
../constants,
Expand Down Expand Up @@ -44,6 +45,7 @@ type
initialized*: bool
latestIndex*: MembershipIndex
validRoots*: Deque[MerkleNode]
onFatalErrorAction*: OnFatalErrorHandler
when defined(rln_v2):
userMessageLimit*: Option[UserMessageLimit]

Expand Down
Loading
Loading