From 05837c533836d5f1760ffad1ae3c626ded92bf9f Mon Sep 17 00:00:00 2001 From: Thomas Winant Date: Thu, 20 Aug 2020 10:01:37 +0200 Subject: [PATCH 1/5] ChainSyncClient: gracefully terminate if the chain is uninteresting Fixes #2499 for the ChainSyncClient. --- .../MiniProtocol/ChainSync/Client.hs | 108 +++---- .../MiniProtocol/ChainSync/Client.hs | 278 +++++++++++------- .../Ouroboros/Consensus/Network/NodeToNode.hs | 2 +- .../Ouroboros/Consensus/Node/ErrorPolicy.hs | 4 - 4 files changed, 228 insertions(+), 164 deletions(-) diff --git a/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs index 7b90c8dd93f..699c87e3607 100644 --- a/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs @@ -94,44 +94,45 @@ updatesToGenerate = 100 prop_chainSync :: ChainSyncClientSetup -> Property prop_chainSync ChainSyncClientSetup {..} = counterexample - ("Client chain: " <> ppChain clientChain <> "\n" <> - "Server chain: " <> ppChain serverChain <> "\n" <> - "Synched fragment: " <> ppFragment synchedFragment <> "\n" <> - "Trace:\n" <> unlines (map ppTraceEvent events)) $ + ("Client chain: " <> ppChain finalClientChain <> "\n" <> + "Server chain: " <> ppChain finalServerChain <> "\n" <> + "Synced fragment: " <> ppFragment syncedFragment <> "\n" <> + "Trace:\n" <> unlines (map ppTraceEvent traceEvents)) $ -- If an exception has been thrown, we check that it was right to throw -- it, but not the other way around: we don't check whether a situation -- has occured where an exception should have been thrown, but wasn't. - case mbEx of - Just (ForkTooDeep intersection _ _) -> + case mbResult of + Just (Right (ForkTooDeep intersection _ _)) -> label "ForkTooDeep" $ counterexample ("ForkTooDeep intersection: " <> ppPoint intersection) $ not (withinFragmentBounds intersection clientFragment) - Just (InvalidRollBack intersection _ _) -> - label "InvalidRollBack" $ - counterexample ("InvalidRollBack intersection: " <> ppPoint intersection) $ - not (withinFragmentBounds intersection synchedFragment) - Just (NoMoreIntersection (Our ourTip) (Their theirTip)) -> + Just (Right (NoMoreIntersection (Our ourTip) (Their theirTip))) -> label "NoMoreIntersection" $ counterexample ("NoMoreIntersection ourHead: " <> ppPoint (getTipPoint ourTip) <> ", theirHead: " <> ppPoint (getTipPoint theirTip)) $ - not (clientFragment `forksWithinK` synchedFragment) - Just e -> - counterexample ("Exception: " ++ displayException e) False + not (clientFragment `forksWithinK` syncedFragment) + Just (Right (RolledBackPastIntersection intersection _ _)) -> + label "RolledBackPastIntersection" $ + counterexample ("RolledBackPastIntersection intersection: " <> ppPoint intersection) $ + not (withinFragmentBounds intersection syncedFragment) + Just (Right result) -> + counterexample ("Terminated with result: " ++ show result) False + Just (Left ex) -> + counterexample ("Exception: " ++ displayException ex) False Nothing -> counterexample "Synced fragment not a suffix of the server chain" - (synchedFragment `isSuffixOf` serverChain) .&&. + (syncedFragment `isSuffixOf` finalServerChain) .&&. counterexample "Synced fragment doesn't intersect with the client chain" - (clientFragment `forksWithinK` synchedFragment) .&&. + (clientFragment `forksWithinK` syncedFragment) .&&. counterexample "Synced fragment doesn't have the same anchor as the client fragment" - (AF.anchorPoint clientFragment === AF.anchorPoint synchedFragment) + (AF.anchorPoint clientFragment === AF.anchorPoint syncedFragment) where k = maxRollbacks securityParam - (clientChain, serverChain, synchedFragment, mbEx, events) = runSimOrThrow $ - runChainSync securityParam clientUpdates serverUpdates - startTick + ChainSyncOutcome {..} = runSimOrThrow $ + runChainSync securityParam clientUpdates serverUpdates startTick - clientFragment = AF.anchorNewest k $ Chain.toAnchoredFragment clientChain + clientFragment = AF.anchorNewest k $ Chain.toAnchoredFragment finalClientChain forksWithinK :: AnchoredFragment TestBlock -- ^ Our chain @@ -210,6 +211,14 @@ type TraceEvent = (Tick, Either (TraceChainSyncClientEvent TestBlock) (TraceSendRecv (ChainSync (Header TestBlock) (Tip TestBlock)))) +data ChainSyncOutcome = ChainSyncOutcome { + finalClientChain :: Chain TestBlock + , finalServerChain :: Chain TestBlock + , syncedFragment :: AnchoredFragment TestBlock + , mbResult :: Maybe (Either ChainSyncClientException ChainSyncClientResult) + , traceEvents :: [TraceEvent] + } + -- | We have a client and a server chain that both start at genesis. At -- certain times, we apply updates to both of these chains to simulate changes -- to the chains. @@ -225,7 +234,7 @@ type TraceEvent = (Tick, Either -- sync client will keep the candidate fragment in sync with the updating -- server chain. -- --- At the end, we return the final chains, the synched candidate fragment, and +-- At the end, we return the final chains, the synced candidate fragment, and -- any exception thrown by the chain sync client. The candidate fragment can -- then be compared to the actual server chain. If an exception was thrown, no -- more chain updates are applied so the state at the time of the exception is @@ -239,21 +248,16 @@ runChainSync -> ClientUpdates -> ServerUpdates -> Tick -- ^ Start chain syncing at this time - -> m (Chain TestBlock, Chain TestBlock, - AnchoredFragment TestBlock, Maybe ChainSyncClientException, - [TraceEvent]) - -- ^ (The final client chain, the final server chain, the synced - -- candidate fragment, exception thrown by the chain sync client, - -- the traced ChainSync and protocol events) + -> m ChainSyncOutcome runChainSync securityParam (ClientUpdates clientUpdates) (ServerUpdates serverUpdates) startSyncingAt = withRegistry $ \registry -> do clock <- LogicalClock.new registry numTicks -- Set up the client - varCandidates <- uncheckedNewTVarM Map.empty - varClientState <- uncheckedNewTVarM (Genesis, testInitExtLedger) - varClientException <- uncheckedNewTVarM Nothing + varCandidates <- uncheckedNewTVarM Map.empty + varClientState <- uncheckedNewTVarM (Genesis, testInitExtLedger) + varClientResult <- uncheckedNewTVarM Nothing -- Candidates are removed from the candidates map when disconnecting, so -- we lose access to them. Therefore, store the candidate 'TVar's in a -- separate map too, one that isn't emptied. We can use this map to look @@ -298,9 +302,10 @@ runChainSync securityParam (ClientUpdates clientUpdates) varLastUpdate <- uncheckedNewTVarM 0 void $ LogicalClock.onEachTick registry clock "scheduled updates" $ \tick -> do -- Stop updating the client and server chains when the chain sync client - -- has thrown an exception, so that at the end, we can read the chains - -- in the states they were in when the exception was thrown. - stop <- fmap isJust $ atomically $ readTVar varClientException + -- has thrown an exception or has gracefully terminated, so that at the + -- end, we can read the chains in the states they were in when the + -- exception was thrown. + stop <- fmap isJust $ atomically $ readTVar varClientResult unless stop $ do -- Client whenJust (Map.lookup tick clientUpdates) $ \chainUpdates -> @@ -345,13 +350,16 @@ runChainSync securityParam (ClientUpdates clientUpdates) serverId $ \varCandidate -> do atomically $ modifyTVar varFinalCandidates $ Map.insert serverId varCandidate - runPipelinedPeer protocolTracer codecChainSyncId clientChannel $ - chainSyncClientPeerPipelined $ client varCandidate - `catch` \(e :: ChainSyncClientException) -> do + (result, _) <- + runPipelinedPeer protocolTracer codecChainSyncId clientChannel $ + chainSyncClientPeerPipelined $ client varCandidate + atomically $ writeTVar varClientResult (Just (Right result)) + return () + `catch` \(ex :: ChainSyncClientException) -> do -- TODO: Is this necessary? Wouldn't the Async's internal MVar do? - atomically $ writeTVar varClientException (Just e) + atomically $ writeTVar varClientResult (Just (Left ex)) -- Rethrow, but it will be ignored anyway. - throwM e + throwM ex void $ forkLinkedThread registry "ChainSyncServer" $ runPeer nullTracer codecChainSyncId serverChannel (chainSyncServerPeer server) @@ -361,20 +369,18 @@ runChainSync securityParam (ClientUpdates clientUpdates) -- to finish threadDelay 2000 - trace <- getTrace + traceEvents <- getTrace -- Collect the return values atomically $ do - clientChain <- fst <$> readTVar varClientState - serverChain <- chainState <$> readTVar varChainProducerState + finalClientChain <- fst <$> readTVar varClientState + finalServerChain <- chainState <$> readTVar varChainProducerState candidateFragment <- readTVar varFinalCandidates >>= readTVar . (Map.! serverId) - clientException <- readTVar varClientException - return ( - clientChain - , fmap testHeader serverChain - , AF.mapAnchoredFragment testHeader candidateFragment - , clientException - , trace - ) + mbResult <- readTVar varClientResult + return ChainSyncOutcome { + finalServerChain = testHeader <$> finalServerChain + , syncedFragment = AF.mapAnchoredFragment testHeader candidateFragment + , .. + } where k = maxRollbacks securityParam @@ -541,7 +547,7 @@ instance Show ChainSyncClientSetup where -- If we don't do this, the client's chain might no longer intersect with the -- synced candidate. This is because the ChainSync protocol won't have had a -- chance to update the candidate fragment, as the code to handle this case --- (our chain has changed such that it no longer intersects with the synched +-- (our chain has changed such that it no longer intersects with the synced -- candidate -> initiate the \"find a new intersection\" part of the protocol) -- is run when we receive new messages (roll forward/backward) from the -- server. diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index 7cfb7bd0736..69436ae7bcb 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -6,6 +6,7 @@ {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -24,6 +25,7 @@ module Ouroboros.Consensus.MiniProtocol.ChainSync.Client ( Consensus , chainSyncClient , bracketChainSyncClient + , ChainSyncClientResult (..) , ChainSyncClientException (..) , ChainDbView (..) , defaultChainDbView @@ -43,7 +45,6 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Proxy import Data.Typeable -import Data.Void (Void) import Data.Word (Word64) import GHC.Generics (Generic) import GHC.Stack (HasCallStack) @@ -77,7 +78,7 @@ import Ouroboros.Consensus.Storage.ChainDB (ChainDB, import qualified Ouroboros.Consensus.Storage.ChainDB as ChainDB type Consensus (client :: Type -> Type -> (Type -> Type) -> Type -> Type) blk m = - client (Header blk) (Tip blk) m Void + client (Header blk) (Tip blk) m ChainSyncClientResult -- | Abstract over the ChainDB data ChainDbView m blk = ChainDbView @@ -365,10 +366,10 @@ chainSyncClient mkPipelineDecision0 tracer cfg -- intersect, disconnect by throwing the exception obtained by calling the -- passed function. findIntersection - :: (Our (Tip blk) -> Their (Tip blk) -> ChainSyncClientException) + :: (Our (Tip blk) -> Their (Tip blk) -> ChainSyncClientResult) -- ^ Exception to throw when no intersection is found. -> Stateful m blk () (ClientPipelinedStIdle 'Z) - findIntersection mkEx = Stateful $ \() -> do + findIntersection mkResult = Stateful $ \() -> do (ourFrag, ourHeaderState, ourTip) <- atomically $ (,,) <$> getCurrentChain <*> (headerState <$> getCurrentLedger) @@ -390,8 +391,8 @@ chainSyncClient mkPipelineDecision0 tracer cfg { recvMsgIntersectFound = \i theirTip' -> continueWithState uis $ intersectFound (castPoint i) (Their theirTip') - , recvMsgIntersectNotFound = \theirTip' -> traceException $ - disconnect $ mkEx ourTip (Their theirTip') + , recvMsgIntersectNotFound = \theirTip' -> + return $ terminate $ mkResult ourTip (Their theirTip') } -- | One of the points we sent intersected our chain. This intersection @@ -624,27 +625,33 @@ chainSyncClient mkPipelineDecision0 tracer cfg -- Get the ledger view required to validate the header -- NOTE: This will block if we are too far behind. - mbKisAndLedgerView <- atomically $ do + intersectCheck <- atomically $ do -- Before obtaining a 'LedgerView', we must find the most recent -- intersection with the current chain. Note that this is cheap when -- the chain and candidate haven't changed. mKis' <- intersectsWithCurrentChain kis case mKis' of - Nothing -> return Nothing - Just kis'@KnownIntersectionState { ourTip, mostRecentIntersection } -> do - ledgerView <- - getLedgerView hdr mostRecentIntersection ourTip theirTip - return $ Just (kis', ledgerView) - - case mbKisAndLedgerView of - Nothing -> + Nothing -> return NoLongerIntersects + Just kis'@KnownIntersectionState { ourTip, mostRecentIntersection } -> + getLedgerView hdr mostRecentIntersection ourTip theirTip >>= \case + Left result -> return $ Uninteresting result + Right ledgerView -> return $ Intersects kis' ledgerView + + case intersectCheck of + NoLongerIntersects -> -- Our chain (tip) has changed and it no longer intersects with the -- candidate fragment, so we have to find a new intersection, but -- first drain the pipe. continueWithState () $ drainThePipe n $ findIntersection NoMoreIntersection - Just (kis', ledgerView) -> do + + Uninteresting result -> + -- The upstream chain is no longer interesting to us, gracefully + -- terminate with the given result + terminateAfterDrain n result + + Intersects kis' ledgerView -> do -- Our chain still intersects with the candidate fragment and we -- have obtained a 'LedgerView' that we can use to validate @hdr@. @@ -700,11 +707,13 @@ chainSyncClient mkPipelineDecision0 tracer cfg -- outdated) view of our chain, i.e. 'ourFrag', we /only/ use -- @curLedger@ to validate /their/ header, even in the special case -- discussed below. - getLedgerView :: Header blk - -> Point blk -- ^ Intersection between our and their chain - -> Our (Tip blk) -- ^ Only to produce an error message - -> Their (Tip blk) -- ^ Only to produce an error message - -> STM m (Ticked (LedgerView (BlockProtocol blk))) + getLedgerView :: + Header blk + -> Point blk -- ^ Intersection between our and their chain + -> Our (Tip blk) -- ^ Only to produce an error message + -> Their (Tip blk) -- ^ Only to produce an error message + -> STM m (Either ChainSyncClientResult + (Ticked (LedgerView (BlockProtocol blk)))) getLedgerView hdr intersection ourTip theirTip = do curLedger <- ledgerState <$> getCurrentLedger @@ -721,14 +730,14 @@ chainSyncClient mkPipelineDecision0 tracer cfg curLedger (pointSlot intersection) of Nothing -> -- Case (2) - disconnect $ + return $ Left $ InvalidRollForward (realPointToPoint hdrPoint) ourTip theirTip Just forecast -> case runExcept $ forecastFor forecast (realPointSlot hdrPoint) of Left OutsideForecastRange{} -> -- Case (1) retry Right lv -> - return lv + return $ Right lv where hdrPoint = headerRealPoint hdr @@ -739,7 +748,7 @@ chainSyncClient mkPipelineDecision0 tracer cfg -> Stateful m blk (KnownIntersectionState blk) (ClientPipelinedStIdle n) - rollBackward mkPipelineDecision n intersection + rollBackward mkPipelineDecision n rollBackPoint theirTip = Stateful $ \KnownIntersectionState { theirFrag @@ -748,9 +757,7 @@ chainSyncClient mkPipelineDecision0 tracer cfg , ourTip , mostRecentIntersection } -> traceException $ do - (theirFrag', theirHeaderState') <- do - case attemptRollback cfg intersection (theirFrag, theirHeaderState) of - Just (c, d) -> return (c,d) + case attemptRollback cfg rollBackPoint (theirFrag, theirHeaderState) of -- Remember that we use our current chain fragment as the starting -- point for the candidate's chain. Our fragment contained @k@ -- headers. At this point, the candidate fragment might have grown to @@ -773,31 +780,45 @@ chainSyncClient mkPipelineDecision0 tracer cfg -- forward @s@ new headers where @s >= r@. -- -- Thus, @k - r + s >= k@. - Nothing -> disconnect $ - InvalidRollBack intersection ourTip theirTip - - -- We just rolled back to @intersection@, either our most recent - -- intersection was after or at @intersection@, in which case - -- @intersection@ becomes the new most recent intersection. - -- - -- But if the most recent intersection was /before/ @intersection@, - -- then the most recent intersection doesn't change. - let mostRecentIntersection' - | AF.withinFragmentBounds (castPoint intersection) ourFrag - = intersection - | otherwise - = mostRecentIntersection - kis' = assertKnownIntersectionInvariants (configConsensus cfg) $ - KnownIntersectionState - { theirFrag = theirFrag' - , theirHeaderState = theirHeaderState' - , ourFrag = ourFrag - , ourTip = ourTip - , mostRecentIntersection = mostRecentIntersection' - } - atomically $ writeTVar varCandidate theirFrag' - - continueWithState kis' $ nextStep mkPipelineDecision n theirTip + Nothing -> + terminateAfterDrain n $ + RolledBackPastIntersection rollBackPoint ourTip theirTip + + Just (theirFrag', theirHeaderState') -> do + -- We just rolled back to @intersection@, either our most recent + -- intersection was after or at @intersection@, in which case + -- @intersection@ becomes the new most recent intersection. + -- + -- But if the most recent intersection was /before/ @intersection@, + -- then the most recent intersection doesn't change. + let mostRecentIntersection' + | AF.withinFragmentBounds (castPoint rollBackPoint) ourFrag + = rollBackPoint + | otherwise + = mostRecentIntersection + kis' = assertKnownIntersectionInvariants (configConsensus cfg) $ + KnownIntersectionState + { theirFrag = theirFrag' + , theirHeaderState = theirHeaderState' + , ourFrag = ourFrag + , ourTip = ourTip + , mostRecentIntersection = mostRecentIntersection' + } + atomically $ writeTVar varCandidate theirFrag' + + continueWithState kis' $ nextStep mkPipelineDecision n theirTip + + -- | Gracefully terminate the connection with the upstream node with the + -- given result. + terminate :: ChainSyncClientResult -> Consensus (ClientPipelinedStIdle 'Z) blk m + terminate = SendMsgDone + + -- | Same as 'terminate', but first 'drainThePipe'. + terminateAfterDrain :: Nat n -> ChainSyncClientResult -> m (Consensus (ClientPipelinedStIdle n) blk m) + terminateAfterDrain n result = + continueWithState () + $ drainThePipe n + $ Stateful $ const $ return $ terminate result -- | Disconnect from the upstream node by throwing the given exception. -- The cleanup is handled in 'bracketChainSyncClient'. @@ -839,17 +860,18 @@ chainSyncClient mkPipelineDecision0 tracer cfg k :: Word64 k = maxRollbacks $ configSecurityParam cfg -attemptRollback :: ( BlockSupportsProtocol blk - , Serialise (HeaderHash blk) - , HasAnnTip blk - ) - => TopLevelConfig blk - -> Point blk - -> (AnchoredFragment (Header blk), HeaderState blk) - -> Maybe (AnchoredFragment (Header blk), HeaderState blk) -attemptRollback cfg intersection (frag, state) = do - frag' <- AF.rollback (castPoint intersection) frag - state' <- rewindHeaderState cfg intersection state +attemptRollback :: + ( BlockSupportsProtocol blk + , Serialise (HeaderHash blk) + , HasAnnTip blk + ) + => TopLevelConfig blk + -> Point blk + -> (AnchoredFragment (Header blk), HeaderState blk) + -> Maybe (AnchoredFragment (Header blk), HeaderState blk) +attemptRollback cfg rollBackPoint (frag, state) = do + frag' <- AF.rollback (castPoint rollBackPoint) frag + state' <- rewindHeaderState cfg rollBackPoint state return (frag', state') -- | Watch the invalid block checker function for changes (using its @@ -905,6 +927,21 @@ rejectInvalidBlocks tracer registry getIsInvalidBlock getCandidate = traceWith tracer $ TraceException ex throwM ex +-- | Auxiliary data type used as an intermediary result in 'rollForward'. +data IntersectCheck blk = + -- | The upstream chain no longer intersects with our current chain because + -- our current chain changed in the background. + NoLongerIntersects + -- | The upstream chain still interects with our current chain, but is no + -- longer interesting. Gracefully terminate with the given result. + | Uninteresting ChainSyncClientResult + -- | The upstream chain still intersects with our chain, return the + -- resulting 'KnownIntersectionState' and the 'LedgerView' corresponding to + -- the header 'rollForward' received. + | Intersects + (KnownIntersectionState blk) + (Ticked (LedgerView (BlockProtocol blk))) + {------------------------------------------------------------------------------- Explicit state -------------------------------------------------------------------------------} @@ -925,10 +962,21 @@ continueWithState !s (Stateful f) = checkInvariant (unsafeNoUnexpectedThunks s) $ f s {------------------------------------------------------------------------------- - Exception + Return value -------------------------------------------------------------------------------} -data ChainSyncClientException = +-- | The Chain sync client only _gracefully_ terminates when the upstream node's +-- chain is not interesting (e.g., forked off too far in the past). By +-- gracefully terminating, the network layer can keep the other mini-protocols +-- connect to the same upstream node running. +-- +-- For example, a relay node will often receive connections from nodes syncing +-- from scratch or an old chain. Since these nodes have a chain that is shorter +-- than the relay node's chain, it's useless for the relay node to run the +-- client-side of the chain sync protocol. However, the other direction of the +-- protocol, and, e.g., the transaction submission protocol, should keep +-- running. +data ChainSyncClientResult = -- | The server we're connecting to forked more than @k@ blocks ago. forall blk. BlockSupportsProtocol blk => ForkTooDeep @@ -936,14 +984,6 @@ data ChainSyncClientException = (Our (Tip blk)) (Their (Tip blk)) - -- | Header validation threw an error. - | forall blk. (BlockSupportsProtocol blk, ValidateEnvelope blk) => - HeaderError - (Point blk) -- ^ Invalid header - (HeaderError blk) - (Our (Tip blk)) - (Their (Tip blk)) - -- | We were unable to get a ledger view for the intersection point -- between the candidate's chain and our chain. -- @@ -958,10 +998,64 @@ data ChainSyncClientException = (Our (Tip blk)) (Their (Tip blk)) - -- | The upstream node rolled back more than @k@ blocks. + -- | Our chain changed such that it no longer intersects with the + -- candidate's fragment, and asking for a new intersection did not yield + -- one. + | forall blk. BlockSupportsProtocol blk => + NoMoreIntersection + (Our (Tip blk)) + (Their (Tip blk)) + + -- | We were asked to roll back past the anchor point of the candidate's + -- fragment. This means the candidate chain no longer forks off within + -- @k@, making it impossible to switch to. | forall blk. BlockSupportsProtocol blk => - InvalidRollBack - (Point blk) -- ^ Roll back to this header + RolledBackPastIntersection + (Point blk) -- ^ Point asked to roll back to + (Our (Tip blk)) + (Their (Tip blk)) + +deriving instance Show ChainSyncClientResult + +instance Eq ChainSyncClientResult where + ForkTooDeep (a :: Point blk) b c == ForkTooDeep (a' :: Point blk') b' c' = + case eqT @blk @blk' of + Nothing -> False + Just Refl -> (a, b, c) == (a', b', c') + ForkTooDeep{} == _ = False + + InvalidRollForward (a :: Point blk) b c == InvalidRollForward (a' :: Point blk') b' c' = + case eqT @blk @blk' of + Nothing -> False + Just Refl -> (a, b, c) == (a', b', c') + InvalidRollForward{} == _ = False + + NoMoreIntersection (a :: Our (Tip blk)) b == NoMoreIntersection (a' :: Our (Tip blk')) b' = + case eqT @blk @blk' of + Nothing -> False + Just Refl -> (a, b) == (a', b') + NoMoreIntersection{} == _ = False + + RolledBackPastIntersection (a :: Point blk) b c == RolledBackPastIntersection (a' :: Point blk') b' c' = + case eqT @blk @blk' of + Nothing -> False + Just Refl -> (a, b, c) == (a', b', c') + RolledBackPastIntersection{} == _ = False + +{------------------------------------------------------------------------------- + Exception +-------------------------------------------------------------------------------} + +-- | When the upstream node violates the protocol or exhibits malicious +-- behaviour, e.g., serving an invalid header or a header corresponding to a +-- known invalid block, we throw an exception to disconnect. This will bring +-- down all miniprotocols in both directions with that node. +data ChainSyncClientException = + -- | Header validation threw an error. + forall blk. (BlockSupportsProtocol blk, ValidateEnvelope blk) => + HeaderError + (Point blk) -- ^ Invalid header + (HeaderError blk) (Our (Tip blk)) (Their (Tip blk)) @@ -976,14 +1070,6 @@ data ChainSyncClientException = (Our (Tip blk)) (Their (Tip blk)) - -- | Our chain changed such that it no longer intersects with the - -- candidate's fragment, and asking for a new intersection did not yield - -- one. - | forall blk. BlockSupportsProtocol blk => - NoMoreIntersection - (Our (Tip blk)) - (Their (Tip blk)) - -- | The received header to roll forward doesn't fit onto the previous -- one. -- @@ -1005,42 +1091,18 @@ data ChainSyncClientException = deriving instance Show ChainSyncClientException instance Eq ChainSyncClientException where - ForkTooDeep (a :: Point blk) b c == ForkTooDeep (a' :: Point blk') b' c' = - case eqT @blk @blk' of - Nothing -> False - Just Refl -> (a, b, c) == (a', b', c') - ForkTooDeep{} == _ = False - HeaderError (a :: Point blk) b c d == HeaderError (a' :: Point blk') b' c' d' = case eqT @blk @blk' of Nothing -> False Just Refl -> (a, b, c, d) == (a', b', c', d') HeaderError{} == _ = False - InvalidRollForward (a :: Point blk) b c == InvalidRollForward (a' :: Point blk') b' c' = - case eqT @blk @blk' of - Nothing -> False - Just Refl -> (a, b, c) == (a', b', c') - InvalidRollForward{} == _ = False - - InvalidRollBack (a :: Point blk) b c == InvalidRollBack (a' :: Point blk') b' c' = - case eqT @blk @blk' of - Nothing -> False - Just Refl -> (a, b, c) == (a', b', c') - InvalidRollBack{} == _ = False - InvalidIntersection (a :: Point blk) b c == InvalidIntersection (a' :: Point blk') b' c' = case eqT @blk @blk' of Nothing -> False Just Refl -> (a, b, c) == (a', b', c') InvalidIntersection{} == _ = False - NoMoreIntersection (a :: Our (Tip blk)) b == NoMoreIntersection (a' :: Our (Tip blk')) b' = - case eqT @blk @blk' of - Nothing -> False - Just Refl -> (a, b) == (a', b') - NoMoreIntersection{} == _ = False - DoesntFit (a :: ChainHash blk) b c d == DoesntFit (a' :: ChainHash blk') b' c' d' = case eqT @blk @blk' of Nothing -> False diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs index 71d49a9e299..05d379f298a 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs @@ -101,7 +101,7 @@ data Handlers m peer blk = Handlers { hChainSyncClient :: NodeToNodeVersion -> StrictTVar m (AnchoredFragment (Header blk)) - -> ChainSyncClientPipelined (Header blk) (Tip blk) m Void + -> ChainSyncClientPipelined (Header blk) (Tip blk) m ChainSyncClientResult -- TODO: we should consider either bundling these context parameters -- into a record, or extending the protocol handler representation -- to support bracket-style initialisation so that we could have the diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Node/ErrorPolicy.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Node/ErrorPolicy.hs index a0be3bdbe57..a8c5f1f1106 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Node/ErrorPolicy.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Node/ErrorPolicy.hs @@ -98,12 +98,8 @@ consensusErrorPolicy = ErrorPolicies { -- because we have diverged too much. , ErrorPolicy $ \(e :: ChainSyncClientException) -> case e of - ForkTooDeep{} -> Just distantPeer HeaderError{} -> Just theyBuggyOrEvil - InvalidRollForward{} -> Just distantPeer - InvalidRollBack{} -> Just theyBuggyOrEvil InvalidIntersection{} -> Just theyBuggyOrEvil - NoMoreIntersection{} -> Just distantPeer DoesntFit{} -> Just theyBuggyOrEvil -- A block so far in the future that it exceeds the max clock -- skew is also considered to be invalid From 336cf1e8c2056d79bae61ff84989b4093b2fbee9 Mon Sep 17 00:00:00 2001 From: Nicolas Frisby Date: Thu, 27 Aug 2020 13:03:31 -0700 Subject: [PATCH 2/5] test-infra: simplify mini-protocol restart logic Now a directed edge in the topology (ie a bundle of mini-protocol instances) restarts for two reasons. 1) The ChainSync client and/or server terminated *without* an exception. It restarts at the next slot onset. 2) The node was scheduled to restart at this slot onset. It restarts immediately. --- .../src/Test/ThreadNet/Network.hs | 110 +++++------------- 1 file changed, 32 insertions(+), 78 deletions(-) diff --git a/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs b/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs index d965435852e..a943bb6e6c5 100644 --- a/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs @@ -25,10 +25,8 @@ module Test.ThreadNet.Network ( , plainTestNodeInitialization , TracingConstraints -- * Tracers - , MiniProtocolExpectedException (..) , MiniProtocolFatalException (..) , MiniProtocolState (..) - , TraceMiniProtocolRestart (..) -- * Test Output , TestOutput (..) , NodeOutput (..) @@ -65,13 +63,10 @@ import Ouroboros.Network.MockChain.Chain (Chain (Genesis)) import Ouroboros.Network.Point (WithOrigin (..)) import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS -import qualified Ouroboros.Network.BlockFetch.Client as BFClient import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..)) import Ouroboros.Network.Protocol.KeepAlive.Type import Ouroboros.Network.Protocol.Limits (waitForever) import Ouroboros.Network.Protocol.TxSubmission.Type -import qualified Ouroboros.Network.TxSubmission.Inbound as TxInbound -import qualified Ouroboros.Network.TxSubmission.Outbound as TxOutbound import Ouroboros.Consensus.Block import Ouroboros.Consensus.BlockchainTime @@ -83,7 +78,6 @@ import Ouroboros.Consensus.Ledger.Inspect import Ouroboros.Consensus.Ledger.SupportsMempool import Ouroboros.Consensus.Ledger.SupportsProtocol import Ouroboros.Consensus.Mempool -import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.Server as BFServer import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient import qualified Ouroboros.Consensus.Network.NodeToNode as NTN import Ouroboros.Consensus.Node.NetworkProtocolVersion @@ -1048,14 +1042,11 @@ data CodecError -- | Cause for an edge to restart -- data RestartCause - = RestartExn !MiniProtocolExpectedException - -- ^ restart due to an exception in one of the mini protocol instances - -- - -- Edges only catch-and-restart on /expected/ exceptions; anything else - -- will tear down the whole hierarchy of test threads. See - -- 'MiniProtocolExpectedException'. - | RestartNode - -- ^ restart because at least one of the two nodes is 'VFalling' + = RestartScheduled + -- ^ restart because at least one of the two nodes set its status to + -- 'VFalling' because of a scheduled restart in 'tnaRestarts' + | RestartChainSyncTerminated + -- ^ restart because the ChainSync client terminated the mini protocol -- | Fork two directed edges, one in each direction between the two vertices -- @@ -1063,7 +1054,7 @@ forkBothEdges :: (IOLike m, RunNode blk, HasCallStack) => ResourceRegistry m -> OracularClock m - -> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException) + -> Tracer m (SlotNo, MiniProtocolState) -> (NodeToNodeVersion, BlockNodeToNodeVersion blk) -> (CodecConfig blk, CalcMessageDelay blk) -> Map CoreNodeId (VertexStatusVar m blk) @@ -1111,7 +1102,7 @@ forkBothEdges sharedRegistry clock tr version cfg vertexStatusVars (node1, node2 directedEdge :: forall m blk. (IOLike m, RunNode blk) => ResourceRegistry m - -> Tracer m (SlotNo, MiniProtocolState, MiniProtocolExpectedException) + -> Tracer m (SlotNo, MiniProtocolState) -> (NodeToNodeVersion, BlockNodeToNodeVersion blk) -> (CodecConfig blk, CalcMessageDelay blk) -> OracularClock m @@ -1124,18 +1115,17 @@ directedEdge registry tr version cfg clock edgeStatusVar client server = where loop = do restart <- directedEdgeInner registry clock version cfg edgeStatusVar client server - `catch` (pure . RestartExn) `catch` hUnexpected atomically $ writeTVar edgeStatusVar EDown case restart of - RestartNode -> pure () - RestartExn e -> do - -- "error policy": restart at beginning of next slot + RestartScheduled -> pure () + RestartChainSyncTerminated -> do + -- "error" policy: restart at beginning of next slot s <- OracularClock.getCurrentSlot clock let s' = succ s - traceWith tr (s, MiniProtocolDelayed, e) + traceWith tr (s, MiniProtocolDelayed) void $ OracularClock.blockUntilSlot clock s' - traceWith tr (s', MiniProtocolRestarting, e) + traceWith tr (s', MiniProtocolRestarting) loop where -- Wrap synchronous exceptions in 'MiniProtocolFatalException' @@ -1178,51 +1168,53 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay) let miniProtocol :: String -- ^ protocol name + -> (String -> a -> RestartCause) -> ( LimitedApp' m NodeId blk -> NodeToNodeVersion -> NodeId -> Channel m msg - -> m ((), trailingBytes) + -> m (a, trailingBytes) ) -- ^ client action to run on node1 -> ( LimitedApp' m NodeId blk -> NodeToNodeVersion -> NodeId -> Channel m msg - -> m ((), trailingBytes) + -> m (a, trailingBytes) ) -- ^ server action to run on node2 -> (msg -> m ()) - -> m (m (), m ()) - miniProtocol proto client server middle = do + -> m (m RestartCause, m RestartCause) + miniProtocol proto ret client server middle = do (chan, dualChan) <- createConnectedChannelsWithDelay registry (node1, node2, proto) middle pure - ( fst <$> client app1 version (fromCoreNodeId node2) chan - , fst <$> server app2 version (fromCoreNodeId node1) dualChan + ( (ret (proto <> ".client") . fst) <$> client app1 version (fromCoreNodeId node2) chan + , (ret (proto <> ".server") . fst) <$> server app2 version (fromCoreNodeId node1) dualChan ) - -- NB only 'watcher' ever returns in these tests - fmap (\() -> RestartNode) $ - (>>= withAsyncsWaitAny) $ + (>>= withAsyncsWaitAny) $ fmap flattenPairs $ sequence $ pure (watcher vertexStatusVar1, watcher vertexStatusVar2) NE.:| [ miniProtocol "ChainSync" - (wrapMPEE MPEEChainSyncClient NTN.aChainSyncClient) + (\_s () -> RestartChainSyncTerminated) + NTN.aChainSyncClient NTN.aChainSyncServer chainSyncMiddle - -- TODO do not swallow exceptions from these protocols , miniProtocol "BlockFetch" + neverReturns NTN.aBlockFetchClient NTN.aBlockFetchServer (\_ -> pure ()) , miniProtocol "TxSubmission" + neverReturns NTN.aTxSubmissionClient NTN.aTxSubmissionServer (\_ -> pure ()) , miniProtocol "KeepAlive" + neverReturns NTN.aKeepAliveClient NTN.aKeepAliveServer (\_ -> pure ()) @@ -1235,23 +1227,18 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay) flattenPairs :: forall a. NE.NonEmpty (a, a) -> NE.NonEmpty a flattenPairs = uncurry (<>) . NE.unzip - -- TODO only wrap actually expected exceptions - wrapMPEE :: - Exception e - => (e -> MiniProtocolExpectedException) - -> (app -> version -> peer -> chan -> m a) - -> (app -> version -> peer -> chan -> m a) - wrapMPEE f m = \app ver them chan -> - catch (m app ver them chan) $ throwM . f + neverReturns :: String -> () -> void + neverReturns s () = error $ s <> " never returns!" - -- terminates when the vertex starts 'VFalling' + -- terminates (by returning, not via exception) when the vertex starts + -- 'VFalling' -- -- because of 'withAsyncsWaitAny' used above, this brings down the whole -- edge - watcher :: VertexStatusVar m blk -> m () + watcher :: VertexStatusVar m blk -> m RestartCause watcher v = do atomically $ readTVar v >>= \case - VFalling -> pure () + VFalling -> pure RestartScheduled _ -> retry -- introduce a delay for 'CS.MsgRollForward' @@ -1570,43 +1557,10 @@ type LimitedApp' m peer blk = Tracing -------------------------------------------------------------------------------} --- | Non-fatal exceptions expected from the threads of a 'directedEdge' --- -data MiniProtocolExpectedException - = MPEEChainSyncClient CSClient.ChainSyncClientException - -- ^ see "Ouroboros.Consensus.MiniProtocol.ChainSync.Client" - -- - -- NOTE: the second type in 'ChainSyncClientException' denotes the 'tip'. - -- If it does not agree with the consensus client & server, 'Dynamic chain - -- generation' tests will fail, since they will not catch the right - -- exception. - | MPEEBlockFetchClient BFClient.BlockFetchProtocolFailure - -- ^ see "Ouroboros.Network.BlockFetch.Client" - | MPEEBlockFetchServer BFServer.BlockFetchServerException - -- ^ see "Ouroboros.Consensus.MiniProtocol.BlockFetch.Server" - | MPEETxSubmissionClient TxOutbound.TxSubmissionProtocolError - -- ^ see "Ouroboros.Network.TxSubmission.Outbound" - | MPEETxSubmissionServer TxInbound.TxSubmissionProtocolError - -- ^ see "Ouroboros.Network.TxSubmission.Inbound" - deriving (Show) - -instance Exception MiniProtocolExpectedException - data MiniProtocolState = MiniProtocolDelayed | MiniProtocolRestarting deriving (Show) -data TraceMiniProtocolRestart peer - = TraceMiniProtocolRestart - peer peer - SlotNo - MiniProtocolState - MiniProtocolExpectedException - -- ^ us them when-start-blocking state reason - deriving (Show) - --- | Any synchronous exception from a 'directedEdge' that was not handled as a --- 'MiniProtocolExpectedException' --- +-- | Any synchronous exception from a 'directedEdge' data MiniProtocolFatalException = MiniProtocolFatalException { mpfeType :: !Typeable.TypeRep -- ^ Including the type explicitly makes it easier for a human to debug From 5452492cf68763f3fa08bba4d4e9db92b086d789 Mon Sep 17 00:00:00 2001 From: Thomas Winant Date: Fri, 21 Aug 2020 10:52:56 +0200 Subject: [PATCH 3/5] Introduce the App type synonym for Apps This allows us to later easily add arguments to an `App`, e.g., `NodeToClientVersion` or `ControlMessageSTM m`. --- .../Consensus/Network/NodeToClient.hs | 10 ++++++--- .../Ouroboros/Consensus/Network/NodeToNode.hs | 21 ++++++++++++------- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToClient.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToClient.hs index b891f494af3..e0bc3c7fe33 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToClient.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToClient.hs @@ -26,6 +26,7 @@ module Ouroboros.Consensus.Network.NodeToClient ( , nullTracers , showTracers -- * Applications + , App , Apps (..) , mkApps -- ** Projections @@ -296,18 +297,21 @@ showTracers tr = Tracers { Applications -------------------------------------------------------------------------------} +-- | A node-to-client application +type App m peer bytes a = peer -> Channel m bytes -> m (a, Maybe bytes) + -- | Applications for the node-to-client (i.e., local) protocols -- -- See 'Network.Mux.Types.MuxApplication' data Apps m peer bCS bTX bSQ a = Apps { -- | Start a local chain sync server. - aChainSyncServer :: peer -> Channel m bCS -> m (a, Maybe bCS) + aChainSyncServer :: App m peer bCS a -- | Start a local transaction submission server. - , aTxSubmissionServer :: peer -> Channel m bTX -> m (a, Maybe bTX) + , aTxSubmissionServer :: App m peer bTX a -- | Start a local state query server. - , aStateQueryServer :: peer -> Channel m bSQ -> m (a, Maybe bSQ) + , aStateQueryServer :: App m peer bSQ a } -- | Construct the 'NetworkApplication' for the node-to-client protocols diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs index 05d379f298a..afee9cd9949 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs @@ -21,6 +21,7 @@ module Ouroboros.Consensus.Network.NodeToNode ( , nullTracers , showTracers -- * Applications + , App , Apps (..) , mkApps -- ** Projections @@ -352,36 +353,40 @@ showTracers tr = Tracers { Applications -------------------------------------------------------------------------------} +-- | A node-to-node application +type App m peer bytes a = + NodeToNodeVersion -> peer -> Channel m bytes -> m (a, Maybe bytes) + -- | Applications for the node-to-node protocols -- -- See 'Network.Mux.Types.MuxApplication' data Apps m peer bCS bBF bTX bKA a = Apps { -- | Start a chain sync client that communicates with the given upstream -- node. - aChainSyncClient :: NodeToNodeVersion -> peer -> Channel m bCS -> m (a, Maybe bCS) + aChainSyncClient :: App m peer bCS a -- | Start a chain sync server. - , aChainSyncServer :: NodeToNodeVersion -> peer -> Channel m bCS -> m (a, Maybe bCS) + , aChainSyncServer :: App m peer bCS a -- | Start a block fetch client that communicates with the given -- upstream node. - , aBlockFetchClient :: NodeToNodeVersion -> peer -> Channel m bBF -> m (a, Maybe bBF) + , aBlockFetchClient :: App m peer bBF a -- | Start a block fetch server. - , aBlockFetchServer :: NodeToNodeVersion -> peer -> Channel m bBF -> m (a, Maybe bBF) + , aBlockFetchServer :: App m peer bBF a -- | Start a transaction submission client that communicates with the -- given upstream node. - , aTxSubmissionClient :: NodeToNodeVersion -> peer -> Channel m bTX -> m (a, Maybe bTX) + , aTxSubmissionClient :: App m peer bTX a -- | Start a transaction submission server. - , aTxSubmissionServer :: NodeToNodeVersion -> peer -> Channel m bTX -> m (a, Maybe bTX) + , aTxSubmissionServer :: App m peer bTX a -- | Start a keep-alive client. - , aKeepAliveClient :: NodeToNodeVersion -> peer -> Channel m bKA -> m (a, Maybe bKA) + , aKeepAliveClient :: App m peer bKA a -- | Start a keep-alive server. - , aKeepAliveServer :: NodeToNodeVersion -> peer -> Channel m bKA -> m (a, Maybe bKA) + , aKeepAliveServer :: App m peer bKA a } -- | Construct the 'NetworkApplication' for the node-to-node protocols From 29896b11a9b2f158241818838dfc53d5a89c54b2 Mon Sep 17 00:00:00 2001 From: Thomas Winant Date: Fri, 21 Aug 2020 10:59:11 +0200 Subject: [PATCH 4/5] Pass ControlMessageSTM to the NodeToNode apps The control message isn't used yet. --- .../src/Test/ThreadNet/Network.hs | 7 ++- .../Ouroboros/Consensus/Network/NodeToNode.hs | 50 ++++++++++++------- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs b/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs index a943bb6e6c5..ceca66b200b 100644 --- a/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-test/src/Test/ThreadNet/Network.hs @@ -63,6 +63,7 @@ import Ouroboros.Network.MockChain.Chain (Chain (Genesis)) import Ouroboros.Network.Point (WithOrigin (..)) import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS +import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM) import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..)) import Ouroboros.Network.Protocol.KeepAlive.Type import Ouroboros.Network.Protocol.Limits (waitForever) @@ -1171,6 +1172,7 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay) -> (String -> a -> RestartCause) -> ( LimitedApp' m NodeId blk -> NodeToNodeVersion + -> ControlMessageSTM m -> NodeId -> Channel m msg -> m (a, trailingBytes) @@ -1178,6 +1180,7 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay) -- ^ client action to run on node1 -> ( LimitedApp' m NodeId blk -> NodeToNodeVersion + -> ControlMessageSTM m -> NodeId -> Channel m msg -> m (a, trailingBytes) @@ -1189,8 +1192,8 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay) (chan, dualChan) <- createConnectedChannelsWithDelay registry (node1, node2, proto) middle pure - ( (ret (proto <> ".client") . fst) <$> client app1 version (fromCoreNodeId node2) chan - , (ret (proto <> ".server") . fst) <$> server app2 version (fromCoreNodeId node1) dualChan + ( (ret (proto <> ".client") . fst) <$> client app1 version (return Continue) (fromCoreNodeId node2) chan + , (ret (proto <> ".server") . fst) <$> server app2 version (return Continue) (fromCoreNodeId node1) dualChan ) (>>= withAsyncsWaitAny) $ diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs index afee9cd9949..00045fef928 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs @@ -355,7 +355,11 @@ showTracers tr = Tracers { -- | A node-to-node application type App m peer bytes a = - NodeToNodeVersion -> peer -> Channel m bytes -> m (a, Maybe bytes) + NodeToNodeVersion + -> ControlMessageSTM m + -> peer + -> Channel m bytes + -> m (a, Maybe bytes) -- | Applications for the node-to-node protocols -- @@ -413,10 +417,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = where aChainSyncClient :: NodeToNodeVersion + -> ControlMessageSTM m -> remotePeer -> Channel m bCS -> m ((), Maybe bCS) - aChainSyncClient version them channel = do + aChainSyncClient version _controlMessageSTM them channel = do labelThisThread "ChainSyncClient" -- Note that it is crucial that we sync with the fetch client "outside" -- of registering the state for the sync client. This is needed to @@ -444,10 +449,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = aChainSyncServer :: NodeToNodeVersion + -> ControlMessageSTM m -> remotePeer -> Channel m bCS -> m ((), Maybe bCS) - aChainSyncServer version them channel = do + aChainSyncServer version _controlMessageSTM them channel = do labelThisThread "ChainSyncServer" withRegistry $ \registry -> do chainSyncTimeout <- genChainSyncTimeout @@ -462,10 +468,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = aBlockFetchClient :: NodeToNodeVersion + -> ControlMessageSTM m -> remotePeer -> Channel m bBF -> m ((), Maybe bBF) - aBlockFetchClient version them channel = do + aBlockFetchClient version _controlMessageSTM them channel = do labelThisThread "BlockFetchClient" bracketFetchClient (getFetchClientRegistry kernel) them $ \clientCtx -> runPipelinedPeerWithLimits @@ -478,10 +485,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = aBlockFetchServer :: NodeToNodeVersion + -> ControlMessageSTM m -> remotePeer -> Channel m bBF -> m ((), Maybe bBF) - aBlockFetchServer version them channel = do + aBlockFetchServer version _controlMessageSTM them channel = do labelThisThread "BlockFetchServer" withRegistry $ \registry -> runPeerWithLimits @@ -495,10 +503,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = aTxSubmissionClient :: NodeToNodeVersion + -> ControlMessageSTM m -> remotePeer -> Channel m bTX -> m ((), Maybe bTX) - aTxSubmissionClient version them channel = do + aTxSubmissionClient version _controlMessageSTM them channel = do labelThisThread "TxSubmissionClient" runPeerWithLimits (contramap (TraceLabelPeer them) tTxSubmissionTracer) @@ -510,10 +519,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = aTxSubmissionServer :: NodeToNodeVersion + -> ControlMessageSTM m -> remotePeer -> Channel m bTX -> m ((), Maybe bTX) - aTxSubmissionServer version them channel = do + aTxSubmissionServer version _controlMessageSTM them channel = do labelThisThread "TxSubmissionServer" runPipelinedPeerWithLimits (contramap (TraceLabelPeer them) tTxSubmissionTracer) @@ -525,10 +535,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = aKeepAliveClient :: NodeToNodeVersion + -> ControlMessageSTM m -> remotePeer -> Channel m bKA -> m ((), Maybe bKA) - aKeepAliveClient version them channel = do + aKeepAliveClient version _controlMessageSTM them channel = do labelThisThread "KeepAliveClient" let kacApp = case version of -- Version 1 doesn't support keep alive protocol but Blockfetch @@ -550,10 +561,11 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = aKeepAliveServer :: NodeToNodeVersion + -> ControlMessageSTM m -> remotePeer -> Channel m bKA -> m ((), Maybe bKA) - aKeepAliveServer _version _them channel = do + aKeepAliveServer _version _controlMessageSTM _them channel = do labelThisThread "KeepAliveServer" runPeerWithLimits nullTracer @@ -588,15 +600,15 @@ initiator miniProtocolParameters version Apps {..} = -- p2p-governor & connection-manager. Then consenus can use peer's ip -- address & port number, rather than 'ConnectionId' (which is -- a quadruple uniquely determinaing a connection). - (\them _shouldStopSTM -> NodeToNodeProtocols { + (\them controlMessageSTM -> NodeToNodeProtocols { chainSyncProtocol = - (InitiatorProtocolOnly (MuxPeerRaw (aChainSyncClient version them))), + (InitiatorProtocolOnly (MuxPeerRaw (aChainSyncClient version controlMessageSTM them))), blockFetchProtocol = - (InitiatorProtocolOnly (MuxPeerRaw (aBlockFetchClient version them))), + (InitiatorProtocolOnly (MuxPeerRaw (aBlockFetchClient version controlMessageSTM them))), txSubmissionProtocol = - (InitiatorProtocolOnly (MuxPeerRaw (aTxSubmissionClient version them))), + (InitiatorProtocolOnly (MuxPeerRaw (aTxSubmissionClient version controlMessageSTM them))), keepAliveProtocol = - (InitiatorProtocolOnly (MuxPeerRaw (aKeepAliveClient version them))) + (InitiatorProtocolOnly (MuxPeerRaw (aKeepAliveClient version controlMessageSTM them))) }) version @@ -612,14 +624,14 @@ responder responder miniProtocolParameters version Apps {..} = nodeToNodeProtocols miniProtocolParameters - (\them _shouldStopSTM -> NodeToNodeProtocols { + (\them controlMessageSTM -> NodeToNodeProtocols { chainSyncProtocol = - (ResponderProtocolOnly (MuxPeerRaw (aChainSyncServer version them))), + (ResponderProtocolOnly (MuxPeerRaw (aChainSyncServer version controlMessageSTM them))), blockFetchProtocol = - (ResponderProtocolOnly (MuxPeerRaw (aBlockFetchServer version them))), + (ResponderProtocolOnly (MuxPeerRaw (aBlockFetchServer version controlMessageSTM them))), txSubmissionProtocol = - (ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version them))), + (ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer version controlMessageSTM them))), keepAliveProtocol = - (ResponderProtocolOnly (MuxPeerRaw (aKeepAliveServer version them))) + (ResponderProtocolOnly (MuxPeerRaw (aKeepAliveServer version controlMessageSTM them))) }) version From 8be435e5d57aa1f149facdd62e291bd8e020b401 Mon Sep 17 00:00:00 2001 From: Thomas Winant Date: Fri, 21 Aug 2020 11:42:34 +0200 Subject: [PATCH 5/5] ChainSyncClient: respond to ControlMessageSTM --- .../MiniProtocol/ChainSync/Client.hs | 5 +- .../MiniProtocol/ChainSync/Client.hs | 54 ++++++++++++------- .../Ouroboros/Consensus/Network/NodeToNode.hs | 5 +- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs index 699c87e3607..03dde3ce722 100644 --- a/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/ChainSync/Client.hs @@ -27,10 +27,9 @@ import Control.Monad.IOSim (runSimOrThrow) import Cardano.Crypto.DSIGN.Mock -import Ouroboros.Network.Block (getTipPoint) - import Ouroboros.Network.AnchoredFragment (AnchoredFragment) import qualified Ouroboros.Network.AnchoredFragment as AF +import Ouroboros.Network.Block (getTipPoint) import Ouroboros.Network.Channel import Ouroboros.Network.Driver import Ouroboros.Network.MockChain.Chain (Chain (Genesis)) @@ -38,6 +37,7 @@ import qualified Ouroboros.Network.MockChain.Chain as Chain import Ouroboros.Network.MockChain.ProducerState (chainState, initChainProducerState) import qualified Ouroboros.Network.MockChain.ProducerState as CPS +import Ouroboros.Network.Mux (ControlMessage (..)) import Ouroboros.Network.Protocol.ChainSync.ClientPipelined import Ouroboros.Network.Protocol.ChainSync.Codec (codecChainSyncId) import Ouroboros.Network.Protocol.ChainSync.Examples @@ -292,6 +292,7 @@ runChainSync securityParam (ClientUpdates clientUpdates) nodeCfg chainDbView maxBound + (return Continue) -- Set up the server varChainProducerState <- uncheckedNewTVarM $ initChainProducerState Genesis diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index 69436ae7bcb..b511d43a2a6 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -55,6 +55,7 @@ import Network.TypedProtocol.Pipelined import Ouroboros.Network.AnchoredFragment (AnchoredFragment (..)) import qualified Ouroboros.Network.AnchoredFragment as AF import Ouroboros.Network.Block (Tip, getTipBlockNo, getTipPoint) +import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM) import Ouroboros.Network.Protocol.ChainSync.ClientPipelined import Ouroboros.Network.Protocol.ChainSync.PipelineDecision @@ -342,6 +343,7 @@ chainSyncClient -> TopLevelConfig blk -> ChainDbView m blk -> NodeToNodeVersion + -> ControlMessageSTM m -> StrictTVar m (AnchoredFragment (Header blk)) -> Consensus ChainSyncClientPipelined blk m chainSyncClient mkPipelineDecision0 tracer cfg @@ -351,8 +353,9 @@ chainSyncClient mkPipelineDecision0 tracer cfg , getOurTip , getIsInvalidBlock } - _version - varCandidate = ChainSyncClientPipelined $ + _version + controlMessageSTM + varCandidate = ChainSyncClientPipelined $ continueWithState () $ initialise where -- | Start ChainSync by looking for an intersection between our current @@ -516,6 +519,9 @@ chainSyncClient mkPipelineDecision0 tracer cfg -- This is the main place we check whether our current chain has changed. -- We also check it in 'rollForward' to make sure we have an up-to-date -- intersection before calling 'getLedgerView'. + -- + -- This is also the place where we checked whether we're asked to terminate + -- by the mux layer. nextStep :: MkPipelineDecision -> Nat n -> Their (Tip blk) @@ -523,22 +529,28 @@ chainSyncClient mkPipelineDecision0 tracer cfg (KnownIntersectionState blk) (ClientPipelinedStIdle n) nextStep mkPipelineDecision n theirTip = Stateful $ \kis -> do - mKis' <- atomically $ intersectsWithCurrentChain kis - case mKis' of - Just kis'@KnownIntersectionState { theirFrag } -> do - -- Our chain (tip) didn't change or if it did, it still intersects - -- with the candidate fragment, so we can continue requesting the - -- next block. - atomically $ writeTVar varCandidate theirFrag - let candTipBlockNo = AF.headBlockNo theirFrag - return $ requestNext kis' mkPipelineDecision n theirTip candTipBlockNo - Nothing -> - -- Our chain (tip) has changed and it no longer intersects with the - -- candidate fragment, so we have to find a new intersection, but - -- first drain the pipe. - continueWithState () - $ drainThePipe n - $ findIntersection NoMoreIntersection + atomically controlMessageSTM >>= \case + -- We have been asked to terminate the client + Terminate -> + terminateAfterDrain n $ AskedToTerminate + _continue -> do + mKis' <- atomically $ intersectsWithCurrentChain kis + case mKis' of + Just kis'@KnownIntersectionState { theirFrag } -> do + -- Our chain (tip) didn't change or if it did, it still intersects + -- with the candidate fragment, so we can continue requesting the + -- next block. + atomically $ writeTVar varCandidate theirFrag + let candTipBlockNo = AF.headBlockNo theirFrag + return $ + requestNext kis' mkPipelineDecision n theirTip candTipBlockNo + Nothing -> + -- Our chain (tip) has changed and it no longer intersects with + -- the candidate fragment, so we have to find a new intersection, + -- but first drain the pipe. + continueWithState () + $ drainThePipe n + $ findIntersection NoMoreIntersection -- | "Drain the pipe": collect and discard all in-flight responses and -- finally execute the given action. @@ -1015,6 +1027,9 @@ data ChainSyncClientResult = (Our (Tip blk)) (Their (Tip blk)) + -- | We were asked to terminate via the 'ControlMessageSTM' + | AskedToTerminate + deriving instance Show ChainSyncClientResult instance Eq ChainSyncClientResult where @@ -1042,6 +1057,9 @@ instance Eq ChainSyncClientResult where Just Refl -> (a, b, c) == (a', b', c') RolledBackPastIntersection{} == _ = False + AskedToTerminate == AskedToTerminate = True + AskedToTerminate == _ = False + {------------------------------------------------------------------------------- Exception -------------------------------------------------------------------------------} diff --git a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs index 00045fef928..64c07fb8658 100644 --- a/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs @@ -101,6 +101,7 @@ import Ouroboros.Consensus.Storage.ChainDB.Serialisation data Handlers m peer blk = Handlers { hChainSyncClient :: NodeToNodeVersion + -> ControlMessageSTM m -> StrictTVar m (AnchoredFragment (Header blk)) -> ChainSyncClientPipelined (Header blk) (Tip blk) m ChainSyncClientResult -- TODO: we should consider either bundling these context parameters @@ -421,7 +422,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = -> remotePeer -> Channel m bCS -> m ((), Maybe bCS) - aChainSyncClient version _controlMessageSTM them channel = do + aChainSyncClient version controlMessageSTM them channel = do labelThisThread "ChainSyncClient" -- Note that it is crucial that we sync with the fetch client "outside" -- of registering the state for the sync client. This is needed to @@ -444,7 +445,7 @@ mkApps kernel Tracers {..} Codecs {..} genChainSyncTimeout Handlers {..} = (timeLimitsChainSync chainSyncTimeout) channel $ chainSyncClientPeerPipelined - $ hChainSyncClient version varCandidate + $ hChainSyncClient version controlMessageSTM varCandidate return ((), trailing) aChainSyncServer