From 24c06f4d8ace18d6322a99a54e7ca218cb34ccad Mon Sep 17 00:00:00 2001 From: Kirilll Zaborsky Date: Sun, 1 Dec 2013 22:49:57 +0400 Subject: [PATCH 1/7] Added cabal sandbox files + TAGS to .gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 61454c0..b52ac36 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ cabal-dev .dist-buildwrapper .project .settings +.cabal-sandbox +cabal.sandbox.config +TAGS \ No newline at end of file From 0192cd1c2420490801a331e4364dd494777f58ea Mon Sep 17 00:00:00 2001 From: Kirilll Zaborsky Date: Sun, 1 Dec 2013 22:52:19 +0400 Subject: [PATCH 2/7] Added commit index to types and to Follower code --- src/Network/Kontiki/Monad.hs | 4 ++++ src/Network/Kontiki/Raft.hs | 15 +++++++++------ src/Network/Kontiki/Raft/Candidate.hs | 19 +++++++++++++------ src/Network/Kontiki/Raft/Follower.hs | 15 +++++++++++---- src/Network/Kontiki/Raft/Leader.hs | 22 ++++++++++++++-------- src/Network/Kontiki/Raft/Utils.hs | 7 ++++--- src/Network/Kontiki/Types.hs | 17 ++++++++++++----- 7 files changed, 67 insertions(+), 32 deletions(-) diff --git a/src/Network/Kontiki/Monad.hs b/src/Network/Kontiki/Monad.hs index 20102bd..c9647e4 100644 --- a/src/Network/Kontiki/Monad.hs +++ b/src/Network/Kontiki/Monad.hs @@ -88,6 +88,10 @@ truncateLog i = tell [CTruncateLog i] logEntries :: Monad m => [Entry a] -> TransitionT a f m () logEntries es = tell [CLogEntries es] +-- | Sets new commit `Index' `i' +setCommitIndex :: Monad m => Index -> TransitionT a f m () +setCommitIndex i = tell [CSetCommitIndex i] + -- | Handler of events. type Handler a s m = Event a -- ^ `Event' to handle diff --git a/src/Network/Kontiki/Raft.hs b/src/Network/Kontiki/Raft.hs index 94f38ea..ca59bf4 100644 --- a/src/Network/Kontiki/Raft.hs +++ b/src/Network/Kontiki/Raft.hs @@ -54,7 +54,9 @@ handle config state event = case state of -- | Initial state of all nodes. initialState :: SomeState -initialState = wrap FollowerState {_fCurrentTerm = term0, _fVotedFor = Nothing} +initialState = wrap FollowerState {_fCurrentTerm = term0 + , _fCommitIndex = index0 + , _fVotedFor = Nothing} -- | Restores the node to initial (`Follower') mode -- and resets the election timeout. This function is useful @@ -70,12 +72,13 @@ restore :: Config -- ^ configuration of the cluster -> (SomeState, [Command a]) -- ^ new state and list of commands restore cfg s = case s of WrapState(Follower _) -> (s, commands) - WrapState(Candidate s') -> (toFollower (s' ^. cCurrentTerm), commands) - WrapState(Leader s') -> (toFollower (s' ^. lCurrentTerm), commands) + WrapState(Candidate s') -> (toFollower (s' ^. cCurrentTerm) (s' ^. cCommitIndex), commands) + WrapState(Leader s') -> (toFollower (s' ^. lCurrentTerm) (s' ^. lCommitIndex), commands) where - toFollower t = wrap FollowerState { _fCurrentTerm = t - , _fVotedFor = Just nodeId - } + toFollower t i = wrap FollowerState { _fCurrentTerm = t + , _fCommitIndex = i + , _fVotedFor = Just nodeId + } nodeId = cfg ^. configNodeId et = cfg ^. configElectionTimeout commands :: forall a. [Command a] diff --git a/src/Network/Kontiki/Raft/Candidate.hs b/src/Network/Kontiki/Raft/Candidate.hs index 38cd14a..9681ece 100644 --- a/src/Network/Kontiki/Raft/Candidate.hs +++ b/src/Network/Kontiki/Raft/Candidate.hs @@ -18,7 +18,7 @@ import qualified Data.Set as Set import Data.ByteString.Char8 () -import Control.Lens +import Control.Lens (use, view, (%=)) import Network.Kontiki.Log import Network.Kontiki.Types @@ -30,9 +30,10 @@ import qualified Network.Kontiki.Raft.Leader as Leader handleRequestVote :: (Functor m, Monad m) => MessageHandler RequestVote a Candidate m handleRequestVote sender RequestVote{..} = do currentTerm <- use cCurrentTerm + commitIndex <- use cCommitIndex if rvTerm > currentTerm - then stepDown sender rvTerm + then stepDown sender rvTerm commitIndex else do logS "Not granting vote" send sender $ RequestVoteResponse { rvrTerm = currentTerm @@ -45,12 +46,13 @@ handleRequestVoteResponse :: (Functor m, Monad m, MonadLog m a) => MessageHandler RequestVoteResponse a Candidate m handleRequestVoteResponse sender RequestVoteResponse{..} = do currentTerm <- use cCurrentTerm + commitIndex <- use cCommitIndex votes <- use cVotes if | rvrTerm < currentTerm -> do logS "Ignoring RequestVoteResponse for old term" currentState - | rvrTerm > currentTerm -> stepDown sender rvrTerm + | rvrTerm > currentTerm -> stepDown sender rvrTerm commitIndex | not rvrVoteGranted -> do logS "Ignoring RequestVoteResponse since vote wasn't granted" currentState @@ -69,18 +71,19 @@ handleRequestVoteResponse sender RequestVoteResponse{..} = do currentState else do logS "Reached a majority, becoming Leader" - Leader.stepUp currentTerm + Leader.stepUp currentTerm commitIndex -- | Handles `AppendEntries'. handleAppendEntries :: (Functor m, Monad m) => MessageHandler (AppendEntries a) a Candidate m handleAppendEntries sender AppendEntries{..} = do currentTerm <- use cCurrentTerm + commitIndex <- use cCommitIndex if currentTerm <= aeTerm then do logS "Received AppendEntries for current or newer term" - stepDown sender aeTerm + stepDown sender aeTerm commitIndex else do logS "Ignoring AppendEntries for old term" currentState @@ -102,6 +105,7 @@ handleElectionTimeout = do nodeId <- view configNodeId nextTerm <- succTerm `fmap` use cCurrentTerm + commitIndex <- use cCommitIndex e <- logLastEntry let lastIndex = maybe index0 eIndex e @@ -114,6 +118,7 @@ handleElectionTimeout = do } return $ wrap CandidateState { _cCurrentTerm = nextTerm + , _cCommitIndex = commitIndex , _cVotes = Set.singleton nodeId } @@ -139,8 +144,9 @@ handle = handleGeneric -- and resets the election timer. stepUp :: (Functor m, Monad m, MonadLog m a) => Term + -> Index -> TransitionT a s m SomeState -stepUp term = do +stepUp term commitIndex = do logS "Becoming candidate" resetElectionTimeout @@ -158,5 +164,6 @@ stepUp term = do } return $ wrap CandidateState { _cCurrentTerm = term + , _cCommitIndex = commitIndex , _cVotes = Set.singleton nodeId } diff --git a/src/Network/Kontiki/Raft/Follower.hs b/src/Network/Kontiki/Raft/Follower.hs index ca11626..21160a6 100644 --- a/src/Network/Kontiki/Raft/Follower.hs +++ b/src/Network/Kontiki/Raft/Follower.hs @@ -20,6 +20,8 @@ import Data.ByteString.Char8 () import Control.Lens +import Control.Monad (when) + import Network.Kontiki.Log import Network.Kontiki.Types import Network.Kontiki.Monad @@ -87,9 +89,10 @@ handleRequestVoteResponse :: (Functor m, Monad m) => MessageHandler RequestVoteResponse a Follower m handleRequestVoteResponse sender RequestVoteResponse{..} = do currentTerm <- use fCurrentTerm + commitIndex <- use fCommitIndex if rvrTerm > currentTerm - then stepDown sender rvrTerm + then stepDown sender rvrTerm commitIndex else currentState -- | Handles `AppendEntries'. @@ -97,10 +100,11 @@ handleAppendEntries :: (Functor m, Monad m, MonadLog m a) => MessageHandler (AppendEntries a) a Follower m handleAppendEntries sender AppendEntries{..} = do currentTerm <- use fCurrentTerm + commitIndex <- use fCommitIndex e <- logLastEntry let lastIndex = maybe index0 eIndex e - if | aeTerm > currentTerm -> stepDown sender aeTerm + if | aeTerm > currentTerm -> stepDown sender aeTerm commitIndex | aeTerm < currentTerm -> do send sender $ AppendEntriesResponse { aerTerm = currentTerm , aerSuccess = False @@ -130,6 +134,8 @@ handleAppendEntries sender AppendEntries{..} = do return $ eIndex $ last es else return lastIndex + when (commitIndex /= aeCommitIndex) $ setCommitIndex aeCommitIndex + send sender $ AppendEntriesResponse { aerTerm = aeTerm , aerSuccess = True , aerLastIndex = lastIndex' @@ -175,14 +181,15 @@ handleElectionTimeout = do currentTerm <- use fCurrentTerm let nextTerm = succTerm currentTerm + commitIndex <- use fCommitIndex fCurrentTerm .= nextTerm quorum <- quorumSize if quorum == 1 - then Leader.stepUp nextTerm - else Candidate.stepUp nextTerm + then Leader.stepUp nextTerm commitIndex + else Candidate.stepUp nextTerm commitIndex -- | Handles `HeartbeatTimeout'. handleHeartbeatTimeout :: (Functor m, Monad m) diff --git a/src/Network/Kontiki/Raft/Leader.hs b/src/Network/Kontiki/Raft/Leader.hs index c01217c..0495e06 100644 --- a/src/Network/Kontiki/Raft/Leader.hs +++ b/src/Network/Kontiki/Raft/Leader.hs @@ -36,9 +36,10 @@ handleRequestVote :: (Functor m, Monad m) => MessageHandler RequestVote a Leader m handleRequestVote sender RequestVote{..} = do currentTerm <- use lCurrentTerm + commitIndex <- use lCommitIndex if rvTerm > currentTerm - then stepDown sender rvTerm + then stepDown sender rvTerm commitIndex else do logS "Not granting vote" send sender $ RequestVoteResponse { rvrTerm = currentTerm @@ -51,9 +52,10 @@ handleRequestVoteResponse :: (Functor m, Monad m) => MessageHandler RequestVoteResponse a Leader m handleRequestVoteResponse sender RequestVoteResponse{..} = do currentTerm <- use lCurrentTerm + commitIndex <- use lCommitIndex if rvrTerm > currentTerm - then stepDown sender rvrTerm + then stepDown sender rvrTerm commitIndex else currentState -- | Handles `AppendEntries'. @@ -61,9 +63,10 @@ handleAppendEntries :: (Functor m, Monad m) => MessageHandler (AppendEntries a) a Leader m handleAppendEntries sender AppendEntries{..} = do currentTerm <- use lCurrentTerm + commitIndex <- use lCommitIndex if aeTerm > currentTerm - then stepDown sender aeTerm + then stepDown sender aeTerm commitIndex else currentState -- | Handles `AppendEntriesResponse'. @@ -71,11 +74,12 @@ handleAppendEntriesResponse :: (Functor m, Monad m) => MessageHandler AppendEntriesResponse a Leader m handleAppendEntriesResponse sender AppendEntriesResponse{..} = do currentTerm <- use lCurrentTerm + commitIndex <- use lCommitIndex if | aerTerm < currentTerm -> do logS "Ignoring old AppendEntriesResponse" currentState - | aerTerm > currentTerm -> stepDown sender aerTerm + | aerTerm > currentTerm -> stepDown sender aerTerm commitIndex | not aerSuccess -> do lNextIndex %= Map.alter (\i -> Just $ maybe index0 prevIndex i) sender currentState @@ -184,9 +188,10 @@ handle = handleGeneric -- | Transitions into `MLeader' mode by broadcasting heartbeat `AppendEntries' -- to all nodes and changing state to `LeaderState'. stepUp :: (Functor m, Monad m, MonadLog m a) - => Term -- ^ `Term' of the `Leader' + => Term -- ^ `Term' of the `Leader' + -> Index -- ^ commit `Index' -> TransitionT a f m SomeState -stepUp t = do +stepUp term commitIndex = do logS "Becoming leader" resetHeartbeatTimeout @@ -197,7 +202,7 @@ stepUp t = do nodeId <- view configNodeId - broadcast $ AppendEntries { aeTerm = t + broadcast $ AppendEntries { aeTerm = term , aeLeaderId = nodeId , aePrevLogIndex = lastIndex , aePrevLogTerm = lastTerm @@ -209,7 +214,8 @@ stepUp t = do let ni = Map.fromList $ map (\n -> (n, succIndex lastIndex)) (Set.toList nodes) li = Map.fromList $ map (\n -> (n, index0)) (Set.toList nodes) - return $ wrap $ LeaderState { _lCurrentTerm = t + return $ wrap $ LeaderState { _lCurrentTerm = term + , _lCommitIndex = commitIndex , _lNextIndex = ni , _lLastIndex = li } diff --git a/src/Network/Kontiki/Raft/Utils.hs b/src/Network/Kontiki/Raft/Utils.hs index 51601e5..25400c7 100644 --- a/src/Network/Kontiki/Raft/Utils.hs +++ b/src/Network/Kontiki/Raft/Utils.hs @@ -21,7 +21,7 @@ import Data.ByteString.Char8 () import Control.Monad.State.Class (get) -import Control.Lens +import Control.Lens (view) import Network.Kontiki.Types import Network.Kontiki.Monad @@ -84,8 +84,8 @@ isMajority votes = do -- `MFollower' mode. -- -- Can't have this in Follower due to recursive imports, bummer -stepDown :: Monad m => NodeId -> Term -> TransitionT a f m SomeState -stepDown sender term = do +stepDown :: Monad m => NodeId -> Term -> Index -> TransitionT a f m SomeState +stepDown sender term commitIndex = do logS "Stepping down to Follower state" resetElectionTimeout @@ -95,5 +95,6 @@ stepDown sender term = do } return $ wrap FollowerState { _fCurrentTerm = term + , _fCommitIndex = commitIndex , _fVotedFor = Just sender } diff --git a/src/Network/Kontiki/Types.hs b/src/Network/Kontiki/Types.hs index 93ce02a..895f679 100644 --- a/src/Network/Kontiki/Types.hs +++ b/src/Network/Kontiki/Types.hs @@ -33,9 +33,9 @@ module Network.Kontiki.Types ( , Config(..), configNodeId, configNodes, configElectionTimeout, configHeartbeatTimeout -- * Node states - , FollowerState(..), fCurrentTerm, fVotedFor - , CandidateState(..), cCurrentTerm, cVotes - , LeaderState(..), lCurrentTerm, lNextIndex, lLastIndex + , FollowerState(..), fCurrentTerm, fCommitIndex, fVotedFor + , CandidateState(..), cCurrentTerm, cCommitIndex, cVotes + , LeaderState(..), lCurrentTerm, lCommitIndex, lNextIndex, lLastIndex , Mode(..), mode , Follower, Candidate, Leader , State(..), SomeState(..), InternalState @@ -189,6 +189,7 @@ arbitraryBS = BS8.pack `fmap` listOf1 arbitrary -- | State kept when in `Follower' mode. data FollowerState = FollowerState { _fCurrentTerm :: Term + , _fCommitIndex :: Index , _fVotedFor :: Maybe NodeId } deriving (Show, Eq, Generic) @@ -199,10 +200,11 @@ instance Binary FollowerState instance Arbitrary FollowerState where arbitrary = do n <- arbitraryBS - FollowerState <$> arbitrary <*> elements [Nothing, Just n] + FollowerState <$> arbitrary <*> arbitrary <*> elements [Nothing, Just n] -- | State kept when in `Candidate' mode. data CandidateState = CandidateState { _cCurrentTerm :: Term + , _cCommitIndex :: Index , _cVotes :: NodeSet } deriving (Show, Eq, Generic) @@ -213,10 +215,11 @@ instance Binary CandidateState instance Arbitrary CandidateState where arbitrary = do v <- Set.fromList `fmap` listOf1 arbitraryBS - CandidateState <$> arbitrary <*> pure v + CandidateState <$> arbitrary <*> arbitrary <*> pure v -- | State kept when in `Leader' mode. data LeaderState = LeaderState { _lCurrentTerm :: Term + , _lCommitIndex :: Index , _lNextIndex :: Map NodeId Index , _lLastIndex :: Map NodeId Index } @@ -227,6 +230,7 @@ instance Binary LeaderState instance Arbitrary LeaderState where arbitrary = LeaderState <$> arbitrary + <*> arbitrary <*> (Map.fromList `fmap` (listOf1 $ (,) <$> arbitraryBS <*> arbitrary)) <*> (Map.fromList `fmap` (listOf1 $ (,) <$> arbitraryBS <*> arbitrary)) -- | Running modes. @@ -381,6 +385,7 @@ data Command a = CBroadcast (Message a) -- ^ Broadcast a `Message' to all | CLog Builder -- ^ Log a message | CTruncateLog Index -- ^ Truncate the log to given `Index' | CLogEntries [Entry a] -- ^ Append some entries to the log + | CSetCommitIndex Index -- ^ Set new commit `Index' {-| Manually created `Show' instance for `Command'. @@ -410,6 +415,8 @@ instance Show a => Show (Command a) where . showsPrec 11 i CLogEntries es -> showString "CLogEntries " . showsPrec 11 es + CSetCommitIndex i -> showString "CSetCommitIndex " + . showsPrec 11 i instance Arbitrary a => Arbitrary (Command a) where arbitrary = do From 92f86b286fdee4d105c1dfcbf43115dcd178666c Mon Sep 17 00:00:00 2001 From: Kirilll Zaborsky Date: Sun, 1 Dec 2013 23:18:38 +0400 Subject: [PATCH 3/7] CSetCommitIndex generation for leader --- src/Network/Kontiki/Raft/Leader.hs | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/Network/Kontiki/Raft/Leader.hs b/src/Network/Kontiki/Raft/Leader.hs index 0495e06..39cc5c6 100644 --- a/src/Network/Kontiki/Raft/Leader.hs +++ b/src/Network/Kontiki/Raft/Leader.hs @@ -70,7 +70,7 @@ handleAppendEntries sender AppendEntries{..} = do else currentState -- | Handles `AppendEntriesResponse'. -handleAppendEntriesResponse :: (Functor m, Monad m) +handleAppendEntriesResponse :: (Functor m, Monad m, MonadLog m a) => MessageHandler AppendEntriesResponse a Leader m handleAppendEntriesResponse sender AppendEntriesResponse{..} = do currentTerm <- use lCurrentTerm @@ -90,8 +90,19 @@ handleAppendEntriesResponse sender AppendEntriesResponse{..} = do when (aerLastIndex >= li) $ do lLastIndex %= Map.insert sender aerLastIndex lNextIndex %= Map.insert sender aerLastIndex + newQuorumIndex <- quorumIndex + when (newQuorumIndex > commitIndex) $ setCommitIndex newQuorumIndex currentState +-- | Calculates current quorum `Index' from nodes' latest indices +quorumIndex :: (Functor m, Monad m, MonadLog m a) + => TransitionT a LeaderState m Index +quorumIndex = do + lastIndices <- Map.elems `fmap` use lLastIndex + let sorted = sortBy (\a b -> compare b a) lastIndices + quorum <- quorumSize + return $ sorted !! (quorum - 1) + -- | Handles `ElectionTimeout'. handleElectionTimeout :: (Functor m, Monad m) => TimeoutHandler ElectionTimeout a Leader m @@ -103,27 +114,13 @@ handleHeartbeatTimeout :: (Functor m, Monad m, MonadLog m a) handleHeartbeatTimeout = do resetHeartbeatTimeout - currentTerm <- use lCurrentTerm + commitIndex <- use lCommitIndex lastEntry <- logLastEntry nodeId <- view configNodeId lLastIndex %= Map.insert nodeId (maybe index0 eIndex lastEntry) - lastIndices <- Map.elems `fmap` use lLastIndex - let sorted = sortBy (\a b -> compare b a) lastIndices - quorum <- quorumSize - let quorumIndex = sorted !! (quorum - 1) - - -- TODO Check paper. CommitIndex can only be in current term if there's - -- a prior accepted item in the same term? - - e <- logEntry quorumIndex - let commitIndex = - if maybe term0 eTerm e >= currentTerm - then quorumIndex - else index0 - nodes <- view configNodes let otherNodes = filter (/= nodeId) (Set.toList nodes) mapM_ (sendAppendEntries lastEntry commitIndex) otherNodes From 682d6f06a6a0b55dc11211514813bdfc72578007 Mon Sep 17 00:00:00 2001 From: Kirilll Zaborsky Date: Sun, 1 Dec 2013 23:25:42 +0400 Subject: [PATCH 4/7] Some cleanup of 'udp' example --- bin/udp.hs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/bin/udp.hs b/bin/udp.hs index 1d75cf3..06faa62 100644 --- a/bin/udp.hs +++ b/bin/udp.hs @@ -14,7 +14,6 @@ import qualified Data.Set as Set import Control.Concurrent (forkIO) import Control.Concurrent.STM -import Control.Concurrent.STM.TMVar import qualified Data.ByteString.Char8 as BS8 import qualified Data.ByteString.Lazy as LBS @@ -24,11 +23,10 @@ import qualified Data.ByteString.Lazy.Builder as Builder import System.Environment (getArgs) import System.Random (randomRIO) -import Network.Socket (SockAddr(SockAddrInet), PortNumber(PortNum), inet_addr) +import Network.Socket (SockAddr(SockAddrInet), inet_addr) import Control.Lens -import Data.Binary (Binary) import qualified Data.Binary as B import Data.Conduit @@ -36,11 +34,11 @@ import qualified Data.Conduit.List as CL import qualified Data.Conduit.Network.UDP as CNU import Network.Kontiki.Raft - ( Command(..), Config(..), Entry(..), Event(..), MonadLog(..), + ( Command(..), Config(..), Entry(..), Event(..), Message, NodeId, SomeState, unIndex) import qualified Network.Kontiki.Raft as Raft -import Data.Kontiki.MemLog (Log, MemLog, runMemLog) +import Data.Kontiki.MemLog (Log, runMemLog) import qualified Data.Kontiki.MemLog as MemLog import Data.STM.RollingQueue (RollingQueue) @@ -146,8 +144,8 @@ run config' s ps = do ps'' <- case s' of Raft.WrapState (Raft.Leader ls) -> do let l = psLog ps' - s = IntMap.size l - (_, m) = if s /= 0 + size = IntMap.size l + (_, m) = if size /= 0 then IntMap.findMax l else (0, Entry { eTerm = Raft.term0 , eIndex = Raft.index0 @@ -155,7 +153,7 @@ run config' s ps = do }) e = Entry { eTerm = ls ^. Raft.lCurrentTerm , eIndex = Raft.succIndex (eIndex m) - , eValue = (config' ^. Raft.configNodeId, s) + , eValue = (config' ^. Raft.configNodeId, size) } l' = IntMap.insert (fromIntegral $ unIndex $ eIndex e) e l return $ ps' { psLog = l' } From 5f8da1b8ae55fb677d1b11e0d4489726ee40e5e1 Mon Sep 17 00:00:00 2001 From: Kirilll Zaborsky Date: Mon, 2 Dec 2013 01:55:59 +0400 Subject: [PATCH 5/7] Forgot to update commit index in node state --- src/Network/Kontiki/Raft/Follower.hs | 4 +++- src/Network/Kontiki/Raft/Leader.hs | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Network/Kontiki/Raft/Follower.hs b/src/Network/Kontiki/Raft/Follower.hs index 21160a6..0838838 100644 --- a/src/Network/Kontiki/Raft/Follower.hs +++ b/src/Network/Kontiki/Raft/Follower.hs @@ -134,7 +134,9 @@ handleAppendEntries sender AppendEntries{..} = do return $ eIndex $ last es else return lastIndex - when (commitIndex /= aeCommitIndex) $ setCommitIndex aeCommitIndex + when (commitIndex /= aeCommitIndex) $ do + fCommitIndex .= aeCommitIndex + setCommitIndex aeCommitIndex send sender $ AppendEntriesResponse { aerTerm = aeTerm , aerSuccess = True diff --git a/src/Network/Kontiki/Raft/Leader.hs b/src/Network/Kontiki/Raft/Leader.hs index 39cc5c6..41e684c 100644 --- a/src/Network/Kontiki/Raft/Leader.hs +++ b/src/Network/Kontiki/Raft/Leader.hs @@ -31,6 +31,7 @@ import Network.Kontiki.Types import Network.Kontiki.Monad import Network.Kontiki.Raft.Utils + -- | Handles `RequestVote'. handleRequestVote :: (Functor m, Monad m) => MessageHandler RequestVote a Leader m @@ -91,7 +92,9 @@ handleAppendEntriesResponse sender AppendEntriesResponse{..} = do lLastIndex %= Map.insert sender aerLastIndex lNextIndex %= Map.insert sender aerLastIndex newQuorumIndex <- quorumIndex - when (newQuorumIndex > commitIndex) $ setCommitIndex newQuorumIndex + when (newQuorumIndex > commitIndex) $ do + lCommitIndex .= newQuorumIndex + setCommitIndex newQuorumIndex currentState -- | Calculates current quorum `Index' from nodes' latest indices From 289448b45a1302b708c3f0a4e06938f15830b33d Mon Sep 17 00:00:00 2001 From: Kirilll Zaborsky Date: Mon, 2 Dec 2013 01:57:02 +0400 Subject: [PATCH 6/7] Updated 'udp' example to use CSetCommitIndex --- bin/udp.hs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/bin/udp.hs b/bin/udp.hs index 06faa62..61701b5 100644 --- a/bin/udp.hs +++ b/bin/udp.hs @@ -1,6 +1,7 @@ {-# LANGUAGE OverloadedStrings, ScopedTypeVariables, - GADTs #-} + GADTs, + MultiWayIf #-} module Main (main) where import Control.Monad (foldM, forM, void) @@ -25,7 +26,7 @@ import System.Random (randomRIO) import Network.Socket (SockAddr(SockAddrInet), inet_addr) -import Control.Lens +import Control.Lens hiding (Index) import qualified Data.Binary as B @@ -35,7 +36,7 @@ import qualified Data.Conduit.Network.UDP as CNU import Network.Kontiki.Raft ( Command(..), Config(..), Entry(..), Event(..), - Message, NodeId, SomeState, unIndex) + Message, NodeId, SomeState, Index, index0, succIndex, unIndex) import qualified Network.Kontiki.Raft as Raft import Data.Kontiki.MemLog (Log, runMemLog) @@ -69,6 +70,7 @@ data PlumbingState = PS { psElectionTimer :: Timer , psMessages :: RollingQueue (Event Value) , psChannels :: Map NodeId (RollingQueue (Message Value)) , psLog :: Log Value + , psCommitIndex :: Index } queueSize :: Int @@ -85,6 +87,7 @@ newPlumbingState channels = do , psMessages = q , psChannels = channels , psLog = MemLog.empty + , psCommitIndex = index0 } handleCommand :: PlumbingState -> Command Value -> IO PlumbingState @@ -123,6 +126,19 @@ handleCommand s c = case c of let l = psLog s l' = foldr (\e -> MemLog.insert (fromIntegral $ unIndex $ eIndex e) e) l es return $ s { psLog = l' } + CSetCommitIndex i' -> do + let i = psCommitIndex s + putStrLn $ "New commit index, to commit: " ++ entriesToCommit i i' + return $ s { psCommitIndex = i' } + +entriesToCommit :: Index -> Index -> String +entriesToCommit prev new = + if | new < prev -> error "Committed entries could not be reverted" + | new == prev -> "nothing" + | new == next -> "entry " ++ show new + | otherwise -> "entries " ++ show next ++ " to " ++ show new + where + next = succIndex prev handleCommands :: PlumbingState -> [Command Value] -> IO PlumbingState handleCommands = foldM handleCommand From ae1451fba8222695d406f66a844ca2957121f315 Mon Sep 17 00:00:00 2001 From: Kirilll Zaborsky Date: Wed, 11 Dec 2013 09:31:34 +0400 Subject: [PATCH 7/7] Minor cleanup: some formatting and redundant constraint removed --- src/Network/Kontiki/Raft.hs | 5 +++-- src/Network/Kontiki/Raft/Leader.hs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Network/Kontiki/Raft.hs b/src/Network/Kontiki/Raft.hs index ca59bf4..f40cca6 100644 --- a/src/Network/Kontiki/Raft.hs +++ b/src/Network/Kontiki/Raft.hs @@ -54,9 +54,10 @@ handle config state event = case state of -- | Initial state of all nodes. initialState :: SomeState -initialState = wrap FollowerState {_fCurrentTerm = term0 +initialState = wrap FollowerState { _fCurrentTerm = term0 , _fCommitIndex = index0 - , _fVotedFor = Nothing} + , _fVotedFor = Nothing + } -- | Restores the node to initial (`Follower') mode -- and resets the election timeout. This function is useful diff --git a/src/Network/Kontiki/Raft/Leader.hs b/src/Network/Kontiki/Raft/Leader.hs index 41e684c..476cd25 100644 --- a/src/Network/Kontiki/Raft/Leader.hs +++ b/src/Network/Kontiki/Raft/Leader.hs @@ -71,7 +71,7 @@ handleAppendEntries sender AppendEntries{..} = do else currentState -- | Handles `AppendEntriesResponse'. -handleAppendEntriesResponse :: (Functor m, Monad m, MonadLog m a) +handleAppendEntriesResponse :: (Functor m, Monad m) => MessageHandler AppendEntriesResponse a Leader m handleAppendEntriesResponse sender AppendEntriesResponse{..} = do currentTerm <- use lCurrentTerm @@ -98,7 +98,7 @@ handleAppendEntriesResponse sender AppendEntriesResponse{..} = do currentState -- | Calculates current quorum `Index' from nodes' latest indices -quorumIndex :: (Functor m, Monad m, MonadLog m a) +quorumIndex :: (Functor m, Monad m) => TransitionT a LeaderState m Index quorumIndex = do lastIndices <- Map.elems `fmap` use lLastIndex