diff --git a/src/Control/Concurrent/Supervisor/Types.hs b/src/Control/Concurrent/Supervisor/Types.hs index 6a67014..b415525 100644 --- a/src/Control/Concurrent/Supervisor/Types.hs +++ b/src/Control/Concurrent/Supervisor/Types.hs @@ -15,7 +15,7 @@ module Control.Concurrent.Supervisor.Types , Supervisor , QueueLike(..) , Child_ - , DeadLetter + , Message(..) , RestartAction , SupervisionEvent(..) , RestartStrategy(..) @@ -52,7 +52,11 @@ import Data.Time import System.Clock (Clock(Monotonic), TimeSpec, getTime) -------------------------------------------------------------------------------- -type Mailbox = TChan DeadLetter +data Message = + DeadLetter !LetterEpoch !ThreadId !SomeException + | Shutdown + +type Mailbox = TChan Message -------------------------------------------------------------------------------- data SupervisionCtx q = SupervisionCtx { @@ -88,9 +92,6 @@ instance QueueLike TBQueue where isFull <- isFullTBQueue q unless isFull $ writeTBQueue q e --------------------------------------------------------------------------------- -data DeadLetter = DeadLetter !LetterEpoch !ThreadId !SomeException - -------------------------------------------------------------------------------- type Epoch = TimeSpec newtype LetterEpoch = LetterEpoch Epoch deriving Show @@ -209,17 +210,8 @@ activeChildren Supervisor{_sp_ctx} = do -- be killed as well. To do so, we explore the children tree, killing workers as we go, -- and recursively calling `shutdownSupervisor` in case we hit a monitored `Supervisor`. shutdownSupervisor :: QueueLike q => Supervisor q -> IO () -shutdownSupervisor (Supervisor tid ctx) = do - chMap <- readIORef (_sc_children ctx) - processChildren (Map.toList chMap) - killThread tid - where - processChildren [] = return () - processChildren (x:xs) = do - case x of - (workerTid, Worker{}) -> killThread workerTid - (_, Supvsr _ _ _ s) -> shutdownSupervisor s - processChildren xs +shutdownSupervisor (Supervisor _ ctx) = do + atomically $ writeTChan (_sc_mailbox ctx) Shutdown -- $fork @@ -325,9 +317,8 @@ restartOneForOne :: QueueLike q restartOneForOne = restartChild -------------------------------------------------------------------------------- -handleEvents :: QueueLike q => SupervisionCtx q -> IO () -handleEvents ctx@SupervisionCtx{..} = do - (DeadLetter epoch newDeath ex) <- atomically $ readTChan _sc_mailbox +handleMessage :: QueueLike q => SupervisionCtx q -> Message -> IO Bool +handleMessage ctx@SupervisionCtx{..} (DeadLetter epoch newDeath ex) = do now <- getCurrentTime atomically $ writeQueue _sc_eventStream (ChildDied newDeath ex now) -- If we catch an `AsyncException`, we have nothing but good @@ -337,7 +328,7 @@ handleEvents ctx@SupervisionCtx{..} = do -- Remove the `Child_` from the map, log what happenend. atomicModifyIORef' _sc_children $ \chMap -> (Map.delete newDeath chMap, ()) atomically $ writeQueue _sc_eventStream (ChildDied newDeath ex now) - handleEvents ctx + return True Nothing -> do restartResult <- case _sc_strategy of OneForOne -> restartOneForOne ctx epoch now newDeath @@ -349,7 +340,26 @@ handleEvents ctx@SupervisionCtx{..} = do writeQueue _sc_eventStream reason Restarted oldId newId rStatus tm -> writeQueue _sc_eventStream (ChildRestarted oldId newId rStatus tm) - handleEvents ctx + return True +handleMessage SupervisionCtx{..} Shutdown = do + chMap <- readIORef _sc_children + processChildren (Map.toList chMap) + return False + where + processChildren [] = return () + processChildren (x:xs) = do + case x of + (workerTid, Worker{}) -> killThread workerTid + (_, Supvsr _ _ _ s) -> shutdownSupervisor s + processChildren xs + +handleEvents :: QueueLike q => SupervisionCtx q -> IO () +handleEvents ctx@SupervisionCtx{..} = do + msg <- atomically $ readTChan _sc_mailbox + continue <- handleMessage ctx msg + case continue of + True -> handleEvents ctx + False -> return () -- $monitor