Skip to content

Commit

Permalink
*: add protection against unlimited log growth(tikv#131)
Browse files Browse the repository at this point in the history
- Add protection against uncommitted log growth of leader.
  This protection is done by estimate size of entry data
  rather than count of entry. So entry with no data will
  never be dropped
- Add tests for raft and raw_node

Signed-off-by: lyzongyuan <lyzongyuan@gmail.com>
  • Loading branch information
lizongyuan.0x0 authored and c0x0o committed Sep 20, 2020
1 parent 0c53e7b commit 680b93c
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 5 deletions.
92 changes: 92 additions & 0 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5075,3 +5075,95 @@ fn test_read_when_quorum_becomes_less() {
.unwrap();
assert!(!network.peers[&1].read_states.is_empty());
}

#[test]
fn test_uncommitted_entries_size_limit() {
let l = default_logger();
let config = &Config {
id: 1,
max_uncommitted_size: 12,
..Config::default()
};
let mut nt = Network::new_with_config(vec![None, None, None], config, &l);
let data = b"hello world!".to_vec();

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// should return ProposalDropped error
{
let mut entry = Entry::default();
entry.data = b"hello world and raft".to_vec();
let mut msg = Message::default();
msg.from = 1;
msg.to = 1;
msg.set_msg_type(MessageType::MsgPropose);
msg.set_entries(vec![entry].into());

let result = nt.dispatch(vec![msg].to_vec());
assert_eq!(result.unwrap_err(), raft::Error::ProposalDropped);
}

// should return ok
{
let mut entry = Entry::default();
entry.data = data.clone();
let mut msg = Message::default();
msg.from = 1;
msg.to = 1;
msg.set_msg_type(MessageType::MsgPropose);
msg.set_entries(vec![entry].into());

let result = nt.dispatch(vec![msg].to_vec());
assert!(result.is_ok());
}

// then next proposal should be dropped
{
let mut entry = Entry::default();
entry.data = b"!".to_vec();
let mut msg = Message::default();
msg.from = 1;
msg.to = 1;
msg.set_msg_type(MessageType::MsgPropose);
msg.set_entries(vec![entry].into());

let result = nt.dispatch(vec![msg].to_vec());
assert!(!result.is_ok());
assert_eq!(result.unwrap_err(), raft::Error::ProposalDropped);
}

// but entry with empty size should be accepted
{
let entry = Entry::default();
let mut msg = Message::default();
msg.from = 1;
msg.to = 1;
msg.set_msg_type(MessageType::MsgPropose);
msg.set_entries(vec![entry].into());

let result = nt.dispatch(vec![msg].to_vec());
assert!(result.is_ok());
}

// after reduce, new proposal should be accecpted
{
let mut entry = Entry::default();
entry.data = data.clone();
let mut msg = Message::default();
msg.from = 1;
msg.to = 1;
msg.set_msg_type(MessageType::MsgPropose);
msg.set_entries(vec![entry].into());

// consume entry
let mut entry = Entry::default();
entry.data = data.clone();
nt.peers
.get_mut(&1)
.unwrap()
.reduce_uncommitted_size(&vec![entry]);

let result = nt.dispatch(vec![msg].to_vec());
assert!(result.is_ok());
}
}
70 changes: 70 additions & 0 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,24 @@ fn new_raw_node(
RawNode::new(&config, storage, logger).unwrap()
}

fn new_raw_node_with_config(
peers: Vec<u64>,
config: &Config,
storage: MemStorage,
logger: &Logger,
) -> RawNode<MemStorage> {
if storage.initial_state().unwrap().initialized() && peers.is_empty() {
panic!("new_raw_node with empty peers on initialized store");
}
if !peers.is_empty() && !storage.initial_state().unwrap().initialized() {
storage
.wl()
.apply_snapshot(new_snapshot(1, 1, peers))
.unwrap();
}
RawNode::new(&config, storage, logger).unwrap()
}

/// Ensures that RawNode::step ignore local message.
#[test]
fn test_raw_node_step() {
Expand Down Expand Up @@ -728,3 +746,55 @@ fn test_set_priority() {
assert_eq!(raw_node.raft.priority, p);
}
}

#[test]
fn test_bounded_uncommitted_entries_growth_with_partition() {
let l = default_logger();
let config = &Config {
id: 1,
max_uncommitted_size: 12,
..Config::default()
};
let s = new_storage();
let mut raw_node = new_raw_node_with_config(vec![1], config, s.clone(), &l);

// wait raw_node to be leader
raw_node.campaign().unwrap();
loop {
let rd = raw_node.ready();
if rd
.ss()
.map_or(false, |ss| ss.leader_id == raw_node.raft.leader_id)
{
break;
}

raw_node.advance(rd);
}

// should be accepted
{
let data = b"hello world!".to_vec();
let result = raw_node.propose(vec![], data);
assert!(result.is_ok());
}

// shoule be dropped
{
let data = b"hello world!".to_vec();
let result = raw_node.propose(vec![], data);
assert!(!result.is_ok());
assert_eq!(result.unwrap_err(), Error::ProposalDropped)
}

// should be accepted when previous data has been committed
{
let rd = raw_node.ready();
s.wl().append(rd.entries()).unwrap();
raw_node.advance(rd);

let data = b"hello world!".to_vec();
let result = raw_node.propose(vec![], data);
assert!(result.is_ok());
}
}
12 changes: 12 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// limitations under the License.

pub use super::read_only::{ReadOnlyOption, ReadState};
use super::util::NO_LIMIT;
use super::{
errors::{Error, Result},
INVALID_ID,
Expand Down Expand Up @@ -90,6 +91,10 @@ pub struct Config {

/// The election priority of this node.
pub priority: u64,

/// Specify maximum of uncommited entry size.
/// When this limit is reached, all proposals to append new log will be dropped
pub max_uncommitted_size: usize,
}

impl Default for Config {
Expand All @@ -110,6 +115,7 @@ impl Default for Config {
skip_bcast_commit: false,
batch_append: false,
priority: 0,
max_uncommitted_size: NO_LIMIT as usize,
}
}
}
Expand Down Expand Up @@ -189,6 +195,12 @@ impl Config {
));
}

