Skip to content

Commit

Permalink
Pass ControlMessageSTM to the NodeToNode apps
Browse files Browse the repository at this point in the history
The control message isn't used yet.
  • Loading branch information
mrBliss committed Aug 28, 2020
1 parent 5452492 commit 29896b1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 21 deletions.
7 changes: 5 additions & 2 deletions ouroboros-consensus-test/src/Test/ThreadNet/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import Ouroboros.Network.MockChain.Chain (Chain (Genesis))
import Ouroboros.Network.Point (WithOrigin (..))
import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS

import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..))
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.Limits (waitForever)
Expand Down Expand Up @@ -1171,13 +1172,15 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
-> (String -> a -> RestartCause)
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> Channel m msg
-> m (a, trailingBytes)
)
-- ^ client action to run on node1
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> Channel m msg
-> m (a, trailingBytes)
Expand All @@ -1189,8 +1192,8 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
(chan, dualChan) <-
createConnectedChannelsWithDelay registry (node1, node2, proto) middle
pure
( (ret (proto <> ".client") . fst) <$> client app1 version (fromCoreNodeId node2) chan
, (ret (proto <> ".server") . fst) <$> server app2 version (fromCoreNodeId node1) dualChan
( (ret (proto <> ".client") . fst) <$> client app1 version (return Continue) (fromCoreNodeId node2) chan
, (ret (proto <> ".server") . fst) <$> server app2 version (return Continue) (fromCoreNodeId node1) dualChan
)

(>>= withAsyncsWaitAny) $
Expand Down
50 changes: 31 additions & 19 deletions ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,11 @@ showTracers tr = Tracers {

-- | A node-to-node application
type App m peer bytes a =
NodeToNodeVersion -> peer -> Channel m bytes -> m (a, Maybe bytes)
NodeToNodeVersion
-> ControlMessageSTM m
-> peer
-> Channel m bytes
-> m (a, Maybe bytes)

-- | Applications for the node-to-node protocols
--
Expand Down Expand Up @@ -413,10 +417,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =
where
aChainSyncClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bCS
-> m ((), Maybe bCS)
aChainSyncClient version them channel = do
aChainSyncClient version _controlMessageSTM them channel = do
labelThisThread "ChainSyncClient"
-- Note that it is crucial that we sync with the fetch client "outside"
-- of registering the state for the sync client. This is needed to
Expand Down Expand Up @@ -444,10 +449,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aChainSyncServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bCS
-> m ((), Maybe bCS)
aChainSyncServer version them channel = do
aChainSyncServer version _controlMessageSTM them channel = do
labelThisThread "ChainSyncServer"
withRegistry $ \registry -> do
chainSyncTimeout <- genChainSyncTimeout
Expand All @@ -462,10 +468,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aBlockFetchClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bBF
-> m ((), Maybe bBF)
aBlockFetchClient version them channel = do
aBlockFetchClient version _controlMessageSTM them channel = do
labelThisThread "BlockFetchClient"
bracketFetchClient (getFetchClientRegistry kernel) them $ \clientCtx ->
runPipelinedPeerWithLimits
Expand All @@ -478,10 +485,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aBlockFetchServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bBF
-> m ((), Maybe bBF)
aBlockFetchServer version them channel = do
aBlockFetchServer version _controlMessageSTM them channel = do
labelThisThread "BlockFetchServer"
withRegistry $ \registry ->
runPeerWithLimits
Expand All @@ -495,10 +503,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aTxSubmissionClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bTX
-> m ((), Maybe bTX)
aTxSubmissionClient version them channel = do
aTxSubmissionClient version _controlMessageSTM them channel = do
labelThisThread "TxSubmissionClient"
runPeerWithLimits
(contramap (TraceLabelPeer them) tTxSubmissionTracer)
Expand All @@ -510,10 +519,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aTxSubmissionServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bTX
-> m ((), Maybe bTX)
aTxSubmissionServer version them channel = do
aTxSubmissionServer version _controlMessageSTM them channel = do
labelThisThread "TxSubmissionServer"
runPipelinedPeerWithLimits
(contramap (TraceLabelPeer them) tTxSubmissionTracer)
Expand All @@ -525,10 +535,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aKeepAliveClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bKA
-> m ((), Maybe bKA)
aKeepAliveClient version them channel = do
aKeepAliveClient version _controlMessageSTM them channel = do
labelThisThread "KeepAliveClient"
let kacApp = case version of
-- Version 1 doesn't support keep alive protocol but Blockfetch
Expand All @@ -550,10 +561,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} =

aKeepAliveServer
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> Channel m bKA
-> m ((), Maybe bKA)
aKeepAliveServer _version _them channel = do
aKeepAliveServer _version _controlMessageSTM _them channel = do
labelThisThread "KeepAliveServer"
runPeerWithLimits
nullTracer
Expand Down Expand Up @@ -588,15 +600,15 @@ initiator miniProtocolParameters version Apps {..} =
-- p2p-governor & connection-manager. Then consenus can use peer's ip
-- address & port number, rather than 'ConnectionId' (which is
-- a quadruple uniquely determinaing a connection).
(\them _shouldStopSTM -> NodeToNodeProtocols {
(\them controlMessageSTM -> NodeToNodeProtocols {
chainSyncProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aChainSyncClient version them))),
(InitiatorProtocolOnly (MuxPeerRaw (aChainSyncClient version controlMessageSTM them))),
blockFetchProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aBlockFetchClient version them))),
(InitiatorProtocolOnly (MuxPeerRaw (aBlockFetchClient version controlMessageSTM them))),
txSubmissionProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aTxSubmissionClient version them))),
(InitiatorProtocolOnly (MuxPeerRaw (aTxSubmissionClient version controlMessageSTM them))),
keepAliveProtocol =
(InitiatorProtocolOnly (MuxPeerRaw (aKeepAliveClient version them)))
(InitiatorProtocolOnly (MuxPeerRaw (aKeepAliveClient version controlMessageSTM them)))
})
version

Expand All @@ -612,14 +624,14 @@ responder
responder miniProtocolParameters version Apps {..} =
nodeToNodeProtocols
miniProtocolParameters
(\them _shouldStopSTM -> NodeToNodeProtocols {
(\them controlMessageSTM -> NodeToNodeProtocols {
chainSyncProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aChainSyncServer version them))),
(ResponderProtocolOnly (MuxPeerRaw (aChainSyncServer version controlMessageSTM them))),
blockFetchProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aBlockFetchServer version them))),
(ResponderProtocolOnly (MuxPeerRaw (aBlockFetchServer version controlMessageSTM them))),
txSubmissionProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version them))),
(ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version controlMessageSTM them))),
keepAliveProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aKeepAliveServer version them)))
(ResponderProtocolOnly (MuxPeerRaw (aKeepAliveServer version controlMessageSTM them)))
})
version

0 comments on commit 29896b1

Please sign in to comment.