Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port #2525 and #2531 to master #2548

Merged
merged 5 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 35 additions & 78 deletions ouroboros-consensus-test/src/Test/ThreadNet/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ module Test.ThreadNet.Network (
, plainTestNodeInitialization
, TracingConstraints
-- * Tracers
, MiniProtocolExpectedException (..)
, MiniProtocolFatalException (..)
, MiniProtocolState (..)
, TraceMiniProtocolRestart (..)
-- * Test Output
, TestOutput (..)
, NodeOutput (..)
Expand Down Expand Up @@ -65,13 +63,11 @@ import Ouroboros.Network.MockChain.Chain (Chain (Genesis))
import Ouroboros.Network.Point (WithOrigin (..))
import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS

import qualified Ouroboros.Network.BlockFetch.Client as BFClient
import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..))
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.Limits (waitForever)
import Ouroboros.Network.Protocol.TxSubmission.Type
import qualified Ouroboros.Network.TxSubmission.Inbound as TxInbound
import qualified Ouroboros.Network.TxSubmission.Outbound as TxOutbound

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.BlockchainTime
Expand All @@ -83,7 +79,6 @@ import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Mempool
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.Server as BFServer
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
import qualified Ouroboros.Consensus.Network.NodeToNode as NTN
import Ouroboros.Consensus.Node.NetworkProtocolVersion
Expand Down Expand Up @@ -1048,22 +1043,19 @@ data CodecError
-- | Cause for an edge to restart
--
data RestartCause
= RestartExn !MiniProtocolExpectedException
-- ^ restart due to an exception in one of the mini protocol instances
--
-- Edges only catch-and-restart on /expected/ exceptions; anything else
-- will tear down the whole hierarchy of test threads. See
-- 'MiniProtocolExpectedException'.
| RestartNode
-- ^ restart because at least one of the two nodes is 'VFalling'
= RestartScheduled
-- ^ restart because at least one of the two nodes set its status to
-- 'VFalling' because of a scheduled restart in 'tnaRestarts'
| RestartChainSyncTerminated
-- ^ restart because the ChainSync client terminated the mini protocol