if self.max_uncommitted_size == 0 {
return Err(Error::ConfigInvalid(
"max uncommitted entries size must be greater than 0".to_owned(),
));
}

Ok(())
}
}
4 changes: 4 additions & 0 deletions src/log_unstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl Unstable {
}

/// Append entries to unstable, truncate local block first if overlapped.
///
/// # Panics
///
/// Panics if truncate logs to the entry before snapshot
pub fn truncate_and_append(&mut self, ents: &[Entry]) {
let after = ents[0].index;
if after == self.offset + self.entries.len() as u64 {
Expand Down
73 changes: 68 additions & 5 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use super::Config;
use crate::confchange::Changer;
use crate::quorum::VoteResult;
use crate::util;
use crate::util::NO_LIMIT;
use crate::{confchange, Progress, ProgressState, ProgressTracker};

// CAMPAIGN_PRE_ELECTION represents the first phase of a normal election when
Expand Down Expand Up @@ -177,6 +178,11 @@ pub struct RaftCore<T: Storage> {

/// The election priority of this node.
pub priority: u64,

/// Specify maximum of uncommited entry size.
/// When this limit is reached, all proposals to append new log will be dropped
max_uncommitted_size: usize,
uncommitted_size: usize,
}

/// A struct that represents the raft consensus itself. Stores details concerning the current
Expand Down Expand Up @@ -269,13 +275,15 @@ impl<T: Storage> Raft<T> {
pending_conf_index: Default::default(),
vote: Default::default(),
heartbeat_elapsed: Default::default(),
randomized_election_timeout: 0,
randomized_election_timeout: Default::default(),
min_election_timeout: c.min_election_tick(),
max_election_timeout: c.max_election_tick(),
skip_bcast_commit: c.skip_bcast_commit,
batch_append: c.batch_append,
logger,
priority: c.priority,
max_uncommitted_size: c.max_uncommitted_size,
uncommitted_size: Default::default(),
},
};
confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?;
Expand Down Expand Up @@ -861,6 +869,7 @@ impl<T: Storage> Raft<T> {

let last_index = self.raft_log.last_index();
let committed = self.raft_log.committed;
self.uncommitted_size = 0;
let self_id = self.id;
for (&id, mut pr) in self.mut_prs().iter_mut() {
pr.reset(last_index + 1);
Expand All @@ -871,9 +880,13 @@ impl<T: Storage> Raft<T> {
}
}

/// Appends a slice of entries to the log. The entries are updated to match
/// the current index and term.
pub fn append_entry(&mut self, es: &mut [Entry]) {
/// Appends a slice of entries to the log.
/// The entries are updated to match the current index and term.
pub fn append_entry(&mut self, es: &mut [Entry]) -> bool {
if !self.maybe_increase_uncommitted_size(es) {
return false;
}

let mut li = self.raft_log.last_index();
for (i, e) in es.iter_mut().enumerate() {
e.term = self.term;
Expand All @@ -887,6 +900,8 @@ impl<T: Storage> Raft<T> {

// Regardless of maybe_commit's return, our caller will call bcastAppend.
self.maybe_commit();

true
}

/// Returns true to indicate that there will probably be some readiness need to be handled.
Expand Down Expand Up @@ -1043,6 +1058,8 @@ impl<T: Storage> Raft<T> {
// could be expensive.
self.pending_conf_index = self.raft_log.last_index();

// no need to check result becase append_entry never refuse entries
// which size is zero
self.append_entry(&mut [Entry::default()]);

info!(
Expand Down Expand Up @@ -1761,7 +1778,10 @@ impl<T: Storage> Raft<T> {
e.set_entry_type(EntryType::EntryNormal);
}
}
self.append_entry(&mut m.mut_entries());
if !self.append_entry(&mut m.mut_entries()) {
// return ProposalDropped when uncommitted size limit is reached
return Err(Error::ProposalDropped);
}
self.bcast_append();
return Ok(());
}
Expand Down Expand Up @@ -2457,4 +2477,47 @@ impl<T: Storage> Raft<T> {
to_send.set_entries(req.take_entries());
Some(to_send)
}

/// Reduce size of 'ents' from uncommitted size.
///
/// # Panics
///
/// Panics if size of 'ents' is greater than uncommitted size
pub fn reduce_uncommitted_size(&mut self, ents: &[Entry]) {
// fast path for NO_LIMIT and non-leader endpoint
if self.max_uncommitted_size == NO_LIMIT as usize || self.state != StateRole::Leader {
return;
}

let mut size: usize = 0;
for entry in ents {
size += entry.get_data().len()
}

if size > self.uncommitted_size {
fatal!(
self.r.logger,
"try to reduce uncommitted size less than 0, first index of pending ents is {}",
ents[0].get_index()
);
} else {
self.uncommitted_size -= size;
}
}

/// Increase size of 'ents' to uncommitted size. Return true when size limit
/// is satisfied. Otherwise return false and uncommitted size remains unchanged.
pub fn maybe_increase_uncommitted_size(&mut self, ents: &[Entry]) -> bool {
let mut size: usize = 0;
for entry in ents {
size += entry.get_data().len()
}

if size + self.uncommitted_size > self.max_uncommitted_size {
false
} else {
self.uncommitted_size += size;
true
}
}
}
5 changes: 5 additions & 0 deletions src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ impl<T: Storage> RawNode<T> {
if !rd.read_states.is_empty() {
self.raft.read_states.clear();
}
// update raft uncommitted entries size
if rd.committed_entries.is_some() {
self.raft
.reduce_uncommitted_size(&rd.committed_entries.unwrap_or_default())
}
}

fn commit_apply(&mut self, applied: u64) {
Expand Down

0 comments on commit 680b93c

Please sign in to comment.