Skip to content

Commit

Permalink
Merge #130
Browse files Browse the repository at this point in the history
130: raft: reduce memory allocation when reset r=Hoverbear a=BusyJay

Create a new progress will allocate a vec. So this pr adds a reset function to avoid the allocation. Little performance enhancement is detected:

```
Raft::campaign (7, 3, CampaignElection)
                        time:   [2.8076 us 2.8291 us 2.8565 us]
                        change: [-4.6982% -4.2007% -3.6530%] (p = 0.00 < 0.05)
                        Performance has improved.
```

Co-authored-by: Jay Lee <busyjaylee@gmail.com>
Co-authored-by: Hoverbear <operator@hoverbear.org>
Co-authored-by: qupeng <onlyqupeng@gmail.com>
  • Loading branch information
4 people committed Nov 8, 2018
2 parents d68ad38 + cded3d7 commit 3799cfa
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 67 deletions.
2 changes: 1 addition & 1 deletion benches/suites/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub fn bench_progress(c: &mut Criterion) {
pub fn bench_progress_default(c: &mut Criterion) {
let bench = |b: &mut Bencher| {
// No setup.
b.iter(Progress::default);
b.iter(|| Progress::new(9, 10));
};

c.bench_function("Progress::default", bench);
Expand Down
12 changes: 6 additions & 6 deletions benches/suites/progress_set.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use criterion::{Bencher, Criterion};
use raft::ProgressSet;
use raft::{Progress, ProgressSet};
use DEFAULT_RAFT_SETS;

pub fn bench_progress_set(c: &mut Criterion) {
Expand All @@ -17,10 +17,10 @@ pub fn bench_progress_set(c: &mut Criterion) {
fn quick_progress_set(voters: usize, learners: usize) -> ProgressSet {
let mut set = ProgressSet::with_capacity(voters, learners);
(0..voters).for_each(|id| {
set.insert_voter(id as u64, Default::default()).ok();
set.insert_voter(id as u64, Progress::new(0, 10)).ok();
});
(voters..learners).for_each(|id| {
set.insert_learner(id as u64, Default::default()).ok();
(voters..(learners + voters)).for_each(|id| {
set.insert_learner(id as u64, Progress::new(0, 10)).ok();
});
set
}
Expand Down Expand Up @@ -56,7 +56,7 @@ pub fn bench_progress_set_insert_voter(c: &mut Criterion) {
let set = quick_progress_set(voters, learners);
b.iter(|| {
let mut set = set.clone();
set.insert_voter(99, Default::default()).ok()
set.insert_voter(99, Progress::new(0, 10)).ok()
});
}
};
Expand All @@ -75,7 +75,7 @@ pub fn bench_progress_set_insert_learner(c: &mut Criterion) {
let set = quick_progress_set(voters, learners);
b.iter(|| {
let mut set = set.clone();
set.insert_learner(99, Default::default()).ok()
set.insert_learner(99, Progress::new(0, 10)).ok()
});
}
};
Expand Down
65 changes: 35 additions & 30 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl ProgressSet {
}

/// The progress of catching up from a restart.
#[derive(Debug, Default, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct Progress {
/// How much state is matched.
pub matched: u64,
Expand Down Expand Up @@ -274,9 +274,13 @@ impl Progress {
/// Creates a new progress with the given settings.
pub fn new(next_idx: u64, ins_size: usize) -> Self {
Progress {
matched: 0,
next_idx,
state: ProgressState::default(),
paused: false,
pending_snapshot: 0,
recent_active: false,
ins: Inflights::new(ins_size),
..Default::default()
}
}

Expand All @@ -287,6 +291,17 @@ impl Progress {
self.ins.reset();
}

pub(crate) fn reset(&mut self, next_idx: u64) {
self.matched = 0;
self.next_idx = next_idx;
self.state = ProgressState::default();
self.paused = false;
self.pending_snapshot = 0;
self.recent_active = false;
debug_assert!(self.ins.cap() != 0);
self.ins.reset();
}

/// Changes the progress to a probe.
pub fn become_probe(&mut self) {
// If the original state is ProgressStateSnapshot, progress knows that
Expand Down Expand Up @@ -394,7 +409,7 @@ impl Progress {
}

/// A buffer of inflight messages.
#[derive(Debug, Default, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct Inflights {
// the starting index in the buffer
start: usize,
Expand All @@ -410,7 +425,8 @@ impl Inflights {
pub fn new(cap: usize) -> Inflights {
Inflights {
buffer: Vec::with_capacity(cap),
..Default::default()
start: 0,
count: 0,
}
}

Expand Down Expand Up @@ -518,11 +534,8 @@ mod test {

assert_eq!(inflight, wantin2);

let mut inflight2 = Inflights {
start: 5,
buffer: Vec::with_capacity(10),
..Default::default()
};
let mut inflight2 = Inflights::new(10);
inflight2.start = 5;
inflight2.buffer.extend_from_slice(&[0, 0, 0, 0, 0]);

for i in 0..5 {
Expand Down Expand Up @@ -635,11 +648,9 @@ mod test_progress_set {
#[test]
fn test_insert_redundant_voter() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
let default_progress = Progress::new(0, 256);
let mut canary_progress = Progress::new(0, 256);
canary_progress.matched = CANARY;
set.insert_voter(1, default_progress.clone())?;
assert!(
set.insert_voter(1, canary_progress).is_err(),
Expand All @@ -656,11 +667,9 @@ mod test_progress_set {
#[test]
fn test_insert_redundant_learner() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
let default_progress = Progress::new(0, 256);
let mut canary_progress = Progress::new(0, 256);
canary_progress.matched = CANARY;
set.insert_learner(1, default_progress.clone())?;
assert!(
set.insert_learner(1, canary_progress).is_err(),
Expand All @@ -677,11 +686,9 @@ mod test_progress_set {
#[test]
fn test_insert_learner_that_is_voter() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
let default_progress = Progress::new(0, 256);
let mut canary_progress = Progress::new(0, 256);
canary_progress.matched = CANARY;
set.insert_voter(1, default_progress.clone())?;
assert!(
set.insert_learner(1, canary_progress).is_err(),
Expand All @@ -698,11 +705,9 @@ mod test_progress_set {
#[test]
fn test_insert_voter_that_is_learner() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let canary_progress = Progress {
matched: CANARY,
..Default::default()
};
let default_progress = Progress::new(0, 256);
let mut canary_progress = Progress::new(0, 256);
canary_progress.matched = CANARY;
set.insert_learner(1, default_progress.clone())?;
assert!(
set.insert_voter(1, canary_progress).is_err(),
Expand All @@ -719,7 +724,7 @@ mod test_progress_set {
#[test]
fn test_promote_learner() -> Result<()> {
let mut set = ProgressSet::default();
let default_progress = Progress::default();
let default_progress = Progress::new(0, 256);
set.insert_voter(1, default_progress)?;
let pre = set.get(1).expect("Should have been inserted").clone();
assert!(
Expand Down
6 changes: 3 additions & 3 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,15 +608,15 @@ impl<T: Storage> Raft<T> {

self.abort_leader_transfer();

self.votes = FxHashMap::default();
self.votes.clear();

self.pending_conf_index = 0;
self.read_only = ReadOnly::new(self.read_only.option);

let (last_index, max_inflight) = (self.raft_log.last_index(), self.max_inflight);
let last_index = self.raft_log.last_index();
let self_id = self.id;
for (&id, pr) in self.mut_prs().iter_mut() {
*pr = Progress::new(last_index + 1, max_inflight);
pr.reset(last_index + 1);
if id == self_id {
pr.matched = last_index;
}
Expand Down
35 changes: 11 additions & 24 deletions tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,11 @@ fn new_progress(
pending_snapshot: u64,
ins_size: usize,
) -> Progress {
Progress {
state,
matched,
next_idx,
pending_snapshot,
ins: Inflights::new(ins_size),
..Default::default()
}
let mut p = Progress::new(next_idx, ins_size);
p.state = state;
p.matched = matched;
p.pending_snapshot = pending_snapshot;
p
}

fn read_messages<T: Storage>(raft: &mut Raft<T>) -> Vec<Message> {
Expand Down Expand Up @@ -193,11 +190,8 @@ fn test_progress_update() {
(prev_m + 2, prev_m + 2, prev_n + 1, true),
];
for (i, &(update, wm, wn, wok)) in tests.iter().enumerate() {
let mut p = Progress {
matched: prev_m,
next_idx: prev_n,
..Default::default()
};
let mut p = Progress::new(prev_n, 256);
p.matched = prev_m;
let ok = p.maybe_update(update);
if ok != wok {
panic!("#{}: ok= {}, want {}", i, ok, wok);
Expand Down Expand Up @@ -263,12 +257,8 @@ fn test_progress_is_paused() {
(ProgressState::Snapshot, true, true),
];
for (i, &(state, paused, w)) in tests.iter().enumerate() {
let p = Progress {
state,
paused,
ins: Inflights::new(256),
..Default::default()
};
let mut p = new_progress(state, 0, 0, 0, 256);
p.paused = paused;
if p.is_paused() != w {
panic!("#{}: shouldwait = {}, want {}", i, p.is_paused(), w)
}
Expand All @@ -280,11 +270,8 @@ fn test_progress_is_paused() {
#[test]
fn test_progress_resume() {
setup_for_test();
let mut p = Progress {
next_idx: 2,
paused: true,
..Default::default()
};
let mut p = Progress::new(2, 256);
p.paused = true;
p.maybe_decr_to(1, 1);
assert!(!p.paused, "paused= true, want false");
p.paused = true;
Expand Down
8 changes: 5 additions & 3 deletions tests/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ impl Interface {
prs.learner_ids().len(),
));
for id in ids {
let progress = Progress::default();
let progress = Progress::new(0, 256);
if prs.learner_ids().contains(id) {
if let Err(e) = self.mut_prs().insert_learner(*id, progress) {
panic!("{}", e);
}
} else if let Err(e) = self.mut_prs().insert_voter(*id, progress) {
panic!("{}", e);
} else {
if let Err(e) = self.mut_prs().insert_voter(*id, progress) {
panic!("{}", e);
}
}
}
let term = self.term;
Expand Down

0 comments on commit 3799cfa

Please sign in to comment.