Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add AppendEntries and RequestVote RPC #11

Merged
merged 3 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [ ] "Reinitialized after election"
- [ ] `nextIndex[]` for each server, index of the next log entry to send to that server (initialized to leader last log index + 1)
- [ ] `matchIndex[]` for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)

### Rules for Servers
#### All Servers
- [ ] If commitIndex > lastApplied: increment lastApplied
Expand All @@ -41,33 +42,35 @@
- [ ] If successful: update nextIndex and matchIndex for follower (§5.3)
- [ ] If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3)
- [ ] If there exists an N such that N > commitIndex, a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).

### AppendEntries RPC
#### Arguments
- [ ] term leader’s term
- [ ] leaderId so follower can redirect clients
- [ ] prevLogIndex index of log entry immediately preceding new ones
- [ ] prevLogTerm term of prevLogIndex entry
- [ ] entries[] log entries to store (empty for heartbeat; may send more than one for efficiency)
- [ ] leaderCommit leader’s commitIndex
- [x] term: leader’s term
- [ ] leaderId: so follower can redirect clients
- [x] prevLogIndex: index of log entry immediately preceding new ones
- [x] prevLogTerm: term of prevLogIndex entry
- [ ] entries[]: log entries to store (empty for heartbeat; may send more than one for efficiency)
- [ ] leaderCommit: leader’s commitIndex
#### Results
- [ ] term currentTerm, for leader to update itself
- [ ] success true if follower contained entry matching prevLogIndex and
- [x] term: currentTerm, for leader to update itself
- [x] success: true if follower contained entry matching prevLogIndex and
prevLogTerm
#### Receiver implementation
- [ ] Reply false if term < currentTerm (§5.1)
- [ ] Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
- [ ] If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3)
- [ ] Append any new entries not already in the log
- [ ] If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)

### RequestVote RPC
#### Arguments
- [ ] term candidate’s term
- [ ] candidateId candidate requesting vote
- [ ] lastLogIndex index of candidate’s last log entry (§5.4)
- [ ] lastLogTerm term of candidate’s last log entry (§5.4)
- [x] term: candidate’s term
- [x] candidateId: candidate requesting vote
- [x] lastLogIndex: index of candidate’s last log entry (§5.4)
- [x] lastLogTerm: term of candidate’s last log entry (§5.4)
#### Results
- [ ] term currentTerm, for candidate to update itself
- [ ] voteGranted true means candidate received vote
- [x] term: currentTerm, for candidate to update itself
- [x] voteGranted: true means candidate received vote
#### Receiver implementation
- [ ] Reply false if term < currentTerm (§5.1)
- [ ] If candidate’s log is at least as up-to-date as receiver’s log
Expand All @@ -84,6 +87,7 @@
- **Leader election:** choose a new leader when the existing one fails
- **Log replication:** leader accepts log entries from client and replicate them across cluster, forcing other logs to agree with its own
- **Safety:** (State Machine Safety Property). If a server has applied a log entry to state machine, then no other server will apply a different entry to the same log index

### 5.1 Raft basics
- [ ] server can be in one of 3 modes: leader, follower candidate
- [ ] normal operation: 1 leader and other are followers
Expand Down Expand Up @@ -139,6 +143,7 @@
- [ ] increment its term
- [ ] start a new election by initiating another round of RequestVote
- [ ] Election timeout is chosen randomly between 150-300ms

### 5.3 Log replication
- [ ] a leader services client requests
- each request contains a command to be executed by the state machine
Expand Down Expand Up @@ -182,9 +187,11 @@
- [ ] follower appends entries from the leader
- [ ] (optional) its possible to optimize finding the matching `nextIndex` between leader and follower
- [ ] A leader never deletes of overwrites its own logs

### 5.4 Safety
- restrict which servers can be elected leaders
- [ ] leader for any term must contain all entries committed in previous terms

#### 5.4.1 Election restriction
- All committed entries from previous terms are present on each new leader when its elected.
- [ ] log entries only flow from leader to follower.
Expand All @@ -198,6 +205,7 @@
- The RequestVote RPC helps ensure the leader's log is `up-to-date`
- [ ] RequestVote includes info about candidate's log
- [ ] voter denies vote if its own log is more `up-to-date`

#### 5.4.2 Committing entries from previous terms
- [ ] a leader knows an entry from its **current term** (not true for previous terms) is committed, once its stored (replicated) on a majority of servers
- [ ] an entry is replicated if the server responds with success to AppendEntries
Expand All @@ -206,6 +214,7 @@
- [ ] never commit entries from previous terms by counting replicas
- [ ] only entries from the leader's current term are committed by counting replicas
- [ ] once an entry from the current term is committed, previous entries will indirectly be committed

#### 5.4.3 Safety argument
- [ ] entries are applied/committed in log index order
- this (combined with State Machine Safety Property) ensures that all servers apply the same set of log entries to state machine, in the same order
Expand Down
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
mod error;
mod io;
mod log;
mod rpc;
mod server;
mod timeout;

#[cfg(test)]
mod testing;

fn start() {}
6 changes: 5 additions & 1 deletion src/log.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::log::{entry::Entry, idx::Idx, term_idx::TermIdx};
use crate::log::entry::Entry;

mod entry;
mod idx;
mod term;
mod term_idx;

pub use idx::Idx;
pub use term::Term;
pub use term_idx::TermIdx;

