From ff479a2aafa34a3a7e43ff9fda152a1f8f25d396 Mon Sep 17 00:00:00 2001 From: Anon Ray Date: Thu, 30 Jul 2020 08:04:50 +0530 Subject: [PATCH] server: stop eventing subsystem threads when shutting down (#5479) * server: stop eventing subsystem threads when shutting down * Apply suggestions from code review Co-authored-by: Karthikeyan Chinnakonda Co-authored-by: Phil Freeman Co-authored-by: Phil Freeman Co-authored-by: Karthikeyan Chinnakonda --- server/commit_diff.txt | 16 ------------ server/src-lib/Hasura/App.hs | 47 +++++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 36 deletions(-) diff --git a/server/commit_diff.txt b/server/commit_diff.txt index 4078111e9ade7..4f921c97d06e6 100644 --- a/server/commit_diff.txt +++ b/server/commit_diff.txt @@ -1,17 +1 @@ **** Latest commit compared against master - 2c397f9f4f7628d6eb893974e273ecf3884d8a17 - -commit 2c397f9f4f7628d6eb893974e273ecf3884d8a17 (HEAD -> master, upstream/master) -Author: Anon Ray -Date: Thu Jul 30 08:04:50 2020 +0530 - - server: stop eventing subsystem threads when shutting down (#5479) - - * server: stop eventing subsystem threads when shutting down - - * Apply suggestions from code review - - Co-authored-by: Karthikeyan Chinnakonda - - Co-authored-by: Phil Freeman - Co-authored-by: Phil Freeman - Co-authored-by: Karthikeyan Chinnakonda diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index b4d9294654868..6c28e4f941fb6 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -73,8 +73,8 @@ import Hasura.Server.Telemetry import Hasura.Server.Version import Hasura.Session -import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL +import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS import qualified Hasura.Tracing as Tracing data ExitCode @@ -367,14 +367,14 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos fetchI = milliseconds $ fromMaybe (Milliseconds defaultFetchInterval) soEventsFetchInterval logEnvHeaders = soLogHeadersFromEnv - lockedEventsCtx <- liftIO $ atomically $ initLockedEventsCtx + lockedEventsCtx <- liftIO $ atomically initLockedEventsCtx -- prepare event triggers data prepareEvents _icPgPool logger eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers" - _eventQueueThread <- C.forkImmortal "processEventQueue" logger $ + eventQueueThread <- C.forkImmortal "processEventQueue" logger $ processEventQueue logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx @@ -383,35 +383,42 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos asyncActionsProcessor env logger (_scrCache cacheRef) _icPgPool _icHttpManager -- start a background thread to create new cron events - void $ liftIO $ C.forkImmortal "runCronEventsGenerator" logger $ + cronEventsThread <- liftIO $ C.forkImmortal "runCronEventsGenerator" logger $ runCronEventsGenerator logger _icPgPool (getSCFromRef cacheRef) -- prepare scheduled triggers prepareScheduledEvents _icPgPool logger -- start a background thread to deliver the scheduled events - void $ C.forkImmortal "processScheduledTriggers" logger $ + scheduledEventsThread <- C.forkImmortal "processScheduledTriggers" logger $ processScheduledTriggers env logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef) lockedEventsCtx -- start a background thread to check for updates updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $ checkForUpdates loggerCtx _icHttpManager - -- startTelemetry logger serveOpts cacheRef initCtx -- start a background thread for telemetry - when soEnableTelemetry $ do - unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice - - (dbId, pgVersion) <- liftIO $ runTxIO _icPgPool (Q.ReadCommitted, Nothing) $ - (,) <$> getDbId <*> getPgVersion - - void $ C.forkImmortal "runTelemetry" logger $ liftIO $ - runTelemetry logger _icHttpManager (getSCFromRef cacheRef) dbId _icInstanceId pgVersion - - - - -- events has its own shutdown mechanism, used in 'shutdownHandler' - let immortalThreads = [schemaSyncListenerThread, schemaSyncProcessorThread, updateThread, asyncActionsThread] + telemetryThread <- if soEnableTelemetry + then do + unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice + + (dbId, pgVersion) <- liftIO $ runTxIO _icPgPool (Q.ReadCommitted, Nothing) $ + (,) <$> getDbId <*> getPgVersion + + telemetryThread <- C.forkImmortal "runTelemetry" logger $ liftIO $ + runTelemetry logger _icHttpManager (getSCFromRef cacheRef) dbId _icInstanceId pgVersion + return $ Just telemetryThread + else return Nothing + + -- all the immortal threads are collected so that they can be stopped when gracefully shutting down + let immortalThreads = [ schemaSyncListenerThread + , schemaSyncProcessorThread + , updateThread + , asyncActionsThread + , eventQueueThread + , scheduledEventsThread + , cronEventsThread + ] <> maybe [] pure telemetryThread finishTime <- liftIO Clock.getCurrentTime let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime @@ -475,7 +482,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos -> IO () unlockEventsForShutdown pool (Logger logger) triggerType eventType doUnlock lockedIdsVar = do lockedIds <- readTVarIO lockedIdsVar - unless (Set.null lockedIds) do + unless (Set.null lockedIds) $ do result <- runTx pool (Q.ReadCommitted, Nothing) (doUnlock $ toList lockedIds) case result of Left err -> logger $ mkGenericStrLog LevelWarn triggerType $