From 049c98940b54880605fcf60cd2f8cf7f2d742ec7 Mon Sep 17 00:00:00 2001 From: Karl Knutsson Date: Wed, 21 Oct 2020 11:54:16 +0200 Subject: [PATCH] Handle exiting threads in diffusion layer If a thread spawned by diffusion exits for any reason we kill all other threads and return. This is done to ensure that the node isn't left running in a semi-functional state, for example with the client side working but the server has exited due to a configuration error. --- .../src/Ouroboros/Network/Diffusion.hs | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs index 79a11d62d13..947ec35fd1a 100644 --- a/ouroboros-network/src/Ouroboros/Network/Diffusion.hs +++ b/ouroboros-network/src/Ouroboros/Network/Diffusion.hs @@ -171,24 +171,32 @@ runDataDiffusion tracers Async.withAsync (cleanNetworkMutableState networkLocalState) $ \cleanLocalNetworkStateThread -> -- fork server for local clients - Async.withAsync (runLocalServer iocp networkLocalState) $ \_ -> + Async.withAsync (runLocalServer iocp networkLocalState) $ \localServerThread -> -- fork ip subscription - Async.withAsync (runIpSubscriptionWorker snocket networkState lias) $ \_ -> + Async.withAsync (runIpSubscriptionWorker snocket networkState lias) $ \ipSubThread -> -- fork dns subscriptions - withAsyncs (runDnsSubscriptionWorker snocket networkState lias <$> daDnsProducers) $ \_ -> + withAsyncs (runDnsSubscriptionWorker snocket networkState lias <$> daDnsProducers) $ \dnsSubThreads -> case daDiffusionMode of InitiatorAndResponderDiffusionMode -> -- fork servers for remote peers - withAsyncs (runServer snocket networkState . fmap Socket.addrAddress <$> addresses) $ \_ -> - -- If any other threads throws 'cleanNetowrkStateThread' and - -- 'cleanLocalNetworkStateThread' threads will will finish. - Async.waitEither_ cleanNetworkStateThread cleanLocalNetworkStateThread + withAsyncs (runServer snocket networkState . fmap Socket.addrAddress <$> addresses) $ \serverThreads -> do + void $ Async.waitAnyCancel $ + [ cleanNetworkStateThread + , cleanLocalNetworkStateThread + , localServerThread + , ipSubThread + ] ++ dnsSubThreads ++ serverThreads InitiatorOnlyDiffusionMode -> - Async.waitEither_ cleanNetworkStateThread cleanLocalNetworkStateThread + void $ Async.waitAnyCancel $ + [ cleanNetworkStateThread + , cleanLocalNetworkStateThread + , localServerThread + , ipSubThread + ] ++ dnsSubThreads where DiffusionTracers { dtIpSubscriptionTracer @@ -276,7 +284,7 @@ runDataDiffusion tracers runLocalServer :: IOManager -> NetworkMutableState LocalAddress - -> IO Void + -> IO () runLocalServer iocp networkLocalState = bracket ( @@ -308,7 +316,7 @@ runDataDiffusion tracers Snocket.bind sn sd $ Snocket.localAddressFromPath a Snocket.listen sn sd - NodeToClient.withServer + void $ NodeToClient.withServer sn (NetworkServerTracers dtMuxLocalTracer @@ -321,7 +329,7 @@ runDataDiffusion tracers localErrorPolicy ) - runServer :: SocketSnocket -> NetworkMutableState SockAddr -> Either Socket.Socket SockAddr -> IO Void + runServer :: SocketSnocket -> NetworkMutableState SockAddr -> Either Socket.Socket SockAddr -> IO () runServer sn networkState address = bracket ( @@ -338,7 +346,7 @@ runDataDiffusion tracers Snocket.bind sn sd a Snocket.listen sn sd - NodeToNode.withServer + void $ NodeToNode.withServer sn (NetworkServerTracers dtMuxTracer @@ -354,8 +362,8 @@ runDataDiffusion tracers runIpSubscriptionWorker :: SocketSnocket -> NetworkMutableState SockAddr -> LocalAddresses SockAddr - -> IO Void - runIpSubscriptionWorker sn networkState la = NodeToNode.ipSubscriptionWorker + -> IO () + runIpSubscriptionWorker sn networkState la = void $ NodeToNode.ipSubscriptionWorker sn (NetworkSubscriptionTracers dtMuxTracer @@ -375,8 +383,8 @@ runDataDiffusion tracers -> NetworkMutableState SockAddr -> LocalAddresses SockAddr -> DnsSubscriptionTarget - -> IO Void - runDnsSubscriptionWorker sn networkState la dnsProducer = NodeToNode.dnsSubscriptionWorker + -> IO () + runDnsSubscriptionWorker sn networkState la dnsProducer = void $ NodeToNode.dnsSubscriptionWorker sn (NetworkDNSSubscriptionTracers dtMuxTracer