Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChainSyncClient: respond to ControlMessageSTM #2531

Merged
merged 3 commits into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import Ouroboros.Network.Point (WithOrigin (..))
import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS

import qualified Ouroboros.Network.BlockFetch.Client as BFClient
import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..))
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.Limits (waitForever)
Expand Down Expand Up @@ -1180,13 +1181,15 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
-- ^ protocol name
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> Channel m msg
-> m ((), trailingBytes)
)
-- ^ client action to run on node1
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> Channel m msg
-> m ((), trailingBytes)
Expand All @@ -1198,8 +1201,8 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
(chan, dualChan) <-
createConnectedChannelsWithDelay registry (node1, node2, proto) middle
pure
( fst <$> client app1 version (fromCoreNodeId node2) chan
, fst <$> server app2 version (fromCoreNodeId node1) dualChan
( fst <$> client app1 version (return Continue) (fromCoreNodeId node2) chan
, fst <$> server app2 version (return Continue) (fromCoreNodeId node1) dualChan
)

-- NB only 'watcher' ever returns in these tests
Expand Down Expand Up @@ -1239,10 +1242,10 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
wrapMPEE ::
Exception e
=> (e -> MiniProtocolExpectedException)
-> (app -> version -> peer -> chan -> m a)
-> (app -> version -> peer -> chan -> m a)
wrapMPEE f m = \app ver them chan ->
catch (m app ver them chan) $ throwM . f
-> (app -> version -> controlMsg -> peer -> chan -> m a)
-> (app -> version -> controlMsg -> peer -> chan -> m a)
wrapMPEE f m = \app ver controlMsg them chan ->
catch (m app ver controlMsg them chan) $ throwM . f

-- terminates when the vertex starts 'VFalling'
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import Network.TypedProtocol.Pipelined
import Ouroboros.Network.AnchoredFragment (AnchoredFragment (..))
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (Tip, getTipBlockNo, getTipPoint)
import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision

Expand Down Expand Up @@ -342,6 +343,7 @@ chainSyncClient
-> TopLevelConfig blk
-> ChainDbView m blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> StrictTVar m (AnchoredFragment (Header blk))
-> Consensus ChainSyncClientPipelined blk m
chainSyncClient mkPipelineDecision0 tracer cfg
Expand All @@ -351,8 +353,9 @@ chainSyncClient mkPipelineDecision0 tracer cfg
, getOurTip
, getIsInvalidBlock
}
_version
varCandidate = ChainSyncClientPipelined $
_version
controlMessageSTM
varCandidate = ChainSyncClientPipelined $
continueWithState () $ initialise
where
-- | Start ChainSync by looking for an intersection between our current
Expand Down Expand Up @@ -516,29 +519,41 @@ chainSyncClient mkPipelineDecision0 tracer cfg
-- This is the main place we check whether our current chain has changed.
-- We also check it in 'rollForward' to make sure we have an up-to-date
-- intersection before calling 'getLedgerView'.
--
-- This is also the place where we checked whether we're asked to terminate
-- by the mux layer.
nextStep :: MkPipelineDecision
-> Nat n
-> Their (Tip blk)
-> Stateful m blk
(KnownIntersectionState blk)
(ClientPipelinedStIdle n)
nextStep mkPipelineDecision n theirTip = Stateful $ \kis -> do
mKis' <- atomically $ intersectsWithCurrentChain kis
case mKis' of
Just kis'@KnownIntersectionState { theirFrag } -> do
-- Our chain (tip) didn't change or if it did, it still intersects
-- with the candidate fragment, so we can continue requesting the
-- next block.
atomically $ writeTVar varCandidate theirFrag
let candTipBlockNo = AF.headBlockNo theirFrag
return $ requestNext kis' mkPipelineDecision n theirTip candTipBlockNo
Nothing ->
-- Our chain (tip) has changed and it no longer intersects with the
-- candidate fragment, so we have to find a new intersection, but
-- first drain the pipe.
atomically controlMessageSTM >>= \case
-- We have been asked to terminate the client
Terminate ->
continueWithState ()
$ drainThePipe n
$ findIntersection NoMoreIntersection
$ Stateful $ const $ return $ terminate AskedToTerminate
-- Continue
_ -> do
mKis' <- atomically $ intersectsWithCurrentChain kis
case mKis' of
Just kis'@KnownIntersectionState { theirFrag } -> do
-- Our chain (tip) didn't change or if it did, it still intersects
-- with the candidate fragment, so we can continue requesting the
-- next block.
atomically $ writeTVar varCandidate theirFrag
let candTipBlockNo = AF.headBlockNo theirFrag
return $
requestNext kis' mkPipelineDecision n theirTip candTipBlockNo
Nothing ->
-- Our chain (tip) has changed and it no longer intersects with
-- the candidate fragment, so we have to find a new intersection,
-- but first drain the pipe.
continueWithState ()
$ drainThePipe n
$ findIntersection NoMoreIntersection

