Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Progress.is_learner #119

Merged
merged 12 commits into from
Oct 22, 2018
142 changes: 123 additions & 19 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl ProgressSet {
}

/// Adds a voter node
pub fn insert_voter(&mut self, id: u64, mut pr: Progress) -> Result<(), Error> {
pub fn insert_voter(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
// If the progress exists already this is in error.
if self.progress.contains_key(&id) {
// Determine the correct error to return.
Expand All @@ -162,15 +162,14 @@ impl ProgressSet {
}
return Err(Error::Exists(id, "voters"));
}
pr.is_learner = false;
self.configuration.voters.insert(id);
self.progress.insert(id, pr);
self.assert_progress_and_configuration_consistent();
Ok(())
}

/// Adds a learner to the cluster
pub fn insert_learner(&mut self, id: u64, mut pr: Progress) -> Result<(), Error> {
pub fn insert_learner(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
// If the progress exists already this is in error.
if self.progress.contains_key(&id) {
// Determine the correct error to return.
Expand All @@ -179,7 +178,6 @@ impl ProgressSet {
}
return Err(Error::Exists(id, "voters"));
}
pr.is_learner = true;
self.configuration.learners.insert(id);
self.progress.insert(id, pr);
self.assert_progress_and_configuration_consistent();
Expand All @@ -197,15 +195,13 @@ impl ProgressSet {

/// Promote a learner to a peer.
pub fn promote_learner(&mut self, id: u64) -> Result<(), Error> {
match self.progress.get_mut(&id) {
Some(progress) => if !progress.is_learner {
Err(Error::Exists(id, "voters"))?;
} else {
progress.is_learner = false;
self.configuration.voters.insert(id);
self.configuration.learners.remove(&id);
},
None => Err(Error::NotExists(id, "learners"))?,
if !self.configuration.learners.remove(&id) {
// Wasn't already a voter. We can't promote what doesn't exist.
return Err(Error::Exists(id, "learners"));
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
}
if !self.configuration.voters.insert(id) {
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
// Already existed, the caller should know this was a noop.
return Err(Error::Exists(id, "voters"));
}
self.assert_progress_and_configuration_consistent();
Ok(())
Expand Down Expand Up @@ -233,7 +229,7 @@ impl ProgressSet {
}

/// The progress of catching up from a restart.
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, PartialEq)]
pub struct Progress {
/// How much state is matched.
pub matched: u64,
Expand Down Expand Up @@ -272,18 +268,14 @@ pub struct Progress {
/// When a leader receives a reply, the previous inflights should
/// be freed by calling inflights.freeTo.
pub ins: Inflights,

/// Indicates the Progress is a learner or not.
pub is_learner: bool,
}

impl Progress {
/// Creates a new progress with the given settings.
pub fn new(next_idx: u64, ins_size: usize, is_learner: bool) -> Self {
pub fn new(next_idx: u64, ins_size: usize) -> Self {
Progress {
next_idx,
ins: Inflights::new(ins_size),
is_learner,
..Default::default()
}
}
Expand Down Expand Up @@ -630,3 +622,115 @@ mod test {
assert_eq!(inflight, wantin);
}
}

// TODO: Reorganize this whole file into separate files.
// See https://github.com/pingcap/raft-rs/issues/125
#[cfg(test)]
mod test_progress_set {
use Result;
use {Progress, ProgressSet};

const CANARY: u64 = 123;

#[test]
fn test_insert_redundant_voter() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
set.insert_voter(1, default_progress.clone())?;
assert!(
set.insert_voter(1, canary_progress).is_err(),
"Should return an error on redundant insert."
);
assert_eq!(
*set.get(1).expect("Should be inserted."),
default_progress,
"The ProgressSet was mutated in a `insert_voter` that returned error."
);
Ok(())
}

#[test]
fn test_insert_redundant_learner() -> Result<()> {
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
set.insert_voter(1, default_progress.clone())?;
assert!(
set.insert_voter(1, canary_progress).is_err(),
"Should return an error on redundant insert."
);
assert_eq!(
*set.get(1).expect("Should be inserted."),
default_progress,
"The ProgressSet was mutated in a `insert_learner` that returned error."
);
Ok(())
}

#[test]
fn test_insert_learner_that_is_voter() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
set.insert_voter(1, default_progress.clone())?;
assert!(
set.insert_learner(1, canary_progress).is_err(),
"Should return an error on invalid learner insert."
);
assert_eq!(
*set.get(1).expect("Should be inserted."),
default_progress,
"The ProgressSet was mutated in a `insert_learner` that returned error."
);
Ok(())
}

#[test]
fn test_insert_voter_that_is_learner() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
set.insert_learner(1, default_progress.clone())?;
assert!(
set.insert_voter(1, canary_progress).is_err(),
"Should return an error on invalid voter insert."
);
assert_eq!(
*set.get(1).expect("Should be inserted."),
default_progress,
"The ProgressSet was mutated in a `insert_voter` that returned error."
);
Ok(())
}

#[test]
fn test_promote_learner_already_voter() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
set.insert_voter(1, default_progress)?;
let pre = set.get(1).expect("Should have been inserted").clone();
assert!(
set.promote_learner(1).is_err(),
"Should return an error on invalid promote_learner."
);
assert!(
set.promote_learner(2).is_err(),
"Should return an error on invalid promote_learner."
);
assert_eq!(pre, *set.get(1).expect("Peer should not have been deleted"));
Ok(())
}
}
64 changes: 32 additions & 32 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ impl<T: Storage> Raft<T> {
tag: c.tag.to_owned(),
};
for p in peers {
let pr = Progress::new(1, r.max_inflight, false);
let pr = Progress::new(1, r.max_inflight);
if let Err(e) = r.mut_prs().insert_voter(*p, pr) {
panic!("{}", e);
}
}
for p in learners {
let pr = Progress::new(1, r.max_inflight, true);
let pr = Progress::new(1, r.max_inflight);
if let Err(e) = r.mut_prs().insert_learner(*p, pr) {
panic!("{}", e);
};
Expand Down Expand Up @@ -616,7 +616,7 @@ impl<T: Storage> Raft<T> {
let (last_index, max_inflight) = (self.raft_log.last_index(), self.max_inflight);
let self_id = self.id;
for (&id, pr) in self.mut_prs().iter_mut() {
*pr = Progress::new(last_index + 1, max_inflight, pr.is_learner);
*pr = Progress::new(last_index + 1, max_inflight);
if id == self_id {
pr.matched = last_index;
}
Expand Down Expand Up @@ -1843,38 +1843,25 @@ impl<T: Storage> Raft<T> {
self.prs().voter_ids().contains(&self.id)
}

fn add_voter_or_learner(&mut self, id: u64, learner: bool) {
fn add_voter_or_learner(&mut self, id: u64, learner: bool) -> Result<()> {
debug!(
"Adding node (learner: {}) with ID {} to peers.",
learner, id
);

// Ignore redundant inserts.
// TODO: Remove these and have this function and related functions return errors.
if let Some(progress) = self.prs().get(id) {
// If progress.is_learner == learner, then it's already inserted as what it should be, return early to avoid error.
if progress.is_learner == learner {
info!("{} Ignoring redundant insert of ID {}.", self.tag, id);
return;
}
// If progress.is_learner == false, and learner == true, then it's a demotion, return early to avoid an error.
if !progress.is_learner && learner {
info!("{} Ignoring voter demotion of ID {}.", self.tag, id);
return;
}
};

let progress = Progress::new(self.raft_log.last_index() + 1, self.max_inflight, learner);
let result = if learner {
let progress = Progress::new(self.raft_log.last_index() + 1, self.max_inflight);
self.mut_prs().insert_learner(id, progress)
} else if self.prs().learner_ids().contains(&id) {
self.mut_prs().promote_learner(id)
} else {
let progress = Progress::new(self.raft_log.last_index() + 1, self.max_inflight);
self.mut_prs().insert_voter(id, progress)
};

if let Err(e) = result {
panic!("{}", e)
error!("{}", e);
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
return Err(e);
}
if self.id == id {
self.is_learner = learner
Expand All @@ -1883,16 +1870,19 @@ impl<T: Storage> Raft<T> {
// Otherwise, check_quorum may cause us to step down if it is invoked
// before the added node has a chance to commuicate with us.
self.mut_prs().get_mut(id).unwrap().recent_active = true;
result
}

/// Adds a new node to the cluster.
// TODO: Return an error on a redundant insert.
pub fn add_node(&mut self, id: u64) {
self.add_voter_or_learner(id, false);
self.add_voter_or_learner(id, false).ok();
}

/// Adds a learner node.
// TODO: Return an error on a redundant insert.
pub fn add_learner(&mut self, id: u64) {
self.add_voter_or_learner(id, true);
self.add_voter_or_learner(id, true).ok();
}

/// Removes a node from the raft.
Expand All @@ -1917,7 +1907,7 @@ impl<T: Storage> Raft<T> {

/// Updates the progress of the learner or voter.
pub fn set_progress(&mut self, id: u64, matched: u64, next_idx: u64, is_learner: bool) {
let mut p = Progress::new(next_idx, self.max_inflight, is_learner);
let mut p = Progress::new(next_idx, self.max_inflight);
p.matched = matched;
if is_learner {
if let Err(e) = self.mut_prs().insert_learner(id, p) {
Expand Down Expand Up @@ -1993,16 +1983,26 @@ impl<T: Storage> Raft<T> {
fn check_quorum_active(&mut self) -> bool {
let self_id = self.id;
let mut act = 0;
for (&id, pr) in self.mut_prs().iter_mut() {
if id == self_id {
act += 1;
continue;

self.prs = if let Some(mut prs) = self.prs.take() {
for (&id, pr) in prs.voters_mut() {
if id == self_id {
act += 1;
continue;
}
if pr.recent_active {
act += 1;
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
}
pr.recent_active = false;
}
if !pr.is_learner && pr.recent_active {
act += 1;
for (&_id, pr) in prs.learners_mut() {
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
pr.recent_active = false;
}
pr.recent_active = false;
}
Some(prs)
} else {
unreachable!("Invariant: ProgressSet is `None` and should always be `Some(prs)`.");
};

act >= self.quorum()
}

Expand Down
8 changes: 4 additions & 4 deletions tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3670,12 +3670,12 @@ fn test_restore_with_learner() {

for &node in s.get_metadata().get_conf_state().get_nodes() {
assert!(sm.prs().get(node).is_some());
assert!(!sm.prs().get(node).unwrap().is_learner);
assert!(!sm.prs().learner_ids().contains(&node));
}

for &node in s.get_metadata().get_conf_state().get_learners() {
assert!(sm.prs().get(node).is_some());
assert!(sm.prs().get(node).unwrap().is_learner);
assert!(sm.prs().learner_ids().contains(&node));
}

assert!(!sm.restore(s));
Expand Down Expand Up @@ -3751,8 +3751,8 @@ fn test_add_learner() {
let mut n1 = new_test_raft(1, vec![1], 10, 1, new_storage());
n1.add_learner(2);

assert_eq!(n1.prs().learner_ids().iter().next().unwrap(), &2);
assert!(n1.prs().get(2).unwrap().is_learner);
assert_eq!(*n1.prs().learner_ids().iter().next().unwrap(), 2);
assert!(n1.prs().learner_ids().contains(&2));
}

// TestRemoveLearner tests that removeNode could update nodes and
Expand Down
14 changes: 3 additions & 11 deletions tests/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,13 @@ impl Interface {
prs.learner_ids().len(),
));
for id in ids {
let progress = Progress::default();
if prs.learner_ids().contains(id) {
let progress = Progress {
is_learner: true,
..Default::default()
};
if let Err(e) = self.mut_prs().insert_learner(*id, progress) {
panic!("{}", e);
}
} else {
let progress = Progress {
..Default::default()
};
if let Err(e) = self.mut_prs().insert_voter(*id, progress) {
panic!("{}", e);
}
} else if let Err(e) = self.mut_prs().insert_voter(*id, progress) {
panic!("{}", e);
}
}
let term = self.term;
Expand Down