Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit index implemented #9

Merged
merged 7 commits into from
Dec 13, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ cabal-dev
.dist-buildwrapper
.project
.settings
.cabal-sandbox
cabal.sandbox.config
TAGS
36 changes: 25 additions & 11 deletions bin/udp.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE OverloadedStrings,
ScopedTypeVariables,
GADTs #-}
GADTs,
MultiWayIf #-}
module Main (main) where

import Control.Monad (foldM, forM, void)
Expand All @@ -14,7 +15,6 @@ import qualified Data.Set as Set

import Control.Concurrent (forkIO)
import Control.Concurrent.STM
import Control.Concurrent.STM.TMVar

import qualified Data.ByteString.Char8 as BS8
import qualified Data.ByteString.Lazy as LBS
Expand All @@ -24,23 +24,22 @@ import qualified Data.ByteString.Lazy.Builder as Builder
import System.Environment (getArgs)
import System.Random (randomRIO)

import Network.Socket (SockAddr(SockAddrInet), PortNumber(PortNum), inet_addr)
import Network.Socket (SockAddr(SockAddrInet), inet_addr)

import Control.Lens
import Control.Lens hiding (Index)

import Data.Binary (Binary)
import qualified Data.Binary as B

import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Network.UDP as CNU

import Network.Kontiki.Raft
( Command(..), Config(..), Entry(..), Event(..), MonadLog(..),
Message, NodeId, SomeState, unIndex)
( Command(..), Config(..), Entry(..), Event(..),
Message, NodeId, SomeState, Index, index0, succIndex, unIndex)
import qualified Network.Kontiki.Raft as Raft

import Data.Kontiki.MemLog (Log, MemLog, runMemLog)
import Data.Kontiki.MemLog (Log, runMemLog)
import qualified Data.Kontiki.MemLog as MemLog

import Data.STM.RollingQueue (RollingQueue)
Expand Down Expand Up @@ -71,6 +70,7 @@ data PlumbingState = PS { psElectionTimer :: Timer
, psMessages :: RollingQueue (Event Value)
, psChannels :: Map NodeId (RollingQueue (Message Value))
, psLog :: Log Value
, psCommitIndex :: Index
}

queueSize :: Int
Expand All @@ -87,6 +87,7 @@ newPlumbingState channels = do
, psMessages = q
, psChannels = channels
, psLog = MemLog.empty
, psCommitIndex = index0
}

