Skip to content

Commit

Permalink
ChainSyncClient: respond to ControlMessageSTM
Browse files Browse the repository at this point in the history
  • Loading branch information
mrBliss committed Aug 28, 2020
1 parent 29896b1 commit 8be435e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ import Control.Monad.IOSim (runSimOrThrow)

import Cardano.Crypto.DSIGN.Mock

import Ouroboros.Network.Block (getTipPoint)

import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (getTipPoint)
import Ouroboros.Network.Channel
import Ouroboros.Network.Driver
import Ouroboros.Network.MockChain.Chain (Chain (Genesis))
import qualified Ouroboros.Network.MockChain.Chain as Chain
import Ouroboros.Network.MockChain.ProducerState (chainState,
initChainProducerState)
import qualified Ouroboros.Network.MockChain.ProducerState as CPS
import Ouroboros.Network.Mux (ControlMessage (..))
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
import Ouroboros.Network.Protocol.ChainSync.Codec (codecChainSyncId)
import Ouroboros.Network.Protocol.ChainSync.Examples
Expand Down Expand Up @@ -292,6 +292,7 @@ runChainSync securityParam (ClientUpdates clientUpdates)
nodeCfg
chainDbView
maxBound
(return Continue)

-- Set up the server
varChainProducerState <- uncheckedNewTVarM $ initChainProducerState Genesis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import Network.TypedProtocol.Pipelined
import Ouroboros.Network.AnchoredFragment (AnchoredFragment (..))
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (Tip, getTipBlockNo, getTipPoint)
import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision

Expand Down Expand Up @@ -342,6 +343,7 @@ chainSyncClient
-> TopLevelConfig blk
-> ChainDbView m blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> StrictTVar m (AnchoredFragment (Header blk))
-> Consensus ChainSyncClientPipelined blk m
chainSyncClient mkPipelineDecision0 tracer cfg
Expand All @@ -351,8 +353,9 @@ chainSyncClient mkPipelineDecision0 tracer cfg
, getOurTip
, getIsInvalidBlock
}
_version
varCandidate = ChainSyncClientPipelined $
_version
controlMessageSTM
varCandidate = ChainSyncClientPipelined $
continueWithState () $ initialise
where
-- | Start ChainSync by looking for an intersection between our current
Expand Down Expand Up @@ -516,29 +519,38 @@ chainSyncClient mkPipelineDecision0 tracer cfg
-- This is the main place we check whether our current chain has changed.
-- We also check it in 'rollForward' to make sure we have an up-to-date
-- intersection before calling 'getLedgerView'.
--
-- This is also the place where we checked whether we're asked to terminate
-- by the mux layer.
nextStep :: MkPipelineDecision
-> Nat n
-> Their (Tip blk)
-> Stateful m blk
(KnownIntersectionState blk)
(ClientPipelinedStIdle n)
nextStep mkPipelineDecision n theirTip = Stateful $ \kis -> do
mKis' <- atomically $ intersectsWithCurrentChain kis
case mKis' of
Just kis'@KnownIntersectionState { theirFrag } -> do
-- Our chain (tip) didn't change or if it did, it still intersects
-- with the candidate fragment, so we can continue requesting the
-- next block.
atomically $ writeTVar varCandidate theirFrag
let candTipBlockNo = AF.headBlockNo theirFrag
return $ requestNext kis' mkPipelineDecision n theirTip candTipBlockNo
Nothing ->
-- Our chain (tip) has changed and it no longer intersects with the
-- candidate fragment, so we have to find a new intersection, but
-- first drain the pipe.
continueWithState ()
$ drainThePipe n
$ findIntersection NoMoreIntersection
atomically controlMessageSTM >>= \case
-- We have been asked to terminate the client
Terminate ->
terminateAfterDrain n $ AskedToTerminate
_continue -> do
mKis' <- atomically $ intersectsWithCurrentChain kis
case mKis' of
Just kis'@KnownIntersectionState { theirFrag } -> do
-- Our chain (tip) didn't change or if it did, it still intersects
-- with the candidate fragment, so we can continue requesting the
-- next block.
atomically $ writeTVar varCandidate theirFrag
let candTipBlockNo = AF.headBlockNo theirFrag
return $
requestNext kis' mkPipelineDecision n theirTip candTipBlockNo
Nothing ->
-- Our chain (tip) has changed and it no longer intersects with
-- the candidate fragment, so we have to find a new intersection,
-- but first drain the pipe.
continueWithState ()
$ drainThePipe n
$ findIntersection NoMoreIntersection

-- | "Drain the pipe": collect and discard all in-flight responses and
-- finally execute the given action.
Expand Down Expand Up @@ -1015,6 +1027,9 @@ data ChainSyncClientResult =
(Our (Tip blk))
(Their (Tip blk))

-- | We were asked to terminate via the 'ControlMessageSTM'
| AskedToTerminate

deriving instance Show ChainSyncClientResult

instance Eq ChainSyncClientResult where
Expand Down Expand Up @@ -1042,6 +1057,9 @@ instance Eq ChainSyncClientResult where
Just Refl -> (a, b, c) == (a', b', c')
RolledBackPastIntersection{} == _ = False

AskedToTerminate == AskedToTerminate = True
AskedToTerminate == _ = False

{-------------------------------------------------------------------------------
Exception
-------------------------------------------------------------------------------}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ import Ouroboros.Consensus.Storage.ChainDB.Serialisation
data Handlers m peer blk = Handlers {
hChainSyncClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> StrictTVar m (AnchoredFragment (Header blk))
-> ChainSyncClientPipelined (Header blk) (Tip blk) m ChainSyncClientResult
-- TODO: we should consider either bundling these context parameters
Expand Down Expand Up @@ -421,7 +422,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
-> remotePeer
-> Channel m bCS
-> m ((), Maybe bCS)
aChainSyncClient version _controlMessageSTM them channel = do
aChainSyncClient version controlMessageSTM them channel = do
labelThisThread "ChainSyncClient"
-- Note that it is crucial that we sync with the fetch client "outside"
-- of registering the state for the sync client. This is needed to
Expand All @@ -444,7 +445,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
(timeLimitsChainSync chainSyncTimeout)
channel
$ chainSyncClientPeerPipelined
$ hChainSyncClient version varCandidate
$ hChainSyncClient version controlMessageSTM varCandidate
return ((), trailing)

aChainSyncServer
Expand Down

0 comments on commit 8be435e

Please sign in to comment.