Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a single set of Progresses for ProgressSet. #108

Merged
merged 7 commits into from
Sep 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions benches/suites/progress_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use DEFAULT_RAFT_SETS;

pub fn bench_progress_set(c: &mut Criterion) {
bench_progress_set_new(c);
bench_progress_set_with_capacity(c);
bench_progress_set_insert_voter(c);
bench_progress_set_insert_learner(c);
bench_progress_set_promote_learner(c);
Expand All @@ -14,7 +15,7 @@ pub fn bench_progress_set(c: &mut Criterion) {
}

fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
let mut set = ProgressSet::new(voters, learners);
let mut set = ProgressSet::with_capacity(voters, learners);
(0..voters).for_each(|id| {
set.insert_voter(id as u64, Default::default()).ok();
});
Expand All @@ -25,16 +26,25 @@ fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
}

pub fn bench_progress_set_new(c: &mut Criterion) {
let bench = |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::new());
};

c.bench_function("ProgressSet::new", bench);
}

pub fn bench_progress_set_with_capacity(c: &mut Criterion) {
let bench = |voters, learners| {
move |b: &mut Bencher| {
// No setup.
b.iter(|| ProgressSet::new(voters, learners));
b.iter(|| ProgressSet::with_capacity(voters, learners));
}
};

DEFAULT_RAFT_SETS.iter().for_each(|(voters, learners)| {
c.bench_function(
&format!("ProgressSet::new ({}, {})", voters, learners),
&format!("ProgressSet::with_capacity ({}, {})", voters, learners),
bench(*voters, *learners),
);
});
Expand Down
201 changes: 140 additions & 61 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
// limitations under the License.

use errors::Error;
use fxhash::FxHashMap;
use fxhash::{FxBuildHasher, FxHashMap, FxHashSet};
use std::cmp;
use std::collections::hash_map::HashMap;
use std::collections::{HashMap, HashSet};

/// The state of the progress.
#[derive(Debug, PartialEq, Clone, Copy)]
Expand All @@ -47,119 +47,188 @@ impl Default for ProgressState {
}
}

#[derive(Clone, Debug, Default)]
struct Configuration {
voters: FxHashSet<u64>,
learners: FxHashSet<u64>,
}

/// `ProgressSet` contains several `Progress`es,
/// which could be `Leader`, `Follower` and `Learner`.
#[derive(Default, Clone)]
pub struct ProgressSet {
voters: FxHashMap<u64, Progress>,
learners: FxHashMap<u64, Progress>,
progress: FxHashMap<u64, Progress>,
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
configuration: Configuration,
}