-- | "Drain the pipe": collect and discard all in-flight responses and
-- finally execute the given action.
Expand Down Expand Up @@ -1000,6 +1015,9 @@ data ChainSyncClientResult =
(Our (Tip blk))
(Their (Tip blk))

-- | We were asked to terminate via the 'ControlMessageSTM'
| AskedToTerminate

deriving instance Show ChainSyncClientResult

instance Eq ChainSyncClientResult where
Expand All @@ -1021,6 +1039,9 @@ instance Eq ChainSyncClientResult where
Just Refl -> (a, b) == (a', b')
NoMoreIntersection{} == _ = False

AskedToTerminate == AskedToTerminate = True
AskedToTerminate == _ = False

{-------------------------------------------------------------------------------
Exception
-------------------------------------------------------------------------------}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ module Ouroboros.Consensus.Network.NodeToClient (
, nullTracers
, showTracers
-- * Applications
, App
, Apps (..)
, mkApps
-- ** Projections
Expand Down Expand Up @@ -297,18 +298,21 @@ showTracers tr = Tracers {
Applications
-------------------------------------------------------------------------------}

-- | A node-to-client application
type App m peer bytes a = peer -> Channel m bytes -> m (a, Maybe bytes)

-- | Applications for the node-to-client (i.e., local) protocols
--
-- See 'Network.Mux.Types.MuxApplication'
data Apps m peer bCS bTX bSQ a = Apps {
-- | Start a local chain sync server.
aChainSyncServer :: peer -> Channel m bCS -> m (a, Maybe bCS)
aChainSyncServer :: App m peer bCS a

-- | Start a local transaction submission server.
, aTxSubmissionServer :: peer -> Channel m bTX -> m (a, Maybe bTX)
, aTxSubmissionServer :: App m peer bTX a

-- | Start a local state query server.
, aStateQueryServer :: peer -> Channel m bSQ -> m (a, Maybe bSQ)
, aStateQueryServer :: App m peer bSQ a
}

-- | Construct the 'NetworkApplication' for the node-to-client protocols
Expand Down
70 changes: 44 additions & 26 deletions ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module Ouroboros.Consensus.Network.NodeToNode (
, nullTracers
, showTracers
-- * Applications
, App
, Apps (..)
, mkApps
-- ** 'OuroborosBundle'
Expand Down Expand Up @@ -99,6 +100,7 @@ import Ouroboros.Consensus.Storage.ChainDB.Serialisation
data Handlers m peer blk = Handlers {
hChainSyncClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> StrictTVar m (AnchoredFragment (Header blk))
-> ChainSyncClientPipelined (Header blk) (Tip blk) m ChainSyncClientResult
-- TODO: we should consider either bundling these context parameters
Expand Down Expand Up @@ -351,36 +353,44 @@ showTracers tr = Tracers {
Applications
-------------------------------------------------------------------------------}

-- | A node-to-node application
type App m peer bytes a =
NodeToNodeVersion
-> ControlMessageSTM m
-> peer
-> Channel m bytes
-> m (a, Maybe bytes)

-- | Applications for the node-to-node protocols
--
-- See 'Network.Mux.Types.MuxApplication'
data Apps m peer bCS bBF bTX bKA a = Apps {
-- | Start a chain sync client that communicates with the given upstream
-- node.
aChainSyncClient :: NodeToNodeVersion -> peer -> Channel m bCS -> m (a, Maybe bCS)
aChainSyncClient :: App m peer bCS a

-- | Start a chain sync server.
, aChainSyncServer :: NodeToNodeVersion -> peer -> Channel m bCS -> m (a, Maybe bCS)
, aChainSyncServer :: App m peer bCS a

-- | Start a block fetch client that communicates with the given
-- upstream node.
, aBlockFetchClient :: NodeToNodeVersion -> peer -> Channel m bBF -> m (a, Maybe bBF)
, aBlockFetchClient :: App m peer bBF a

-- | Start a block fetch server.
, aBlockFetchServer :: NodeToNodeVersion -> peer -> Channel m bBF -> m (a, Maybe bBF)
, aBlockFetchServer :: App m peer bBF a

-- | Start a transaction submission client that communicates with the
-- given upstream node.
, aTxSubmissionClient :: NodeToNodeVersion -> peer -> Channel m bTX -> m (a, Maybe bTX)
, aTxSubmissionClient :: App m peer bTX a

-- | Start a transaction submission server.
, aTxSubmissionServer :: NodeToNodeVersion -> peer -> Channel m bTX -> m (a, Maybe bTX)
, aTxSubmissionServer :: App m peer bTX a

-- | Start a keep-alive client.
, aKeepAliveClient :: NodeToNodeVersion -> peer -> Channel m bKA -> m (a, Maybe bKA)
, aKeepAliveClient :: App m peer bKA a

-- | Start a keep-alive server.
, aKeepAliveServer :: NodeToNodeVersion -> peer -> Channel m bKA -> m (a, Maybe bKA)
, aKeepAliveServer :: App m peer bKA a
}

-- | Construct the 'NetworkApplication' for the node-to-node protocols
Expand All @@ -407,10 +417,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
where
aChainSyncClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bCS
-> m ((), Maybe bCS)
aChainSyncClient version them channel = do
aChainSyncClient version controlMessageSTM them channel = do
labelThisThread "ChainSyncClient"
-- Note that it is crucial that we sync with the fetch client "outside"
-- of registering the state for the sync client. This is needed to
Expand All @@ -433,15 +444,16 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
(timeLimitsChainSync chainSyncTimeout)
channel
$ chainSyncClientPeerPipelined
$ hChainSyncClient version varCandidate
$ hChainSyncClient version controlMessageSTM varCandidate
return ((), trailing)

aChainSyncServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bCS
-> m ((), Maybe bCS)
aChainSyncServer version them channel = do
aChainSyncServer version _controlMessageSTM them channel = do
labelThisThread "ChainSyncServer"
withRegistry $ \registry -> do
chainSyncTimeout <- genChainSyncTimeout
Expand All @@ -456,10 +468,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aBlockFetchClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bBF
-> m ((), Maybe bBF)
aBlockFetchClient version them channel = do
aBlockFetchClient version _controlMessageSTM them channel = do
labelThisThread "BlockFetchClient"
bracketFetchClient (getFetchClientRegistry kernel) them $ \clientCtx ->
runPipelinedPeerWithLimits
Expand All @@ -472,10 +485,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aBlockFetchServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bBF
-> m ((), Maybe bBF)
aBlockFetchServer version them channel = do
aBlockFetchServer version _controlMessageSTM them channel = do
labelThisThread "BlockFetchServer"
withRegistry $ \registry ->
runPeerWithLimits
Expand All @@ -489,10 +503,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aTxSubmissionClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bTX
-> m ((), Maybe bTX)
aTxSubmissionClient version them channel = do
aTxSubmissionClient version _controlMessageSTM them channel = do
labelThisThread "TxSubmissionClient"
runPeerWithLimits
(contramap (TraceLabelPeer them) tTxSubmissionTracer)
Expand All @@ -504,10 +519,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aTxSubmissionServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bTX
-> m ((), Maybe bTX)
aTxSubmissionServer version them channel = do
aTxSubmissionServer version _controlMessageSTM them channel = do
labelThisThread "TxSubmissionServer"
runPipelinedPeerWithLimits
(contramap (TraceLabelPeer them) tTxSubmissionTracer)
Expand All @@ -519,10 +535,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aKeepAliveClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bKA
-> m ((), Maybe bKA)
aKeepAliveClient version them channel = do
aKeepAliveClient version _controlMessageSTM them channel = do
labelThisThread "KeepAliveClient"
let kacApp = case version of
-- Version 1 doesn't support keep alive protocol but Blockfetch
Expand All @@ -544,10 +561,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aKeepAliveServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bKA
-> m ((), Maybe bKA)
aKeepAliveServer _version _them channel = do
aKeepAliveServer _version _controlMessageSTM _them channel = do
labelThisThread "KeepAliveServer"
runPeerWithLimits
nullTracer
Expand Down Expand Up @@ -581,22 +599,22 @@ initiatorAndResponder miniProtocolParameters version Apps {..} =
-- p2p-governor & connection-manager. Then consenus can use peer's ip
-- address & port number, rather than 'ConnectionId' (which is
-- a quadruple uniquely determinaing a connection).
(\them _controlMessageSTM -> NodeToNodeProtocols {
(\them controlMessageSTM -> NodeToNodeProtocols {
chainSyncProtocol =
(InitiatorAndResponderProtocol
(MuxPeerRaw (aChainSyncClient version them))
(MuxPeerRaw (aChainSyncServer version them))),
(MuxPeerRaw (aChainSyncClient version controlMessageSTM them))
(MuxPeerRaw (aChainSyncServer version controlMessageSTM them))),
blockFetchProtocol =
(InitiatorAndResponderProtocol
(MuxPeerRaw (aBlockFetchClient version them))
(MuxPeerRaw (aBlockFetchServer version them))),
(MuxPeerRaw (aBlockFetchClient version controlMessageSTM them))
(MuxPeerRaw (aBlockFetchServer version controlMessageSTM them))),
txSubmissionProtocol =
(InitiatorAndResponderProtocol
(MuxPeerRaw (aTxSubmissionClient version them))
(MuxPeerRaw (aTxSubmissionServer version them))),
(MuxPeerRaw (aTxSubmissionClient version controlMessageSTM them))
(MuxPeerRaw (aTxSubmissionServer version controlMessageSTM them))),
keepAliveProtocol =
(InitiatorAndResponderProtocol
(MuxPeerRaw (aKeepAliveClient version them))
(MuxPeerRaw (aKeepAliveServer version them)))
(MuxPeerRaw (aKeepAliveClient version controlMessageSTM them))
(MuxPeerRaw (aKeepAliveServer version controlMessageSTM them)))
})
version
Loading