From c7b607ea8c9e4cc26c32dcc7daca1c2a1fe73269 Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Mon, 25 Nov 2024 12:15:32 +0900 Subject: [PATCH 1/6] using Weak ThreadId to prevent thread leak --- Network/HTTP2/H2/Manager.hs | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/Network/HTTP2/H2/Manager.hs b/Network/HTTP2/H2/Manager.hs index 017665c7..a54eea99 100644 --- a/Network/HTTP2/H2/Manager.hs +++ b/Network/HTTP2/H2/Manager.hs @@ -19,8 +19,9 @@ import Control.Concurrent.STM import Control.Exception import qualified Control.Exception as E import Data.Foldable -import Data.Map (Map) -import qualified Data.Map.Strict as Map +import Data.IntMap (IntMap) +import qualified Data.IntMap.Strict as Map +import System.Mem.Weak (Weak, deRefWeak) import qualified System.TimeManager as T import Imports @@ -30,7 +31,7 @@ import Imports -- | Manager to manage the thread and the timer. data Manager = Manager T.Manager (TVar ManagedThreads) -type ManagedThreads = Map ThreadId TimeoutHandle +type ManagedThreads = IntMap (Weak ThreadId, TimeoutHandle) ---------------------------------------------------------------- @@ -74,10 +75,14 @@ stopAfter (Manager _timmgr var) action cleanup = do m0 <- readTVar var writeTVar var Map.empty return m0 - forM_ (Map.elems m) cancelTimeout + let ths = Map.elems m + forM_ (map snd ths) cancelTimeout let er = either Just (const Nothing) ma - forM_ (Map.keys m) $ \tid -> - E.throwTo tid $ KilledByHttp2ThreadManager er + forM_ (map fst ths) $ \wtid -> do + mtid <- deRefWeak wtid + case mtid of + Nothing -> return () + Just tid -> E.throwTo tid $ KilledByHttp2ThreadManager er case ma of Left err -> cleanup (Just err) >> throwIO err Right a -> cleanup Nothing >> return a @@ -101,12 +106,12 @@ forkManagedUnmask (Manager _timmgr var) label io = -- So, SomeException should be reasonable. void $ mask_ $ forkIOWithUnmask $ \unmask -> E.handle ignore $ do labelMe label - tid <- myThreadId - atomically $ modifyTVar var $ Map.insert tid ThreadWithoutTimeout + (wtid, n) <- myWeakThradId + atomically $ modifyTVar var $ Map.insert n (wtid, ThreadWithoutTimeout) -- We catch the exception and do not rethrow it: we don't want the -- exception printed to stderr. io unmask `catch` ignore - atomically $ modifyTVar var $ Map.delete tid + atomically $ modifyTVar var $ Map.delete n where ignore (E.SomeException _) = return () @@ -120,7 +125,14 @@ waitCounter0 (Manager _timmgr var) = atomically $ do withTimeout :: Manager -> (T.Handle -> IO ()) -> IO () withTimeout (Manager timmgr var) action = T.withHandleKillThread timmgr (return ()) $ \th -> do - tid <- myThreadId + (wtid, n) <- myWeakThradId -- overriding ThreadWithoutTimeout - atomically $ modifyTVar var $ Map.insert tid $ ThreadWithTimeout th + atomically $ modifyTVar var $ Map.insert n $ (wtid, ThreadWithTimeout th) action th + +myWeakThradId :: IO (Weak ThreadId, Int) +myWeakThradId = do + tid <- myThreadId + wtid <- mkWeakThreadId tid + let n = read (drop 9 $ show tid) -- drop "ThreadId " + return (wtid, n) From b1b2177d24de049552ee3a5285a3b219b1f207de Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Mon, 25 Nov 2024 12:32:07 +0900 Subject: [PATCH 2/6] making the semantics of forkManagedUnmask clear - use "bracket" - ignore "KilledByHttp2ThreadManager" only as "KilledByHttp2ThreadManager" is thrown here --- Network/HTTP2/H2/Manager.hs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/Network/HTTP2/H2/Manager.hs b/Network/HTTP2/H2/Manager.hs index a54eea99..38cdb97d 100644 --- a/Network/HTTP2/H2/Manager.hs +++ b/Network/HTTP2/H2/Manager.hs @@ -102,18 +102,17 @@ forkManaged mgr label io = forkManagedUnmask :: Manager -> String -> ((forall x. IO x -> IO x) -> IO ()) -> IO () forkManagedUnmask (Manager _timmgr var) label io = - -- This is the top level of thread. - -- So, SomeException should be reasonable. void $ mask_ $ forkIOWithUnmask $ \unmask -> E.handle ignore $ do labelMe label + E.bracket setup clear $ \_ -> io unmask + where + setup = do (wtid, n) <- myWeakThradId + -- asking to throw KilledByHttp2ThreadManager to me atomically $ modifyTVar var $ Map.insert n (wtid, ThreadWithoutTimeout) - -- We catch the exception and do not rethrow it: we don't want the - -- exception printed to stderr. - io unmask `catch` ignore - atomically $ modifyTVar var $ Map.delete n - where - ignore (E.SomeException _) = return () + return n + clear n = atomically $ modifyTVar var $ Map.delete n + ignore (KilledByHttp2ThreadManager _) = return () waitCounter0 :: Manager -> IO () waitCounter0 (Manager _timmgr var) = atomically $ do From 5027d40abfaf3c41ab39a9ee38517b33013c50de Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Mon, 25 Nov 2024 14:26:19 +0900 Subject: [PATCH 3/6] comment --- Network/HTTP2/H2/Manager.hs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Network/HTTP2/H2/Manager.hs b/Network/HTTP2/H2/Manager.hs index 38cdb97d..e6d078be 100644 --- a/Network/HTTP2/H2/Manager.hs +++ b/Network/HTTP2/H2/Manager.hs @@ -76,6 +76,10 @@ stopAfter (Manager _timmgr var) action cleanup = do writeTVar var Map.empty return m0 let ths = Map.elems m + -- Managed threads may receive 'TimeoutThread' and + -- 'KilledByHttp2ThreadManager'. Before throwing to + -- 'KilledByHttp2ThreadManager' to the tagets, + -- let's cancel 'TimeoutThread' to avoid race. forM_ (map snd ths) cancelTimeout let er = either Just (const Nothing) ma forM_ (map fst ths) $ \wtid -> do From 568af806bc71cbbeb7dbe19897609c9644759816 Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Mon, 25 Nov 2024 15:39:04 +0900 Subject: [PATCH 4/6] refactoring --- Network/HTTP2/H2/Manager.hs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Network/HTTP2/H2/Manager.hs b/Network/HTTP2/H2/Manager.hs index e6d078be..b5d1f9ba 100644 --- a/Network/HTTP2/H2/Manager.hs +++ b/Network/HTTP2/H2/Manager.hs @@ -31,17 +31,15 @@ import Imports -- | Manager to manage the thread and the timer. data Manager = Manager T.Manager (TVar ManagedThreads) -type ManagedThreads = IntMap (Weak ThreadId, TimeoutHandle) +type ManagedThreads = IntMap ManagedThread ---------------------------------------------------------------- -data TimeoutHandle - = ThreadWithTimeout T.Handle - | ThreadWithoutTimeout +data ManagedThread = ManagedThread (Weak ThreadId) (Maybe T.Handle) -cancelTimeout :: TimeoutHandle -> IO () -cancelTimeout (ThreadWithTimeout th) = T.cancel th -cancelTimeout ThreadWithoutTimeout = return () +cancelTimeout :: ManagedThread -> IO () +cancelTimeout (ManagedThread _ (Just th)) = T.cancel th +cancelTimeout _ = return () ---------------------------------------------------------------- @@ -80,9 +78,9 @@ stopAfter (Manager _timmgr var) action cleanup = do -- 'KilledByHttp2ThreadManager'. Before throwing to -- 'KilledByHttp2ThreadManager' to the tagets, -- let's cancel 'TimeoutThread' to avoid race. - forM_ (map snd ths) cancelTimeout + forM_ ths cancelTimeout let er = either Just (const Nothing) ma - forM_ (map fst ths) $ \wtid -> do + forM_ ths $ \(ManagedThread wtid _) -> do mtid <- deRefWeak wtid case mtid of Nothing -> return () @@ -113,7 +111,8 @@ forkManagedUnmask (Manager _timmgr var) label io = setup = do (wtid, n) <- myWeakThradId -- asking to throw KilledByHttp2ThreadManager to me - atomically $ modifyTVar var $ Map.insert n (wtid, ThreadWithoutTimeout) + let ent = ManagedThread wtid Nothing + atomically $ modifyTVar var $ Map.insert n ent return n clear n = atomically $ modifyTVar var $ Map.delete n ignore (KilledByHttp2ThreadManager _) = return () @@ -129,8 +128,9 @@ withTimeout :: Manager -> (T.Handle -> IO ()) -> IO () withTimeout (Manager timmgr var) action = T.withHandleKillThread timmgr (return ()) $ \th -> do (wtid, n) <- myWeakThradId - -- overriding ThreadWithoutTimeout - atomically $ modifyTVar var $ Map.insert n $ (wtid, ThreadWithTimeout th) + -- overriding Nothing to Just if already exist + let ent = ManagedThread wtid $ Just th + atomically $ modifyTVar var $ Map.insert n ent action th myWeakThradId :: IO (Weak ThreadId, Int) From 940e4f73bc5a0698889216b6bef65ba37556aa99 Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Tue, 26 Nov 2024 08:04:41 +0900 Subject: [PATCH 5/6] using modifyTVar' instead of modifyTVar. --- Network/HTTP2/H2/Manager.hs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Network/HTTP2/H2/Manager.hs b/Network/HTTP2/H2/Manager.hs index b5d1f9ba..f2523604 100644 --- a/Network/HTTP2/H2/Manager.hs +++ b/Network/HTTP2/H2/Manager.hs @@ -112,9 +112,9 @@ forkManagedUnmask (Manager _timmgr var) label io = (wtid, n) <- myWeakThradId -- asking to throw KilledByHttp2ThreadManager to me let ent = ManagedThread wtid Nothing - atomically $ modifyTVar var $ Map.insert n ent + atomically $ modifyTVar' var $ Map.insert n ent return n - clear n = atomically $ modifyTVar var $ Map.delete n + clear n = atomically $ modifyTVar' var $ Map.delete n ignore (KilledByHttp2ThreadManager _) = return () waitCounter0 :: Manager -> IO () @@ -130,7 +130,7 @@ withTimeout (Manager timmgr var) action = (wtid, n) <- myWeakThradId -- overriding Nothing to Just if already exist let ent = ManagedThread wtid $ Just th - atomically $ modifyTVar var $ Map.insert n ent + atomically $ modifyTVar' var $ Map.insert n ent action th myWeakThradId :: IO (Weak ThreadId, Int) From 3af78ae9699ad25a1cdda66161d790418e0e715e Mon Sep 17 00:00:00 2001 From: Kazu Yamamoto Date: Tue, 26 Nov 2024 09:40:31 +0900 Subject: [PATCH 6/6] providing forkManagedTimeout ensuring that only one asynchronous exception is thrown --- Network/HTTP2/H2/Manager.hs | 82 +++++++++++++++++++--------------- Network/HTTP2/Server/Worker.hs | 45 +++++++++---------- 2 files changed, 67 insertions(+), 60 deletions(-) diff --git a/Network/HTTP2/H2/Manager.hs b/Network/HTTP2/H2/Manager.hs index f2523604..959adf69 100644 --- a/Network/HTTP2/H2/Manager.hs +++ b/Network/HTTP2/H2/Manager.hs @@ -9,7 +9,7 @@ module Network.HTTP2.H2.Manager ( stopAfter, forkManaged, forkManagedUnmask, - withTimeout, + forkManagedTimeout, KilledByHttp2ThreadManager (..), waitCounter0, ) where @@ -19,6 +19,7 @@ import Control.Concurrent.STM import Control.Exception import qualified Control.Exception as E import Data.Foldable +import Data.IORef import Data.IntMap (IntMap) import qualified Data.IntMap.Strict as Map import System.Mem.Weak (Weak, deRefWeak) @@ -35,11 +36,11 @@ type ManagedThreads = IntMap ManagedThread ---------------------------------------------------------------- -data ManagedThread = ManagedThread (Weak ThreadId) (Maybe T.Handle) - -cancelTimeout :: ManagedThread -> IO () -cancelTimeout (ManagedThread _ (Just th)) = T.cancel th -cancelTimeout _ = return () +-- 'IORef' prevents race between WAI TimeManager (TimeoutThread) +-- and stopAfter (KilledByHttp2ThreadManager). +-- It is initialized with 'False' and turned into 'True' when locked. +-- The winner can throw an asynchronous exception. +data ManagedThread = ManagedThread (Weak ThreadId) (IORef Bool) ---------------------------------------------------------------- @@ -74,17 +75,9 @@ stopAfter (Manager _timmgr var) action cleanup = do writeTVar var Map.empty return m0 let ths = Map.elems m - -- Managed threads may receive 'TimeoutThread' and - -- 'KilledByHttp2ThreadManager'. Before throwing to - -- 'KilledByHttp2ThreadManager' to the tagets, - -- let's cancel 'TimeoutThread' to avoid race. - forM_ ths cancelTimeout - let er = either Just (const Nothing) ma - forM_ ths $ \(ManagedThread wtid _) -> do - mtid <- deRefWeak wtid - case mtid of - Nothing -> return () - Just tid -> E.throwTo tid $ KilledByHttp2ThreadManager er + er = either Just (const Nothing) ma + ex = KilledByHttp2ThreadManager er + forM_ ths $ \(ManagedThread wtid ref) -> lockAndKill wtid ref ex case ma of Left err -> cleanup (Just err) >> throwIO err Right a -> cleanup Nothing >> return a @@ -106,16 +99,42 @@ forkManagedUnmask forkManagedUnmask (Manager _timmgr var) label io = void $ mask_ $ forkIOWithUnmask $ \unmask -> E.handle ignore $ do labelMe label - E.bracket setup clear $ \_ -> io unmask - where - setup = do - (wtid, n) <- myWeakThradId - -- asking to throw KilledByHttp2ThreadManager to me - let ent = ManagedThread wtid Nothing - atomically $ modifyTVar' var $ Map.insert n ent - return n - clear n = atomically $ modifyTVar' var $ Map.delete n - ignore (KilledByHttp2ThreadManager _) = return () + E.bracket (setup var) (clear var) $ \_ -> io unmask + +forkManagedTimeout :: Manager -> String -> (T.Handle -> IO ()) -> IO () +forkManagedTimeout (Manager timmgr var) label io = + void $ forkIO $ E.handle ignore $ do + labelMe label + E.bracket (setup var) (clear var) $ \(_n, wtid, ref) -> + -- 'TimeoutThread' is ignored by 'withHandle'. + T.withHandle timmgr (lockAndKill wtid ref T.TimeoutThread) io + +setup :: TVar (IntMap ManagedThread) -> IO (Int, Weak ThreadId, IORef Bool) +setup var = do + (wtid, n) <- myWeakThradId + ref <- newIORef False + let ent = ManagedThread wtid ref + -- asking to throw KilledByHttp2ThreadManager to me + atomically $ modifyTVar' var $ Map.insert n ent + return (n, wtid, ref) + +lockAndKill :: Exception e => Weak ThreadId -> IORef Bool -> e -> IO () +lockAndKill wtid ref e = do + alreadyLocked <- atomicModifyIORef' ref (\b -> (True, b)) -- try to lock + unless alreadyLocked $ do + mtid <- deRefWeak wtid + case mtid of + Nothing -> return () + Just tid -> E.throwTo tid e + +clear + :: TVar (IntMap ManagedThread) + -> (Map.Key, Weak ThreadId, IORef Bool) + -> IO () +clear var (n, _, _) = atomically $ modifyTVar' var $ Map.delete n + +ignore :: KilledByHttp2ThreadManager -> IO () +ignore (KilledByHttp2ThreadManager _) = return () waitCounter0 :: Manager -> IO () waitCounter0 (Manager _timmgr var) = atomically $ do @@ -124,15 +143,6 @@ waitCounter0 (Manager _timmgr var) = atomically $ do ---------------------------------------------------------------- -withTimeout :: Manager -> (T.Handle -> IO ()) -> IO () -withTimeout (Manager timmgr var) action = - T.withHandleKillThread timmgr (return ()) $ \th -> do - (wtid, n) <- myWeakThradId - -- overriding Nothing to Just if already exist - let ent = ManagedThread wtid $ Just th - atomically $ modifyTVar' var $ Map.insert n ent - action th - myWeakThradId :: IO (Weak ThreadId, Int) myWeakThradId = do tid <- myThreadId diff --git a/Network/HTTP2/Server/Worker.hs b/Network/HTTP2/Server/Worker.hs index d18d7135..a3fd1425 100644 --- a/Network/HTTP2/Server/Worker.hs +++ b/Network/HTTP2/Server/Worker.hs @@ -23,15 +23,13 @@ import Network.HTTP2.H2 runServer :: Config -> Server -> Launch runServer conf server ctx@Context{..} strm req = - forkManaged threadManager label $ - withTimeout threadManager $ \th -> do - -- FIXME: exception - let req' = pauseRequestBody th - aux = Aux th mySockAddr peerSockAddr - request = Request req' - lc <- newLoopCheck strm Nothing - server request aux $ sendResponse conf ctx lc strm request - adjustRxWindow ctx strm + forkManagedTimeout threadManager label $ \th -> do + let req' = pauseRequestBody th + aux = Aux th mySockAddr peerSockAddr + request = Request req' + lc <- newLoopCheck strm Nothing + server request aux $ sendResponse conf ctx lc strm request + adjustRxWindow ctx strm where label = "H2 response sender for stream " ++ show (streamNumber strm) pauseRequestBody th = req{inpObjBody = readBody'} @@ -169,21 +167,20 @@ sendStreaming -> IO (TBQueue StreamingChunk) sendStreaming Context{..} strm strmbdy = do tbq <- newTBQueueIO 10 -- fixme: hard coding: 10 - forkManaged threadManager label $ - withTimeout threadManager $ \th -> - withOutBodyIface tbq id $ \iface -> do - let iface' = - iface - { outBodyPush = \b -> do - T.pause th - outBodyPush iface b - T.resume th - , outBodyPushFinal = \b -> do - T.pause th - outBodyPushFinal iface b - T.resume th - } - strmbdy iface' + forkManagedTimeout threadManager label $ \th -> + withOutBodyIface tbq id $ \iface -> do + let iface' = + iface + { outBodyPush = \b -> do + T.pause th + outBodyPush iface b + T.resume th + , outBodyPushFinal = \b -> do + T.pause th + outBodyPushFinal iface b + T.resume th + } + strmbdy iface' return tbq where label = "H2 response streaming sender for " ++ show (streamNumber strm)