Skip to content

Commit

Permalink
Merge branch 'master' into fix_deadlock_during_prevote_migration_process
Browse files Browse the repository at this point in the history
  • Loading branch information
siddontang committed May 10, 2018
2 parents f8c289e + 1beea8c commit fa74996
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 89 deletions.
15 changes: 11 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,28 @@ sudo: false
env:
global:
- RUST_BACKTRACE=1
- CLIPPY_VERSION=0.0.179
- CLIPPY_VERSION=0.0.192
- RUSTFLAGS="-D warnings"
cache: cargo

# `allow_failures` requires this, but we set it below.
rust:

matrix:
include:
# This build uses stable and checks rustfmt (clippy is not on stable).
- rust: stable
- rust: nightly-2018-01-12
install:
- rustup component add rustfmt-preview
before_script:
- cargo fmt --all -- --write-mode diff
# This build uses the nightly used by TiKV and checks clippy.
- rust: nightly-2018-04-06
install:
- export PATH="$PATH:$HOME/.cargo/bin"
- if [[ `cargo clippy -- --version` != $CLIPPY_VERSION* ]]; then cargo install -f clippy --version $CLIPPY_VERSION; fi
before_script:
- cargo clippy


script:
- if `which cargo-clippy &>/dev/null`; then cargo clippy; fi
- cargo test --all -- --nocapture
12 changes: 8 additions & 4 deletions examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,25 @@

extern crate raft;

use std::collections::HashMap;
use std::sync::mpsc::{self, RecvTimeoutError};
use std::time::{Duration, Instant};
use std::thread;
use std::collections::HashMap;
use std::time::{Duration, Instant};

use raft::prelude::*;
use raft::storage::MemStorage;

type ProposeCallback = Box<Fn() + Send>;

enum Msg {
Propose { id: u8, cb: ProposeCallback },
Propose {
id: u8,
cb: ProposeCallback,
},
// Here we don't use Raft Message, so use dead_code to
// avoid the compiler warning.
#[allow(dead_code)] Raft(Message),
#[allow(dead_code)]
Raft(Message),
}

// A simple example about how to use the Raft library in Rust.
Expand Down
4 changes: 2 additions & 2 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{cmp, io, result};
use std::error;
use std::{cmp, io, result};

use protobuf::ProtobufError;

Expand Down Expand Up @@ -110,8 +110,8 @@ pub type Result<T> = result::Result<T, Error>;

