Skip to content

Commit

Permalink
Remove Progress.is_learner (#119)
Browse files Browse the repository at this point in the history
* Remove Progress.is_learner and depend on ProgressSet Configuration.
  • Loading branch information
Hoverbear authored Oct 22, 2018
1 parent 6f88eb6 commit dcba27f
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 66 deletions.
146 changes: 127 additions & 19 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl ProgressSet {
}

/// Adds a voter node
pub fn insert_voter(&mut self, id: u64, mut pr: Progress) -> Result<(), Error> {
pub fn insert_voter(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
// If the progress exists already this is in error.
if self.progress.contains_key(&id) {
// Determine the correct error to return.
Expand All @@ -162,15 +162,14 @@ impl ProgressSet {
}
return Err(Error::Exists(id, "voters"));
}
pr.is_learner = false;
self.configuration.voters.insert(id);
self.progress.insert(id, pr);
self.assert_progress_and_configuration_consistent();
Ok(())
}

/// Adds a learner to the cluster
pub fn insert_learner(&mut self, id: u64, mut pr: Progress) -> Result<(), Error> {
pub fn insert_learner(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
// If the progress exists already this is in error.
if self.progress.contains_key(&id) {
// Determine the correct error to return.
Expand All @@ -179,7 +178,6 @@ impl ProgressSet {
}
return Err(Error::Exists(id, "voters"));
}
pr.is_learner = true;
self.configuration.learners.insert(id);
self.progress.insert(id, pr);
self.assert_progress_and_configuration_consistent();
Expand All @@ -197,15 +195,13 @@ impl ProgressSet {

/// Promote a learner to a peer.
pub fn promote_learner(&mut self, id: u64) -> Result<(), Error> {
match self.progress.get_mut(&id) {
Some(progress) => if !progress.is_learner {
Err(Error::Exists(id, "voters"))?;
} else {
progress.is_learner = false;
self.configuration.voters.insert(id);
self.configuration.learners.remove(&id);
},
None => Err(Error::NotExists(id, "learners"))?,
if !self.configuration.learners.remove(&id) {
// Wasn't already a learner. We can't promote what doesn't exist.
return Err(Error::NotExists(id, "learners"));
}
if !self.configuration.voters.insert(id) {
// Already existed, the caller should know this was a noop.
return Err(Error::Exists(id, "voters"));
}
self.assert_progress_and_configuration_consistent();
Ok(())
Expand Down Expand Up @@ -233,7 +229,7 @@ impl ProgressSet {
}

/// The progress of catching up from a restart.
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, PartialEq)]
pub struct Progress {
/// How much state is matched.
pub matched: u64,
Expand Down Expand Up @@ -272,18 +268,14 @@ pub struct Progress {
/// When a leader receives a reply, the previous inflights should
/// be freed by calling inflights.freeTo.
pub ins: Inflights,

/// Indicates the Progress is a learner or not.
pub is_learner: bool,
}

impl Progress {
/// Creates a new progress with the given settings.
pub fn new(next_idx: u64, ins_size: usize, is_learner: bool) -> Self {
pub fn new(next_idx: u64, ins_size: usize) -> Self {
Progress {
next_idx,
ins: Inflights::new(ins_size),
is_learner,
..Default::default()
}
}
Expand Down Expand Up @@ -630,3 +622,119 @@ mod test {
assert_eq!(inflight, wantin);
}
}

// TODO: Reorganize this whole file into separate files.
// See https://github.com/pingcap/raft-rs/issues/125
#[cfg(test)]
mod test_progress_set {
use Result;
use {Progress, ProgressSet};

const CANARY: u64 = 123;

#[test]
fn test_insert_redundant_voter() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
set.insert_voter(1, default_progress.clone())?;
assert!(
set.insert_voter(1, canary_progress).is_err(),
"Should return an error on redundant insert."
);
assert_eq!(
*set.get(1).expect("Should be inserted."),
default_progress,
"The ProgressSet was mutated in a `insert_voter` that returned error."
);
Ok(())
}

#[test]
fn test_insert_redundant_learner() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
set.insert_learner(1, default_progress.clone())?;
assert!(
set.insert_learner(1, canary_progress).is_err(),
"Should return an error on redundant insert."
);
assert_eq!(
*set.get(1).expect("Should be inserted."),
default_progress,
"The ProgressSet was mutated in a `insert_learner` that returned error."
);
Ok(())
}

#[test]
fn test_insert_learner_that_is_voter() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
set.insert_voter(1, default_progress.clone())?;
assert!(
set.insert_learner(1, canary_progress).is_err(),
"Should return an error on invalid learner insert."
);
assert_eq!(
*set.get(1).expect("Should be inserted."),
default_progress,
"The ProgressSet was mutated in a `insert_learner` that returned error."
);
Ok(())
}

#[test]
fn test_insert_voter_that_is_learner() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
set.insert_learner(1, default_progress.clone())?;
assert!(
set.insert_voter(1, canary_progress).is_err(),
"Should return an error on invalid voter insert."
);
assert_eq!(
*set.get(1).expect("Should be inserted."),
default_progress,
"The ProgressSet was mutated in a `insert_voter` that returned error."
);
Ok(())
}

#[test]
fn test_promote_learner() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
set.insert_voter(1, default_progress)?;
let pre = set.get(1).expect("Should have been inserted").clone();
assert!(
set.promote_learner(1).is_err(),
"Should return an error on promote_learner on a peer that is a voter."
);
// Not yet added.
assert!(
set.promote_learner(2).is_err(),
"Should return an error on promote_learner on a non-existing peer.."
);
assert_eq!(
pre,
*set.get(1).expect("Peer should not have been promoted")
);
Ok(())
}
}
64 changes: 32 additions & 32 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ impl<T: Storage> Raft<T> {
tag: c.tag.to_owned(),
};
for p in peers {
let pr = Progress::new(1, r.max_inflight, false);
let pr = Progress::new(1, r.max_inflight);
if let Err(e) = r.mut_prs().insert_voter(*p, pr) {
panic!("{}", e);
}
}
for p in learners {
let pr = Progress::new(1, r.max_inflight, true);
let pr = Progress::new(1, r.max_inflight);
if let Err(e) = r.mut_prs().insert_learner(*p, pr) {
panic!("{}", e);
};
Expand Down Expand Up @@ -616,7 +616,7 @@ impl<T: Storage> Raft<T> {
let (last_index, max_inflight) = (self.raft_log.last_index(), self.max_inflight);
let self_id = self.id;
for (&id, pr) in self.mut_prs().iter_mut() {
*pr = Progress::new(last_index + 1, max_inflight, pr.is_learner);
*pr = Progress::new(last_index + 1, max_inflight);
if id == self_id {
pr.matched = last_index;
}
Expand Down Expand Up @@ -1843,38 +1843,25 @@ impl<T: Storage> Raft<T> {
self.prs().voter_ids().contains(&self.id)
}

fn add_voter_or_learner(&mut self, id: u64, learner: bool) {
fn add_voter_or_learner(&mut self, id: u64, learner: bool) -> Result<()> {
debug!(
"Adding node (learner: {}) with ID {} to peers.",
learner, id
);

// Ignore redundant inserts.
// TODO: Remove these and have this function and related functions return errors.
if let Some(progress) = self.prs().get(id) {
// If progress.is_learner == learner, then it's already inserted as what it should be, return early to avoid error.
if progress.is_learner == learner {
info!("{} Ignoring redundant insert of ID {}.", self.tag, id);
return;
}
// If progress.is_learner == false, and learner == true, then it's a demotion, return early to avoid an error.
if !progress.is_learner && learner {
info!("{} Ignoring voter demotion of ID {}.", self.tag, id);
return;
}
};

let progress = Progress::new(self.raft_log.last_index() + 1, self.max_inflight, learner);
let result = if learner {
let progress = Progress::new(self.raft_log.last_index() + 1, self.max_inflight);
self.mut_prs().insert_learner(id, progress)
} else if self.prs().learner_ids().contains(&id) {
self.mut_prs().promote_learner(id)
} else {
let progress = Progress::new(self.raft_log.last_index() + 1, self.max_inflight);
self.mut_prs().insert_voter(id, progress)
};

if let Err(e) = result {
panic!("{}", e)
error!("{}", e);
return Err(e);
}
if self.id == id {
self.is_learner = learner
Expand All @@ -1883,16 +1870,19 @@ impl<T: Storage> Raft<T> {
// Otherwise, check_quorum may cause us to step down if it is invoked
// before the added node has a chance to commuicate with us.
self.mut_prs().get_mut(id).unwrap().recent_active = true;
result
}

/// Adds a new node to the cluster.
// TODO: Return an error on a redundant insert.
pub fn add_node(&mut self, id: u64) {
self.add_voter_or_learner(id, false);
self.add_voter_or_learner(id, false).ok();
}

/// Adds a learner node.
// TODO: Return an error on a redundant insert.
pub fn add_learner(&mut self, id: u64) {
self.add_voter_or_learner(id, true);
self.add_voter_or_learner(id, true).ok();
}

/// Removes a node from the raft.
Expand All @@ -1917,7 +1907,7 @@ impl<T: Storage> Raft<T> {

/// Updates the progress of the learner or voter.
pub fn set_progress(&mut self, id: u64, matched: u64, next_idx: u64, is_learner: bool) {
let mut p = Progress::new(next_idx, self.max_inflight, is_learner);
let mut p = Progress::new(next_idx, self.max_inflight);
p.matched = matched;
if is_learner {
if let Err(e) = self.mut_prs().insert_learner(id, p) {
Expand Down Expand Up @@ -1993,16 +1983,26 @@ impl<T: Storage> Raft<T> {
fn check_quorum_active(&mut self) -> bool {
let self_id = self.id;
let mut act = 0;
for (&id, pr) in self.mut_prs().iter_mut() {
if id == self_id {
act += 1;
continue;

self.prs = if let Some(mut prs) = self.prs.take() {
for (&id, pr) in prs.voters_mut() {
if id == self_id {
act += 1;
continue;
}
if pr.recent_active {
act += 1;
}
pr.recent_active = false;
}
if !pr.is_learner && pr.recent_active {
act += 1;
for (&_id, pr) in prs.learners_mut() {
pr.recent_active = false;
}
pr.recent_active = false;
}
Some(prs)
} else {
unreachable!("Invariant: ProgressSet is `None` and should always be `Some(prs)`.");
};

act >= self.quorum()
}

Expand Down
24 changes: 20 additions & 4 deletions tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3670,12 +3670,12 @@ fn test_restore_with_learner() {

for &node in s.get_metadata().get_conf_state().get_nodes() {
assert!(sm.prs().get(node).is_some());
assert!(!sm.prs().get(node).unwrap().is_learner);
assert!(!sm.prs().learner_ids().contains(&node));
}

for &node in s.get_metadata().get_conf_state().get_learners() {
assert!(sm.prs().get(node).is_some());
assert!(sm.prs().get(node).unwrap().is_learner);
assert!(sm.prs().learner_ids().contains(&node));
}

assert!(!sm.restore(s));
Expand Down Expand Up @@ -3751,8 +3751,24 @@ fn test_add_learner() {
let mut n1 = new_test_raft(1, vec![1], 10, 1, new_storage());
n1.add_learner(2);

assert_eq!(n1.prs().learner_ids().iter().next().unwrap(), &2);
assert!(n1.prs().get(2).unwrap().is_learner);
assert_eq!(*n1.prs().learner_ids().iter().next().unwrap(), 2);
assert!(n1.prs().learner_ids().contains(&2));
}

// Ensure when add_voter is called on a peers own ID that it will be promoted.
// When the action fails, ensure it doesn't mutate the raft state.
#[test]
fn test_add_voter_peer_promotes_self_sets_is_learner() {
setup_for_test();
let mut n1 = new_test_raft(1, vec![1], 10, 1, new_storage());
// Node is already voter.
n1.add_learner(1);
assert_eq!(n1.is_learner, false);
assert!(n1.prs().voter_ids().contains(&1));
n1.remove_node(1);
n1.add_learner(1);
assert_eq!(n1.is_learner, true);
assert!(n1.prs().learner_ids().contains(&1));
}

// TestRemoveLearner tests that removeNode could update nodes and
Expand Down
Loading

0 comments on commit dcba27f

Please sign in to comment.