Skip to content

Commit

Permalink
ChainSyncClient: respond to ControlMessageSTM
Browse files Browse the repository at this point in the history
  • Loading branch information
mrBliss authored and coot committed Aug 21, 2020
1 parent 28cb227 commit bbfc0b7
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import Ouroboros.Network.Point (WithOrigin (..))
import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS

import qualified Ouroboros.Network.BlockFetch.Client as BFClient
import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..))
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.Limits (waitForever)
Expand Down Expand Up @@ -1180,13 +1181,15 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
-- ^ protocol name
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> Channel m msg
-> m ((), trailingBytes)
)
-- ^ client action to run on node1
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> Channel m msg
-> m ((), trailingBytes)
Expand All @@ -1198,8 +1201,8 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
(chan, dualChan) <-
createConnectedChannelsWithDelay registry (node1, node2, proto) middle
pure
( fst <$> client app1 version (fromCoreNodeId node2) chan
, fst <$> server app2 version (fromCoreNodeId node1) dualChan
( fst <$> client app1 version (return Continue) (fromCoreNodeId node2) chan
, fst <$> server app2 version (return Continue) (fromCoreNodeId node1) dualChan
)

-- NB only 'watcher' ever returns in these tests
Expand Down Expand Up @@ -1239,10 +1242,10 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
wrapMPEE ::
Exception e
=> (e -> MiniProtocolExpectedException)
-> (app -> version -> peer -> chan -> m a)
-> (app -> version -> peer -> chan -> m a)
wrapMPEE f m = \app ver them chan ->
catch (m app ver them chan) $ throwM . f
-> (app -> version -> controlMsg -> peer -> chan -> m a)
-> (app -> version -> controlMsg -> peer -> chan -> m a)
wrapMPEE f m = \app ver controlMsg them chan ->
catch (m app ver controlMsg them chan) $ throwM . f

-- terminates when the vertex starts 'VFalling'
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import Network.TypedProtocol.Pipelined
import Ouroboros.Network.AnchoredFragment (AnchoredFragment (..))
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (Tip, getTipBlockNo, getTipPoint)
import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision

Expand Down Expand Up @@ -342,6 +343,7 @@ chainSyncClient
-> TopLevelConfig blk
-> ChainDbView m blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> StrictTVar m (AnchoredFragment (Header blk))
-> Consensus ChainSyncClientPipelined blk m
chainSyncClient mkPipelineDecision0 tracer cfg
Expand All @@ -351,8 +353,9 @@ chainSyncClient mkPipelineDecision0 tracer cfg
, getOurTip
, getIsInvalidBlock
}
_version
varCandidate = ChainSyncClientPipelined $
_version
controlMessageSTM
varCandidate = ChainSyncClientPipelined $
continueWithState () $ initialise
where
-- | Start ChainSync by looking for an intersection between our current
Expand Down Expand Up @@ -516,29 +519,41 @@ chainSyncClient mkPipelineDecision0 tracer cfg
-- This is the main place we check whether our current chain has changed.
-- We also check it in 'rollForward' to make sure we have an up-to-date
-- intersection before calling 'getLedgerView'.
--
-- This is also the place where we checked whether we're asked to terminate
-- by the mux layer.
nextStep :: MkPipelineDecision
-> Nat n
-> Their (Tip blk)
-> Stateful m blk
(KnownIntersectionState blk)
(ClientPipelinedStIdle n)
nextStep mkPipelineDecision n theirTip = Stateful $ \kis -> do
mKis' <- atomically $ intersectsWithCurrentChain kis
case mKis' of
Just kis'@KnownIntersectionState { theirFrag } -> do
-- Our chain (tip) didn't change or if it did, it still intersects
-- with the candidate fragment, so we can continue requesting the
-- next block.
atomically $ writeTVar varCandidate theirFrag
let candTipBlockNo = AF.headBlockNo theirFrag
return $ requestNext kis' mkPipelineDecision n theirTip candTipBlockNo
Nothing ->
-- Our chain (tip) has changed and it no longer intersects with the
-- candidate fragment, so we have to find a new intersection, but
-- first drain the pipe.
atomically controlMessageSTM >>= \case
-- We have been asked to terminate the client
Terminate ->
continueWithState ()
$ drainThePipe n
$ findIntersection NoMoreIntersection
$ Stateful $ const $ return $ terminate AskedToTerminate
-- Continue
_ -> do
mKis' <- atomically $ intersectsWithCurrentChain kis
case mKis' of
Just kis'@KnownIntersectionState { theirFrag } -> do
-- Our chain (tip) didn't change or if it did, it still intersects
-- with the candidate fragment, so we can continue requesting the
-- next block.
atomically $ writeTVar varCandidate theirFrag
let candTipBlockNo = AF.headBlockNo theirFrag
return $
requestNext kis' mkPipelineDecision n theirTip candTipBlockNo
Nothing ->
-- Our chain (tip) has changed and it no longer intersects with
-- the candidate fragment, so we have to find a new intersection,
-- but first drain the pipe.
continueWithState ()
$ drainThePipe n
$ findIntersection NoMoreIntersection

