Skip to content

Commit

Permalink
Fix existing tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
utsl42 committed Nov 9, 2018
1 parent 7f2ad5b commit 9fa860e
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 57 deletions.
13 changes: 10 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::{
errors::{Error, Result},
INVALID_ID,
};
use raft_log::NO_SIZE_LIMIT;

/// Config contains the parameters to start a raft.
pub struct Config {
Expand Down Expand Up @@ -111,8 +112,8 @@ pub struct Config {
pub tag: String,

/// Limits the aggregate byte size of the uncommitted entries that may be appended to a leader's
/// log. Once this limit is exceeded, proposals will begin to return ErrProposalDropped errors.
/// Note: 0 for no limit.
/// log. Once this limit is exceeded, proposals will begin to return ProposalDropped errors.
/// Defaults to no limit.
pub max_uncommitted_entries_size: usize,
}

Expand All @@ -135,7 +136,7 @@ impl Default for Config {
read_only_option: ReadOnlyOption::Safe,
skip_bcast_commit: false,
tag: "".into(),
max_uncommitted_entries_size: 0,
max_uncommitted_entries_size: NO_SIZE_LIMIT,
}
}
}
Expand Down Expand Up @@ -210,6 +211,12 @@ impl Config {
));
}

if self.max_uncommitted_entries_size == 0 {
return Err(Error::ConfigInvalid(
"max uncommitted entries size must be greater than 0".to_owned(),
));
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ pub use self::progress::{Inflights, Progress, ProgressSet, ProgressState};
pub use self::raft::{
quorum, vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID, INVALID_INDEX,
};
pub use self::raft_log::{RaftLog, NO_LIMIT};
pub use self::raft_log::{RaftLog, NO_LIMIT, NO_SIZE_LIMIT};
pub use self::raw_node::{is_empty_snap, Peer, RawNode, Ready, SnapshotStatus};
pub use self::read_only::{ReadOnlyOption, ReadState};
pub use self::status::Status;
Expand Down
22 changes: 13 additions & 9 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ pub struct Raft<T: Storage> {
tag: String,

/// Limits the aggregate byte size of the uncommitted entries that may be appended to a leader's
/// log. Once this limit is exceeded, proposals will begin to return ErrProposalDropped errors.
/// Note: 0 for no limit.
/// log. Once this limit is exceeded, proposals will begin to return ProposalDropped errors.
/// Defaults to no limit.
max_uncommitted_entries_size: usize,
}

Expand Down Expand Up @@ -388,11 +388,11 @@ impl<T: Storage> Raft<T> {
}

#[inline]
fn get_max_uncommitted_size(&self) -> usize {
fn max_uncommitted_size(&self) -> usize {
if self.state == StateRole::Leader {
return self.max_uncommitted_entries_size
return self.max_uncommitted_entries_size;
}
return 0
raft_log::NO_SIZE_LIMIT
}

// send persists state to stable storage and then sends to its mailbox.
Expand Down Expand Up @@ -645,7 +645,7 @@ impl<T: Storage> Raft<T> {
e.set_term(self.term);
e.set_index(li + 1 + i as u64);
}
let max = self.get_max_uncommitted_size();
let max = self.max_uncommitted_size();
// use latest "last" index after truncate/append
li = self.raft_log.append(es, max)?;

Expand Down Expand Up @@ -779,8 +779,6 @@ impl<T: Storage> Raft<T> {
);
let term = self.term;
self.reset(term);
self.leader_id = self.id;
self.state = StateRole::Leader;

// Conservatively set the pending_conf_index to the last index in the
// log. There may or may not be a pending config change, but it's
Expand All @@ -789,7 +787,13 @@ impl<T: Storage> Raft<T> {
// could be expensive.
self.pending_conf_index = self.raft_log.last_index();

// This unwrap is safe, because append_entry only returns a ProposalDropped
// error if self.state is set to Leader, which it is not, yet.
self.append_entry(&mut [Entry::new()]).unwrap();

self.leader_id = self.id;
self.state = StateRole::Leader;

info!("{} became leader at term {}", self.tag, self.term);
}

Expand Down Expand Up @@ -1696,7 +1700,7 @@ impl<T: Storage> Raft<T> {
m.get_log_term(),
m.get_commit(),
m.get_entries(),
0,
raft_log::NO_SIZE_LIMIT,
) {
Some(mlast_index) => {
to_send.set_index(mlast_index);
Expand Down
83 changes: 61 additions & 22 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ use util;

pub use util::NO_LIMIT;

/// A number to represent that there is no limit.
pub const NO_SIZE_LIMIT: usize = usize::max_value();

/// Raft log implementation
#[derive(Default)]
pub struct RaftLog<T: Storage> {
Expand All @@ -60,7 +63,7 @@ pub struct RaftLog<T: Storage> {
pub tag: String,

/// Size of uncommitted entries
uncommitted_size: usize,
pub uncommitted_size: usize,
}

impl<T> ToString for RaftLog<T>
Expand Down Expand Up @@ -231,7 +234,8 @@ impl<T: Storage> RaftLog<T> {
)
} else {
let offset = idx + 1;
self.append(&ents[(conflict_idx - offset) as usize..], max).ok()?;
self.append(&ents[(conflict_idx - offset) as usize..], max)
.ok()?;
}
self.commit_to(cmp::min(committed, last_new_index));
return Some(last_new_index);
Expand All @@ -257,7 +261,12 @@ impl<T: Storage> RaftLog<T> {
self.last_index()
)
}
let entries = self.slice(self.committed, to_commit, util::NO_LIMIT).unwrap();
let entries = self
.slice(
cmp::max(self.committed, self.first_index()),
to_commit,
util::NO_LIMIT,
).unwrap();
self.reduce_uncommitted_size(&entries);
self.committed = to_commit;
}
Expand Down Expand Up @@ -313,9 +322,10 @@ impl<T: Storage> RaftLog<T> {
self.tag, after, self.committed
)
}

self.increase_uncommitted_size(ents, max)?;
self.unstable.truncate_and_append(ents);
return Ok(self.last_index())
Ok(self.last_index())
}

