Skip to content

Commit

Permalink
Merge pull request #156 from kazu-yamamoto/improve-manager
Browse files Browse the repository at this point in the history
Improve manager
  • Loading branch information
kazu-yamamoto authored Nov 28, 2024
2 parents f7c0701 + 3af78ae commit b054b01
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 56 deletions.
89 changes: 57 additions & 32 deletions Network/HTTP2/H2/Manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Network.HTTP2.H2.Manager (
stopAfter,
forkManaged,
forkManagedUnmask,
withTimeout,
forkManagedTimeout,
KilledByHttp2ThreadManager (..),
waitCounter0,
) where
Expand All @@ -19,8 +19,10 @@ import Control.Concurrent.STM
import Control.Exception
import qualified Control.Exception as E
import Data.Foldable
import Data.Map (Map)
import qualified Data.Map.Strict as Map
import Data.IORef
import Data.IntMap (IntMap)
import qualified Data.IntMap.Strict as Map
import System.Mem.Weak (Weak, deRefWeak)
import qualified System.TimeManager as T

import Imports
Expand All @@ -30,17 +32,15 @@ import Imports
-- | Manager to manage the thread and the timer.
data Manager = Manager T.Manager (TVar ManagedThreads)

type ManagedThreads = Map ThreadId TimeoutHandle
type ManagedThreads = IntMap ManagedThread

----------------------------------------------------------------

data TimeoutHandle
= ThreadWithTimeout T.Handle
| ThreadWithoutTimeout

cancelTimeout :: TimeoutHandle -> IO ()
cancelTimeout (ThreadWithTimeout th) = T.cancel th
cancelTimeout ThreadWithoutTimeout = return ()
-- 'IORef' prevents race between WAI TimeManager (TimeoutThread)
-- and stopAfter (KilledByHttp2ThreadManager).
-- It is initialized with 'False' and turned into 'True' when locked.
-- The winner can throw an asynchronous exception.
data ManagedThread = ManagedThread (Weak ThreadId) (IORef Bool)

----------------------------------------------------------------

Expand Down Expand Up @@ -74,10 +74,10 @@ stopAfter (Manager _timmgr var) action cleanup = do
m0 <- readTVar var
writeTVar var Map.empty
return m0
forM_ (Map.elems m) cancelTimeout
let er = either Just (const Nothing) ma
forM_ (Map.keys m) $ \tid ->
E.throwTo tid $ KilledByHttp2ThreadManager er
let ths = Map.elems m
er = either Just (const Nothing) ma
ex = KilledByHttp2ThreadManager er
forM_ ths $ \(ManagedThread wtid ref) -> lockAndKill wtid ref ex
case ma of
Left err -> cleanup (Just err) >> throwIO err
Right a -> cleanup Nothing >> return a
Expand All @@ -97,18 +97,44 @@ forkManaged mgr label io =
forkManagedUnmask
:: Manager -> String -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask (Manager _timmgr var) label io =
-- This is the top level of thread.
-- So, SomeException should be reasonable.
void $ mask_ $ forkIOWithUnmask $ \unmask -> E.handle ignore $ do
labelMe label
tid <- myThreadId
atomically $ modifyTVar var $ Map.insert tid ThreadWithoutTimeout
-- We catch the exception and do not rethrow it: we don't want the
-- exception printed to stderr.
io unmask `catch` ignore
atomically $ modifyTVar var $ Map.delete tid
where
ignore (E.SomeException _) = return ()
E.bracket (setup var) (clear var) $ \_ -> io unmask

forkManagedTimeout :: Manager -> String -> (T.Handle -> IO ()) -> IO ()
forkManagedTimeout (Manager timmgr var) label io =
void $ forkIO $ E.handle ignore $ do
labelMe label
E.bracket (setup var) (clear var) $ \(_n, wtid, ref) ->
-- 'TimeoutThread' is ignored by 'withHandle'.
T.withHandle timmgr (lockAndKill wtid ref T.TimeoutThread) io

