Skip to content

Commit

Permalink
Merge #2548
Browse files Browse the repository at this point in the history
2548: Port #2525 and #2531 to master r=mrBliss a=mrBliss

#2525 and #2531 were merged in the https://github.com/input-output-hk/ouroboros-network/tree/coot/connection-manager. However, that branch won't be merged for another few weeks. In the meantime, the merge conflicts with the changes made in those two PRs will accumulate. For example, I know that #2546 will cause a merge conflict.

Now that @coot has kindly merged #2541 (which those PRs depended on) in master, we can port #2525 and #2532 to master so we can avoid the upcoming merge conflicts. After this PR is merged, the `coot/connection-manager` branch should be rebased onto master.

Co-authored-by: Thomas Winant <thomas@well-typed.com>
Co-authored-by: Nicolas Frisby <nick.frisby@iohk.io>
  • Loading branch information
3 people authored Aug 28, 2020
2 parents 267c6c3 + 8be435e commit 9480613
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 292 deletions.
113 changes: 35 additions & 78 deletions ouroboros-consensus-test/src/Test/ThreadNet/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ module Test.ThreadNet.Network (
, plainTestNodeInitialization
, TracingConstraints
-- * Tracers
, MiniProtocolExpectedException (..)
, MiniProtocolFatalException (..)
, MiniProtocolState (..)
, TraceMiniProtocolRestart (..)
-- * Test Output
, TestOutput (..)
, NodeOutput (..)
Expand Down Expand Up @@ -65,13 +63,11 @@ import Ouroboros.Network.MockChain.Chain (Chain (Genesis))
import Ouroboros.Network.Point (WithOrigin (..))
import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS

import qualified Ouroboros.Network.BlockFetch.Client as BFClient
import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..))
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.Limits (waitForever)
import Ouroboros.Network.Protocol.TxSubmission.Type
import qualified Ouroboros.Network.TxSubmission.Inbound as TxInbound
import qualified Ouroboros.Network.TxSubmission.Outbound as TxOutbound

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.BlockchainTime
Expand All @@ -83,7 +79,6 @@ import Ouroboros.Consensus.Ledger.Inspect
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Ledger.SupportsProtocol
import Ouroboros.Consensus.Mempool
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.Server as BFServer
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient
import qualified Ouroboros.Consensus.Network.NodeToNode as NTN
import Ouroboros.Consensus.Node.NetworkProtocolVersion
Expand Down Expand Up @@ -1048,22 +1043,19 @@ data CodecError
-- | Cause for an edge to restart
--
data RestartCause
= RestartExn !MiniProtocolExpectedException
-- ^ restart due to an exception in one of the mini protocol instances
--
-- Edges only catch-and-restart on /expected/ exceptions; anything else
-- will tear down the whole hierarchy of test threads. See
-- 'MiniProtocolExpectedException'.
| RestartNode
-- ^ restart because at least one of the two nodes is 'VFalling'
= RestartScheduled
-- ^ restart because at least one of the two nodes set its status to
-- 'VFalling' because of a scheduled restart in 'tnaRestarts'
| RestartChainSyncTerminated
-- ^ restart because the ChainSync client terminated the mini protocol