/// Returns slice of entries that are not committed.
Expand Down Expand Up @@ -497,8 +507,11 @@ impl<T: Storage> RaftLog<T> {
/// true.
fn increase_uncommitted_size(&mut self, entries: &[Entry], max: usize) -> Result<()> {
let sum = util::get_size(entries);
if max > 0 && self.uncommitted_size+sum > max {
return Err(Error::ProposalDropped)

assert_ne!(0, max);

if max != NO_SIZE_LIMIT && self.uncommitted_size + sum > max {
return Err(Error::ProposalDropped);
}
self.uncommitted_size += sum;
Ok(())
Expand Down Expand Up @@ -601,7 +614,9 @@ mod test {
for (i, &(ref ents, wconflict)) in tests.iter().enumerate() {
let store = MemStorage::new();
let mut raft_log = new_raft_log(store);
raft_log.append(&previous_ents);
raft_log
.append(&previous_ents, raft_log::NO_SIZE_LIMIT)
.unwrap();
let gconflict = raft_log.find_conflict(ents);
if gconflict != wconflict {
panic!("#{}: conflict = {}, want {}", i, gconflict, wconflict)
Expand All @@ -615,7 +630,9 @@ mod test {
let previous_ents = vec![new_entry(1, 1), new_entry(2, 2), new_entry(3, 3)];
let store = MemStorage::new();
let mut raft_log = new_raft_log(store);
raft_log.append(&previous_ents);
raft_log
.append(&previous_ents, raft_log::NO_SIZE_LIMIT)
.unwrap();
let tests = vec![
// greater term, ignore lastIndex
(raft_log.last_index() - 1, 4, true),
Expand Down Expand Up @@ -664,7 +681,7 @@ mod test {
let store = MemStorage::new();
store.wl().append(&previous_ents).expect("append failed");
let mut raft_log = new_raft_log(store);
let index = raft_log.append(ents);
let index = raft_log.append(ents, raft_log::NO_SIZE_LIMIT).unwrap();
if index != windex {
panic!("#{}: last_index = {}, want {}", i, index, windex);
}
Expand Down Expand Up @@ -696,7 +713,11 @@ mod test {
}
let mut raft_log = new_raft_log(storage);
for i in unstable_index..last_index {
raft_log.append(&[new_entry(i as u64 + 1, i as u64 + 1)]);
raft_log
.append(
&[new_entry(i as u64 + 1, i as u64 + 1)],
raft_log::NO_SIZE_LIMIT,
).unwrap();
}

assert!(
Expand Down Expand Up @@ -724,7 +745,9 @@ mod test {
}

let mut prev = raft_log.last_index();
raft_log.append(&[new_entry(prev + 1, prev + 1)]);
raft_log
.append(&[new_entry(prev + 1, prev + 1)], raft_log::NO_SIZE_LIMIT)
.unwrap();
assert_eq!(prev + 1, raft_log.last_index());

prev = raft_log.last_index();
Expand Down Expand Up @@ -778,7 +801,9 @@ mod test {
.expect("apply failed.");
let mut raft_log = new_raft_log(store);
for i in 1..num {
raft_log.append(&[new_entry(offset + i, i)]);
raft_log
.append(&[new_entry(offset + i, i)], raft_log::NO_SIZE_LIMIT)
.unwrap();
}

let tests = vec![
Expand Down Expand Up @@ -872,7 +897,7 @@ mod test {
.apply_snapshot(new_snapshot(snap_index, snap_term))
.expect("");
let mut raft_log = new_raft_log(store);
raft_log.append(new_ents);
raft_log.append(new_ents, raft_log::NO_SIZE_LIMIT).unwrap();
raft_log.stable_to(stablei, stablet);
if raft_log.unstable.offset != wunstable {
panic!(
Expand All @@ -890,7 +915,9 @@ mod test {
for (i, &(stablei, stablet, wunstable)) in tests.iter().enumerate() {
let store = MemStorage::new();
let mut raft_log = new_raft_log(store);
raft_log.append(&[new_entry(1, 1), new_entry(2, 2)]);
raft_log
.append(&[new_entry(1, 1), new_entry(2, 2)], raft_log::NO_SIZE_LIMIT)
.unwrap();
raft_log.stable_to(stablei, stablet);
if raft_log.unstable.offset != wunstable {
panic!(
Expand Down Expand Up @@ -919,7 +946,9 @@ mod test {

// append unstable entries to raftlog
let mut raft_log = new_raft_log(store);
raft_log.append(&previous_ents[(unstable - 1)..]);
raft_log
.append(&previous_ents[(unstable - 1)..], raft_log::NO_SIZE_LIMIT)
.unwrap();

let ents = raft_log.unstable_entries().unwrap_or(&[]).to_vec();
let l = ents.len();
Expand Down Expand Up @@ -951,7 +980,7 @@ mod test {
let store = MemStorage::new();
store.wl().apply_snapshot(new_snapshot(3, 1)).expect("");
let mut raft_log = new_raft_log(store);
raft_log.append(&ents);
raft_log.append(&ents, raft_log::NO_SIZE_LIMIT).unwrap();
raft_log.maybe_commit(5, 1);
raft_log.applied_to(applied);

Expand All @@ -975,7 +1004,7 @@ mod test {
let store = MemStorage::new();
store.wl().apply_snapshot(new_snapshot(3, 1)).expect("");
let mut raft_log = new_raft_log(store);
raft_log.append(&ents);
raft_log.append(&ents, raft_log::NO_SIZE_LIMIT).unwrap();
raft_log.maybe_commit(5, 1);
raft_log.applied_to(applied);

Expand Down Expand Up @@ -1007,7 +1036,11 @@ mod test {
}
let mut raft_log = new_raft_log(store);
for i in (num / 2)..num {
raft_log.append(&[new_entry(offset + i, offset + i)]);
raft_log
.append(
&[new_entry(offset + i, offset + i)],
raft_log::NO_SIZE_LIMIT,
).unwrap();
}

let tests = vec![
Expand Down Expand Up @@ -1270,10 +1303,12 @@ mod test {
{
let store = MemStorage::new();
let mut raft_log = new_raft_log(store);
raft_log.append(&previous_ents);
raft_log
.append(&previous_ents, raft_log::NO_SIZE_LIMIT)
.unwrap();
raft_log.committed = commit;
let res = panic::catch_unwind(AssertUnwindSafe(|| {
raft_log.maybe_append(index, log_term, committed, ents)
raft_log.maybe_append(index, log_term, committed, ents, raft_log::NO_SIZE_LIMIT)
}));
if res.is_err() ^ wpanic {
panic!("#{}: panic = {}, want {}", i, res.is_err(), wpanic);
Expand Down Expand Up @@ -1316,7 +1351,9 @@ mod test {
for (i, &(commit, wcommit, wpanic)) in tests.iter().enumerate() {
let store = MemStorage::new();
let mut raft_log = new_raft_log(store);
raft_log.append(&previous_ents);
raft_log
.append(&previous_ents, raft_log::NO_SIZE_LIMIT)
.unwrap();
raft_log.committed = previous_commit;
let has_panic =
panic::catch_unwind(AssertUnwindSafe(|| raft_log.commit_to(commit))).is_err();
Expand Down Expand Up @@ -1392,7 +1429,9 @@ mod test {
.expect("");
let mut raft_log = new_raft_log(store);
for i in 1u64..(num + 1) {
raft_log.append(&[new_entry(i + offset, 0)]);
raft_log
.append(&[new_entry(i + offset, 0)], raft_log::NO_SIZE_LIMIT)
.unwrap();
}
let first = offset + 1;
let tests = vec![
Expand Down
3 changes: 2 additions & 1 deletion src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use protobuf::{self, RepeatedField};

use super::config::Config;
use super::errors::{Error, Result};
use super::raft_log::NO_SIZE_LIMIT;
use super::read_only::ReadState;
use super::Status;
use super::Storage;
Expand Down Expand Up @@ -246,7 +247,7 @@ impl<T: Storage> RawNode<T> {
e.set_data(data);
ents.push(e);
}
rn.raft.raft_log.append(&ents, 0).unwrap();
rn.raft.raft_log.append(&ents, NO_SIZE_LIMIT).unwrap();
rn.raft.raft_log.committed = ents.len() as u64;
for peer in peers {
rn.raft.add_node(peer.id);
Expand Down
18 changes: 6 additions & 12 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use std::u64;

use eraftpb::Entry;
use protobuf::Message;

/// A number to represent that there is no limit.
Expand Down Expand Up @@ -92,18 +93,11 @@ pub fn limit_size<T: Message + Clone>(entries: &mut Vec<T>, max: u64) {
/// ];
///
/// assert_eq!(entries.len(), 5);
/// assert_eq!(get_size(&mut entries), 220);
/// assert_eq!(get_size(&mut entries), 500);
/// ```
pub fn get_size<T: Message>(entries: &[T]) -> usize {
let sum : u32 = entries
pub fn get_size(entries: &[Entry]) -> usize {
entries
.iter()
.map( | e | -> u32 {
let sz = e.get_cached_size();
if sz == 0 {
e.compute_size()
} else {
sz
}
}).sum();
sum as usize
.map(|e| e.get_data().len() + e.get_context().len())
.sum()
}
Loading

0 comments on commit 9fa860e

Please sign in to comment.