Skip to content

Commit

Permalink
*: update raft (tikv#3069)
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed May 14, 2018
1 parent f67acd8 commit ca9dc9b
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 27 deletions.
61 changes: 42 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ bitflags = "1.0.1"
fail = "0.2"
uuid = { version = "0.4", features = [ "serde", "v4" ] }
grpcio = { version = "0.2", features = [ "secure" ] }
raft = "0.1"
raft = "0.2"
crossbeam-channel = { version = "0.1", features = [ "nightly" ] }

[target.'cfg(unix)'.dependencies]
Expand All @@ -87,6 +87,7 @@ git = "https://github.com/pingcap/rust-rocksdb.git"

[dependencies.kvproto]
git = "https://github.com/pingcap/kvproto.git"
branch = "release-2.0"

[dependencies.tipb]
git = "https://github.com/pingcap/tipb.git"
Expand Down
7 changes: 7 additions & 0 deletions src/raftstore/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,10 @@ pub use self::transport::Transport;
// Only used in tests
#[cfg(test)]
pub use self::worker::{SplitCheckRunner, SplitCheckTask};

bitflags! {
// TODO: maybe declare it as protobuf struct is better.
struct ProposalContext: u8 {
const SYNC_LOG = 0b00000001;
}
}
21 changes: 15 additions & 6 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ use raftstore::coprocessor::CoprocessorHost;
use raftstore::store::worker::apply::ExecResult;
use raftstore::store::worker::{Apply, ApplyRes, ApplyTask};
use raftstore::store::worker::{apply, Proposal, RegionProposal};
use raftstore::store::{Callback, Config, ReadResponse, RegionSnapshot};
use raftstore::store::{Callback, Config, ProposalContext, ReadResponse, RegionSnapshot};
use raftstore::{Error, Result};

use util::MustConsumeVec;
use util::collections::{FlatMap, FlatMapValues as Values, HashSet};
use util::collections::{FlatMap, HashSet};
use util::time::{duration_to_sec, monotonic_raw_now};
use util::worker::{FutureWorker, Scheduler};

Expand Down Expand Up @@ -324,7 +324,7 @@ impl Peer {
..Default::default()
};

let raft_group = RawNode::new(&raft_cfg, ps, &[])?;
let raft_group = RawNode::new(&raft_cfg, ps, vec![])?;

let mut peer = Peer {
kv_engine: store.kv_engine(),
Expand Down Expand Up @@ -1229,7 +1229,10 @@ impl Peer {
/// 2. it's a follower, and it does not lag behind the leader a lot.
/// If a snapshot is involved between it and the Raft leader, it's not healthy since
/// it cannot works as a node in the quorum to receive replicating logs from leader.
fn count_healthy_node(&self, progress: Values<u64, Progress>) -> usize {
fn count_healthy_node<'a, I>(&self, progress: I) -> usize
where
I: Iterator<Item = &'a Progress>,
{
let mut healthy = 0;
for pr in progress {
if pr.matched >= self.get_store().truncated_index() {
Expand Down Expand Up @@ -1519,7 +1522,12 @@ impl Peer {

let sync_log = get_sync_log_from_request(&req);
let propose_index = self.next_proposal_index();
self.raft_group.propose(data, sync_log)?;
let context = if !sync_log {
vec![]
} else {
vec![ProposalContext::SYNC_LOG.bits()]
};
self.raft_group.propose(context, data)?;
if self.next_proposal_index() == propose_index {
// The message is dropped silently, this usually due to leader absence
// or transferring leader. Both cases can be considered as NotLeader error.
Expand Down Expand Up @@ -1598,7 +1606,8 @@ impl Peer {
);

let propose_index = self.next_proposal_index();
self.raft_group.propose_conf_change(cc)?;
self.raft_group
.propose_conf_change(vec![ProposalContext::SYNC_LOG.bits()], cc)?;
if self.next_proposal_index() == propose_index {
// The message is dropped silently, this usually due to leader absence
// or transferring leader. Both cases can be considered as NotLeader error.
Expand Down
9 changes: 9 additions & 0 deletions src/raftstore/store/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use kvproto::raft_serverpb::{MergeState, PeerState, RaftApplyState, RaftLocalSta
RaftSnapshotData, RegionLocalState};
use raft::eraftpb::{ConfState, Entry, HardState, Snapshot};
use raft::{self, Error as RaftError, RaftState, Ready, Storage, StorageError};
use raftstore::store::ProposalContext;
use raftstore::store::util::conf_state_from_region;
use raftstore::{Error, Result};
use storage::CF_RAFT;
Expand Down Expand Up @@ -763,6 +764,14 @@ impl PeerStorage {
for entry in entries {
if entry.get_sync_log() {
ready_ctx.sync_log = true;
} else {
let ctx = entry.get_context();
if !ctx.is_empty() {
let ctx = ProposalContext::from_bits_truncate(ctx[0]);
if ctx.contains(ProposalContext::SYNC_LOG) {
ready_ctx.sync_log = true;
}
}
}
ready_ctx.raft_wb.put_msg(
&keys::raft_log_key(self.get_region_id(), entry.get_index()),
Expand Down
2 changes: 1 addition & 1 deletion src/server/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl Debugger {
..Default::default()
};

box_try!(RawNode::new(&raft_cfg, peer_storage, &[]));
box_try!(RawNode::new(&raft_cfg, peer_storage, vec![]));
Ok(())
};

Expand Down

0 comments on commit ca9dc9b

Please sign in to comment.