-- | Fork two directed edges, one in each direction between the two vertices
--
forkBothEdges
:: (IOLike m, RunNode blk, HasCallStack)
=> ResourceRegistry m
-> OracularClock m
-> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException)
-> Tracer m (SlotNo, MiniProtocolState)
-> (NodeToNodeVersion, BlockNodeToNodeVersion blk)
-> (CodecConfig blk, CalcMessageDelay blk)
-> Map CoreNodeId (VertexStatusVar m blk)
Expand Down Expand Up @@ -1111,7 +1103,7 @@ forkBothEdges sharedRegistry clock tr version cfg vertexStatusVars (node1, node2
directedEdge ::
forall m blk. (IOLike m, RunNode blk)
=> ResourceRegistry m
-> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException)
-> Tracer m (SlotNo, MiniProtocolState)
-> (NodeToNodeVersion, BlockNodeToNodeVersion blk)
-> (CodecConfig blk, CalcMessageDelay blk)
-> OracularClock m
Expand All @@ -1124,18 +1116,17 @@ directedEdge registry tr version cfg clock edgeStatusVar client server =
where
loop = do
restart <- directedEdgeInner registry clock version cfg edgeStatusVar client server
`catch` (pure . RestartExn)
`catch` hUnexpected
atomically $ writeTVar edgeStatusVar EDown
case restart of
RestartNode -> pure ()
RestartExn e -> do
-- "error policy": restart at beginning of next slot
RestartScheduled -> pure ()
RestartChainSyncTerminated -> do
-- "error" policy: restart at beginning of next slot
s <- OracularClock.getCurrentSlot clock
let s' = succ s
traceWith tr (s, MiniProtocolDelayed, e)
traceWith tr (s, MiniProtocolDelayed)
void $ OracularClock.blockUntilSlot clock s'
traceWith tr (s', MiniProtocolRestarting, e)
traceWith tr (s', MiniProtocolRestarting)
loop
where
-- Wrap synchronous exceptions in 'MiniProtocolFatalException'
Expand Down Expand Up @@ -1178,51 +1169,55 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
let miniProtocol ::
String
-- ^ protocol name
-> (String -> a -> RestartCause)
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> Channel m msg
-> m ((), trailingBytes)
-> m (a, trailingBytes)
)
-- ^ client action to run on node1
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> Channel m msg
-> m ((), trailingBytes)
-> m (a, trailingBytes)
)
-- ^ server action to run on node2
-> (msg -> m ())
-> m (m (), m ())
miniProtocol proto client server middle = do
-> m (m RestartCause, m RestartCause)
miniProtocol proto ret client server middle = do
(chan, dualChan) <-
createConnectedChannelsWithDelay registry (node1, node2, proto) middle
pure
( fst <$> client app1 version (fromCoreNodeId node2) chan
, fst <$> server app2 version (fromCoreNodeId node1) dualChan
( (ret (proto <> ".client") . fst) <$> client app1 version (return Continue) (fromCoreNodeId node2) chan
, (ret (proto <> ".server") . fst) <$> server app2 version (return Continue) (fromCoreNodeId node1) dualChan
)

-- NB only 'watcher' ever returns in these tests
fmap (\() -> RestartNode) $
(>>= withAsyncsWaitAny) $
(>>= withAsyncsWaitAny) $
fmap flattenPairs $
sequence $
pure (watcher vertexStatusVar1, watcher vertexStatusVar2)
NE.:|
[ miniProtocol "ChainSync"
(wrapMPEE MPEEChainSyncClient NTN.aChainSyncClient)
(\_s () -> RestartChainSyncTerminated)
NTN.aChainSyncClient
NTN.aChainSyncServer
chainSyncMiddle
-- TODO do not swallow exceptions from these protocols
, miniProtocol "BlockFetch"
neverReturns
NTN.aBlockFetchClient
NTN.aBlockFetchServer
(\_ -> pure ())
, miniProtocol "TxSubmission"
neverReturns
NTN.aTxSubmissionClient
NTN.aTxSubmissionServer
(\_ -> pure ())
, miniProtocol "KeepAlive"
neverReturns
NTN.aKeepAliveClient
NTN.aKeepAliveServer
(\_ -> pure ())
Expand All @@ -1235,23 +1230,18 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
flattenPairs :: forall a. NE.NonEmpty (a, a) -> NE.NonEmpty a
flattenPairs = uncurry (<>) . NE.unzip

-- TODO only wrap actually expected exceptions
wrapMPEE ::
Exception e
=> (e -> MiniProtocolExpectedException)
-> (app -> version -> peer -> chan -> m a)
-> (app -> version -> peer -> chan -> m a)
wrapMPEE f m = \app ver them chan ->
catch (m app ver them chan) $ throwM . f
neverReturns :: String -> () -> void
neverReturns s () = error $ s <> " never returns!"

-- terminates when the vertex starts 'VFalling'
-- terminates (by returning, not via exception) when the vertex starts
-- 'VFalling'
--
-- because of 'withAsyncsWaitAny' used above, this brings down the whole
-- edge
watcher :: VertexStatusVar m blk -> m ()
watcher :: VertexStatusVar m blk -> m RestartCause
watcher v = do
atomically $ readTVar v >>= \case
VFalling -> pure ()
VFalling -> pure RestartScheduled
_ -> retry

-- introduce a delay for 'CS.MsgRollForward'
Expand Down Expand Up @@ -1570,43 +1560,10 @@ type LimitedApp' m peer blk =
Tracing
-------------------------------------------------------------------------------}

-- | Non-fatal exceptions expected from the threads of a 'directedEdge'
--
data MiniProtocolExpectedException
= MPEEChainSyncClient CSClient.ChainSyncClientException
-- ^ see "Ouroboros.Consensus.MiniProtocol.ChainSync.Client"
--
-- NOTE: the second type in 'ChainSyncClientException' denotes the 'tip'.
-- If it does not agree with the consensus client & server, 'Dynamic chain
-- generation' tests will fail, since they will not catch the right
-- exception.
| MPEEBlockFetchClient BFClient.BlockFetchProtocolFailure
-- ^ see "Ouroboros.Network.BlockFetch.Client"
| MPEEBlockFetchServer BFServer.BlockFetchServerException
-- ^ see "Ouroboros.Consensus.MiniProtocol.BlockFetch.Server"
| MPEETxSubmissionClient TxOutbound.TxSubmissionProtocolError
-- ^ see "Ouroboros.Network.TxSubmission.Outbound"
| MPEETxSubmissionServer TxInbound.TxSubmissionProtocolError
-- ^ see "Ouroboros.Network.TxSubmission.Inbound"
deriving (Show)

instance Exception MiniProtocolExpectedException

data MiniProtocolState = MiniProtocolDelayed | MiniProtocolRestarting
deriving (Show)

data TraceMiniProtocolRestart peer
= TraceMiniProtocolRestart
peer peer
SlotNo
MiniProtocolState
MiniProtocolExpectedException
-- ^ us them when-start-blocking state reason
deriving (Show)

-- | Any synchronous exception from a 'directedEdge' that was not handled as a
-- 'MiniProtocolExpectedException'
--
-- | Any synchronous exception from a 'directedEdge'
data MiniProtocolFatalException = MiniProtocolFatalException
{ mpfeType :: !Typeable.TypeRep
-- ^ Including the type explicitly makes it easier for a human to debug
Expand Down
Loading