-- | "Drain the pipe": collect and discard all in-flight responses and
-- finally execute the given action.
Expand Down Expand Up @@ -1000,6 +1015,9 @@ data ChainSyncClientResult =
(Our (Tip blk))
(Their (Tip blk))

-- | We were asked to terminate via the 'ControlMessageSTM'
| AskedToTerminate

deriving instance Show ChainSyncClientResult

instance Eq ChainSyncClientResult where
Expand All @@ -1021,6 +1039,9 @@ instance Eq ChainSyncClientResult where
Just Refl -> (a, b) == (a', b')
NoMoreIntersection{} == _ = False

AskedToTerminate == AskedToTerminate = True
AskedToTerminate == _ = False

{-------------------------------------------------------------------------------
Exception
-------------------------------------------------------------------------------}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ import Ouroboros.Consensus.Storage.ChainDB.Serialisation
data Handlers m peer blk = Handlers {
hChainSyncClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> StrictTVar m (AnchoredFragment (Header blk))
-> ChainSyncClientPipelined (Header blk) (Tip blk) m ChainSyncClientResult
-- TODO: we should consider either bundling these context parameters
Expand Down Expand Up @@ -420,7 +421,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
-> remotePeer
-> Channel m bCS
-> m ((), Maybe bCS)
aChainSyncClient version _controlMessageSTM them channel = do
aChainSyncClient version controlMessageSTM them channel = do
labelThisThread "ChainSyncClient"
-- Note that it is crucial that we sync with the fetch client "outside"
-- of registering the state for the sync client. This is needed to
Expand All @@ -443,7 +444,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
(timeLimitsChainSync chainSyncTimeout)
channel
$ chainSyncClientPeerPipelined
$ hChainSyncClient version varCandidate
$ hChainSyncClient version controlMessageSTM varCandidate
return ((), trailing)

aChainSyncServer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ import Control.Monad.IOSim (runSimOrThrow)

import Cardano.Crypto.DSIGN.Mock

import Ouroboros.Network.Block (getTipPoint)

import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (getTipPoint)
import Ouroboros.Network.Channel
import Ouroboros.Network.Driver
import Ouroboros.Network.MockChain.Chain (Chain (Genesis))
import qualified Ouroboros.Network.MockChain.Chain as Chain
import Ouroboros.Network.MockChain.ProducerState (chainState,
initChainProducerState)
import qualified Ouroboros.Network.MockChain.ProducerState as CPS
import Ouroboros.Network.Mux (ControlMessage (..))
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
import Ouroboros.Network.Protocol.ChainSync.Codec (codecChainSyncId)
import Ouroboros.Network.Protocol.ChainSync.Examples
Expand Down Expand Up @@ -293,6 +293,7 @@ runChainSync securityParam (ClientUpdates clientUpdates)
nodeCfg
chainDbView
maxBound
(return Continue)

-- Set up the server
varChainProducerState <- uncheckedNewTVarM $ initChainProducerState Genesis
Expand Down

0 comments on commit bbfc0b7

Please sign in to comment.