From 8be435e5d57aa1f149facdd62e291bd8e020b401 Mon Sep 17 00:00:00 2001 From: Thomas Winant Date: Fri, 21 Aug 2020 11:42:34 +0200 Subject: [PATCH] ChainSyncClient: respond to ControlMessageSTM --- .../MiniProtocol/ChainSync/Client.hs | 5 +- .../MiniProtocol/ChainSync/Client.hs | 54 ++++++++++++------- .../Ouroboros/Consensus/Network/NodeToNode.hs | 5 +- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs index 699c87e3607..03dde3ce722 100644 --- a/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs @@ -27,10 +27,9 @@ import Control.Monad.IOSim (runSimOrThrow) import Cardano.Crypto.DSIGN.Mock -import Ouroboros.Network.Block (getTipPoint) - import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import qualified Ouroboros.Network.AnchoredFragment as AF +import Ouroboros.Network.Block (getTipPoint) import Ouroboros.Network.Channel import Ouroboros.Network.Driver import Ouroboros.Network.MockChain.Chain (Chain (Genesis)) @@ -38,6 +37,7 @@ import qualified Ouroboros.Network.MockChain.Chain as Chain import Ouroboros.Network.MockChain.ProducerState (chainState, initChainProducerState) import qualified Ouroboros.Network.MockChain.ProducerState as CPS +import Ouroboros.Network.Mux (ControlMessage (..)) import Ouroboros.Network.Protocol.ChainSync.ClientPipelined import Ouroboros.Network.Protocol.ChainSync.Codec (codecChainSyncId) import Ouroboros.Network.Protocol.ChainSync.Examples @@ -292,6 +292,7 @@ runChainSync securityParam (ClientUpdates clientUpdates) nodeCfg chainDbView maxBound + (return Continue) -- Set up the server varChainProducerState <- uncheckedNewTVarM $ initChainProducerState Genesis diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index 69436ae7bcb..b511d43a2a6 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -55,6 +55,7 @@ import Network.TypedProtocol.Pipelined import Ouroboros.Network.AnchoredFragment (AnchoredFragment (..)) import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.Block (Tip, getTipBlockNo, getTipPoint) +import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM) import Ouroboros.Network.Protocol.ChainSync.ClientPipelined import Ouroboros.Network.Protocol.ChainSync.PipelineDecision @@ -342,6 +343,7 @@ chainSyncClient -> TopLevelConfig blk -> ChainDbView m blk -> NodeToNodeVersion + -> ControlMessageSTM m -> StrictTVar m (AnchoredFragment (Header blk)) -> Consensus ChainSyncClientPipelined blk m chainSyncClient mkPipelineDecision0 tracer cfg @@ -351,8 +353,9 @@ chainSyncClient mkPipelineDecision0 tracer cfg , getOurTip , getIsInvalidBlock } - _version - varCandidate = ChainSyncClientPipelined $ + _version + controlMessageSTM + varCandidate = ChainSyncClientPipelined $ continueWithState () $ initialise where -- | Start ChainSync by looking for an intersection between our current @@ -516,6 +519,9 @@ chainSyncClient mkPipelineDecision0 tracer cfg -- This is the main place we check whether our current chain has changed. -- We also check it in 'rollForward' to make sure we have an up-to-date -- intersection before calling 'getLedgerView'. + -- + -- This is also the place where we checked whether we're asked to terminate + -- by the mux layer. nextStep :: MkPipelineDecision -> Nat n -> Their (Tip blk) @@ -523,22 +529,28 @@ chainSyncClient mkPipelineDecision0 tracer cfg (KnownIntersectionState blk) (ClientPipelinedStIdle n) nextStep mkPipelineDecision n theirTip = Stateful $ \kis -> do - mKis' <- atomically $ intersectsWithCurrentChain kis - case mKis' of - Just kis'@KnownIntersectionState { theirFrag } -> do - -- Our chain (tip) didn't change or if it did, it still intersects - -- with the candidate fragment, so we can continue requesting the - -- next block. - atomically $ writeTVar varCandidate theirFrag - let candTipBlockNo = AF.headBlockNo theirFrag - return $ requestNext kis' mkPipelineDecision n theirTip candTipBlockNo - Nothing -> - -- Our chain (tip) has changed and it no longer intersects with the - -- candidate fragment, so we have to find a new intersection, but - -- first drain the pipe. - continueWithState () - $ drainThePipe n - $ findIntersection NoMoreIntersection + atomically controlMessageSTM >>= \case + -- We have been asked to terminate the client + Terminate -> + terminateAfterDrain n $ AskedToTerminate + _continue -> do + mKis' <- atomically $ intersectsWithCurrentChain kis + case mKis' of + Just kis'@KnownIntersectionState { theirFrag } -> do + -- Our chain (tip) didn't change or if it did, it still intersects + -- with the candidate fragment, so we can continue requesting the + -- next block. + atomically $ writeTVar varCandidate theirFrag + let candTipBlockNo = AF.headBlockNo theirFrag + return $ + requestNext kis' mkPipelineDecision n theirTip candTipBlockNo + Nothing -> + -- Our chain (tip) has changed and it no longer intersects with + -- the candidate fragment, so we have to find a new intersection, + -- but first drain the pipe. + continueWithState () + $ drainThePipe n + $ findIntersection NoMoreIntersection -- | "Drain the pipe": collect and discard all in-flight responses and -- finally execute the given action. @@ -1015,6 +1027,9 @@ data ChainSyncClientResult = (Our (Tip blk)) (Their (Tip blk)) + -- | We were asked to terminate via the 'ControlMessageSTM' + | AskedToTerminate + deriving instance Show ChainSyncClientResult instance Eq ChainSyncClientResult where @@ -1042,6 +1057,9 @@ instance Eq ChainSyncClientResult where Just Refl -> (a, b, c) == (a', b', c') RolledBackPastIntersection{} == _ = False + AskedToTerminate == AskedToTerminate = True + AskedToTerminate == _ = False + {------------------------------------------------------------------------------- Exception -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs index 00045fef928..64c07fb8658 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs @@ -101,6 +101,7 @@ import Ouroboros.Consensus.Storage.ChainDB.Serialisation data Handlers m peer blk = Handlers { hChainSyncClient :: NodeToNodeVersion + -> ControlMessageSTM m -> StrictTVar m (AnchoredFragment (Header blk)) -> ChainSyncClientPipelined (Header blk) (Tip blk) m ChainSyncClientResult -- TODO: we should consider either bundling these context parameters @@ -421,7 +422,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = -> remotePeer -> Channel m bCS -> m ((), Maybe bCS) - aChainSyncClient version _controlMessageSTM them channel = do + aChainSyncClient version controlMessageSTM them channel = do labelThisThread "ChainSyncClient" -- Note that it is crucial that we sync with the fetch client "outside" -- of registering the state for the sync client. This is needed to @@ -444,7 +445,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = (timeLimitsChainSync chainSyncTimeout) channel $ chainSyncClientPeerPipelined - $ hChainSyncClient version varCandidate + $ hChainSyncClient version controlMessageSTM varCandidate return ((), trailing) aChainSyncServer