Skip to content

Commit

Permalink
Merge #3532
Browse files Browse the repository at this point in the history
3532: Added prop_timeouts_enforced r=bolt12 a=bolt12



Co-authored-by: Armando Santos <armando@well-typed.com>
  • Loading branch information
iohk-bors[bot] and bolt12 authored Feb 3, 2022
2 parents 9a19754 + 826f2db commit a3eafb5
Show file tree
Hide file tree
Showing 6 changed files with 438 additions and 105 deletions.
54 changes: 21 additions & 33 deletions network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ module Network.Mux.Bearer.AttenuatedChannel
import Prelude hiding (read)

import Control.Monad (when)
import qualified Control.Monad.Class.MonadSTM as LazySTM
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadThrow
import Control.Monad.Class.MonadTime
Expand All @@ -29,9 +28,7 @@ import Control.Tracer (Tracer, traceWith)
import GHC.IO.Exception

import qualified Data.ByteString.Lazy as BL
import Data.Functor (($>))
import Data.Int (Int64)
import Data.Maybe (isJust)

import Network.Mux.Codec
import Network.Mux.Time
Expand Down Expand Up @@ -85,16 +82,23 @@ readQueueChannel QueueChannel { qcRead } =

writeQueueChannel :: MonadSTM m
=> QueueChannel m -> Message -> m Bool
writeQueueChannel QueueChannel { qcWrite } msg =
writeQueueChannel QueueChannel { qcWrite, qcRead } msg =
atomically $ do
mq <- readTVar qcWrite
-- Match SO_LINGER set with 0 interval, by not writing MsgClose
-- and closing this end without any waiting for any ack. It is
-- simulating a lost message, so if the receiver does not get the packet,
-- it will get an error the next time it tries to send a packet, closing
-- its end.
case mq of
Nothing -> return False
Just q -> writeTQueue q msg
>> case msg of
MsgClose -> writeTVar qcWrite Nothing
_ -> return ()
>> return True
Nothing -> do
writeTVar qcRead Nothing
return False
Just q -> do
case msg of
MsgClose -> writeTVar qcWrite Nothing
_ -> writeTQueue q msg
return True


newConnectedQueueChannelPair :: ( MonadSTM m
Expand Down Expand Up @@ -213,32 +217,17 @@ newAttenuatedChannel tr Attenuation { aReadAttenuation,
when (not sent) $
throwIO (resourceVanishedIOError "AttenuatedChannel.write" "")

-- closing is a 3-way handshake.
-- acClose simulates SO_LINGER TCP option with interval set to 0.
--
-- It is assumed that the MsgClose is lost, where in this case
-- we only close the local end. When the remote end gets
-- used it will be closed.
--
acClose :: m ()
acClose = do
-- send 'MsgClose' and close the underlying channel
sent <- writeQueueChannel qc MsgClose
traceWith tr (AttChannClosing sent)

-- await for a reply, unless the read channel is already closed.
--
-- TODO: switch to timeout once it's fixed.
d <- registerDelay 120
res <-
atomically $
(LazySTM.readTVar d >>= \b -> check b $> Nothing)
`orElse`
(fmap Just $ do
msg <- readTVar (qcRead qc)
>>= traverse readTQueue
case msg of
Nothing -> return ()
Just MsgClose -> return ()
-- some other message; let the appliction read it first.
Just _ -> retry)

traceWith tr (AttChannClosed (isJust res))
traceWith tr (AttChannLocalClose sent)


-- | Create a pair of connected 'AttenuatedChannel's.
Expand Down Expand Up @@ -315,8 +304,7 @@ attenuationChannelAsMuxBearer sduSize sduTimeout muxTracer chan =
--

data AttenuatedChannelTrace =
AttChannClosing Bool
| AttChannClosed Bool
AttChannLocalClose Bool
| AttChannRemoteClose
deriving Show

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ test-suite test
, quickcheck-instances
, tasty
, tasty-quickcheck
, these

, cardano-prelude
, contra-tracer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import Data.Proxy (Proxy (..))
import Data.Typeable (Typeable)
import GHC.Stack (CallStack, HasCallStack, callStack)

import Data.Map (Map, traverseWithKey)
import Data.Map (Map)
import qualified Data.Map as Map
import qualified Data.Set as Set

Expand Down Expand Up @@ -652,11 +652,18 @@ withConnectionManager ConnectionManagerArguments {
}

k connectionManager
`finally` do
-- Since this exception handler is blocking it might receive exceptions
-- during its execution, which we want to avoid, so we wrap it around
-- uninterruptibleMask_.
`finally` uninterruptibleMask_ (do
traceWith tracer TrShutdown

state <- atomically $ readTMVar stateVar
void $ traverseWithKey
-- Spawning one thread for each connection cleanup avoids spending time
-- waiting for locks and cleanup logic that could delay closing the
-- connections and making us not respecting certain timeouts.
asyncs <- Map.elems
<$> Map.traverseMaybeWithKey
(\peerAddr MutableConnState { connVar } -> do
-- cleanup handler for that thread will close socket associated
-- with the thread. We put each connection in 'TerminatedState' to
Expand All @@ -679,11 +686,20 @@ withConnectionManager ConnectionManagerArguments {

when shouldTrace $
traceWith trTracer tr
-- using 'cancel' here, since we want to block until connection
-- handler thread terminates.
traverse_ cancel (getConnThread connState)
)
state
-- using 'throwTo' here, since we want to block only until connection
-- handler thread receives an exception so as to not take up extra
-- time and making us go above timeout schedules.
traverse
(\thread -> do
throwTo (asyncThreadId thread) AsyncCancelled
pure thread
)
(getConnThread connState)
) state

atomically $ runLastToFinishM
$ foldMap (LastToFinishM . void <$> waitCatchSTM) asyncs
)
where
traceCounters :: StrictTMVar m (ConnectionManagerState peerAddr handle handleError version m) -> m ()
traceCounters stateVar = do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ data DataFlow
-- | Boolean like type which indicates if the timeout on 'OutboundStateDuplex'
-- has expired.
data TimeoutExpired = Expired | Ticking
deriving (Eq, Show)
deriving (Eq, Ord, Show)



Expand Down Expand Up @@ -650,7 +650,7 @@ data AbstractState
| WaitRemoteIdleSt
| TerminatingSt
| TerminatedSt
deriving (Eq, Show, Typeable)
deriving (Eq, Ord, Show, Typeable)


-- | Counters for tracing and analysis purposes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,13 @@ inboundGovernor trTracer tracer serverControlChannel inboundIdleTimeout
(\(e :: SomeException) -> do
state <- atomically $ readTVar st
_ <- Map.traverseWithKey
(\connId _ ->
(\connId _ -> do
-- Remove the connection from the state so
-- mkRemoteTransitionTrace can create the correct state
-- transition to Nothing value.
let state' = unregisterConnection connId state
traceWith trTracer
(mkRemoteTransitionTrace connId state emptyState)
(mkRemoteTransitionTrace connId state state')
)
(igsConnections state)
traceWith tracer (TrInboundGovernorError e)
Expand Down
Loading

0 comments on commit a3eafb5

Please sign in to comment.