From bbfc0b787b9f017677be3b66eaf409d2f322b680 Mon Sep 17 00:00:00 2001 From: Thomas Winant Date: Fri, 21 Aug 2020 11:42:34 +0200 Subject: [PATCH] ChainSyncClient: respond to ControlMessageSTM --- .../src/Test/ThreadNet/Network.hs | 15 +++--- .../MiniProtocol/ChainSync/Client.hs | 53 +++++++++++++------ .../Ouroboros/Consensus/Network/NodeToNode.hs | 5 +- .../MiniProtocol/ChainSync/Client.hs | 5 +- 4 files changed, 52 insertions(+), 26 deletions(-) diff --git a/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/ThreadNet/Network.hs b/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/ThreadNet/Network.hs index 0fb24627d22..b897aec8781 100644 --- a/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus/ouroboros-consensus-test-infra/src/Test/ThreadNet/Network.hs @@ -67,6 +67,7 @@ import Ouroboros.Network.Point (WithOrigin (..)) import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS import qualified Ouroboros.Network.BlockFetch.Client as BFClient +import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM) import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..)) import Ouroboros.Network.Protocol.KeepAlive.Type import Ouroboros.Network.Protocol.Limits (waitForever) @@ -1180,6 +1181,7 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay) -- ^ protocol name -> ( LimitedApp' m NodeId blk -> NodeToNodeVersion + -> ControlMessageSTM m -> NodeId -> Channel m msg -> m ((), trailingBytes) @@ -1187,6 +1189,7 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay) -- ^ client action to run on node1 -> ( LimitedApp' m NodeId blk -> NodeToNodeVersion + -> ControlMessageSTM m -> NodeId -> Channel m msg -> m ((), trailingBytes) @@ -1198,8 +1201,8 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay) (chan, dualChan) <- createConnectedChannelsWithDelay registry (node1, node2, proto) middle pure - ( fst <$> client app1 version (fromCoreNodeId node2) chan - , fst <$> server app2 version (fromCoreNodeId node1) dualChan + ( fst <$> client app1 version (return Continue) (fromCoreNodeId node2) chan + , fst <$> server app2 version (return Continue) (fromCoreNodeId node1) dualChan ) -- NB only 'watcher' ever returns in these tests @@ -1239,10 +1242,10 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay) wrapMPEE :: Exception e => (e -> MiniProtocolExpectedException) - -> (app -> version -> peer -> chan -> m a) - -> (app -> version -> peer -> chan -> m a) - wrapMPEE f m = \app ver them chan -> - catch (m app ver them chan) $ throwM . f + -> (app -> version -> controlMsg -> peer -> chan -> m a) + -> (app -> version -> controlMsg -> peer -> chan -> m a) + wrapMPEE f m = \app ver controlMsg them chan -> + catch (m app ver controlMsg them chan) $ throwM . f -- terminates when the vertex starts 'VFalling' -- diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index dbab02dcd17..23c1d5ecc31 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -54,6 +54,7 @@ import Network.TypedProtocol.Pipelined import Ouroboros.Network.AnchoredFragment (AnchoredFragment (..)) import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.Block (Tip, getTipBlockNo, getTipPoint) +import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM) import Ouroboros.Network.Protocol.ChainSync.ClientPipelined import Ouroboros.Network.Protocol.ChainSync.PipelineDecision @@ -342,6 +343,7 @@ chainSyncClient -> TopLevelConfig blk -> ChainDbView m blk -> NodeToNodeVersion + -> ControlMessageSTM m -> StrictTVar m (AnchoredFragment (Header blk)) -> Consensus ChainSyncClientPipelined blk m chainSyncClient mkPipelineDecision0 tracer cfg @@ -351,8 +353,9 @@ chainSyncClient mkPipelineDecision0 tracer cfg , getOurTip , getIsInvalidBlock } - _version - varCandidate = ChainSyncClientPipelined $ + _version + controlMessageSTM + varCandidate = ChainSyncClientPipelined $ continueWithState () $ initialise where -- | Start ChainSync by looking for an intersection between our current @@ -516,6 +519,9 @@ chainSyncClient mkPipelineDecision0 tracer cfg -- This is the main place we check whether our current chain has changed. -- We also check it in 'rollForward' to make sure we have an up-to-date -- intersection before calling 'getLedgerView'. + -- + -- This is also the place where we checked whether we're asked to terminate + -- by the mux layer. nextStep :: MkPipelineDecision -> Nat n -> Their (Tip blk) @@ -523,22 +529,31 @@ chainSyncClient mkPipelineDecision0 tracer cfg (KnownIntersectionState blk) (ClientPipelinedStIdle n) nextStep mkPipelineDecision n theirTip = Stateful $ \kis -> do - mKis' <- atomically $ intersectsWithCurrentChain kis - case mKis' of - Just kis'@KnownIntersectionState { theirFrag } -> do - -- Our chain (tip) didn't change or if it did, it still intersects - -- with the candidate fragment, so we can continue requesting the - -- next block. - atomically $ writeTVar varCandidate theirFrag - let candTipBlockNo = AF.headBlockNo theirFrag - return $ requestNext kis' mkPipelineDecision n theirTip candTipBlockNo - Nothing -> - -- Our chain (tip) has changed and it no longer intersects with the - -- candidate fragment, so we have to find a new intersection, but - -- first drain the pipe. + atomically controlMessageSTM >>= \case + -- We have been asked to terminate the client + Terminate -> continueWithState () $ drainThePipe n - $ findIntersection NoMoreIntersection + $ Stateful $ const $ return $ terminate AskedToTerminate + -- Continue + _ -> do + mKis' <- atomically $ intersectsWithCurrentChain kis + case mKis' of + Just kis'@KnownIntersectionState { theirFrag } -> do + -- Our chain (tip) didn't change or if it did, it still intersects + -- with the candidate fragment, so we can continue requesting the + -- next block. + atomically $ writeTVar varCandidate theirFrag + let candTipBlockNo = AF.headBlockNo theirFrag + return $ + requestNext kis' mkPipelineDecision n theirTip candTipBlockNo + Nothing -> + -- Our chain (tip) has changed and it no longer intersects with + -- the candidate fragment, so we have to find a new intersection, + -- but first drain the pipe. + continueWithState () + $ drainThePipe n + $ findIntersection NoMoreIntersection -- | "Drain the pipe": collect and discard all in-flight responses and -- finally execute the given action. @@ -1000,6 +1015,9 @@ data ChainSyncClientResult = (Our (Tip blk)) (Their (Tip blk)) + -- | We were asked to terminate via the 'ControlMessageSTM' + | AskedToTerminate + deriving instance Show ChainSyncClientResult instance Eq ChainSyncClientResult where @@ -1021,6 +1039,9 @@ instance Eq ChainSyncClientResult where Just Refl -> (a, b) == (a', b') NoMoreIntersection{} == _ = False + AskedToTerminate == AskedToTerminate = True + AskedToTerminate == _ = False + {------------------------------------------------------------------------------- Exception -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs index f067462bfe7..aec5b78dc49 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs @@ -100,6 +100,7 @@ import Ouroboros.Consensus.Storage.ChainDB.Serialisation data Handlers m peer blk = Handlers { hChainSyncClient :: NodeToNodeVersion + -> ControlMessageSTM m -> StrictTVar m (AnchoredFragment (Header blk)) -> ChainSyncClientPipelined (Header blk) (Tip blk) m ChainSyncClientResult -- TODO: we should consider either bundling these context parameters @@ -420,7 +421,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = -> remotePeer -> Channel m bCS -> m ((), Maybe bCS) - aChainSyncClient version _controlMessageSTM them channel = do + aChainSyncClient version controlMessageSTM them channel = do labelThisThread "ChainSyncClient" -- Note that it is crucial that we sync with the fetch client "outside" -- of registering the state for the sync client. This is needed to @@ -443,7 +444,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = (timeLimitsChainSync chainSyncTimeout) channel $ chainSyncClientPeerPipelined - $ hChainSyncClient version varCandidate + $ hChainSyncClient version controlMessageSTM varCandidate return ((), trailing) aChainSyncServer diff --git a/ouroboros-consensus/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs index 6b52e56c84f..e36d159f519 100644 --- a/ouroboros-consensus/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs @@ -28,10 +28,9 @@ import Control.Monad.IOSim (runSimOrThrow) import Cardano.Crypto.DSIGN.Mock -import Ouroboros.Network.Block (getTipPoint) - import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import qualified Ouroboros.Network.AnchoredFragment as AF +import Ouroboros.Network.Block (getTipPoint) import Ouroboros.Network.Channel import Ouroboros.Network.Driver import Ouroboros.Network.MockChain.Chain (Chain (Genesis)) @@ -39,6 +38,7 @@ import qualified Ouroboros.Network.MockChain.Chain as Chain import Ouroboros.Network.MockChain.ProducerState (chainState, initChainProducerState) import qualified Ouroboros.Network.MockChain.ProducerState as CPS +import Ouroboros.Network.Mux (ControlMessage (..)) import Ouroboros.Network.Protocol.ChainSync.ClientPipelined import Ouroboros.Network.Protocol.ChainSync.Codec (codecChainSyncId) import Ouroboros.Network.Protocol.ChainSync.Examples @@ -293,6 +293,7 @@ runChainSync securityParam (ClientUpdates clientUpdates) nodeCfg chainDbView maxBound + (return Continue) -- Set up the server varChainProducerState <- uncheckedNewTVarM $ initChainProducerState Genesis