diff --git a/compliance.md b/compliance.md index 6c4af4f..c7dddb4 100644 --- a/compliance.md +++ b/compliance.md @@ -17,6 +17,7 @@ - [ ] "Reinitialized after election" - [ ] `nextIndex[]` for each server, index of the next log entry to send to that server (initialized to leader last log index + 1) - [ ] `matchIndex[]` for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically) + ### Rules for Servers #### All Servers - [ ] If commitIndex > lastApplied: increment lastApplied @@ -41,17 +42,18 @@ - [ ] If successful: update nextIndex and matchIndex for follower (§5.3) - [ ] If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3) - [ ] If there exists an N such that N > commitIndex, a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4). + ### AppendEntries RPC #### Arguments -- [ ] term leader’s term -- [ ] leaderId so follower can redirect clients -- [ ] prevLogIndex index of log entry immediately preceding new ones -- [ ] prevLogTerm term of prevLogIndex entry -- [ ] entries[] log entries to store (empty for heartbeat; may send more than one for efficiency) -- [ ] leaderCommit leader’s commitIndex +- [x] term: leader’s term +- [ ] leaderId: so follower can redirect clients +- [x] prevLogIndex: index of log entry immediately preceding new ones +- [x] prevLogTerm: term of prevLogIndex entry +- [ ] entries[]: log entries to store (empty for heartbeat; may send more than one for efficiency) +- [ ] leaderCommit: leader’s commitIndex #### Results -- [ ] term currentTerm, for leader to update itself -- [ ] success true if follower contained entry matching prevLogIndex and +- [x] term: currentTerm, for leader to update itself +- [x] success: true if follower contained entry matching prevLogIndex and prevLogTerm #### Receiver implementation - [ ] Reply false if term < currentTerm (§5.1) @@ -59,15 +61,16 @@ - [ ] If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3) - [ ] Append any new entries not already in the log - [ ] If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) + ### RequestVote RPC #### Arguments -- [ ] term candidate’s term -- [ ] candidateId candidate requesting vote -- [ ] lastLogIndex index of candidate’s last log entry (§5.4) -- [ ] lastLogTerm term of candidate’s last log entry (§5.4) +- [x] term: candidate’s term +- [x] candidateId: candidate requesting vote +- [x] lastLogIndex: index of candidate’s last log entry (§5.4) +- [x] lastLogTerm: term of candidate’s last log entry (§5.4) #### Results -- [ ] term currentTerm, for candidate to update itself -- [ ] voteGranted true means candidate received vote +- [x] term: currentTerm, for candidate to update itself +- [x] voteGranted: true means candidate received vote #### Receiver implementation - [ ] Reply false if term < currentTerm (§5.1) - [ ] If candidate’s log is at least as up-to-date as receiver’s log @@ -84,6 +87,7 @@ - **Leader election:** choose a new leader when the existing one fails - **Log replication:** leader accepts log entries from client and replicate them across cluster, forcing other logs to agree with its own - **Safety:** (State Machine Safety Property). If a server has applied a log entry to state machine, then no other server will apply a different entry to the same log index + ### 5.1 Raft basics - [ ] server can be in one of 3 modes: leader, follower candidate - [ ] normal operation: 1 leader and other are followers @@ -139,6 +143,7 @@ - [ ] increment its term - [ ] start a new election by initiating another round of RequestVote - [ ] Election timeout is chosen randomly between 150-300ms + ### 5.3 Log replication - [ ] a leader services client requests - each request contains a command to be executed by the state machine @@ -182,9 +187,11 @@ - [ ] follower appends entries from the leader - [ ] (optional) its possible to optimize finding the matching `nextIndex` between leader and follower - [ ] A leader never deletes of overwrites its own logs + ### 5.4 Safety - restrict which servers can be elected leaders - [ ] leader for any term must contain all entries committed in previous terms + #### 5.4.1 Election restriction - All committed entries from previous terms are present on each new leader when its elected. - [ ] log entries only flow from leader to follower. @@ -198,6 +205,7 @@ - The RequestVote RPC helps ensure the leader's log is `up-to-date` - [ ] RequestVote includes info about candidate's log - [ ] voter denies vote if its own log is more `up-to-date` + #### 5.4.2 Committing entries from previous terms - [ ] a leader knows an entry from its **current term** (not true for previous terms) is committed, once its stored (replicated) on a majority of servers - [ ] an entry is replicated if the server responds with success to AppendEntries @@ -206,6 +214,7 @@ - [ ] never commit entries from previous terms by counting replicas - [ ] only entries from the leader's current term are committed by counting replicas - [ ] once an entry from the current term is committed, previous entries will indirectly be committed + #### 5.4.3 Safety argument - [ ] entries are applied/committed in log index order - this (combined with State Machine Safety Property) ensures that all servers apply the same set of log entries to state machine, in the same order diff --git a/src/lib.rs b/src/lib.rs index bf5230e..1ab6cd0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,11 @@ mod error; mod io; mod log; +mod rpc; +mod server; mod timeout; +#[cfg(test)] +mod testing; + fn start() {} diff --git a/src/log.rs b/src/log.rs index a1f43dc..267e91f 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,10 +1,14 @@ -use crate::log::{entry::Entry, idx::Idx, term_idx::TermIdx}; +use crate::log::entry::Entry; mod entry; mod idx; mod term; mod term_idx; +pub use idx::Idx; +pub use term::Term; +pub use term_idx::TermIdx; + #[derive(Debug)] struct Log { entries: Vec, diff --git a/src/log/idx.rs b/src/log/idx.rs index b466b5a..f68545d 100644 --- a/src/log/idx.rs +++ b/src/log/idx.rs @@ -6,7 +6,7 @@ use s2n_codec::{DecoderBufferResult, DecoderValue, EncoderValue}; pub const INITIAL_IDX: Idx = Idx(0); #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] -pub(crate) struct Idx(u64); +pub struct Idx(u64); impl From for Idx { fn from(value: u64) -> Self { diff --git a/src/log/term.rs b/src/log/term.rs index bc47b4d..f26faff 100644 --- a/src/log/term.rs +++ b/src/log/term.rs @@ -6,7 +6,7 @@ use s2n_codec::{DecoderBufferResult, DecoderValue, EncoderValue}; pub const INITIAL_TERM: Term = Term(0); #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] -pub(crate) struct Term(u64); +pub struct Term(u64); impl From for Term { fn from(value: u64) -> Self { diff --git a/src/rpc.rs b/src/rpc.rs new file mode 100644 index 0000000..aab81fc --- /dev/null +++ b/src/rpc.rs @@ -0,0 +1,169 @@ +use crate::{ + log::{Term, TermIdx}, + rpc::{ + append_entries::{AppendEntries, RespAppendEntries}, + request_vote::{RequestVote, RespRequestVote}, + }, + server::ServerId, +}; +use s2n_codec::{DecoderBuffer, DecoderBufferResult, DecoderError, DecoderValue, EncoderValue}; + +mod append_entries; +mod request_vote; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Rpc { + RequestVote(RequestVote), + RespRequestVote(RespRequestVote), + AppendEntries(AppendEntries), + RespAppendEntries(RespAppendEntries), +} + +impl Rpc { + pub fn new_request_vote(term: Term, candidate_id: ServerId, last_log_term_idx: TermIdx) -> Rpc { + Rpc::RequestVote(RequestVote { + term, + candidate_id, + last_log_term_idx, + }) + } + + pub fn new_request_vote_resp(term: Term, vote_granted: bool) -> Rpc { + Rpc::RespRequestVote(RespRequestVote { term, vote_granted }) + } + + pub fn new_append_entry(term: Term, prev_log_term_idx: TermIdx) -> Rpc { + Rpc::AppendEntries(AppendEntries { + term, + prev_log_term_idx, + }) + } + + pub fn new_append_entry_resp(term: Term, success: bool) -> Rpc { + Rpc::RespAppendEntries(RespAppendEntries { term, success }) + } + + pub fn term(&self) -> &Term { + match self { + Rpc::RequestVote(RequestVote { term, .. }) => term, + Rpc::RespRequestVote(RespRequestVote { term, .. }) => term, + Rpc::AppendEntries(AppendEntries { term, .. }) => term, + Rpc::RespAppendEntries(RespAppendEntries { term, .. }) => term, + } + } +} + +impl<'a> DecoderValue<'a> for Rpc { + fn decode(buffer: DecoderBuffer<'a>) -> DecoderBufferResult<'a, Self> { + let (tag, buffer): (u8, _) = buffer.decode()?; + + match tag { + RequestVote::TAG => { + let (rpc, buffer) = buffer.decode()?; + Ok((Rpc::RequestVote(rpc), buffer)) + } + RespRequestVote::TAG => { + let (rpc, buffer) = buffer.decode()?; + Ok((Rpc::RespRequestVote(rpc), buffer)) + } + AppendEntries::TAG => { + let (rpc, buffer) = buffer.decode()?; + Ok((Rpc::AppendEntries(rpc), buffer)) + } + RespAppendEntries::TAG => { + let (rpc, buffer) = buffer.decode()?; + Ok((Rpc::RespAppendEntries(rpc), buffer)) + } + _tag => Err(DecoderError::InvariantViolation("received unexpected tag")), + } + } +} + +impl EncoderValue for Rpc { + fn encode(&self, encoder: &mut E) { + match self { + Rpc::RequestVote(inner) => { + encoder.write_slice(&[RequestVote::TAG]); + encoder.encode(inner); + } + Rpc::RespRequestVote(inner) => { + encoder.write_slice(&[RespRequestVote::TAG]); + encoder.encode(inner); + } + Rpc::AppendEntries(inner) => { + encoder.write_slice(&[AppendEntries::TAG]); + encoder.encode(inner); + } + Rpc::RespAppendEntries(inner) => { + encoder.write_slice(&[RespAppendEntries::TAG]); + encoder.encode(inner); + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::{ + log::Idx, + rpc::{Rpc, Term, TermIdx}, + server::ServerId, + }; + use s2n_codec::{DecoderBuffer, DecoderValue, EncoderBuffer, EncoderValue}; + + #[test] + fn encode_decode_request_vote() { + let rpc = Rpc::new_request_vote( + Term::from(1), + ServerId::new([10; 16]), + TermIdx::builder() + .with_term(Term::from(3)) + .with_idx(Idx::from(4)), + ); + + let mut slice = vec![0; 50]; + let mut buf = EncoderBuffer::new(&mut slice); + rpc.encode(&mut buf); + + let d_buf = DecoderBuffer::new(&slice); + let (d_rpc, _) = Rpc::decode(d_buf).unwrap(); + + assert_eq!(rpc, d_rpc); + } + + #[test] + fn encode_decode_request_vote_res() { + let rpc = Rpc::new_append_entry( + Term::from(1), + TermIdx::builder() + .with_term(Term::from(3)) + .with_idx(Idx::from(4)), + ); + + let mut slice = vec![0; 50]; + let mut buf = EncoderBuffer::new(&mut slice); + rpc.encode(&mut buf); + + let d_buf = DecoderBuffer::new(&slice); + let (d_rpc, _) = Rpc::decode(d_buf).unwrap(); + + assert_eq!(rpc, d_rpc); + } + + #[test] + fn encode_decode_append_entry() {} + + #[test] + fn encode_decode_append_entry_res() { + let rpc = Rpc::new_append_entry_resp(Term::from(1), true); + + let mut slice = vec![0; 50]; + let mut buf = EncoderBuffer::new(&mut slice); + rpc.encode(&mut buf); + + let d_buf = DecoderBuffer::new(&slice); + let (d_rpc, _) = Rpc::decode(d_buf).unwrap(); + + assert_eq!(rpc, d_rpc); + } +} diff --git a/src/rpc/append_entries.rs b/src/rpc/append_entries.rs new file mode 100644 index 0000000..43b96fa --- /dev/null +++ b/src/rpc/append_entries.rs @@ -0,0 +1,129 @@ +use crate::log::{Term, TermIdx}; +use s2n_codec::{DecoderValue, EncoderValue}; + +// Add entries +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AppendEntries { + //% Compliance: + // term: leader’s term + pub term: Term, + + //% Compliance: + //% leaderId: so follower can redirect clients + // TODO + + //% Compliance: + //% prevLogIndex: index of log entry immediately preceding new ones + //% prevLogTerm: term of prevLogIndex entry + pub prev_log_term_idx: TermIdx, + // + //% Compliance: + // TODO entries[]: log entries to store (empty for heartbeat; may send more than one for efficiency) + // + //% Compliance: + // TODO leaderCommit: leader’s commitIndex +} + +impl AppendEntries { + pub const TAG: u8 = 3; + + pub fn term(&self) -> Term { + self.term + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct RespAppendEntries { + //% Compliance: + //% term: currentTerm, for leader to update itself + pub term: Term, + + //% Compliance: + //% success: true if follower contained entry matching prevLogIndex and prevLogTerm + pub success: bool, +} + +impl RespAppendEntries { + pub const TAG: u8 = 4; +} + +impl<'a> DecoderValue<'a> for AppendEntries { + fn decode(buffer: s2n_codec::DecoderBuffer<'a>) -> s2n_codec::DecoderBufferResult<'a, Self> { + let (term, buffer) = buffer.decode()?; + let (prev_log_term_idx, buffer) = buffer.decode()?; + + let rpc = AppendEntries { + term, + prev_log_term_idx, + }; + Ok((rpc, buffer)) + } +} + +impl EncoderValue for AppendEntries { + fn encode(&self, encoder: &mut E) { + encoder.encode(&self.term); + encoder.encode(&self.prev_log_term_idx); + } +} + +impl<'a> DecoderValue<'a> for RespAppendEntries { + fn decode(buffer: s2n_codec::DecoderBuffer<'a>) -> s2n_codec::DecoderBufferResult<'a, Self> { + let (term, buffer) = buffer.decode()?; + let (success, buffer): (u8, _) = buffer.decode()?; + let success = success != 0; + + let rpc = RespAppendEntries { term, success }; + Ok((rpc, buffer)) + } +} + +impl EncoderValue for RespAppendEntries { + fn encode(&self, encoder: &mut E) { + encoder.encode(&self.term); + encoder.write_slice(&(self.success as u8).to_be_bytes()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::log::Idx; + use s2n_codec::{DecoderBuffer, EncoderBuffer}; + + #[test] + fn encode_decode_rpc() { + let rpc = AppendEntries { + term: Term::from(2), + prev_log_term_idx: TermIdx::builder() + .with_term(Term::from(3)) + .with_idx(Idx::from(4)), + }; + + let mut slice = vec![0; 30]; + let mut buf = EncoderBuffer::new(&mut slice); + rpc.encode(&mut buf); + + let d_buf = DecoderBuffer::new(&slice); + let (d_rpc, _) = AppendEntries::decode(d_buf).unwrap(); + + assert_eq!(rpc, d_rpc); + } + + #[test] + fn encode_decode_rpc_resp() { + let rpc = RespAppendEntries { + term: Term::from(2), + success: true, + }; + + let mut slice = vec![0; 30]; + let mut buf = EncoderBuffer::new(&mut slice); + rpc.encode(&mut buf); + + let d_buf = DecoderBuffer::new(&slice); + let (d_rpc, _) = RespAppendEntries::decode(d_buf).unwrap(); + + assert_eq!(rpc, d_rpc); + } +} diff --git a/src/rpc/request_vote.rs b/src/rpc/request_vote.rs new file mode 100644 index 0000000..2462c1f --- /dev/null +++ b/src/rpc/request_vote.rs @@ -0,0 +1,125 @@ +use crate::{ + log::{Term, TermIdx}, + server::ServerId, +}; +use s2n_codec::{DecoderValue, EncoderValue}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct RequestVote { + //% Compliance: + //% term: candidate’s term + pub term: Term, + + //% Compliance: + //% candidateId: candidate requesting vote + pub candidate_id: ServerId, + + //% Compliance: + //% lastLogIndex: index of candidate’s last log entry (§5.4) + //% lastLogTerm: term of candidate’s last log entry (§5.4 + pub last_log_term_idx: TermIdx, +} + +impl RequestVote { + pub const TAG: u8 = 1; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct RespRequestVote { + //% Compliance: + //% term: currentTerm, for candidate to update itself + pub term: Term, + + //% Compliance: + //% voteGranted: true means candidate received vote + pub vote_granted: bool, +} + +impl RespRequestVote { + pub const TAG: u8 = 2; +} + +impl<'a> DecoderValue<'a> for RequestVote { + fn decode(buffer: s2n_codec::DecoderBuffer<'a>) -> s2n_codec::DecoderBufferResult<'a, Self> { + let (term, buffer) = buffer.decode()?; + let (candidate_id, buffer) = buffer.decode()?; + let (last_log_term_idx, buffer) = buffer.decode()?; + + let rpc = RequestVote { + term, + candidate_id, + last_log_term_idx, + }; + Ok((rpc, buffer)) + } +} + +impl EncoderValue for RequestVote { + fn encode(&self, encoder: &mut E) { + encoder.encode(&self.term); + encoder.encode(&self.candidate_id); + encoder.encode(&self.last_log_term_idx); + } +} + +impl<'a> DecoderValue<'a> for RespRequestVote { + fn decode(buffer: s2n_codec::DecoderBuffer<'a>) -> s2n_codec::DecoderBufferResult<'a, Self> { + let (term, buffer) = buffer.decode()?; + let (vote_granted, buffer): (u8, _) = buffer.decode()?; + let vote_granted = vote_granted != 0; + + let rpc = RespRequestVote { term, vote_granted }; + Ok((rpc, buffer)) + } +} + +impl EncoderValue for RespRequestVote { + fn encode(&self, encoder: &mut E) { + encoder.encode(&self.term); + encoder.write_slice(&(self.vote_granted as u8).to_be_bytes()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::log::Idx; + use s2n_codec::{DecoderBuffer, EncoderBuffer}; + + #[test] + fn encode_decode_rpc() { + let rpc = RequestVote { + term: Term::from(2), + candidate_id: ServerId::new([10; 16]), + last_log_term_idx: TermIdx::builder() + .with_term(Term::from(3)) + .with_idx(Idx::from(4)), + }; + + let mut slice = vec![0; 40]; + let mut buf = EncoderBuffer::new(&mut slice); + rpc.encode(&mut buf); + + let d_buf = DecoderBuffer::new(&slice); + let (d_rpc, _) = RequestVote::decode(d_buf).unwrap(); + + assert_eq!(rpc, d_rpc); + } + + #[test] + fn encode_decode_rpc_resp() { + let rpc = RespRequestVote { + term: Term::from(2), + vote_granted: true, + }; + + let mut slice = vec![0; 30]; + let mut buf = EncoderBuffer::new(&mut slice); + rpc.encode(&mut buf); + + let d_buf = DecoderBuffer::new(&slice); + let (d_rpc, _) = RespRequestVote::decode(d_buf).unwrap(); + + assert_eq!(rpc, d_rpc); + } +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..f88111c --- /dev/null +++ b/src/server.rs @@ -0,0 +1,58 @@ +use s2n_codec::{DecoderValue, EncoderValue}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ServerId([u8; 16]); + +impl ServerId { + pub fn new(id: [u8; 16]) -> Self { + ServerId(id) + } +} + +impl<'a> DecoderValue<'a> for ServerId { + fn decode(buffer: s2n_codec::DecoderBuffer<'a>) -> s2n_codec::DecoderBufferResult<'a, Self> { + let (candidate_id, buffer) = buffer.decode_slice(16)?; + let candidate_id = candidate_id + .into_less_safe_slice() + .try_into() + .expect("failed to decode ServerId"); + Ok((ServerId(candidate_id), buffer)) + } +} + +impl EncoderValue for ServerId { + fn encode(&self, encoder: &mut E) { + encoder.write_slice(&self.0) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use s2n_codec::{DecoderBuffer, EncoderBuffer}; + + #[test] + fn encode_decode() { + let id = ServerId([5; 16]); + + let mut slice = vec![0; 20]; + let mut buf = EncoderBuffer::new(&mut slice); + id.encode(&mut buf); + + let d_buf = DecoderBuffer::new(&slice); + let (d_id, _) = ServerId::decode(d_buf).unwrap(); + + assert_eq!(id, d_id); + } + + #[test] + fn cmp_test() { + let id1 = ServerId([10; 16]); + let mut id2 = ServerId([10; 16]); + + assert_eq!(id1, id2); + + id2.0[1] = 11; + assert_ne!(id1, id2); + } +} diff --git a/src/testing.rs b/src/testing.rs new file mode 100644 index 0000000..b10cf3d --- /dev/null +++ b/src/testing.rs @@ -0,0 +1,13 @@ +#![allow(unused)] +macro_rules! cast_unsafe { + ($target: expr, $pat: path) => {{ + if let $pat(a) = $target { + // #1 + a + } else { + panic!("mismatch variant when cast to {}", stringify!($pat)); // #2 + } + }}; +} + +pub(crate) use cast_unsafe;