-- | Fork two directed edges, one in each direction between the two vertices
--
forkBothEdges
:: (IOLike m, RunNode blk, HasCallStack)
=> ResourceRegistry m
-> OracularClock m
-> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException)
-> Tracer m (SlotNo, MiniProtocolState)
-> (NodeToNodeVersion, BlockNodeToNodeVersion blk)
-> (CodecConfig blk, CalcMessageDelay blk)
-> Map CoreNodeId (VertexStatusVar m blk)
Expand Down Expand Up @@ -1111,7 +1103,7 @@ forkBothEdges sharedRegistry clock tr version cfg vertexStatusVars (node1, node2
directedEdge ::
forall m blk. (IOLike m, RunNode blk)
=> ResourceRegistry m
-> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException)
-> Tracer m (SlotNo, MiniProtocolState)
-> (NodeToNodeVersion, BlockNodeToNodeVersion blk)
-> (CodecConfig blk, CalcMessageDelay blk)
-> OracularClock m
Expand All @@ -1124,18 +1116,17 @@ directedEdge registry tr version cfg clock edgeStatusVar client server =
where
loop = do
restart <- directedEdgeInner registry clock version cfg edgeStatusVar client server
`catch` (pure . RestartExn)
`catch` hUnexpected
atomically $ writeTVar edgeStatusVar EDown
case restart of
RestartNode -> pure ()
RestartExn e -> do
-- "error policy": restart at beginning of next slot
RestartScheduled -> pure ()
RestartChainSyncTerminated -> do
-- "error" policy: restart at beginning of next slot
s <- OracularClock.getCurrentSlot clock
let s' = succ s
traceWith tr (s, MiniProtocolDelayed, e)
traceWith tr (s, MiniProtocolDelayed)
void $ OracularClock.blockUntilSlot clock s'
traceWith tr (s', MiniProtocolRestarting, e)
traceWith tr (s', MiniProtocolRestarting)
loop
where
-- Wrap synchronous exceptions in 'MiniProtocolFatalException'
Expand Down Expand Up @@ -1178,51 +1169,55 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
let miniProtocol ::
String
-- ^ protocol name
-> (String -> a -> RestartCause)
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> Channel m msg
-> m ((), trailingBytes)
-> m (a, trailingBytes)
)
-- ^ client action to run on node1
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> Channel m msg
-> m ((), trailingBytes)
-> m (a, trailingBytes)
)
-- ^ server action to run on node2
-> (msg -> m ())
-> m (m (), m ())
miniProtocol proto client server middle = do
-> m (m RestartCause, m RestartCause)
miniProtocol proto ret client server middle = do
(chan, dualChan) <-
createConnectedChannelsWithDelay registry (node1, node2, proto) middle
pure
( fst <$> client app1 version (fromCoreNodeId node2) chan
, fst <$> server app2 version (fromCoreNodeId node1) dualChan
( (ret (proto <> ".client") . fst) <$> client app1 version (return Continue) (fromCoreNodeId node2) chan
, (ret (proto <> ".server") . fst) <$> server app2 version (return Continue) (fromCoreNodeId node1) dualChan
)

-- NB only 'watcher' ever returns in these tests
fmap (\() -> RestartNode) $
(>>= withAsyncsWaitAny) $
(>>= withAsyncsWaitAny) $
fmap flattenPairs $
sequence $
pure (watcher vertexStatusVar1, watcher vertexStatusVar2)
NE.:|
[ miniProtocol "ChainSync"
(wrapMPEE MPEEChainSyncClient NTN.aChainSyncClient)
(\_s () -> RestartChainSyncTerminated)
NTN.aChainSyncClient
NTN.aChainSyncServer
chainSyncMiddle
-- TODO do not swallow exceptions from these protocols
, miniProtocol "BlockFetch"
neverReturns
NTN.aBlockFetchClient
NTN.aBlockFetchServer
(\_ -> pure ())
, miniProtocol "TxSubmission"
neverReturns
NTN.aTxSubmissionClient
NTN.aTxSubmissionServer
(\_ -> pure ())
, miniProtocol "KeepAlive"
neverReturns
NTN.aKeepAliveClient
NTN.aKeepAliveServer
(\_ -> pure ())
Expand All @@ -1235,23 +1230,18 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
flattenPairs :: forall a. NE.NonEmpty (a, a) -> NE.NonEmpty a
flattenPairs = uncurry (<>) . NE.unzip

-- TODO only wrap actually expected exceptions
wrapMPEE ::
Exception e
=> (e -> MiniProtocolExpectedException)
-> (app -> version -> peer -> chan -> m a)
-> (app -> version -> peer -> chan -> m a)
wrapMPEE f m = \app ver them chan ->
catch (m app ver them chan) $ throwM . f
neverReturns :: String -> () -> void
neverReturns s () = error $ s <> " never returns!"

-- terminates when the vertex starts 'VFalling'
-- terminates (by returning, not via exception) when the vertex starts
-- 'VFalling'
--
-- because of 'withAsyncsWaitAny' used above, this brings down the whole
-- edge
watcher :: VertexStatusVar m blk -> m ()
watcher :: VertexStatusVar m blk -> m RestartCause
watcher v = do
atomically $ readTVar v >>= \case
VFalling -> pure ()
VFalling -> pure RestartScheduled
_ -> retry

-- introduce a delay for 'CS.MsgRollForward'
Expand Down Expand Up @@ -1570,43 +1560,10 @@ type LimitedApp' m peer blk =
Tracing
-------------------------------------------------------------------------------}

-- | Non-fatal exceptions expected from the threads of a 'directedEdge'
--
data MiniProtocolExpectedException
= MPEEChainSyncClient CSClient.ChainSyncClientException
-- ^ see "Ouroboros.Consensus.MiniProtocol.ChainSync.Client"
--
-- NOTE: the second type in 'ChainSyncClientException' denotes the 'tip'.
-- If it does not agree with the consensus client & server, 'Dynamic chain
-- generation' tests will fail, since they will not catch the right
-- exception.
| MPEEBlockFetchClient BFClient.BlockFetchProtocolFailure
-- ^ see "Ouroboros.Network.BlockFetch.Client"
| MPEEBlockFetchServer BFServer.BlockFetchServerException
-- ^ see "Ouroboros.Consensus.MiniProtocol.BlockFetch.Server"
| MPEETxSubmissionClient TxOutbound.TxSubmissionProtocolError
-- ^ see "Ouroboros.Network.TxSubmission.Outbound"
| MPEETxSubmissionServer TxInbound.TxSubmissionProtocolError
-- ^ see "Ouroboros.Network.TxSubmission.Inbound"
deriving (Show)

instance Exception MiniProtocolExpectedException

data MiniProtocolState = MiniProtocolDelayed | MiniProtocolRestarting
deriving (Show)

data TraceMiniProtocolRestart peer
= TraceMiniProtocolRestart
peer peer
SlotNo
MiniProtocolState
MiniProtocolExpectedException
-- ^ us them when-start-blocking state reason
deriving (Show)

-- | Any synchronous exception from a 'directedEdge' that was not handled as a
-- 'MiniProtocolExpectedException'
--
-- | Any synchronous exception from a 'directedEdge'
data MiniProtocolFatalException = MiniProtocolFatalException
{ mpfeType :: !Typeable.TypeRep
-- ^ Including the type explicitly makes it easier for a human to debug
Expand Down
Loading

0 comments on commit 9480613

Please sign in to comment.