#[cfg(test)]
mod tests {
use std::io;
use super::*;
use std::io;

#[test]
fn test_error_equal() {
Expand Down
18 changes: 9 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,27 @@ extern crate quick_error;
extern crate rand;

pub mod eraftpb;
mod raft_log;
pub mod storage;
mod raft;
mod progress;
mod errors;
mod log_unstable;
mod status;
mod progress;
mod raft;
mod raft_log;
pub mod raw_node;
mod read_only;
mod status;
pub mod storage;
pub mod util;

pub use self::storage::{RaftState, Storage};
pub use self::errors::{Error, Result, StorageError};
pub use self::log_unstable::Unstable;
pub use self::progress::{Inflights, Progress, ProgressSet, ProgressState};
pub use self::raft::{quorum, vote_resp_msg_type, Config, Raft, SoftState, StateRole, INVALID_ID,
INVALID_INDEX};
pub use self::raft_log::{RaftLog, NO_LIMIT};
pub use self::raw_node::{is_empty_snap, Peer, RawNode, Ready, SnapshotStatus};
pub use self::status::Status;
pub use self::log_unstable::Unstable;
pub use self::progress::{Inflights, Progress, ProgressSet, ProgressState};
pub use self::read_only::{ReadOnlyOption, ReadState};
pub use self::status::Status;
pub use self::storage::{RaftState, Storage};

pub mod prelude {
//! A "prelude" for crates using the `raft` crate.
Expand Down
29 changes: 14 additions & 15 deletions src/log_unstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ pub struct Unstable {
impl Unstable {
pub fn new(offset: u64, tag: String) -> Unstable {
Unstable {
offset: offset,
offset,
snapshot: None,
entries: vec![],
tag: tag,
tag,
}
}
// maybe_first_index returns the index of the first possible entry in entries
Expand All @@ -74,22 +74,21 @@ impl Unstable {
// is any.
pub fn maybe_term(&self, idx: u64) -> Option<u64> {
if idx < self.offset {
if self.snapshot.is_none() {
return None;
}

let meta = self.snapshot.as_ref().unwrap().get_metadata();
let snapshot = self.snapshot.as_ref()?;
let meta = snapshot.get_metadata();
if idx == meta.get_index() {
return Some(meta.get_term());
Some(meta.get_term())
} else {
None
}
return None;
} else {
self.maybe_last_index().and_then(|last| {
if idx > last {
return None;
}
Some(self.entries[(idx - self.offset) as usize].get_term())
})
}
self.maybe_last_index().and_then(|last| {
if idx > last {
return None;
}
Some(self.entries[(idx - self.offset) as usize].get_term())
})
}

pub fn stable_to(&mut self, idx: u64, term: u64) {
Expand Down
140 changes: 137 additions & 3 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use fxhash::FxHashMap;
use std::cmp;
use std::iter::Chain;
use std::collections::hash_map::{HashMap, Iter, IterMut};
use fxhash::FxHashMap;
use std::iter::Chain;

#[derive(Debug, PartialEq, Clone, Copy)]
pub enum ProgressState {
Expand Down Expand Up @@ -283,7 +283,7 @@ impl Progress {
}
}

#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, Clone, PartialEq)]
pub struct Inflights {
// the starting index in the buffer
start: usize,
Expand Down Expand Up @@ -370,3 +370,137 @@ impl Inflights {
self.start = 0;
}
}

#[cfg(test)]
mod test {
use progress::Inflights;

#[test]
fn test_inflight_add() {
let mut inflight = Inflights::new(10);
for i in 0..5 {
inflight.add(i);
}

let wantin = Inflights {
start: 0,
count: 5,
buffer: vec![0, 1, 2, 3, 4],
};

assert_eq!(inflight, wantin);

for i in 5..10 {
inflight.add(i);
}

let wantin2 = Inflights {
start: 0,
count: 10,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
};

assert_eq!(inflight, wantin2);

let mut inflight2 = Inflights {
start: 5,
buffer: Vec::with_capacity(10),
..Default::default()
};
inflight2.buffer.extend_from_slice(&vec![0, 0, 0, 0, 0]);

for i in 0..5 {
inflight2.add(i);
}

let wantin21 = Inflights {
start: 5,
count: 5,
buffer: vec![0, 0, 0, 0, 0, 0, 1, 2, 3, 4],
};

assert_eq!(inflight2, wantin21);

for i in 5..10 {
inflight2.add(i);
}

let wantin22 = Inflights {
start: 5,
count: 10,
buffer: vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4],
};

assert_eq!(inflight2, wantin22);
}

#[test]
fn test_inflight_free_to() {
let mut inflight = Inflights::new(10);
for i in 0..10 {
inflight.add(i);
}

inflight.free_to(4);

let wantin = Inflights {
start: 5,
count: 5,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
};

assert_eq!(inflight, wantin);

inflight.free_to(8);

let wantin2 = Inflights {
start: 9,
count: 1,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
};

assert_eq!(inflight, wantin2);

for i in 10..15 {
inflight.add(i);
}

inflight.free_to(12);

let wantin3 = Inflights {
start: 3,
count: 2,
buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9],
};

assert_eq!(inflight, wantin3);

inflight.free_to(14);

let wantin4 = Inflights {
start: 5,
count: 0,
buffer: vec![10, 11, 12, 13, 14, 5, 6, 7, 8, 9],
};

assert_eq!(inflight, wantin4);
}

#[test]
fn test_inflight_free_first_one() {
let mut inflight = Inflights::new(10);
for i in 0..10 {
inflight.add(i);
}

inflight.free_first_one();

let wantin = Inflights {
start: 1,
count: 9,
buffer: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
};

assert_eq!(inflight, wantin);
}
}
10 changes: 5 additions & 5 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@

use std::cmp;

use rand::{self, Rng};
use eraftpb::{Entry, EntryType, HardState, Message, MessageType, Snapshot};
use fxhash::FxHashMap;
use protobuf::repeated::RepeatedField;
use rand::{self, Rng};

use super::storage::Storage;
use super::progress::{Inflights, Progress, ProgressSet, ProgressState};
use super::errors::{Error, Result, StorageError};
use super::progress::{Inflights, Progress, ProgressSet, ProgressState};
use super::raft_log::{self, RaftLog};
use super::read_only::{ReadOnly, ReadOnlyOption, ReadState};
use super::storage::Storage;

// CAMPAIGN_PRE_ELECTION represents the first phase of a normal election when
// Config.pre_vote is true.
Expand Down Expand Up @@ -246,7 +246,7 @@ pub struct Raft<T: Storage> {

fn new_progress(next_idx: u64, ins_size: usize) -> Progress {
Progress {
next_idx: next_idx,
next_idx,
ins: Inflights::new(ins_size),
..Default::default()
}
Expand Down Expand Up @@ -300,7 +300,7 @@ impl<T: Storage> Raft<T> {
let mut r = Raft {
id: c.id,
read_states: Default::default(),
raft_log: raft_log,
raft_log,
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
prs: Some(ProgressSet::new(peers.len(), learners.len())),
Expand Down
Loading

0 comments on commit fa74996

Please sign in to comment.