impl ProgressSet {
/// Creates a new ProgressSet.
pub fn new(voter_size: usize, learner_size: usize) -> Self {
pub fn new() -> Self {
ProgressSet {
voters: HashMap::with_capacity_and_hasher(voter_size, Default::default()),
learners: HashMap::with_capacity_and_hasher(learner_size, Default::default()),
progress: Default::default(),
configuration: Default::default(),
}
}

/// Create a progress sete with the specified sizes already reserved.
pub fn with_capacity(voters: usize, learners: usize) -> Self {
ProgressSet {
progress: HashMap::with_capacity_and_hasher(
voters + learners,
FxBuildHasher::default(),
),
configuration: Configuration {
voters: HashSet::with_capacity_and_hasher(voters, FxBuildHasher::default()),
learners: HashSet::with_capacity_and_hasher(learners, FxBuildHasher::default()),
},
}
}

/// Returns the status of voters.
pub fn voters(&self) -> &FxHashMap<u64, Progress> {
&self.voters
#[inline]
pub fn voters(&self) -> impl Iterator<Item = (&u64, &Progress)> {
BusyJay marked this conversation as resolved.
Show resolved Hide resolved
let set = self.voter_ids();
self.progress.iter().filter(move |(&k, _)| set.contains(&k))
}

/// Returns the status of learners.
pub fn learners(&self) -> &FxHashMap<u64, Progress> {
&self.learners
#[inline]
pub fn learners(&self) -> impl Iterator<Item = (&u64, &Progress)> {
let set = self.learner_ids();
self.progress.iter().filter(move |(&k, _)| set.contains(&k))
}

/// Returns the ids of all known nodes.
pub fn nodes(&self) -> Vec<u64> {
let mut nodes = Vec::with_capacity(self.voters.len());
nodes.extend(self.voters.keys());
nodes.sort();
nodes
/// Returns the mutable status of voters.
#[inline]
pub fn voters_mut(&mut self) -> impl Iterator<Item = (&u64, &mut Progress)> {
let ids = &self.configuration.voters;
self.progress
.iter_mut()
.filter(move |(k, _)| ids.contains(k))
}

/// Returns the mutable status of learners.
#[inline]
pub fn learners_mut(&mut self) -> impl Iterator<Item = (&u64, &mut Progress)> {
let ids = &self.configuration.learners;
self.progress
.iter_mut()
.filter(move |(k, _)| ids.contains(k))
}

/// Returns the ids of all known voters.
#[inline]
pub fn voter_ids(&self) -> &FxHashSet<u64> {
&self.configuration.voters
}

/// Returns the ids of all known learners.
pub fn learner_nodes(&self) -> Vec<u64> {
let mut ids = Vec::with_capacity(self.learners.len());
ids.extend(self.learners.keys());
ids.sort();
ids
#[inline]
pub fn learner_ids(&self) -> &FxHashSet<u64> {
&self.configuration.learners
}

/// Grabs a reference to the progress of a node.
#[inline]
pub fn get(&self, id: u64) -> Option<&Progress> {
self.voters.get(&id).or_else(|| self.learners.get(&id))
self.progress.get(&id)
}

/// Grabs a mutable reference to the progress of a node.
#[inline]
pub fn get_mut(&mut self, id: u64) -> Option<&mut Progress> {
let progress = self.voters.get_mut(&id);
if progress.is_none() {
return self.learners.get_mut(&id);
}
progress
self.progress.get_mut(&id)
}

/// Returns an iterator across all the nodes and their progress.
pub fn iter(&self) -> impl Iterator<Item = (&u64, &Progress)> {
self.voters.iter().chain(&self.learners)
#[inline]
pub fn iter(&self) -> impl ExactSizeIterator<Item = (&u64, &Progress)> {
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
self.progress.iter()
}

/// Returns a mutable iterator across all the nodes and their progress.
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&u64, &mut Progress)> {
self.voters.iter_mut().chain(&mut self.learners)
#[inline]
pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = (&u64, &mut Progress)> {
self.progress.iter_mut()
}

/// Adds a voter node
pub fn insert_voter(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?
}
if self.learners.contains_key(&id) {
Err(Error::Exists(id, "learners"))?;
pub fn insert_voter(&mut self, id: u64, mut pr: Progress) -> Result<(), Error> {
// If the progress exists already this is in error.
if self.progress.contains_key(&id) {
// Determine the correct error to return.
if self.learner_ids().contains(&id) {
return Err(Error::Exists(id, "learners"));
}
return Err(Error::Exists(id, "voters"));
}
self.voters.insert(id, pr);
pr.is_learner = false;
self.configuration.voters.insert(id);
self.progress.insert(id, pr);
self.assert_progress_and_configuration_consistent();
Ok(())
}

/// Adds a learner to the cluster
pub fn insert_learner(&mut self, id: u64, pr: Progress) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?
}
if self.learners.contains_key(&id) {
Err(Error::Exists(id, "learners"))?
pub fn insert_learner(&mut self, id: u64, mut pr: Progress) -> Result<(), Error> {
// If the progress exists already this is in error.
if self.progress.contains_key(&id) {
// Determine the correct error to return.
if self.learner_ids().contains(&id) {
return Err(Error::Exists(id, "learners"));
}
return Err(Error::Exists(id, "voters"));
}
self.learners.insert(id, pr);
pr.is_learner = true;
self.configuration.learners.insert(id);
self.progress.insert(id, pr);
self.assert_progress_and_configuration_consistent();
Ok(())
}

/// Removes the peer from the set of voters or learners.
pub fn remove(&mut self, id: u64) -> Option<Progress> {
BusyJay marked this conversation as resolved.
Show resolved Hide resolved
match self.voters.remove(&id) {
None => self.learners.remove(&id),
some => some,
}
self.configuration.voters.remove(&id);
self.configuration.learners.remove(&id);
let removed = self.progress.remove(&id);
self.assert_progress_and_configuration_consistent();
removed
}

/// Promote a learner to a peer.
pub fn promote_learner(&mut self, id: u64) -> Result<(), Error> {
if self.voters.contains_key(&id) {
Err(Error::Exists(id, "voters"))?;
}
// We don't want to remove it unless it's there.
if self.learners.contains_key(&id) {
let mut learner = self.learners.remove(&id).unwrap(); // We just checked!
learner.is_learner = false;
self.voters.insert(id, learner);
Ok(())
} else {
Err(Error::NotExists(id, "learners"))
match self.progress.get_mut(&id) {
Some(progress) => if !progress.is_learner {
Err(Error::Exists(id, "voters"))?;
} else {
progress.is_learner = false;
self.configuration.voters.insert(id);
self.configuration.learners.remove(&id);
},
None => Err(Error::NotExists(id, "learners"))?,
}
self.assert_progress_and_configuration_consistent();
Ok(())
}

#[inline(always)]
fn assert_progress_and_configuration_consistent(&self) {
debug_assert!(
self.configuration
.voters
.union(&self.configuration.learners)
.all(|v| self.progress.contains_key(v))
);
debug_assert!(
self.progress
.keys()
.all(|v| self.configuration.learners.contains(v)
|| self.configuration.voters.contains(v))
);
assert_eq!(
self.configuration.voters.len() + self.configuration.learners.len(),
self.progress.len()
);
}
}

Expand Down Expand Up @@ -209,6 +278,16 @@ pub struct Progress {
}

impl Progress {
/// Creates a new progress with the given settings.
pub fn new(next_idx: u64, ins_size: usize, is_learner: bool) -> Self {
Progress {
next_idx,
ins: Inflights::new(ins_size),
is_learner,
..Default::default()
}
}

fn reset_state(&mut self, state: ProgressState) {
self.paused = false;
self.pending_snapshot = 0;
Expand Down
Loading