From 7f2ad5bbd5a4ac691eca8d6a2e4bc79deffaf604 Mon Sep 17 00:00:00 2001 From: Nathan Hawkins Date: Thu, 8 Nov 2018 11:13:14 -0500 Subject: [PATCH 1/2] Prevent unbounded Raft log growth. --- src/config.rs | 6 ++++++ src/raft.rs | 25 +++++++++++++++++++++---- src/raft_log.rs | 45 +++++++++++++++++++++++++++++++++++++++++---- src/raw_node.rs | 2 +- src/util.rs | 39 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 108 insertions(+), 9 deletions(-) diff --git a/src/config.rs b/src/config.rs index 3a11a6013..f8577a2a7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -109,6 +109,11 @@ pub struct Config { /// A human-friendly tag used for logging. pub tag: String, + + /// Limits the aggregate byte size of the uncommitted entries that may be appended to a leader's + /// log. Once this limit is exceeded, proposals will begin to return ErrProposalDropped errors. + /// Note: 0 for no limit. + pub max_uncommitted_entries_size: usize, } impl Default for Config { @@ -130,6 +135,7 @@ impl Default for Config { read_only_option: ReadOnlyOption::Safe, skip_bcast_commit: false, tag: "".into(), + max_uncommitted_entries_size: 0, } } } diff --git a/src/raft.rs b/src/raft.rs index 4457babb1..7660e1db5 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -178,6 +178,11 @@ pub struct Raft { /// Tag is only used for logging tag: String, + + /// Limits the aggregate byte size of the uncommitted entries that may be appended to a leader's + /// log. Once this limit is exceeded, proposals will begin to return ErrProposalDropped errors. + /// Note: 0 for no limit. + max_uncommitted_entries_size: usize, } trait AssertSend: Send {} @@ -258,6 +263,7 @@ impl Raft { max_election_timeout: c.max_election_tick(), skip_bcast_commit: c.skip_bcast_commit, tag: c.tag.to_owned(), + max_uncommitted_entries_size: c.max_uncommitted_entries_size, }; for p in peers { let pr = Progress::new(1, r.max_inflight); @@ -381,6 +387,14 @@ impl Raft { self.skip_bcast_commit = skip; } + #[inline] + fn get_max_uncommitted_size(&self) -> usize { + if self.state == StateRole::Leader { + return self.max_uncommitted_entries_size + } + return 0 + } + // send persists state to stable storage and then sends to its mailbox. fn send(&mut self, mut m: Message) { m.set_from(self.id); @@ -625,20 +639,22 @@ impl Raft { /// Appends a slice of entries to the log. The entries are updated to match /// the current index and term. - pub fn append_entry(&mut self, es: &mut [Entry]) { + pub fn append_entry(&mut self, es: &mut [Entry]) -> Result<()> { let mut li = self.raft_log.last_index(); for (i, e) in es.iter_mut().enumerate() { e.set_term(self.term); e.set_index(li + 1 + i as u64); } + let max = self.get_max_uncommitted_size(); // use latest "last" index after truncate/append - li = self.raft_log.append(es); + li = self.raft_log.append(es, max)?; let self_id = self.id; self.mut_prs().get_mut(self_id).unwrap().maybe_update(li); // Regardless of maybe_commit's return, our caller will call bcastAppend. self.maybe_commit(); + Ok(()) } /// Returns true to indicate that there will probably be some readiness need to be handled. @@ -773,7 +789,7 @@ impl Raft { // could be expensive. self.pending_conf_index = self.raft_log.last_index(); - self.append_entry(&mut [Entry::new()]); + self.append_entry(&mut [Entry::new()]).unwrap(); info!("{} became leader at term {}", self.tag, self.term); } @@ -1404,7 +1420,7 @@ impl Raft { } } } - self.append_entry(&mut m.mut_entries()); + self.append_entry(&mut m.mut_entries())?; self.bcast_append(); return Ok(()); } @@ -1680,6 +1696,7 @@ impl Raft { m.get_log_term(), m.get_commit(), m.get_entries(), + 0, ) { Some(mlast_index) => { to_send.set_index(mlast_index); diff --git a/src/raft_log.rs b/src/raft_log.rs index facc04977..94d3efbaf 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -58,6 +58,9 @@ pub struct RaftLog { /// A tag associated with this raft for logging purposes. pub tag: String, + + /// Size of uncommitted entries + uncommitted_size: usize, } impl ToString for RaftLog @@ -88,6 +91,7 @@ impl RaftLog { applied: first_index - 1, unstable: Unstable::new(last_index + 1, tag.clone()), tag, + uncommitted_size: 0, } } @@ -214,6 +218,7 @@ impl RaftLog { term: u64, committed: u64, ents: &[Entry], + max: usize, ) -> Option { let last_new_index = idx + ents.len() as u64; if self.match_term(idx, term) { @@ -226,7 +231,7 @@ impl RaftLog { ) } else { let offset = idx + 1; - self.append(&ents[(conflict_idx - offset) as usize..]); + self.append(&ents[(conflict_idx - offset) as usize..], max).ok()?; } self.commit_to(cmp::min(committed, last_new_index)); return Some(last_new_index); @@ -252,6 +257,8 @@ impl RaftLog { self.last_index() ) } + let entries = self.slice(self.committed, to_commit, util::NO_LIMIT).unwrap(); + self.reduce_uncommitted_size(&entries); self.committed = to_commit; } @@ -294,9 +301,9 @@ impl RaftLog { } /// Appends a set of entries to the unstable list. - pub fn append(&mut self, ents: &[Entry]) -> u64 { + pub fn append(&mut self, ents: &[Entry], max: usize) -> Result { if ents.is_empty() { - return self.last_index(); + return Ok(self.last_index()); } let after = ents[0].get_index() - 1; @@ -306,8 +313,9 @@ impl RaftLog { self.tag, after, self.committed ) } + self.increase_uncommitted_size(ents, max)?; self.unstable.truncate_and_append(ents); - self.last_index() + return Ok(self.last_index()) } /// Returns slice of entries that are not committed. @@ -481,6 +489,35 @@ impl RaftLog { self.committed = snapshot.get_metadata().get_index(); self.unstable.restore(snapshot); } + + /// increase_uncommitted_size computes the size of the proposed entries and + /// determines whether they would push leader over its max_uncommitted_entries_size limit. + /// If the new entries would exceed the limit, the method returns false. If not, + /// the increase in uncommitted entry size is recorded and the method returns + /// true. + fn increase_uncommitted_size(&mut self, entries: &[Entry], max: usize) -> Result<()> { + let sum = util::get_size(entries); + if max > 0 && self.uncommitted_size+sum > max { + return Err(Error::ProposalDropped) + } + self.uncommitted_size += sum; + Ok(()) + } + + /// reduce_uncommitted_size accounts for the newly committed entries by decreasing + /// the uncommitted entry size limit. + fn reduce_uncommitted_size(&mut self, entries: &[Entry]) { + let sum = util::get_size(entries); + + if sum > self.uncommitted_size { + // uncommittedSize may underestimate the size of the uncommitted Raft + // log tail but will never overestimate it. Saturate at 0 instead of + // allowing overflow. + self.uncommitted_size = 0 + } else { + self.uncommitted_size -= sum + } + } } #[cfg(test)] diff --git a/src/raw_node.rs b/src/raw_node.rs index aaecaf5f4..6bcc39bc4 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -246,7 +246,7 @@ impl RawNode { e.set_data(data); ents.push(e); } - rn.raft.raft_log.append(&ents); + rn.raft.raft_log.append(&ents, 0).unwrap(); rn.raft.raft_log.committed = ents.len() as u64; for peer in peers { rn.raft.add_node(peer.id); diff --git a/src/util.rs b/src/util.rs index e5f1491ea..c7f59c00c 100644 --- a/src/util.rs +++ b/src/util.rs @@ -68,3 +68,42 @@ pub fn limit_size(entries: &mut Vec, max: u64) { entries.truncate(limit); } + +/// Calculate the total size in bytes of the list of entries. +/// +/// # Examples +/// +/// ``` +/// use raft::{util::get_size, prelude::*}; +/// +/// let template = { +/// let mut entry = Entry::new(); +/// entry.set_data("*".repeat(100).into_bytes()); +/// entry +/// }; +/// +/// // Make a bunch of entries that are ~100 bytes long +/// let mut entries = vec![ +/// template.clone(), +/// template.clone(), +/// template.clone(), +/// template.clone(), +/// template.clone(), +/// ]; +/// +/// assert_eq!(entries.len(), 5); +/// assert_eq!(get_size(&mut entries), 220); +/// ``` +pub fn get_size(entries: &[T]) -> usize { + let sum : u32 = entries + .iter() + .map( | e | -> u32 { + let sz = e.get_cached_size(); + if sz == 0 { + e.compute_size() + } else { + sz + } + }).sum(); + sum as usize +} From 9fa860eb18c0b17dff369ad844188a0bbc5d7870 Mon Sep 17 00:00:00 2001 From: Nathan Hawkins Date: Thu, 8 Nov 2018 19:05:06 -0500 Subject: [PATCH 2/2] Fix existing tests. --- src/config.rs | 13 +++- src/lib.rs | 2 +- src/raft.rs | 22 +++--- src/raft_log.rs | 83 ++++++++++++++++------ src/raw_node.rs | 3 +- src/util.rs | 18 ++--- tests/integration_cases/test_raft.rs | 17 ++--- tests/integration_cases/test_raft_paper.rs | 2 +- 8 files changed, 103 insertions(+), 57 deletions(-) diff --git a/src/config.rs b/src/config.rs index f8577a2a7..0a07079b9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -30,6 +30,7 @@ use super::{ errors::{Error, Result}, INVALID_ID, }; +use raft_log::NO_SIZE_LIMIT; /// Config contains the parameters to start a raft. pub struct Config { @@ -111,8 +112,8 @@ pub struct Config { pub tag: String, /// Limits the aggregate byte size of the uncommitted entries that may be appended to a leader's - /// log. Once this limit is exceeded, proposals will begin to return ErrProposalDropped errors. - /// Note: 0 for no limit. + /// log. Once this limit is exceeded, proposals will begin to return ProposalDropped errors. + /// Defaults to no limit. pub max_uncommitted_entries_size: usize, } @@ -135,7 +136,7 @@ impl Default for Config { read_only_option: ReadOnlyOption::Safe, skip_bcast_commit: false, tag: "".into(), - max_uncommitted_entries_size: 0, + max_uncommitted_entries_size: NO_SIZE_LIMIT, } } } @@ -210,6 +211,12 @@ impl Config { )); } + if self.max_uncommitted_entries_size == 0 { + return Err(Error::ConfigInvalid( + "max uncommitted entries size must be greater than 0".to_owned(), + )); + } + Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 805e05d3c..c9626ff56 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -301,7 +301,7 @@ pub use self::progress::{Inflights, Progress, ProgressSet, ProgressState}; pub use self::raft::{ quorum, vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID, INVALID_INDEX, }; -pub use self::raft_log::{RaftLog, NO_LIMIT}; +pub use self::raft_log::{RaftLog, NO_LIMIT, NO_SIZE_LIMIT}; pub use self::raw_node::{is_empty_snap, Peer, RawNode, Ready, SnapshotStatus}; pub use self::read_only::{ReadOnlyOption, ReadState}; pub use self::status::Status; diff --git a/src/raft.rs b/src/raft.rs index 7660e1db5..121bef6f5 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -180,8 +180,8 @@ pub struct Raft { tag: String, /// Limits the aggregate byte size of the uncommitted entries that may be appended to a leader's - /// log. Once this limit is exceeded, proposals will begin to return ErrProposalDropped errors. - /// Note: 0 for no limit. + /// log. Once this limit is exceeded, proposals will begin to return ProposalDropped errors. + /// Defaults to no limit. max_uncommitted_entries_size: usize, } @@ -388,11 +388,11 @@ impl Raft { } #[inline] - fn get_max_uncommitted_size(&self) -> usize { + fn max_uncommitted_size(&self) -> usize { if self.state == StateRole::Leader { - return self.max_uncommitted_entries_size + return self.max_uncommitted_entries_size; } - return 0 + raft_log::NO_SIZE_LIMIT } // send persists state to stable storage and then sends to its mailbox. @@ -645,7 +645,7 @@ impl Raft { e.set_term(self.term); e.set_index(li + 1 + i as u64); } - let max = self.get_max_uncommitted_size(); + let max = self.max_uncommitted_size(); // use latest "last" index after truncate/append li = self.raft_log.append(es, max)?; @@ -779,8 +779,6 @@ impl Raft { ); let term = self.term; self.reset(term); - self.leader_id = self.id; - self.state = StateRole::Leader; // Conservatively set the pending_conf_index to the last index in the // log. There may or may not be a pending config change, but it's @@ -789,7 +787,13 @@ impl Raft { // could be expensive. self.pending_conf_index = self.raft_log.last_index(); + // This unwrap is safe, because append_entry only returns a ProposalDropped + // error if self.state is set to Leader, which it is not, yet. self.append_entry(&mut [Entry::new()]).unwrap(); + + self.leader_id = self.id; + self.state = StateRole::Leader; + info!("{} became leader at term {}", self.tag, self.term); } @@ -1696,7 +1700,7 @@ impl Raft { m.get_log_term(), m.get_commit(), m.get_entries(), - 0, + raft_log::NO_SIZE_LIMIT, ) { Some(mlast_index) => { to_send.set_index(mlast_index); diff --git a/src/raft_log.rs b/src/raft_log.rs index 94d3efbaf..17ad41ff3 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -36,6 +36,9 @@ use util; pub use util::NO_LIMIT; +/// A number to represent that there is no limit. +pub const NO_SIZE_LIMIT: usize = usize::max_value(); + /// Raft log implementation #[derive(Default)] pub struct RaftLog { @@ -60,7 +63,7 @@ pub struct RaftLog { pub tag: String, /// Size of uncommitted entries - uncommitted_size: usize, + pub uncommitted_size: usize, } impl ToString for RaftLog @@ -231,7 +234,8 @@ impl RaftLog { ) } else { let offset = idx + 1; - self.append(&ents[(conflict_idx - offset) as usize..], max).ok()?; + self.append(&ents[(conflict_idx - offset) as usize..], max) + .ok()?; } self.commit_to(cmp::min(committed, last_new_index)); return Some(last_new_index); @@ -257,7 +261,12 @@ impl RaftLog { self.last_index() ) } - let entries = self.slice(self.committed, to_commit, util::NO_LIMIT).unwrap(); + let entries = self + .slice( + cmp::max(self.committed, self.first_index()), + to_commit, + util::NO_LIMIT, + ).unwrap(); self.reduce_uncommitted_size(&entries); self.committed = to_commit; } @@ -313,9 +322,10 @@ impl RaftLog { self.tag, after, self.committed ) } + self.increase_uncommitted_size(ents, max)?; self.unstable.truncate_and_append(ents); - return Ok(self.last_index()) + Ok(self.last_index()) } /// Returns slice of entries that are not committed. @@ -497,8 +507,11 @@ impl RaftLog { /// true. fn increase_uncommitted_size(&mut self, entries: &[Entry], max: usize) -> Result<()> { let sum = util::get_size(entries); - if max > 0 && self.uncommitted_size+sum > max { - return Err(Error::ProposalDropped) + + assert_ne!(0, max); + + if max != NO_SIZE_LIMIT && self.uncommitted_size + sum > max { + return Err(Error::ProposalDropped); } self.uncommitted_size += sum; Ok(()) @@ -601,7 +614,9 @@ mod test { for (i, &(ref ents, wconflict)) in tests.iter().enumerate() { let store = MemStorage::new(); let mut raft_log = new_raft_log(store); - raft_log.append(&previous_ents); + raft_log + .append(&previous_ents, raft_log::NO_SIZE_LIMIT) + .unwrap(); let gconflict = raft_log.find_conflict(ents); if gconflict != wconflict { panic!("#{}: conflict = {}, want {}", i, gconflict, wconflict) @@ -615,7 +630,9 @@ mod test { let previous_ents = vec![new_entry(1, 1), new_entry(2, 2), new_entry(3, 3)]; let store = MemStorage::new(); let mut raft_log = new_raft_log(store); - raft_log.append(&previous_ents); + raft_log + .append(&previous_ents, raft_log::NO_SIZE_LIMIT) + .unwrap(); let tests = vec![ // greater term, ignore lastIndex (raft_log.last_index() - 1, 4, true), @@ -664,7 +681,7 @@ mod test { let store = MemStorage::new(); store.wl().append(&previous_ents).expect("append failed"); let mut raft_log = new_raft_log(store); - let index = raft_log.append(ents); + let index = raft_log.append(ents, raft_log::NO_SIZE_LIMIT).unwrap(); if index != windex { panic!("#{}: last_index = {}, want {}", i, index, windex); } @@ -696,7 +713,11 @@ mod test { } let mut raft_log = new_raft_log(storage); for i in unstable_index..last_index { - raft_log.append(&[new_entry(i as u64 + 1, i as u64 + 1)]); + raft_log + .append( + &[new_entry(i as u64 + 1, i as u64 + 1)], + raft_log::NO_SIZE_LIMIT, + ).unwrap(); } assert!( @@ -724,7 +745,9 @@ mod test { } let mut prev = raft_log.last_index(); - raft_log.append(&[new_entry(prev + 1, prev + 1)]); + raft_log + .append(&[new_entry(prev + 1, prev + 1)], raft_log::NO_SIZE_LIMIT) + .unwrap(); assert_eq!(prev + 1, raft_log.last_index()); prev = raft_log.last_index(); @@ -778,7 +801,9 @@ mod test { .expect("apply failed."); let mut raft_log = new_raft_log(store); for i in 1..num { - raft_log.append(&[new_entry(offset + i, i)]); + raft_log + .append(&[new_entry(offset + i, i)], raft_log::NO_SIZE_LIMIT) + .unwrap(); } let tests = vec![ @@ -872,7 +897,7 @@ mod test { .apply_snapshot(new_snapshot(snap_index, snap_term)) .expect(""); let mut raft_log = new_raft_log(store); - raft_log.append(new_ents); + raft_log.append(new_ents, raft_log::NO_SIZE_LIMIT).unwrap(); raft_log.stable_to(stablei, stablet); if raft_log.unstable.offset != wunstable { panic!( @@ -890,7 +915,9 @@ mod test { for (i, &(stablei, stablet, wunstable)) in tests.iter().enumerate() { let store = MemStorage::new(); let mut raft_log = new_raft_log(store); - raft_log.append(&[new_entry(1, 1), new_entry(2, 2)]); + raft_log + .append(&[new_entry(1, 1), new_entry(2, 2)], raft_log::NO_SIZE_LIMIT) + .unwrap(); raft_log.stable_to(stablei, stablet); if raft_log.unstable.offset != wunstable { panic!( @@ -919,7 +946,9 @@ mod test { // append unstable entries to raftlog let mut raft_log = new_raft_log(store); - raft_log.append(&previous_ents[(unstable - 1)..]); + raft_log + .append(&previous_ents[(unstable - 1)..], raft_log::NO_SIZE_LIMIT) + .unwrap(); let ents = raft_log.unstable_entries().unwrap_or(&[]).to_vec(); let l = ents.len(); @@ -951,7 +980,7 @@ mod test { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); let mut raft_log = new_raft_log(store); - raft_log.append(&ents); + raft_log.append(&ents, raft_log::NO_SIZE_LIMIT).unwrap(); raft_log.maybe_commit(5, 1); raft_log.applied_to(applied); @@ -975,7 +1004,7 @@ mod test { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); let mut raft_log = new_raft_log(store); - raft_log.append(&ents); + raft_log.append(&ents, raft_log::NO_SIZE_LIMIT).unwrap(); raft_log.maybe_commit(5, 1); raft_log.applied_to(applied); @@ -1007,7 +1036,11 @@ mod test { } let mut raft_log = new_raft_log(store); for i in (num / 2)..num { - raft_log.append(&[new_entry(offset + i, offset + i)]); + raft_log + .append( + &[new_entry(offset + i, offset + i)], + raft_log::NO_SIZE_LIMIT, + ).unwrap(); } let tests = vec![ @@ -1270,10 +1303,12 @@ mod test { { let store = MemStorage::new(); let mut raft_log = new_raft_log(store); - raft_log.append(&previous_ents); + raft_log + .append(&previous_ents, raft_log::NO_SIZE_LIMIT) + .unwrap(); raft_log.committed = commit; let res = panic::catch_unwind(AssertUnwindSafe(|| { - raft_log.maybe_append(index, log_term, committed, ents) + raft_log.maybe_append(index, log_term, committed, ents, raft_log::NO_SIZE_LIMIT) })); if res.is_err() ^ wpanic { panic!("#{}: panic = {}, want {}", i, res.is_err(), wpanic); @@ -1316,7 +1351,9 @@ mod test { for (i, &(commit, wcommit, wpanic)) in tests.iter().enumerate() { let store = MemStorage::new(); let mut raft_log = new_raft_log(store); - raft_log.append(&previous_ents); + raft_log + .append(&previous_ents, raft_log::NO_SIZE_LIMIT) + .unwrap(); raft_log.committed = previous_commit; let has_panic = panic::catch_unwind(AssertUnwindSafe(|| raft_log.commit_to(commit))).is_err(); @@ -1392,7 +1429,9 @@ mod test { .expect(""); let mut raft_log = new_raft_log(store); for i in 1u64..(num + 1) { - raft_log.append(&[new_entry(i + offset, 0)]); + raft_log + .append(&[new_entry(i + offset, 0)], raft_log::NO_SIZE_LIMIT) + .unwrap(); } let first = offset + 1; let tests = vec![ diff --git a/src/raw_node.rs b/src/raw_node.rs index 6bcc39bc4..4bbe6bed8 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -40,6 +40,7 @@ use protobuf::{self, RepeatedField}; use super::config::Config; use super::errors::{Error, Result}; +use super::raft_log::NO_SIZE_LIMIT; use super::read_only::ReadState; use super::Status; use super::Storage; @@ -246,7 +247,7 @@ impl RawNode { e.set_data(data); ents.push(e); } - rn.raft.raft_log.append(&ents, 0).unwrap(); + rn.raft.raft_log.append(&ents, NO_SIZE_LIMIT).unwrap(); rn.raft.raft_log.committed = ents.len() as u64; for peer in peers { rn.raft.add_node(peer.id); diff --git a/src/util.rs b/src/util.rs index c7f59c00c..845d3d8ca 100644 --- a/src/util.rs +++ b/src/util.rs @@ -16,6 +16,7 @@ use std::u64; +use eraftpb::Entry; use protobuf::Message; /// A number to represent that there is no limit. @@ -92,18 +93,11 @@ pub fn limit_size(entries: &mut Vec, max: u64) { /// ]; /// /// assert_eq!(entries.len(), 5); -/// assert_eq!(get_size(&mut entries), 220); +/// assert_eq!(get_size(&mut entries), 500); /// ``` -pub fn get_size(entries: &[T]) -> usize { - let sum : u32 = entries +pub fn get_size(entries: &[Entry]) -> usize { + entries .iter() - .map( | e | -> u32 { - let sz = e.get_cached_size(); - if sz == 0 { - e.compute_size() - } else { - sz - } - }).sum(); - sum as usize + .map(|e| e.get_data().len() + e.get_context().len()) + .sum() } diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index cacbeb7be..b613334ca 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -2476,7 +2476,8 @@ fn test_bcast_beat() { sm.become_candidate(); sm.become_leader(); for i in 0..10 { - sm.append_entry(&mut [empty_entry(0, i as u64 + 1)]); + sm.append_entry(&mut [empty_entry(0, i as u64 + 1)]) + .unwrap(); } // slow follower let mut_pr = |sm: &mut Interface, n, matched, next_idx| { @@ -2590,7 +2591,7 @@ fn test_leader_increase_next() { ]; for (i, (state, next_idx, wnext)) in tests.drain(..).enumerate() { let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage()); - sm.raft_log.append(&previous_ents); + sm.raft_log.append(&previous_ents, NO_SIZE_LIMIT).unwrap(); sm.become_candidate(); sm.become_leader(); sm.mut_prs().get_mut(2).unwrap().state = state; @@ -2624,7 +2625,7 @@ fn test_send_append_for_progress_probe() { // we expect that raft will only send out one msgAPP on the first // loop. After that, the follower is paused until a heartbeat response is // received. - r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]); + r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]).unwrap(); do_send_append(&mut r, 2); let msg = r.read_messages(); assert_eq!(msg.len(), 1); @@ -2633,7 +2634,7 @@ fn test_send_append_for_progress_probe() { assert!(r.prs().get(2).unwrap().paused); for _ in 0..10 { - r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]); + r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]).unwrap(); do_send_append(&mut r, 2); assert_eq!(r.read_messages().len(), 0); } @@ -2670,7 +2671,7 @@ fn test_send_append_for_progress_replicate() { r.mut_prs().get_mut(2).unwrap().become_replicate(); for _ in 0..10 { - r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]); + r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]).unwrap(); do_send_append(&mut r, 2); assert_eq!(r.read_messages().len(), 1); } @@ -2686,7 +2687,7 @@ fn test_send_append_for_progress_snapshot() { r.mut_prs().get_mut(2).unwrap().become_snapshot(10); for _ in 0..10 { - r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]); + r.append_entry(&mut [new_entry(0, 0, SOME_DATA)]).unwrap(); do_send_append(&mut r, 2); assert_eq!(r.read_messages().len(), 0); } @@ -2746,7 +2747,7 @@ fn test_restore_ignore_snapshot() { let previous_ents = vec![empty_entry(1, 1), empty_entry(1, 2), empty_entry(1, 3)]; let commit = 1u64; let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage()); - sm.raft_log.append(&previous_ents); + sm.raft_log.append(&previous_ents, NO_SIZE_LIMIT).unwrap(); sm.raft_log.commit_to(commit); let mut s = new_snapshot(commit, 1, vec![1, 2]); @@ -2923,7 +2924,7 @@ fn test_new_leader_pending_config() { let mut e = Entry::new(); if add_entry { e.set_entry_type(EntryType::EntryNormal); - r.append_entry(&mut [e]); + r.append_entry(&mut [e]).unwrap(); } r.become_candidate(); r.become_leader(); diff --git a/tests/integration_cases/test_raft_paper.rs b/tests/integration_cases/test_raft_paper.rs index d5bd026bb..cfcd3c204 100644 --- a/tests/integration_cases/test_raft_paper.rs +++ b/tests/integration_cases/test_raft_paper.rs @@ -126,7 +126,7 @@ fn test_leader_bcast_beat() { r.become_candidate(); r.become_leader(); for i in 0..10 { - r.append_entry(&mut [empty_entry(0, i as u64 + 1)]); + r.append_entry(&mut [empty_entry(0, i as u64 + 1)]).unwrap(); } for _ in 0..hi {