#[derive(Debug)]
struct Log {
entries: Vec<Entry>,
Expand Down
2 changes: 1 addition & 1 deletion src/log/idx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use s2n_codec::{DecoderBufferResult, DecoderValue, EncoderValue};
pub const INITIAL_IDX: Idx = Idx(0);

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub(crate) struct Idx(u64);
pub struct Idx(u64);

impl From<u64> for Idx {
fn from(value: u64) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion src/log/term.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use s2n_codec::{DecoderBufferResult, DecoderValue, EncoderValue};
pub const INITIAL_TERM: Term = Term(0);

#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
pub(crate) struct Term(u64);
pub struct Term(u64);

impl From<u64> for Term {
fn from(value: u64) -> Self {
Expand Down
169 changes: 169 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use crate::{
log::{Term, TermIdx},
rpc::{
append_entries::{AppendEntries, RespAppendEntries},
request_vote::{RequestVote, RespRequestVote},
},
server::ServerId,
};
use s2n_codec::{DecoderBuffer, DecoderBufferResult, DecoderError, DecoderValue, EncoderValue};

mod append_entries;
mod request_vote;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Rpc {
RequestVote(RequestVote),
RespRequestVote(RespRequestVote),
AppendEntries(AppendEntries),
RespAppendEntries(RespAppendEntries),
}

impl Rpc {
pub fn new_request_vote(term: Term, candidate_id: ServerId, last_log_term_idx: TermIdx) -> Rpc {
Rpc::RequestVote(RequestVote {
term,
candidate_id,
last_log_term_idx,
})
}

pub fn new_request_vote_resp(term: Term, vote_granted: bool) -> Rpc {
Rpc::RespRequestVote(RespRequestVote { term, vote_granted })
}

pub fn new_append_entry(term: Term, prev_log_term_idx: TermIdx) -> Rpc {
Rpc::AppendEntries(AppendEntries {
term,
prev_log_term_idx,
})
}

pub fn new_append_entry_resp(term: Term, success: bool) -> Rpc {
Rpc::RespAppendEntries(RespAppendEntries { term, success })
}

pub fn term(&self) -> &Term {
match self {
Rpc::RequestVote(RequestVote { term, .. }) => term,
Rpc::RespRequestVote(RespRequestVote { term, .. }) => term,
Rpc::AppendEntries(AppendEntries { term, .. }) => term,
Rpc::RespAppendEntries(RespAppendEntries { term, .. }) => term,
}
}
}

impl<'a> DecoderValue<'a> for Rpc {
fn decode(buffer: DecoderBuffer<'a>) -> DecoderBufferResult<'a, Self> {
let (tag, buffer): (u8, _) = buffer.decode()?;

match tag {
RequestVote::TAG => {
let (rpc, buffer) = buffer.decode()?;
Ok((Rpc::RequestVote(rpc), buffer))
}
RespRequestVote::TAG => {
let (rpc, buffer) = buffer.decode()?;
Ok((Rpc::RespRequestVote(rpc), buffer))
}
AppendEntries::TAG => {
let (rpc, buffer) = buffer.decode()?;
Ok((Rpc::AppendEntries(rpc), buffer))
}
RespAppendEntries::TAG => {
let (rpc, buffer) = buffer.decode()?;
Ok((Rpc::RespAppendEntries(rpc), buffer))
}
_tag => Err(DecoderError::InvariantViolation("received unexpected tag")),
}
}
}

impl EncoderValue for Rpc {
fn encode<E: s2n_codec::Encoder>(&self, encoder: &mut E) {
match self {
Rpc::RequestVote(inner) => {
encoder.write_slice(&[RequestVote::TAG]);
encoder.encode(inner);
}
Rpc::RespRequestVote(inner) => {
encoder.write_slice(&[RespRequestVote::TAG]);
encoder.encode(inner);
}
Rpc::AppendEntries(inner) => {
encoder.write_slice(&[AppendEntries::TAG]);
encoder.encode(inner);
}
Rpc::RespAppendEntries(inner) => {
encoder.write_slice(&[RespAppendEntries::TAG]);
encoder.encode(inner);
}
}
}
}

#[cfg(test)]
mod tests {
use crate::{
log::Idx,
rpc::{Rpc, Term, TermIdx},
server::ServerId,
};
use s2n_codec::{DecoderBuffer, DecoderValue, EncoderBuffer, EncoderValue};

#[test]
fn encode_decode_request_vote() {
let rpc = Rpc::new_request_vote(
Term::from(1),
ServerId::new([10; 16]),
TermIdx::builder()
.with_term(Term::from(3))
.with_idx(Idx::from(4)),
);

let mut slice = vec![0; 50];
let mut buf = EncoderBuffer::new(&mut slice);
rpc.encode(&mut buf);

let d_buf = DecoderBuffer::new(&slice);
let (d_rpc, _) = Rpc::decode(d_buf).unwrap();

assert_eq!(rpc, d_rpc);
}

#[test]
fn encode_decode_request_vote_res() {
let rpc = Rpc::new_append_entry(
Term::from(1),
TermIdx::builder()
.with_term(Term::from(3))
.with_idx(Idx::from(4)),
);

let mut slice = vec![0; 50];
let mut buf = EncoderBuffer::new(&mut slice);
rpc.encode(&mut buf);

let d_buf = DecoderBuffer::new(&slice);
let (d_rpc, _) = Rpc::decode(d_buf).unwrap();

assert_eq!(rpc, d_rpc);
}

#[test]
fn encode_decode_append_entry() {}

#[test]
fn encode_decode_append_entry_res() {
let rpc = Rpc::new_append_entry_resp(Term::from(1), true);

let mut slice = vec![0; 50];
let mut buf = EncoderBuffer::new(&mut slice);
rpc.encode(&mut buf);

let d_buf = DecoderBuffer::new(&slice);
let (d_rpc, _) = Rpc::decode(d_buf).unwrap();

assert_eq!(rpc, d_rpc);
}
}
Loading
Loading