setup :: TVar (IntMap ManagedThread) -> IO (Int, Weak ThreadId, IORef Bool)
setup var = do
(wtid, n) <- myWeakThradId
ref <- newIORef False
let ent = ManagedThread wtid ref
-- asking to throw KilledByHttp2ThreadManager to me
atomically $ modifyTVar' var $ Map.insert n ent
return (n, wtid, ref)

lockAndKill :: Exception e => Weak ThreadId -> IORef Bool -> e -> IO ()
lockAndKill wtid ref e = do
alreadyLocked <- atomicModifyIORef' ref (\b -> (True, b)) -- try to lock
unless alreadyLocked $ do
mtid <- deRefWeak wtid
case mtid of
Nothing -> return ()
Just tid -> E.throwTo tid e

clear
:: TVar (IntMap ManagedThread)
-> (Map.Key, Weak ThreadId, IORef Bool)
-> IO ()
clear var (n, _, _) = atomically $ modifyTVar' var $ Map.delete n

ignore :: KilledByHttp2ThreadManager -> IO ()
ignore (KilledByHttp2ThreadManager _) = return ()

waitCounter0 :: Manager -> IO ()
waitCounter0 (Manager _timmgr var) = atomically $ do
Expand All @@ -117,10 +143,9 @@ waitCounter0 (Manager _timmgr var) = atomically $ do

----------------------------------------------------------------

withTimeout :: Manager -> (T.Handle -> IO ()) -> IO ()
withTimeout (Manager timmgr var) action =
T.withHandleKillThread timmgr (return ()) $ \th -> do
tid <- myThreadId
-- overriding ThreadWithoutTimeout
atomically $ modifyTVar var $ Map.insert tid $ ThreadWithTimeout th
action th
myWeakThradId :: IO (Weak ThreadId, Int)
myWeakThradId = do
tid <- myThreadId
wtid <- mkWeakThreadId tid
let n = read (drop 9 $ show tid) -- drop "ThreadId "
return (wtid, n)
45 changes: 21 additions & 24 deletions Network/HTTP2/Server/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@ import Network.HTTP2.H2

runServer :: Config -> Server -> Launch
runServer conf server ctx@Context{..} strm req =
forkManaged threadManager label $
withTimeout threadManager $ \th -> do
-- FIXME: exception
let req' = pauseRequestBody th
aux = Aux th mySockAddr peerSockAddr
request = Request req'
lc <- newLoopCheck strm Nothing
server request aux $ sendResponse conf ctx lc strm request
adjustRxWindow ctx strm
forkManagedTimeout threadManager label $ \th -> do
let req' = pauseRequestBody th
aux = Aux th mySockAddr peerSockAddr
request = Request req'
lc <- newLoopCheck strm Nothing
server request aux $ sendResponse conf ctx lc strm request
adjustRxWindow ctx strm
where
label = "H2 response sender for stream " ++ show (streamNumber strm)
pauseRequestBody th = req{inpObjBody = readBody'}
Expand Down Expand Up @@ -169,21 +167,20 @@ sendStreaming
-> IO (TBQueue StreamingChunk)
sendStreaming Context{..} strm strmbdy = do
tbq <- newTBQueueIO 10 -- fixme: hard coding: 10
forkManaged threadManager label $
withTimeout threadManager $ \th ->
withOutBodyIface tbq id $ \iface -> do
let iface' =
iface
{ outBodyPush = \b -> do
T.pause th
outBodyPush iface b
T.resume th
, outBodyPushFinal = \b -> do
T.pause th
outBodyPushFinal iface b
T.resume th
}
strmbdy iface'
forkManagedTimeout threadManager label $ \th ->
withOutBodyIface tbq id $ \iface -> do
let iface' =
iface
{ outBodyPush = \b -> do
T.pause th
outBodyPush iface b
T.resume th
, outBodyPushFinal = \b -> do
T.pause th
outBodyPushFinal iface b
T.resume th
}
strmbdy iface'
return tbq
where
label = "H2 response streaming sender for " ++ show (streamNumber strm)

0 comments on commit b054b01

Please sign in to comment.