From d3f6d0c8a633f2bd175ed46aaf2b3edaab28983f Mon Sep 17 00:00:00 2001 From: Alexander Esgen Date: Thu, 4 Jul 2024 18:15:18 +0200 Subject: [PATCH] BlockFetch pipelining, KeepAlive protocol --- .../Cardano/Tools/N2NPG/Run.hs | 116 ++++++++++++------ 1 file changed, 79 insertions(+), 37 deletions(-) diff --git a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/N2NPG/Run.hs b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/N2NPG/Run.hs index b592a7de48..69f42094df 100644 --- a/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/N2NPG/Run.hs +++ b/ouroboros-consensus-cardano/src/unstable-cardano-tools/Cardano/Tools/N2NPG/Run.hs @@ -14,6 +14,7 @@ module Cardano.Tools.N2NPG.Run ( import qualified Cardano.Tools.DBAnalyser.Block.Cardano as Cardano import Cardano.Tools.DBAnalyser.HasAnalysis (mkProtocolInfo) +import Control.Monad (when) import Control.Monad.Class.MonadSay (MonadSay (..)) import Control.Monad.Cont import Control.Monad.Trans (MonadTrans (..)) @@ -26,9 +27,9 @@ import Data.Traversable (for) import Data.Void (Void) import Data.Word (Word64) import qualified Network.Socket as Socket -import Network.TypedProtocol (PeerHasAgency (..), PeerPipelined (..), - PeerRole (..), PeerSender (..)) -import Network.TypedProtocol.Pipelined (N (..)) +import Network.TypedProtocol (N (..), Nat (..), PeerHasAgency (..), + PeerPipelined (..), PeerReceiver (..), PeerRole (..), + PeerSender (..), natToInt) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config import Ouroboros.Consensus.Config.SupportsNode @@ -47,18 +48,22 @@ import Ouroboros.Consensus.Util.Args import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.ResourceRegistry import Ouroboros.Network.Block (Tip) -import Ouroboros.Network.Diffusion.Configuration (PeerSharing (..)) +import Ouroboros.Network.Diffusion.Configuration + (MiniProtocolParameters (..), PeerSharing (..), + defaultMiniProtocolParameters) import Ouroboros.Network.IOManager (withIOManager) import Ouroboros.Network.Magic (NetworkMagic) -import Ouroboros.Network.Mux (MiniProtocol (..), - MiniProtocolLimits (..), MuxMode (..), +import Ouroboros.Network.Mux (MiniProtocol (..), MuxMode (..), OuroborosApplication (..), OuroborosApplicationWithMinimalCtx, RunMiniProtocol (..), + mkMiniProtocolCbFromPeer, mkMiniProtocolCbFromPeerPipelined) import Ouroboros.Network.NodeToNode (DiffusionMode (..), NetworkConnectTracers (..), NodeToNodeVersionData, Versions (..), blockFetchMiniProtocolNum, - chainSyncMiniProtocolNum, connectTo) + blockFetchProtocolLimits, chainSyncMiniProtocolNum, + chainSyncProtocolLimits, connectTo, + keepAliveMiniProtocolNum, keepAliveProtocolLimits) import Ouroboros.Network.PeerSelection.PeerSharing.Codec (decodeRemoteAddress, encodeRemoteAddress) import Ouroboros.Network.Protocol.BlockFetch.Type (BlockFetch, @@ -67,6 +72,10 @@ import qualified Ouroboros.Network.Protocol.BlockFetch.Type as BlockFetch import Ouroboros.Network.Protocol.ChainSync.Type (ChainSync) import qualified Ouroboros.Network.Protocol.ChainSync.Type as ChainSync import Ouroboros.Network.Protocol.Handshake.Version (Version (..)) +import Ouroboros.Network.Protocol.KeepAlive.Client + (KeepAliveClient (..), KeepAliveClientSt (..), + keepAliveClientPeer) +import Ouroboros.Network.Protocol.KeepAlive.Type (Cookie (..)) import Ouroboros.Network.Snocket (SocketSnocket) import qualified Ouroboros.Network.Snocket as Snocket import System.FS.API (SomeHasFS (..)) @@ -143,7 +152,7 @@ TODOs if we want this as a proper tool: - Think more about tracing/logging (no MonadSay) - Proper exceptions instead of MonadFail - - protocol pipelining? + - ChainSync protocol pipelining? - handle rollbacks in a way other than ignoring? - BlockFetch batching? @@ -227,31 +236,56 @@ simpleBlockFetch :: -- ^ Invoked when we are done. -> PeerPipelined (BlockFetch' blk) AsClient BlockFetch.BFIdle m () simpleBlockFetch getNextPt signalDone = - PeerPipelined go + PeerPipelined $ go Zero where - go :: forall c. PeerSender (BlockFetch' blk) AsClient BlockFetch.BFIdle Z c m () - go = SenderEffect $ getNextPt <&> \case - Nothing -> - SenderYield (ClientAgency BlockFetch.TokIdle) BlockFetch.MsgClientDone - $ SenderEffect $ do - signalDone - pure $ SenderDone BlockFetch.TokDone () + go :: + Nat n + -> PeerSender (BlockFetch' blk) AsClient BlockFetch.BFIdle n () m () + go outstanding = SenderEffect $ getNextPt <&> \case + Nothing -> drain outstanding Just pt -> - SenderYield (ClientAgency BlockFetch.TokIdle) + SenderPipeline (ClientAgency BlockFetch.TokIdle) (BlockFetch.MsgRequestRange (ChainRange pt pt)) - $ SenderAwait (ServerAgency BlockFetch.TokBusy) $ \case - BlockFetch.MsgNoBlocks -> SenderEffect $ - fail $ "Server doesn't have block corresponding to header: " <> show pt - BlockFetch.MsgStartBatch -> - SenderAwait (ServerAgency BlockFetch.TokStreaming) $ \case - BlockFetch.MsgBatchDone -> SenderEffect $ - fail "Server sent an empty batch" - BlockFetch.MsgBlock blk -> SenderEffect $ do - say $ "Received block " <> show (blockPoint blk) - pure $ SenderAwait (ServerAgency BlockFetch.TokStreaming) $ \case - BlockFetch.MsgBlock {} -> SenderEffect $ - fail "Server sent too many blocks in a batch" - BlockFetch.MsgBatchDone -> go + (receiver pt) + $ SenderCollect continue + $ \() -> go outstanding + where + continue + | natToInt outstanding >= pipeliningDepth = Nothing + | otherwise = Just (go (Succ outstanding)) + + pipeliningDepth = + fromIntegral $ blockFetchPipeliningMax defaultMiniProtocolParameters + + receiver :: + Point blk + -> PeerReceiver (BlockFetch' blk) AsClient BlockFetch.BFBusy BlockFetch.BFIdle m () + receiver pt = ReceiverAwait (ServerAgency BlockFetch.TokBusy) $ \case + BlockFetch.MsgNoBlocks -> ReceiverEffect $ + fail $ "Server doesn't have block corresponding to header: " <> show pt + BlockFetch.MsgStartBatch -> + ReceiverAwait (ServerAgency BlockFetch.TokStreaming) $ \case + BlockFetch.MsgBatchDone -> ReceiverEffect $ + fail "Server sent an empty batch" + BlockFetch.MsgBlock blk -> ReceiverEffect $ do + when (blockPoint blk /= pt) $ + fail $ "Server sent incorrect block: expected " <> show pt + <> ", got " <> show (blockPoint blk) + say $ "Received block " <> show (blockPoint blk) + pure $ ReceiverAwait (ServerAgency BlockFetch.TokStreaming) $ \case + BlockFetch.MsgBlock {} -> ReceiverEffect $ + fail "Server sent too many blocks in a batch" + BlockFetch.MsgBatchDone -> ReceiverDone () + + drain :: + Nat n + -> PeerSender (BlockFetch' blk) AsClient BlockFetch.BFIdle n () m () + drain = \case + Zero -> SenderYield (ClientAgency BlockFetch.TokIdle) BlockFetch.MsgClientDone + $ SenderEffect $ do + signalDone + pure $ SenderDone BlockFetch.TokDone () + Succ n -> SenderCollect Nothing $ \() -> drain n {------------------------------------------------------------------------------- Invoke Networking layer @@ -319,15 +353,18 @@ mkApplication codecCfg networkMagic chainSync blockFetch = -> BlockNodeToNodeVersion blk -> OuroborosApplicationWithMinimalCtx InitiatorMode addr BL.ByteString m a Void application version blockVersion = OuroborosApplication - [ mkMiniProtocol chainSyncMiniProtocolNum $ + [ mkMiniProtocol chainSyncMiniProtocolNum chainSyncProtocolLimits $ mkMiniProtocolCbFromPeerPipelined $ \_ -> (nullTracer, cChainSyncCodec, chainSync) - , mkMiniProtocol blockFetchMiniProtocolNum $ + , mkMiniProtocol blockFetchMiniProtocolNum blockFetchProtocolLimits $ mkMiniProtocolCbFromPeerPipelined $ \_ -> (nullTracer, cBlockFetchCodec, blockFetch) + , mkMiniProtocol keepAliveMiniProtocolNum keepAliveProtocolLimits $ + mkMiniProtocolCbFromPeer $ \_ -> + (nullTracer, cKeepAliveCodec, keepAlive) ] where - Codecs {cChainSyncCodec, cBlockFetchCodec} = + Codecs {cChainSyncCodec, cBlockFetchCodec, cKeepAliveCodec} = defaultCodecs codecCfg blockVersion @@ -335,14 +372,19 @@ mkApplication codecCfg networkMagic chainSync blockFetch = decodeRemoteAddress version - mkMiniProtocol miniProtocolNum muxPeer = MiniProtocol { + mkMiniProtocol miniProtocolNum mkLimits muxPeer = MiniProtocol { miniProtocolNum , miniProtocolRun = InitiatorProtocolOnly muxPeer - , miniProtocolLimits = MiniProtocolLimits { - maximumIngressQueue = 1_000_000 -- in bytes - } + , miniProtocolLimits = mkLimits defaultMiniProtocolParameters } + -- Trivial KeepAlive client + keepAlive = keepAliveClientPeer $ KeepAliveClient go + where + go = do + threadDelay 10 -- seconds + pure $ SendMsgKeepAlive (Cookie 42) go + {------------------------------------------------------------------------------- Util -------------------------------------------------------------------------------}