Skip to content

Commit

Permalink
server: stop eventing subsystem threads when shutting down (hasura#5479)
Browse files Browse the repository at this point in the history
* server: stop eventing subsystem threads when shutting down

* Apply suggestions from code review

Co-authored-by: Karthikeyan Chinnakonda <chkarthikeyan95@gmail.com>

Co-authored-by: Phil Freeman <phil@hasura.io>
Co-authored-by: Phil Freeman <paf31@cantab.net>
Co-authored-by: Karthikeyan Chinnakonda <chkarthikeyan95@gmail.com>
  • Loading branch information
4 people committed Aug 3, 2020
1 parent 3140eb3 commit ff479a2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 36 deletions.
16 changes: 0 additions & 16 deletions server/commit_diff.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1 @@
**** Latest commit compared against master - 2c397f9f4f7628d6eb893974e273ecf3884d8a17

commit 2c397f9f4f7628d6eb893974e273ecf3884d8a17 (HEAD -> master, upstream/master)
Author: Anon Ray <ecthiender@users.noreply.github.com>
Date: Thu Jul 30 08:04:50 2020 +0530

server: stop eventing subsystem threads when shutting down (#5479)

* server: stop eventing subsystem threads when shutting down

* Apply suggestions from code review

Co-authored-by: Karthikeyan Chinnakonda <chkarthikeyan95@gmail.com>

Co-authored-by: Phil Freeman <phil@hasura.io>
Co-authored-by: Phil Freeman <paf31@cantab.net>
Co-authored-by: Karthikeyan Chinnakonda <chkarthikeyan95@gmail.com>
47 changes: 27 additions & 20 deletions server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ import Hasura.Server.Telemetry
import Hasura.Server.Version
import Hasura.Session

import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import qualified Hasura.Tracing as Tracing

data ExitCode
Expand Down Expand Up @@ -367,14 +367,14 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
fetchI = milliseconds $ fromMaybe (Milliseconds defaultFetchInterval) soEventsFetchInterval
logEnvHeaders = soLogHeadersFromEnv

lockedEventsCtx <- liftIO $ atomically $ initLockedEventsCtx
lockedEventsCtx <- liftIO $ atomically initLockedEventsCtx

-- prepare event triggers data
prepareEvents _icPgPool logger
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"

_eventQueueThread <- C.forkImmortal "processEventQueue" logger $
eventQueueThread <- C.forkImmortal "processEventQueue" logger $
processEventQueue logger logEnvHeaders
_icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx

Expand All @@ -383,35 +383,42 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
asyncActionsProcessor env logger (_scrCache cacheRef) _icPgPool _icHttpManager

-- start a background thread to create new cron events
void $ liftIO $ C.forkImmortal "runCronEventsGenerator" logger $
cronEventsThread <- liftIO $ C.forkImmortal "runCronEventsGenerator" logger $
runCronEventsGenerator logger _icPgPool (getSCFromRef cacheRef)

-- prepare scheduled triggers
prepareScheduledEvents _icPgPool logger

-- start a background thread to deliver the scheduled events
void $ C.forkImmortal "processScheduledTriggers" logger $
scheduledEventsThread <- C.forkImmortal "processScheduledTriggers" logger $
processScheduledTriggers env logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef) lockedEventsCtx

-- start a background thread to check for updates
updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $
checkForUpdates loggerCtx _icHttpManager

-- startTelemetry logger serveOpts cacheRef initCtx
-- start a background thread for telemetry
when soEnableTelemetry $ do
unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice

(dbId, pgVersion) <- liftIO $ runTxIO _icPgPool (Q.ReadCommitted, Nothing) $
(,) <$> getDbId <*> getPgVersion

void $ C.forkImmortal "runTelemetry" logger $ liftIO $
runTelemetry logger _icHttpManager (getSCFromRef cacheRef) dbId _icInstanceId pgVersion



-- events has its own shutdown mechanism, used in 'shutdownHandler'
let immortalThreads = [schemaSyncListenerThread, schemaSyncProcessorThread, updateThread, asyncActionsThread]
telemetryThread <- if soEnableTelemetry
then do
unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice

(dbId, pgVersion) <- liftIO $ runTxIO _icPgPool (Q.ReadCommitted, Nothing) $
(,) <$> getDbId <*> getPgVersion

telemetryThread <- C.forkImmortal "runTelemetry" logger $ liftIO $
runTelemetry logger _icHttpManager (getSCFromRef cacheRef) dbId _icInstanceId pgVersion
return $ Just telemetryThread
else return Nothing

-- all the immortal threads are collected so that they can be stopped when gracefully shutting down
let immortalThreads = [ schemaSyncListenerThread
, schemaSyncProcessorThread
, updateThread
, asyncActionsThread
, eventQueueThread
, scheduledEventsThread
, cronEventsThread
] <> maybe [] pure telemetryThread

finishTime <- liftIO Clock.getCurrentTime
let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime
Expand Down Expand Up @@ -475,7 +482,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
-> IO ()
unlockEventsForShutdown pool (Logger logger) triggerType eventType doUnlock lockedIdsVar = do
lockedIds <- readTVarIO lockedIdsVar
unless (Set.null lockedIds) do
unless (Set.null lockedIds) $ do
result <- runTx pool (Q.ReadCommitted, Nothing) (doUnlock $ toList lockedIds)
case result of
Left err -> logger $ mkGenericStrLog LevelWarn triggerType $
Expand Down

0 comments on commit ff479a2

Please sign in to comment.