Skip to content

Commit

Permalink
Fix: when lack entry, the snapshot to build has to include at least a…
Browse files Browse the repository at this point in the history
…ll purged logs
  • Loading branch information
drmingdrmer committed Jan 18, 2022
1 parent 6f1c36a commit 1a781e1
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 35 deletions.
37 changes: 25 additions & 12 deletions openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
ReplicaEvent::UpdateMatched { target, matched } => {
self.handle_update_matched(target, matched).await?;
}
ReplicaEvent::NeedsSnapshot { target, tx } => {
self.handle_needs_snapshot(target, tx).await?;
ReplicaEvent::NeedsSnapshot {
target: _,
must_include,
tx,
} => {
self.handle_needs_snapshot(must_include, tx).await?;
}
ReplicaEvent::Shutdown => {
self.core.set_target_state(State::Shutdown);
Expand Down Expand Up @@ -214,10 +218,12 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

/// A replication streams requesting for snapshot info.
///
/// The snapshot has to include `must_include`.
#[tracing::instrument(level = "debug", skip(self, tx))]
async fn handle_needs_snapshot(
&mut self,
_: NodeId,
must_include: Option<LogId>,
tx: oneshot::Sender<Snapshot<S::SnapshotData>>,
) -> Result<(), StorageError> {
// Ensure snapshotting is configured, else do nothing.
Expand All @@ -229,15 +235,22 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let current_snapshot_opt = self.core.storage.get_current_snapshot().await?;

if let Some(snapshot) = current_snapshot_opt {
// If snapshot exists, ensure its distance from the leader's last log index is <= half
// of the configured snapshot threshold, else create a new snapshot.
if snapshot_is_within_half_of_threshold(
&snapshot.meta.last_log_id.index,
&self.core.last_log_id.unwrap_or_default().index,
&threshold,
) {
let _ = tx.send(snapshot);
return Ok(());
if let Some(must_inc) = must_include {
if snapshot.meta.last_log_id >= must_inc {
let _ = tx.send(snapshot);
return Ok(());
}
} else {
// If snapshot exists, ensure its distance from the leader's last log index is <= half
// of the configured snapshot threshold, else create a new snapshot.
if snapshot_is_within_half_of_threshold(
&snapshot.meta.last_log_id.index,
&self.core.last_log_id.unwrap_or_default().index,
&threshold,
) {
let _ = tx.send(snapshot);
return Ok(());
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,10 @@ pub enum ReplicationError {
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("store has no log at: {index:?}")]
#[error("store has no log at: {index:?}, last purged: {last_purged_log_id:?}")]
pub struct LackEntry {
pub index: Option<u64>,
pub last_purged_log_id: Option<LogId>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
Expand Down
61 changes: 40 additions & 21 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
// If it returns Ok(), always go back to LineRate state.
let res = match &self.target_repl_state {
TargetReplState::LineRate => self.line_rate_loop().await,
TargetReplState::Snapshotting => self.replicate_snapshot().await,
TargetReplState::Snapshotting { must_include } => {
let must = *must_include;
self.replicate_snapshot(must).await
}
TargetReplState::Shutdown => return,
};

Expand Down Expand Up @@ -229,11 +232,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
// TODO(xp): tell core to quit?
return;
}
ReplicationError::LackEntry(_) => {
self.set_target_repl_state(TargetReplState::Snapshotting);
ReplicationError::LackEntry(lack_ent) => {
self.set_target_repl_state(TargetReplState::Snapshotting {
must_include: lack_ent.last_purged_log_id,
});
}
ReplicationError::CommittedAdvanceTooMany { .. } => {
self.set_target_repl_state(TargetReplState::Snapshotting);
self.set_target_repl_state(TargetReplState::Snapshotting { must_include: None });
}
ReplicationError::StorageError(_err) => {
self.set_target_repl_state(TargetReplState::Shutdown);
Expand Down Expand Up @@ -267,12 +272,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re

let log_state = self.storage.get_log_state().await?;

let smallest_prev_id = log_state.last_purged_log_id;
let last_purged = log_state.last_purged_log_id;

self.check_consecutive(smallest_prev_id)?;
self.check_consecutive(last_purged)?;

if prev_index < smallest_prev_id.index() {
prev_index = smallest_prev_id.index();
if prev_index < last_purged.index() {
prev_index = last_purged.index();
}

let start = prev_index.next_index();
Expand All @@ -281,16 +286,16 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
tracing::debug!(
?self.matched,
?self.max_possible_matched_index,
?smallest_prev_id,
?last_purged,
?prev_index,
end,
"load entries",
);

assert!(end >= prev_index.next_index());

let prev_log_id = if prev_index == smallest_prev_id.index() {
smallest_prev_id
let prev_log_id = if prev_index == last_purged.index() {
last_purged
} else if let Some(prev_i) = prev_index {
let first = self.storage.try_get_log_entry(prev_i).await?;
match first {
Expand Down Expand Up @@ -406,12 +411,13 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re

/// max_possible_matched_index is the least index for `prev_log_id` to form a consecutive log sequence
#[tracing::instrument(level = "trace", skip(self), fields(max_possible_matched_index=self.max_possible_matched_index))]
fn check_consecutive(&self, smallest_prev_id: Option<LogId>) -> Result<(), ReplicationError> {
tracing::debug!(?smallest_prev_id, ?self.max_possible_matched_index, "check_consecutive");
fn check_consecutive(&self, last_purged: Option<LogId>) -> Result<(), ReplicationError> {
tracing::debug!(?last_purged, ?self.max_possible_matched_index, "check_consecutive");

if smallest_prev_id.index() > self.max_possible_matched_index {
if last_purged.index() > self.max_possible_matched_index {
return Err(ReplicationError::LackEntry(LackEntry {
index: self.max_possible_matched_index,
last_purged_log_id: last_purged,
}));
}

Expand Down Expand Up @@ -533,8 +539,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
enum TargetReplState {
/// The replication stream is running at line rate.
LineRate,

/// The replication stream is streaming a snapshot over to the target node.
Snapshotting,
Snapshotting { must_include: Option<LogId> },

/// The replication stream is shutting down.
Shutdown,
}
Expand Down Expand Up @@ -594,8 +602,11 @@ where S: AsyncRead + AsyncSeek + Send + Unpin + 'static
},
/// An event from a replication stream requesting snapshot info.
NeedsSnapshot {
/// The ID of the target node from which the event was sent.
target: NodeId,

/// The log id the caller requires the snapshot has to include.
must_include: Option<LogId>,

/// The response channel for delivering the snapshot data.
tx: oneshot::Sender<Snapshot<S>>,
},
Expand All @@ -615,8 +626,12 @@ impl<S: AsyncRead + AsyncSeek + Send + Unpin + 'static> MessageSummary for Repli
ReplicaEvent::RevertToFollower { ref target, ref term } => {
format!("RevertToFollower: target: {}, term: {}", target, term)
}
ReplicaEvent::NeedsSnapshot { ref target, .. } => {
format!("NeedsSnapshot: target: {}", target)
ReplicaEvent::NeedsSnapshot {
ref target,
ref must_include,
..
} => {
format!("NeedsSnapshot: target: {}, must_include: {:?}", target, must_include)
}
ReplicaEvent::Shutdown => "Shutdown".to_string(),
}
Expand Down Expand Up @@ -699,8 +714,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
}

#[tracing::instrument(level = "debug", skip(self), fields(state = "snapshotting"))]
pub async fn replicate_snapshot(&mut self) -> Result<(), ReplicationError> {
let snapshot = self.wait_for_snapshot().await?;
pub async fn replicate_snapshot(&mut self, snapshot_must_include: Option<LogId>) -> Result<(), ReplicationError> {
let snapshot = self.wait_for_snapshot(snapshot_must_include).await?;
self.stream_snapshot(snapshot).await?;

Ok(())
Expand All @@ -711,7 +726,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
/// If an error comes up during processing, this routine should simple be called again after
/// issuing a new request to the storage layer.
#[tracing::instrument(level = "debug", skip(self))]
async fn wait_for_snapshot(&mut self) -> Result<Snapshot<S::SnapshotData>, ReplicationError> {
async fn wait_for_snapshot(
&mut self,
snapshot_must_include: Option<LogId>,
) -> Result<Snapshot<S::SnapshotData>, ReplicationError> {
// Ask raft core for a snapshot.
// - If raft core has a ready snapshot, it sends back through tx.
// - Otherwise raft core starts a new task taking snapshot, and **close** `tx` when finished. Thus there has to
Expand All @@ -726,6 +744,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
let _ = self.raft_core_tx.send((
ReplicaEvent::NeedsSnapshot {
target: self.target,
must_include: snapshot_must_include,
tx,
},
tracing::debug_span!("CH"),
Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/snapshot/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
#[path = "../fixtures/mod.rs"]
mod fixtures;

mod after_snapshot_add_learner_and_request_a_log;
mod snapshot_chunk_size;
mod snapshot_ge_half_threshold;
mod snapshot_line_rate_to_snapshot;
mod snapshot_overrides_membership;
mod snapshot_uses_prev_snap_membership;
mod after_snapshot_add_learner_and_request_a_log;

0 comments on commit 1a781e1

Please sign in to comment.