handleCommand :: PlumbingState -> Command Value -> IO PlumbingState
Expand Down Expand Up @@ -125,6 +126,19 @@ handleCommand s c = case c of
let l = psLog s
l' = foldr (\e -> MemLog.insert (fromIntegral $ unIndex $ eIndex e) e) l es
return $ s { psLog = l' }
CSetCommitIndex i' -> do
let i = psCommitIndex s
putStrLn $ "New commit index, to commit: " ++ entriesToCommit i i'
return $ s { psCommitIndex = i' }

entriesToCommit :: Index -> Index -> String
entriesToCommit prev new =
if | new < prev -> error "Committed entries could not be reverted"
| new == prev -> "nothing"
| new == next -> "entry " ++ show new
| otherwise -> "entries " ++ show next ++ " to " ++ show new
where
next = succIndex prev

handleCommands :: PlumbingState -> [Command Value] -> IO PlumbingState
handleCommands = foldM handleCommand
Expand All @@ -146,16 +160,16 @@ run config' s ps = do
ps'' <- case s' of
Raft.WrapState (Raft.Leader ls) -> do
let l = psLog ps'
s = IntMap.size l
(_, m) = if s /= 0
size = IntMap.size l
(_, m) = if size /= 0
then IntMap.findMax l
else (0, Entry { eTerm = Raft.term0
, eIndex = Raft.index0
, eValue = (config' ^. Raft.configNodeId, 0)
})
e = Entry { eTerm = ls ^. Raft.lCurrentTerm
, eIndex = Raft.succIndex (eIndex m)
, eValue = (config' ^. Raft.configNodeId, s)
, eValue = (config' ^. Raft.configNodeId, size)
}
l' = IntMap.insert (fromIntegral $ unIndex $ eIndex e) e l
return $ ps' { psLog = l' }
Expand Down
4 changes: 4 additions & 0 deletions src/Network/Kontiki/Monad.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ truncateLog i = tell [CTruncateLog i]
logEntries :: Monad m => [Entry a] -> TransitionT a f m ()
logEntries es = tell [CLogEntries es]

-- | Sets new commit `Index' `i'
setCommitIndex :: Monad m => Index -> TransitionT a f m ()
setCommitIndex i = tell [CSetCommitIndex i]

-- | Handler of events.
type Handler a s m =
Event a -- ^ `Event' to handle
Expand Down
16 changes: 10 additions & 6 deletions src/Network/Kontiki/Raft.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ handle config state event = case state of

-- | Initial state of all nodes.
initialState :: SomeState
initialState = wrap FollowerState {_fCurrentTerm = term0, _fVotedFor = Nothing}
initialState = wrap FollowerState { _fCurrentTerm = term0
, _fCommitIndex = index0
, _fVotedFor = Nothing
}

-- | Restores the node to initial (`Follower') mode
-- and resets the election timeout. This function is useful
Expand All @@ -70,12 +73,13 @@ restore :: Config -- ^ configuration of the cluster
-> (SomeState, [Command a]) -- ^ new state and list of commands
restore cfg s = case s of
WrapState(Follower _) -> (s, commands)
WrapState(Candidate s') -> (toFollower (s' ^. cCurrentTerm), commands)
WrapState(Leader s') -> (toFollower (s' ^. lCurrentTerm), commands)
WrapState(Candidate s') -> (toFollower (s' ^. cCurrentTerm) (s' ^. cCommitIndex), commands)
WrapState(Leader s') -> (toFollower (s' ^. lCurrentTerm) (s' ^. lCommitIndex), commands)
where
toFollower t = wrap FollowerState { _fCurrentTerm = t
, _fVotedFor = Just nodeId
}
toFollower t i = wrap FollowerState { _fCurrentTerm = t
, _fCommitIndex = i
, _fVotedFor = Just nodeId
}
nodeId = cfg ^. configNodeId
et = cfg ^. configElectionTimeout
commands :: forall a. [Command a]
Expand Down
19 changes: 13 additions & 6 deletions src/Network/Kontiki/Raft/Candidate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import qualified Data.Set as Set

import Data.ByteString.Char8 ()

import Control.Lens
import Control.Lens (use, view, (%=))

import Network.Kontiki.Log
import Network.Kontiki.Types
Expand All @@ -30,9 +30,10 @@ import qualified Network.Kontiki.Raft.Leader as Leader
handleRequestVote :: (Functor m, Monad m) => MessageHandler RequestVote a Candidate m
handleRequestVote sender RequestVote{..} = do
currentTerm <- use cCurrentTerm
commitIndex <- use cCommitIndex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be for another issue, but looking at the go-raft implementation (which might or might not be correct), we potentially should be checking if the node that is requesting the vote has a stale log and replying accordingly, see https://github.com/goraft/raft/blob/master/server.go#L946-L976. I checked, and it looks like we do it in the follower.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the paper and it says:
Receiver implementation (for RequestVote):

  1. Reply false if term < currentTerm (§5.1)
  2. If votedFor is null or candidateId, and candidate's log is at
    least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you are right but as you say it's for another issue :)


if rvTerm > currentTerm
then stepDown sender rvTerm
then stepDown sender rvTerm commitIndex
else do
logS "Not granting vote"
send sender $ RequestVoteResponse { rvrTerm = currentTerm
Expand All @@ -45,12 +46,13 @@ handleRequestVoteResponse :: (Functor m, Monad m, MonadLog m a)
=> MessageHandler RequestVoteResponse a Candidate m
handleRequestVoteResponse sender RequestVoteResponse{..} = do
currentTerm <- use cCurrentTerm
commitIndex <- use cCommitIndex
votes <- use cVotes

if | rvrTerm < currentTerm -> do
logS "Ignoring RequestVoteResponse for old term"
currentState
| rvrTerm > currentTerm -> stepDown sender rvrTerm
| rvrTerm > currentTerm -> stepDown sender rvrTerm commitIndex
| not rvrVoteGranted -> do
logS "Ignoring RequestVoteResponse since vote wasn't granted"
currentState
Expand All @@ -69,18 +71,19 @@ handleRequestVoteResponse sender RequestVoteResponse{..} = do
currentState
else do
logS "Reached a majority, becoming Leader"
Leader.stepUp currentTerm
Leader.stepUp currentTerm commitIndex

-- | Handles `AppendEntries'.
handleAppendEntries :: (Functor m, Monad m)
=> MessageHandler (AppendEntries a) a Candidate m
handleAppendEntries sender AppendEntries{..} = do
currentTerm <- use cCurrentTerm
commitIndex <- use cCommitIndex

if currentTerm <= aeTerm
then do
logS "Received AppendEntries for current or newer term"
stepDown sender aeTerm
stepDown sender aeTerm commitIndex
else do
logS "Ignoring AppendEntries for old term"
currentState
Expand All @@ -102,6 +105,7 @@ handleElectionTimeout = do

nodeId <- view configNodeId
nextTerm <- succTerm `fmap` use cCurrentTerm
commitIndex <- use cCommitIndex

e <- logLastEntry
let lastIndex = maybe index0 eIndex e
Expand All @@ -114,6 +118,7 @@ handleElectionTimeout = do
}

return $ wrap CandidateState { _cCurrentTerm = nextTerm
, _cCommitIndex = commitIndex
, _cVotes = Set.singleton nodeId
}

Expand All @@ -139,8 +144,9 @@ handle = handleGeneric
-- and resets the election timer.
stepUp :: (Functor m, Monad m, MonadLog m a)
=> Term
-> Index
-> TransitionT a s m SomeState
stepUp term = do
stepUp term commitIndex = do
logS "Becoming candidate"

resetElectionTimeout
Expand All @@ -158,5 +164,6 @@ stepUp term = do
}

return $ wrap CandidateState { _cCurrentTerm = term
, _cCommitIndex = commitIndex
, _cVotes = Set.singleton nodeId
}
17 changes: 13 additions & 4 deletions src/Network/Kontiki/Raft/Follower.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import Data.ByteString.Char8 ()

import Control.Lens

import Control.Monad (when)

import Network.Kontiki.Log
import Network.Kontiki.Types
import Network.Kontiki.Monad
Expand Down Expand Up @@ -87,20 +89,22 @@ handleRequestVoteResponse :: (Functor m, Monad m)
=> MessageHandler RequestVoteResponse a Follower m
handleRequestVoteResponse sender RequestVoteResponse{..} = do
currentTerm <- use fCurrentTerm
commitIndex <- use fCommitIndex

if rvrTerm > currentTerm
then stepDown sender rvrTerm
then stepDown sender rvrTerm commitIndex
else currentState

-- | Handles `AppendEntries'.
handleAppendEntries :: (Functor m, Monad m, MonadLog m a)
=> MessageHandler (AppendEntries a) a Follower m
handleAppendEntries sender AppendEntries{..} = do
currentTerm <- use fCurrentTerm
commitIndex <- use fCommitIndex
e <- logLastEntry
let lastIndex = maybe index0 eIndex e

if | aeTerm > currentTerm -> stepDown sender aeTerm
if | aeTerm > currentTerm -> stepDown sender aeTerm commitIndex
| aeTerm < currentTerm -> do
send sender $ AppendEntriesResponse { aerTerm = currentTerm
, aerSuccess = False
Expand Down Expand Up @@ -130,6 +134,10 @@ handleAppendEntries sender AppendEntries{..} = do
return $ eIndex $ last es
else return lastIndex

when (commitIndex /= aeCommitIndex) $ do
fCommitIndex .= aeCommitIndex
setCommitIndex aeCommitIndex

send sender $ AppendEntriesResponse { aerTerm = aeTerm
, aerSuccess = True
, aerLastIndex = lastIndex'
Expand Down Expand Up @@ -175,14 +183,15 @@ handleElectionTimeout = do

currentTerm <- use fCurrentTerm
let nextTerm = succTerm currentTerm
commitIndex <- use fCommitIndex

fCurrentTerm .= nextTerm

quorum <- quorumSize

if quorum == 1
then Leader.stepUp nextTerm
else Candidate.stepUp nextTerm
then Leader.stepUp nextTerm commitIndex
else Candidate.stepUp nextTerm commitIndex

-- | Handles `HeartbeatTimeout'.
handleHeartbeatTimeout :: (Functor m, Monad m)
Expand Down
Loading