Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement on_recv AppendEntries for Follower #26

Merged
merged 22 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
#### Receiver implementation
- [x] Reply false if term < currentTerm (§5.1)
- [x] Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
- [ ] If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3)
- [ ] Append any new entries not already in the log
- [ ] If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
- [x] If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3)
- [x] Append any new entries not already in the log
- [x] If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)

### RequestVote RPC
#### Arguments
Expand Down
14 changes: 13 additions & 1 deletion src/io/testing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::io::{ServerRx, ServerTx};
use crate::{
io::{ServerRx, ServerTx},
rpc::Rpc,
};
use s2n_codec::DecoderBuffer;
use std::task::Poll;

#[derive(Debug)]
Expand Down Expand Up @@ -35,3 +39,11 @@ impl ServerRx for MockIO {
}
}
}

pub fn helper_inspect_sent_rpc(peer_io: &mut MockIO) -> Rpc {
let rpc_bytes = peer_io.send_queue.pop().unwrap();
assert!(peer_io.send_queue.is_empty());
let buffer = DecoderBuffer::new(&rpc_bytes);
let (sent_rpc, _) = buffer.decode::<Rpc>().unwrap();
sent_rpc
}
213 changes: 191 additions & 22 deletions src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,54 +10,108 @@ pub use term_idx::TermIdx;

#[derive(Debug)]
pub struct Log {
entries: Vec<Entry>,
pub entries: Vec<Entry>,
}

