Skip to content

Commit

Permalink
BlockFetch pipelining, KeepAlive protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
amesgen committed Jul 9, 2024
1 parent ba4ecb7 commit a381b12
Showing 1 changed file with 79 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Cardano.Tools.N2NPG.Run (

import qualified Cardano.Tools.DBAnalyser.Block.Cardano as Cardano
import Cardano.Tools.DBAnalyser.HasAnalysis (mkProtocolInfo)
import Control.Monad (when)
import Control.Monad.Class.MonadSay (MonadSay (..))
import Control.Monad.Cont
import Control.Monad.Trans (MonadTrans (..))
Expand All @@ -26,9 +27,9 @@ import Data.Traversable (for)
import Data.Void (Void)
import Data.Word (Word64)
import qualified Network.Socket as Socket
import Network.TypedProtocol (PeerHasAgency (..), PeerPipelined (..),
PeerRole (..), PeerSender (..))
import Network.TypedProtocol.Pipelined (N (..))
import Network.TypedProtocol (N (..), Nat (..), PeerHasAgency (..),
PeerPipelined (..), PeerReceiver (..), PeerRole (..),
PeerSender (..), natToInt)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
import Ouroboros.Consensus.Config.SupportsNode
Expand All @@ -47,18 +48,22 @@ import Ouroboros.Consensus.Util.Args
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Network.Block (Tip)
import Ouroboros.Network.Diffusion.Configuration (PeerSharing (..))
import Ouroboros.Network.Diffusion.Configuration
(MiniProtocolParameters (..), PeerSharing (..),
defaultMiniProtocolParameters)
import Ouroboros.Network.IOManager (withIOManager)
import Ouroboros.Network.Magic (NetworkMagic)
import Ouroboros.Network.Mux (MiniProtocol (..),
MiniProtocolLimits (..), MuxMode (..),
import Ouroboros.Network.Mux (MiniProtocol (..), MuxMode (..),
OuroborosApplication (..),
OuroborosApplicationWithMinimalCtx, RunMiniProtocol (..),
mkMiniProtocolCbFromPeer,
mkMiniProtocolCbFromPeerPipelined)
import Ouroboros.Network.NodeToNode (DiffusionMode (..),
NetworkConnectTracers (..), NodeToNodeVersionData,
Versions (..), blockFetchMiniProtocolNum,
chainSyncMiniProtocolNum, connectTo)
blockFetchProtocolLimits, chainSyncMiniProtocolNum,
chainSyncProtocolLimits, connectTo,
keepAliveMiniProtocolNum, keepAliveProtocolLimits)
import Ouroboros.Network.PeerSelection.PeerSharing.Codec
(decodeRemoteAddress, encodeRemoteAddress)
import Ouroboros.Network.Protocol.BlockFetch.Type (BlockFetch,
Expand All @@ -67,6 +72,10 @@ import qualified Ouroboros.Network.Protocol.BlockFetch.Type as BlockFetch
import Ouroboros.Network.Protocol.ChainSync.Type (ChainSync)
import qualified Ouroboros.Network.Protocol.ChainSync.Type as ChainSync
import Ouroboros.Network.Protocol.Handshake.Version (Version (..))
import Ouroboros.Network.Protocol.KeepAlive.Client
(KeepAliveClient (..), KeepAliveClientSt (..),
keepAliveClientPeer)
import Ouroboros.Network.Protocol.KeepAlive.Type (Cookie (..))
import Ouroboros.Network.Snocket (SocketSnocket)
import qualified Ouroboros.Network.Snocket as Snocket
import System.FS.API (SomeHasFS (..))
Expand Down Expand Up @@ -143,7 +152,7 @@ TODOs if we want this as a proper tool:
- Think more about tracing/logging (no MonadSay)
- Proper exceptions instead of MonadFail
- protocol pipelining?
- ChainSync protocol pipelining?
- handle rollbacks in a way other than ignoring?
- BlockFetch batching?
Expand Down Expand Up @@ -227,31 +236,56 @@ simpleBlockFetch ::
-- ^ Invoked when we are done.
-> PeerPipelined (BlockFetch' blk) AsClient BlockFetch.BFIdle m ()
simpleBlockFetch getNextPt signalDone =
PeerPipelined go
PeerPipelined $ go Zero
where
go :: forall c. PeerSender (BlockFetch' blk) AsClient BlockFetch.BFIdle Z c m ()
go = SenderEffect $ getNextPt <&> \case
Nothing ->
SenderYield (ClientAgency BlockFetch.TokIdle) BlockFetch.MsgClientDone
$ SenderEffect $ do
signalDone
pure $ SenderDone BlockFetch.TokDone ()
go ::
Nat n
-> PeerSender (BlockFetch' blk) AsClient BlockFetch.BFIdle n () m ()
go outstanding = SenderEffect $ getNextPt <&> \case
Nothing -> drain outstanding
Just pt ->
SenderYield (ClientAgency BlockFetch.TokIdle)
SenderPipeline (ClientAgency BlockFetch.TokIdle)
(BlockFetch.MsgRequestRange (ChainRange pt pt))
$ SenderAwait (ServerAgency BlockFetch.TokBusy) $ \case
BlockFetch.MsgNoBlocks -> SenderEffect $
fail $ "Server doesn't have block corresponding to header: " <> show pt
BlockFetch.MsgStartBatch ->
SenderAwait (ServerAgency BlockFetch.TokStreaming) $ \case
BlockFetch.MsgBatchDone -> SenderEffect $
fail "Server sent an empty batch"
BlockFetch.MsgBlock blk -> SenderEffect $ do
say $ "Received block " <> show (blockPoint blk)
pure $ SenderAwait (ServerAgency BlockFetch.TokStreaming) $ \case
BlockFetch.MsgBlock {} -> SenderEffect $
fail "Server sent too many blocks in a batch"
BlockFetch.MsgBatchDone -> go
(receiver pt)
$ SenderCollect continue
$ \() -> go outstanding
where
continue
| natToInt outstanding >= pipeliningDepth = Nothing
| otherwise = Just (go (Succ outstanding))

pipeliningDepth =
fromIntegral $ blockFetchPipeliningMax defaultMiniProtocolParameters

receiver ::
Point blk
-> PeerReceiver (BlockFetch' blk) AsClient BlockFetch.BFBusy BlockFetch.BFIdle m ()
receiver pt = ReceiverAwait (ServerAgency BlockFetch.TokBusy) $ \case
BlockFetch.MsgNoBlocks -> ReceiverEffect $
fail $ "Server doesn't have block corresponding to header: " <> show pt
BlockFetch.MsgStartBatch ->
ReceiverAwait (ServerAgency BlockFetch.TokStreaming) $ \case
BlockFetch.MsgBatchDone -> ReceiverEffect $
fail "Server sent an empty batch"
BlockFetch.MsgBlock blk -> ReceiverEffect $ do
when (blockPoint blk /= pt) $
fail $ "Server sent incorrect block: expected " <> show pt
<> ", got " <> show (blockPoint blk)
say $ "Received block " <> show (blockPoint blk)
pure $ ReceiverAwait (ServerAgency BlockFetch.TokStreaming) $ \case
BlockFetch.MsgBlock {} -> ReceiverEffect $
fail "Server sent too many blocks in a batch"
BlockFetch.MsgBatchDone -> ReceiverDone ()

drain ::
Nat n
-> PeerSender (BlockFetch' blk) AsClient BlockFetch.BFIdle n () m ()
drain = \case
Zero -> SenderYield (ClientAgency BlockFetch.TokIdle) BlockFetch.MsgClientDone
$ SenderEffect $ do
signalDone
pure $ SenderDone BlockFetch.TokDone ()
Succ n -> SenderCollect Nothing $ \() -> drain n

{-------------------------------------------------------------------------------
Invoke Networking layer
Expand Down Expand Up @@ -319,30 +353,38 @@ mkApplication codecCfg networkMagic chainSync blockFetch =
-> BlockNodeToNodeVersion blk
-> OuroborosApplicationWithMinimalCtx InitiatorMode addr BL.ByteString m a Void
application version blockVersion = OuroborosApplication
[ mkMiniProtocol chainSyncMiniProtocolNum $
[ mkMiniProtocol chainSyncMiniProtocolNum chainSyncProtocolLimits $
mkMiniProtocolCbFromPeerPipelined $ \_ ->
(nullTracer, cChainSyncCodec, chainSync)
, mkMiniProtocol blockFetchMiniProtocolNum $
, mkMiniProtocol blockFetchMiniProtocolNum blockFetchProtocolLimits $
mkMiniProtocolCbFromPeerPipelined $ \_ ->
(nullTracer, cBlockFetchCodec, blockFetch)
, mkMiniProtocol keepAliveMiniProtocolNum keepAliveProtocolLimits $
mkMiniProtocolCbFromPeer $ \_ ->
(nullTracer, cKeepAliveCodec, keepAlive)
]
where
Codecs {cChainSyncCodec, cBlockFetchCodec} =
Codecs {cChainSyncCodec, cBlockFetchCodec, cKeepAliveCodec} =
defaultCodecs
codecCfg
blockVersion
encodeRemoteAddress
decodeRemoteAddress
version

mkMiniProtocol miniProtocolNum muxPeer = MiniProtocol {
mkMiniProtocol miniProtocolNum mkLimits muxPeer = MiniProtocol {
miniProtocolNum
, miniProtocolRun = InitiatorProtocolOnly muxPeer
, miniProtocolLimits = MiniProtocolLimits {
maximumIngressQueue = 1_000_000 -- in bytes
}
, miniProtocolLimits = mkLimits defaultMiniProtocolParameters
}

-- Trivial KeepAlive client
keepAlive = keepAliveClientPeer $ KeepAliveClient go
where
go = do
threadDelay 10 -- seconds
pure $ SendMsgKeepAlive (Cookie 42) go

{-------------------------------------------------------------------------------
Util
-------------------------------------------------------------------------------}
Expand Down

0 comments on commit a381b12

Please sign in to comment.