Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle exiting threads in diffusion layer #2696

Merged
merged 1 commit into from
Oct 23, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions ouroboros-network/src/Ouroboros/Network/Diffusion.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,24 +171,32 @@ runDataDiffusion tracers
Async.withAsync (cleanNetworkMutableState networkLocalState) $ \cleanLocalNetworkStateThread ->

-- fork server for local clients
Async.withAsync (runLocalServer iocp networkLocalState) $ \_ ->
Async.withAsync (runLocalServer iocp networkLocalState) $ \localServerThread ->

-- fork ip subscription
Async.withAsync (runIpSubscriptionWorker snocket networkState lias) $ \_ ->
Async.withAsync (runIpSubscriptionWorker snocket networkState lias) $ \ipSubThread ->

-- fork dns subscriptions
withAsyncs (runDnsSubscriptionWorker snocket networkState lias <$> daDnsProducers) $ \_ ->
withAsyncs (runDnsSubscriptionWorker snocket networkState lias <$> daDnsProducers) $ \dnsSubThreads ->

case daDiffusionMode of
InitiatorAndResponderDiffusionMode ->
-- fork servers for remote peers
withAsyncs (runServer snocket networkState . fmap Socket.addrAddress <$> addresses) $ \_ ->
-- If any other threads throws 'cleanNetowrkStateThread' and
-- 'cleanLocalNetworkStateThread' threads will will finish.
Async.waitEither_ cleanNetworkStateThread cleanLocalNetworkStateThread
withAsyncs (runServer snocket networkState . fmap Socket.addrAddress <$> addresses) $ \serverThreads -> do
void $ Async.waitAnyCancel $
[ cleanNetworkStateThread
, cleanLocalNetworkStateThread
, localServerThread
, ipSubThread
] ++ dnsSubThreads ++ serverThreads

InitiatorOnlyDiffusionMode ->
Async.waitEither_ cleanNetworkStateThread cleanLocalNetworkStateThread
void $ Async.waitAnyCancel $
[ cleanNetworkStateThread
, cleanLocalNetworkStateThread
, localServerThread
, ipSubThread
] ++ dnsSubThreads

where
DiffusionTracers { dtIpSubscriptionTracer
Expand Down Expand Up @@ -276,7 +284,7 @@ runDataDiffusion tracers

runLocalServer :: IOManager
-> NetworkMutableState LocalAddress
-> IO Void
-> IO ()
runLocalServer iocp networkLocalState =
bracket
(
Expand Down Expand Up @@ -308,7 +316,7 @@ runDataDiffusion tracers
Snocket.bind sn sd $ Snocket.localAddressFromPath a
Snocket.listen sn sd

NodeToClient.withServer
void $ NodeToClient.withServer
sn
(NetworkServerTracers
dtMuxLocalTracer
Expand All @@ -321,7 +329,7 @@ runDataDiffusion tracers
localErrorPolicy
)

runServer :: SocketSnocket -> NetworkMutableState SockAddr -> Either Socket.Socket SockAddr -> IO Void
runServer :: SocketSnocket -> NetworkMutableState SockAddr -> Either Socket.Socket SockAddr -> IO ()
runServer sn networkState address =
bracket
(
Expand All @@ -338,7 +346,7 @@ runDataDiffusion tracers
Snocket.bind sn sd a
Snocket.listen sn sd

NodeToNode.withServer
void $ NodeToNode.withServer
sn
(NetworkServerTracers
dtMuxTracer
Expand All @@ -354,8 +362,8 @@ runDataDiffusion tracers
runIpSubscriptionWorker :: SocketSnocket
-> NetworkMutableState SockAddr
-> LocalAddresses SockAddr
-> IO Void
runIpSubscriptionWorker sn networkState la = NodeToNode.ipSubscriptionWorker
-> IO ()
runIpSubscriptionWorker sn networkState la = void $ NodeToNode.ipSubscriptionWorker
sn
(NetworkSubscriptionTracers
dtMuxTracer
Expand All @@ -375,8 +383,8 @@ runDataDiffusion tracers
-> NetworkMutableState SockAddr
-> LocalAddresses SockAddr
-> DnsSubscriptionTarget
-> IO Void
runDnsSubscriptionWorker sn networkState la dnsProducer = NodeToNode.dnsSubscriptionWorker
-> IO ()
runDnsSubscriptionWorker sn networkState la dnsProducer = void $ NodeToNode.dnsSubscriptionWorker
sn
(NetworkDNSSubscriptionTracers
dtMuxTracer
Expand Down