Skip to content

Commit

Permalink
Cherry pick all 0.4.x (#263)
Browse files Browse the repository at this point in the history
* Check pending conf change before campaign (#225)

Fix #221.

* Add more convenient lite-weight interfaces (#227)

This PR introduces two simple and lite weight interfaces:
- ping to trigger heartbeats without ticking,
- status_ref to borrow the progress set instead of cloning.

* *: bump to 0.4.2 (#228)

* Bump to v0.4.3 (#231)

* raft: leader respond to learner read index message (#220)

Signed-off-by: nolouch <nolouch@gmail.com>

* Bump to v0.4.3

Signed-off-by: Neil Shen <overvenus@gmail.com>

* Request snapshot (#243)

Signed-off-by: Neil Shen <overvenus@gmail.com>

* fix tests

* cargo fmt

* address comments.
  • Loading branch information
hicqu authored and Hoverbear committed Jul 19, 2019
1 parent ff96aad commit 5a9b67d
Show file tree
Hide file tree
Showing 13 changed files with 599 additions and 57 deletions.
8 changes: 6 additions & 2 deletions harness/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ impl Network {
npeers.insert(*id, r);
}
Some(r) => {
if r.raft.as_ref().map_or(false, |r| r.id != *id) {
panic!("peer {} in peers has a wrong position", r.id);
if let Some(raft) = r.raft.as_ref() {
if raft.id != *id {
panic!("peer {} in peers has a wrong position", r.id);
}
let store = raft.raft_log.store.clone();
nstorage.insert(*id, store);
}
npeers.insert(*id, r);
}
Expand Down
1 change: 1 addition & 0 deletions proto/proto/eraftpb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ message Message {
repeated Entry entries = 7;
uint64 commit = 8;
Snapshot snapshot = 9;
uint64 request_snapshot = 13;
bool reject = 10;
uint64 reject_hint = 11;
bytes context = 12;
Expand Down
5 changes: 5 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ quick_error! {
ViolatesContract(contract: String) {
display("An argument violate a calling contract: {}", contract)
}
/// The request snapshot is dropped.
RequestSnapshotDropped {
description("raft: request snapshot dropped")
}
}
}

Expand All @@ -88,6 +92,7 @@ impl cmp::PartialEq for Error {
(&Error::Io(ref e1), &Error::Io(ref e2)) => e1.kind() == e2.kind(),
(&Error::StepLocalMsg, &Error::StepLocalMsg) => true,
(&Error::ConfigInvalid(ref e1), &Error::ConfigInvalid(ref e2)) => e1 == e2,
(&Error::RequestSnapshotDropped, &Error::RequestSnapshotDropped) => true,
_ => false,
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ before taking old, removed peers offline.
*/

#![cfg_attr(not(feature = "cargo-clippy"), allow(unknown_lints))]
#![deny(clippy::all)]
#![deny(missing_docs)]
#![recursion_limit = "128"]
Expand Down Expand Up @@ -410,7 +411,7 @@ pub use self::raft::{vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID,
pub use self::raft_log::{RaftLog, NO_LIMIT};
pub use self::raw_node::{is_empty_snap, Peer, RawNode, Ready, SnapshotStatus};
pub use self::read_only::{ReadOnlyOption, ReadState};
pub use self::status::Status;
pub use self::status::{Status, StatusRef};
pub use self::storage::{RaftState, Storage};
pub use raft_proto::eraftpb;
use slog::{Drain, Logger};
Expand Down Expand Up @@ -439,7 +440,7 @@ pub mod prelude {

pub use crate::progress::Progress;

pub use crate::status::Status;
pub use crate::status::{Status, StatusRef};

pub use crate::read_only::{ReadOnlyOption, ReadState};
}
Expand Down
41 changes: 32 additions & 9 deletions src/progress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::cmp;

use self::inflights::Inflights;
use std::cmp;
use crate::raft::INVALID_INDEX;
pub mod inflights;
pub mod progress_set;

Expand Down Expand Up @@ -73,6 +74,10 @@ pub struct Progress {
/// this Progress will be paused. raft will not resend snapshot until the pending one
/// is reported to be failed.
pub pending_snapshot: u64,
/// This field is used in request snapshot.
/// If there is a pending request snapshot, this will be set to the request
/// index of the snapshot.
pub pending_request_snapshot: u64,

/// This is true if the progress is recently active. Receiving any messages
/// from the corresponding follower indicates the progress is active.
Expand All @@ -98,6 +103,7 @@ impl Progress {
state: ProgressState::default(),
paused: false,
pending_snapshot: 0,
pending_request_snapshot: 0,
recent_active: false,
ins: Inflights::new(ins_size),
}
Expand All @@ -116,6 +122,7 @@ impl Progress {
self.state = ProgressState::default();
self.paused = false;
self.pending_snapshot = 0;
self.pending_request_snapshot = INVALID_INDEX;
self.recent_active = false;
debug_assert!(self.ins.cap() != 0);
self.ins.reset();
Expand Down Expand Up @@ -188,25 +195,41 @@ impl Progress {
/// Returns false if the given index comes from an out of order message.
/// Otherwise it decreases the progress next index to min(rejected, last)
/// and returns true.
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64) -> bool {
pub fn maybe_decr_to(&mut self, rejected: u64, last: u64, request_snapshot: u64) -> bool {
if self.state == ProgressState::Replicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= self.matched {
// Or rejected equals to matched and request_snapshot is the INVALID_INDEX.
if rejected < self.matched
|| (rejected == self.matched && request_snapshot == INVALID_INDEX)
{
return false;
}
self.next_idx = self.matched + 1;
if request_snapshot == INVALID_INDEX {
self.next_idx = self.matched + 1;
} else {
self.pending_request_snapshot = request_snapshot;
}
return true;
}

// the rejection must be stale if "rejected" does not match next - 1
if self.next_idx == 0 || self.next_idx - 1 != rejected {
// The rejection must be stale if "rejected" does not match next - 1.
// Do not consider it stale if it is a request snapshot message.
if (self.next_idx == 0 || self.next_idx - 1 != rejected)
&& request_snapshot == INVALID_INDEX
{
return false;
}

self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
// Do not decrease next index if it's requesting snapshot.
if request_snapshot == INVALID_INDEX {
self.next_idx = cmp::min(rejected, last + 1);
if self.next_idx < 1 {
self.next_idx = 1;
}
} else if self.pending_request_snapshot == INVALID_INDEX {
// Allow requesting snapshot even if it's not Replicate.
self.pending_request_snapshot = request_snapshot;
}
self.resume();
true
Expand Down
113 changes: 87 additions & 26 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ pub struct Raft<T: Storage> {
/// The maximum length (in bytes) of all the entries.
pub max_msg_size: u64,

/// The peer is requesting snapshot, it is the index that the follower
/// needs it to be included in a snapshot.
pub pending_request_snapshot: u64,

prs: Option<ProgressSet>,

/// The current role of this node.
Expand Down Expand Up @@ -253,6 +257,7 @@ impl<T: Storage> Raft<T> {
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
prs: Some(ProgressSet::with_capacity(peers.len(), learners.len())),
pending_request_snapshot: INVALID_INDEX,
state: StateRole::Follower,
is_learner: false,
check_quorum: c.check_quorum,
Expand Down Expand Up @@ -525,7 +530,7 @@ impl<T: Storage> Raft<T> {
}

m.set_msg_type(MessageType::MsgSnapshot);
let snapshot_r = self.raft_log.snapshot();
let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot);
if let Err(e) = snapshot_r {
if e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) {
debug!(
Expand Down Expand Up @@ -620,34 +625,28 @@ impl<T: Storage> Raft<T> {
);
return;
}
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
let mut m = Message::default();
m.to = to;
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries
trace!(
self.logger,
"Skipping sending to {to}",
to = to;
"index" => pr.next_idx,
"tag" => &self.tag,
"term" => ?term,
"ents" => ?ents,
);
if pr.pending_request_snapshot != INVALID_INDEX {
// Check pending request snapshot first to avoid unnecessary loading entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
let mut ents = ents.unwrap();
if self.batch_append {
let batched = self.try_batching(to, pr, &mut ents);
if batched {
let term = self.raft_log.term(pr.next_idx - 1);
let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size);
if term.is_err() || ents.is_err() {
// send snapshot if we failed to get term or entries.
if !self.prepare_send_snapshot(&mut m, pr, to) {
return;
}
} else {
let mut ents = ents.unwrap();
if self.batch_append && self.try_batching(to, pr, &mut ents) {
return;
}
self.prepare_send_entries(&mut m, pr, term.unwrap(), ents);
}
let term = term.unwrap();
self.prepare_send_entries(&mut m, pr, term, ents);
}
self.send(m);
}
Expand Down Expand Up @@ -682,9 +681,7 @@ impl<T: Storage> Raft<T> {
self.set_prs(prs);
}

/// Broadcast heartbeats to all the followers.
///
/// If it's not leader, nothing will happen.
/// Broadcasts heartbeats to all the followers if it's leader.
pub fn ping(&mut self) {
if self.state == StateRole::Leader {
self.bcast_heartbeat();
Expand Down Expand Up @@ -770,6 +767,7 @@ impl<T: Storage> Raft<T> {

self.pending_conf_index = 0;
self.read_only = ReadOnly::new(self.read_only.option);
self.pending_request_snapshot = INVALID_INDEX;

let last_index = self.raft_log.last_index();
let self_id = self.id;
Expand Down Expand Up @@ -859,9 +857,11 @@ impl<T: Storage> Raft<T> {

/// Converts this node to a follower.
pub fn become_follower(&mut self, term: u64, leader_id: u64) {
let pending_request_snapshot = self.pending_request_snapshot;
self.reset(term);
self.leader_id = leader_id;
self.state = StateRole::Follower;
self.pending_request_snapshot = pending_request_snapshot;
info!(
self.logger,
"became follower at term {term}",
Expand Down Expand Up @@ -1463,7 +1463,7 @@ impl<T: Storage> Raft<T> {
"tag" => &self.tag,
);

if pr.maybe_decr_to(m.index, m.reject_hint) {
if pr.maybe_decr_to(m.index, m.reject_hint, m.request_snapshot) {
debug!(
self.logger,
"decreased progress of {}",
Expand Down Expand Up @@ -1535,7 +1535,10 @@ impl<T: Storage> Raft<T> {
if pr.state == ProgressState::Replicate && pr.ins.full() {
pr.ins.free_first_one();
}
if pr.matched < self.raft_log.last_index() {
// Does it request snapshot?
if pr.matched < self.raft_log.last_index()
|| pr.pending_request_snapshot != INVALID_INDEX
{
*send_append = true;
}

Expand Down Expand Up @@ -1661,6 +1664,7 @@ impl<T: Storage> Raft<T> {
// out the next msgAppend.
// If snapshot failure, wait for a heartbeat interval before next try
pr.pause();
pr.pending_request_snapshot = INVALID_INDEX;
}

/// Check message's progress to decide which action should be taken.
Expand Down Expand Up @@ -2066,9 +2070,47 @@ impl<T: Storage> Raft<T> {
Ok(())
}

/// Request a snapshot from a leader.
pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> {
if self.state == StateRole::Leader {
info!(
self.logger,
"can not request snapshot on leader; dropping request snapshot";
"tag" => &self.tag,
);
} else if self.leader_id == INVALID_ID {
info!(
self.logger,
"drop request snapshot because of no leader";
"tag" => &self.tag, "term" => self.term,
);
} else if self.get_snap().is_some() {
info!(
self.logger,
"there is a pending snapshot; dropping request snapshot";
"tag" => &self.tag,
);
} else if self.pending_request_snapshot != INVALID_INDEX {
info!(
self.logger,
"there is a pending snapshot; dropping request snapshot";
"tag" => &self.tag,
);
} else {
self.pending_request_snapshot = request_index;
self.send_request_snapshot();
return Ok(());
}
Err(Error::RequestSnapshotDropped)
}

// TODO: revoke pub when there is a better way to test.
/// For a given message, append the entries to the log.
pub fn handle_append_entries(&mut self, m: &Message) {
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
if m.index < self.raft_log.committed {
debug!(
self.logger,
Expand Down Expand Up @@ -2119,6 +2161,10 @@ impl<T: Storage> Raft<T> {
/// For a message, commit and send out heartbeat.
pub fn handle_heartbeat(&mut self, mut m: Message) {
self.raft_log.commit_to(m.commit);
if self.pending_request_snapshot != INVALID_INDEX {
self.send_request_snapshot();
return;
}
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgHeartbeatResponse);
to_send.to = m.from;
Expand Down Expand Up @@ -2164,7 +2210,10 @@ impl<T: Storage> Raft<T> {

fn restore_raft(&mut self, snap: &Snapshot) -> Option<bool> {
let meta = snap.get_metadata();
if self.raft_log.match_term(meta.index, meta.term) {
// Do not fast-forward commit if we are requesting snapshot.
if self.pending_request_snapshot == INVALID_INDEX
&& self.raft_log.match_term(meta.index, meta.term)
{
info!(
self.logger,
"[commit: {commit}, lastindex: {last_index}, lastterm: {last_term}] fast-forwarded commit to \
Expand Down Expand Up @@ -2224,6 +2273,7 @@ impl<T: Storage> Raft<T> {
conf_change.start_index = meta.pending_membership_change_index;
self.pending_membership_change = Some(conf_change);
}
self.pending_request_snapshot = INVALID_INDEX;
None
}

Expand Down Expand Up @@ -2506,4 +2556,15 @@ impl<T: Storage> Raft<T> {
pub fn is_in_membership_change(&self) -> bool {
self.prs().is_in_membership_change()
}

fn send_request_snapshot(&mut self) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgAppendResponse);
m.index = self.raft_log.committed;
m.reject = true;
m.reject_hint = self.raft_log.last_index();
m.to = self.leader_id;
m.request_snapshot = self.pending_request_snapshot;
self.send(m);
}
}
Loading

0 comments on commit 5a9b67d

Please sign in to comment.