Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proof of concept: shutdown supervisor via mailbox command #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions src/Control/Concurrent/Supervisor/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ module Control.Concurrent.Supervisor.Types
, Supervisor
, QueueLike(..)
, Child_
, DeadLetter
, Message(..)
, RestartAction
, SupervisionEvent(..)
, RestartStrategy(..)
Expand Down Expand Up @@ -52,7 +52,11 @@ import Data.Time
import System.Clock (Clock(Monotonic), TimeSpec, getTime)

--------------------------------------------------------------------------------
type Mailbox = TChan DeadLetter
data Message =
DeadLetter !LetterEpoch !ThreadId !SomeException
| Shutdown

type Mailbox = TChan Message

--------------------------------------------------------------------------------
data SupervisionCtx q = SupervisionCtx {
Expand Down Expand Up @@ -88,9 +92,6 @@ instance QueueLike TBQueue where
isFull <- isFullTBQueue q
unless isFull $ writeTBQueue q e

--------------------------------------------------------------------------------
data DeadLetter = DeadLetter !LetterEpoch !ThreadId !SomeException

--------------------------------------------------------------------------------
type Epoch = TimeSpec
newtype LetterEpoch = LetterEpoch Epoch deriving Show
Expand Down Expand Up @@ -209,17 +210,8 @@ activeChildren Supervisor{_sp_ctx} = do
-- be killed as well. To do so, we explore the children tree, killing workers as we go,
-- and recursively calling `shutdownSupervisor` in case we hit a monitored `Supervisor`.
shutdownSupervisor :: QueueLike q => Supervisor q -> IO ()
shutdownSupervisor (Supervisor tid ctx) = do
chMap <- readIORef (_sc_children ctx)
processChildren (Map.toList chMap)
killThread tid
where
processChildren [] = return ()
processChildren (x:xs) = do
case x of
(workerTid, Worker{}) -> killThread workerTid
(_, Supvsr _ _ _ s) -> shutdownSupervisor s
processChildren xs
shutdownSupervisor (Supervisor _ ctx) = do
atomically $ writeTChan (_sc_mailbox ctx) Shutdown

-- $fork

Expand Down Expand Up @@ -325,9 +317,8 @@ restartOneForOne :: QueueLike q
restartOneForOne = restartChild

--------------------------------------------------------------------------------
handleEvents :: QueueLike q => SupervisionCtx q -> IO ()
handleEvents ctx@SupervisionCtx{..} = do
(DeadLetter epoch newDeath ex) <- atomically $ readTChan _sc_mailbox
handleMessage :: QueueLike q => SupervisionCtx q -> Message -> IO Bool
handleMessage ctx@SupervisionCtx{..} (DeadLetter epoch newDeath ex) = do
now <- getCurrentTime
atomically $ writeQueue _sc_eventStream (ChildDied newDeath ex now)
-- If we catch an `AsyncException`, we have nothing but good
Expand All @@ -337,7 +328,7 @@ handleEvents ctx@SupervisionCtx{..} = do
-- Remove the `Child_` from the map, log what happenend.
atomicModifyIORef' _sc_children $ \chMap -> (Map.delete newDeath chMap, ())
atomically $ writeQueue _sc_eventStream (ChildDied newDeath ex now)
handleEvents ctx
return True
Nothing -> do
restartResult <- case _sc_strategy of
OneForOne -> restartOneForOne ctx epoch now newDeath
Expand All @@ -349,7 +340,26 @@ handleEvents ctx@SupervisionCtx{..} = do
writeQueue _sc_eventStream reason
Restarted oldId newId rStatus tm ->
writeQueue _sc_eventStream (ChildRestarted oldId newId rStatus tm)
handleEvents ctx
return True
handleMessage SupervisionCtx{..} Shutdown = do
chMap <- readIORef _sc_children
processChildren (Map.toList chMap)
return False
where
processChildren [] = return ()
processChildren (x:xs) = do
case x of
(workerTid, Worker{}) -> killThread workerTid
(_, Supvsr _ _ _ s) -> shutdownSupervisor s
processChildren xs

handleEvents :: QueueLike q => SupervisionCtx q -> IO ()
handleEvents ctx@SupervisionCtx{..} = do
msg <- atomically $ readTChan _sc_mailbox
continue <- handleMessage ctx msg
case continue of
True -> handleEvents ctx
False -> return ()

-- $monitor

Expand Down