impl Log {
pub fn new() -> Self {
Log { entries: vec![] }
}

fn push(&mut self, entries: Vec<Entry>) {
pub fn push(&mut self, entries: Vec<Entry>) {
for entry in entries.into_iter() {
self.entries.push(entry);
}
}

pub(super) fn prev_idx(&self) -> Idx {
Idx::from(self.entries.len() as u64)
}

pub fn next_idx(&self) -> Idx {
let next_idx = self.entries.len() + 1;
Idx::from(next_idx as u64)
self.prev_idx() + 1
}

pub fn last_term(&self) -> Term {
self.entries.last().map_or(Term::initial(), |e| e.term)
}

pub fn term_at_idx(&self, idx: &Idx) -> Option<Term> {
assert!(!idx.is_initial(), "log is empty");
self.find_entry_at(idx).map(|e| e.term)
}

// Attempt to match the leader's log.
pub fn match_leaders_log(&mut self, entry: Entry, entry_idx: Idx) -> MatchOutcome {
assert!(!entry_idx.is_initial());
let entry_term_idx = TermIdx::builder().with_term(entry.term).with_idx(entry_idx);

match self.entry_matches(entry_term_idx) {
outcome @ MatchOutcome::NoMatch => {
//% Compliance:
//% If an existing entry conflicts with a new one (same index but different terms),
//% delete the existing entry and all that follow it (§5.3)
self.entries.truncate(entry_idx.as_log_idx());
//% Compliance:
//% Append any new entries not already in the log
self.entries.push(entry);

outcome
}
outcome @ MatchOutcome::DoesntExist => {
// Confirm that atleast entry_idx - 1 are present.
//
// To maintain the monotonically increasing property for Idx, confirm that
// either the log is empty or entries[entry_idx - 1] exists.
let fn_entry_min_1_exists =
|| self.entries.get((entry_idx - 1).as_log_idx()).is_some();
assert!(self.entries.is_empty() || fn_entry_min_1_exists());

self.entries.push(entry);
outcome
}
outcome @ MatchOutcome::Match => outcome,
}
}

//% Compliance:
//% if two entries in different logs have the same index/term, they store the same command
pub fn entry_matches(&self, term_idx: TermIdx) -> bool {
pub fn entry_matches(&self, term_idx: TermIdx) -> MatchOutcome {
// TermIdx::initial indicates that both logs are empty
if term_idx == TermIdx::initial() {
return self.entries.is_empty();
if term_idx.is_initial() && self.entries.is_empty() {
return MatchOutcome::Match;
}

let entry = self.find_entry_by(term_idx.idx);
entry.is_some_and(|entry| entry.term == term_idx.term)
if let Some(entry) = self.find_entry_at(&term_idx.idx) {
if entry.term == term_idx.term {
MatchOutcome::Match
} else {
MatchOutcome::NoMatch
}
} else {
MatchOutcome::DoesntExist
}
// entry.is_some_and(|entry| entry.term == term_idx.term)
}

fn find_entry_by(&self, idx: Idx) -> Option<&Entry> {
fn find_entry_at(&self, idx: &Idx) -> Option<&Entry> {
//% Compliance:
//% `log[]` log entries; each entry contains command for state machine, and term when entry
//% was received by leader (first index is 1)
if idx == Idx::initial() {
if *idx == Idx::initial() {
return None;
}
let idx = (idx.0 - 1) as usize;

self.entries.get(idx)
self.entries.get(idx.as_log_idx())
}
}

#[must_use]
pub enum MatchOutcome {
Match,
NoMatch,
DoesntExist,
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -71,39 +125,154 @@ mod tests {
};

// Empty log
assert!(log.find_entry_by(Idx::from(1)).is_none());
assert!(log.find_entry_at(&Idx::from(1)).is_none());

log.push(vec![entry.clone()]);

// Find Idx::initial
assert!(log.find_entry_by(Idx::initial()).is_none());
assert!(log.find_entry_at(&Idx::initial()).is_none());

// Find existing entry
assert_eq!(*log.find_entry_by(Idx::from(1)).unwrap(), entry);
assert_eq!(*log.find_entry_at(&Idx::from(1)).unwrap(), entry);
}

#[test]
fn test_log_matches_at_idx() {
fn test_entry_matches() {
let mut log = Log::new();
let term = Term::from(1);
let entry = Entry { term, command: 8 };

// Empty log
assert!(log.entry_matches(TermIdx::initial()));
assert!(matches!(
log.entry_matches(TermIdx::initial()),
MatchOutcome::Match
));

// Non-empty log
log.push(vec![entry.clone()]);
assert!(!log.entry_matches(TermIdx::initial()));
assert!(matches!(
log.entry_matches(TermIdx::initial()),
MatchOutcome::DoesntExist
));

// Log entry match
let term_idx = TermIdx::builder().with_term(term).with_idx(Idx::from(1));
assert!(log.entry_matches(term_idx));
assert!(matches!(log.entry_matches(term_idx), MatchOutcome::Match));

// Log entry mismatch
let mis_match_term_idx = TermIdx {
term: Term::from(2),
idx: Idx::from(1),
};
assert!(!log.entry_matches(mis_match_term_idx));
assert!(matches!(
log.entry_matches(mis_match_term_idx),
MatchOutcome::NoMatch
));

// Non-existing entry
let non_existing_term_idx = TermIdx {
term: Term::from(2),
idx: Idx::from(2),
};
assert!(matches!(
log.entry_matches(non_existing_term_idx),
MatchOutcome::DoesntExist
));
}

#[should_panic]
#[test]
fn invalid_term_at_idx() {
let log = Log::new();
log.term_at_idx(&Idx::initial());
}

#[test]
fn valid_term_at_idx() {
let mut log = Log::new();
let expected_term = Term::from(1);
let entry = Entry {
term: expected_term,
command: 8,
};
log.push(vec![entry]);

let term = log.term_at_idx(&Idx::from(1)).unwrap();
assert_eq!(expected_term, term);
}

#[test]
pub fn match_leaders_log_for_empty_logs() {
let mut log = Log::new();

let outcome = log.match_leaders_log(
Entry {
term: Term::from(1),
command: 8,
},
Idx::from(1),
);
assert!(matches!(outcome, MatchOutcome::DoesntExist));

let outcome = log.match_leaders_log(
Entry {
term: Term::from(1),
command: 8,
},
Idx::from(2),
);
assert!(matches!(outcome, MatchOutcome::DoesntExist));
}

#[test]
pub fn test_match_leaders_log() {
let mut log = Log::new();
log.push(vec![
// Idx 1
Entry {
term: Term::from(2),
command: 8,
},
// Idx 2
Entry {
term: Term::from(2),
command: 8,
},
// Idx 3
Entry {
term: Term::from(4),
command: 8,
},
]);

// matches
let match_outcome = log.match_leaders_log(
Entry {
term: Term::from(2),
command: 8,
},
Idx::from(1),
);
assert!(matches!(match_outcome, MatchOutcome::Match));

// doesnt match
let no_match_outcome = log.match_leaders_log(
Entry {
term: Term::from(3),
command: 8,
},
Idx::from(3),
);
assert!(matches!(no_match_outcome, MatchOutcome::NoMatch));

// doesnt exist
let doesnt_exist_outcome = log.match_leaders_log(
Entry {
term: Term::from(4),
command: 8,
},
Idx::from(4),
);
assert!(matches!(doesnt_exist_outcome, MatchOutcome::DoesntExist));
}
}
36 changes: 34 additions & 2 deletions src/log/idx.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use s2n_codec::{DecoderBufferResult, DecoderValue, EncoderValue};
use std::ops::{Add, AddAssign, Sub};

//% Compliance:
//% `commitIndex` index of highest log entry known to be committed (initialized to 0, increases
Expand All @@ -12,12 +13,43 @@ impl Idx {
pub const fn initial() -> Self {
INITIAL_IDX
}

pub fn is_initial(&self) -> bool {
*self == INITIAL_IDX
}

// Idx represented as an index into the Log.entries array.
pub fn as_log_idx(&self) -> usize {
// Idx is 1 indexed while the Log.entries is 0 indexed.
self.0 as usize - 1
}
}

impl Add<u64> for Idx {
type Output = Self;

fn add(self, rhs: u64) -> Self::Output {
Idx(self.0 + rhs)
}
}

impl Sub<u64> for Idx {
type Output = Self;

fn sub(self, rhs: u64) -> Self::Output {
assert!(self.0 > 0, "value overflowed on subtraction");
Idx(self.0 - rhs)
}
}

impl AddAssign<u64> for Idx {
fn add_assign(&mut self, rhs: u64) {
*self = Idx(self.0 + rhs);
}
}

impl From<u64> for Idx {
fn from(value: u64) -> Self {
// index values should be greater than 0
debug_assert!(value > 0);
Idx(value)
}
}
Expand Down
Loading
Loading