Skip to content

Commit

Permalink
fix: leader should re-create and send snapshot when `threshold/2 < la…
Browse files Browse the repository at this point in the history
…st_log_index - snapshot < threshold`

The problem:

If `last_log_index` advances `snapshot.applied_index` too many, i.e.:
`threshold/2 < last_log_index - snapshot < threshold`
(e.g., `10/2 < 16-10 < 20` in the test that reproduce this bug), the leader
tries to re-create a new snapshot. But when
`last_log_index < threshold`, it won't create, which result in a dead
loop.

Solution:

In such case, force to create a snapshot without considering the
threshold.
  • Loading branch information
drmingdrmer committed Jul 8, 2021
1 parent b351c87 commit cf4badd
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 9 deletions.
2 changes: 1 addition & 1 deletion async-raft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

// Trigger log compaction if needed.
self.core.trigger_log_compaction_if_needed();
self.core.trigger_log_compaction_if_needed(false);
}

/// Apply the given log entry to the state machine.
Expand Down
16 changes: 9 additions & 7 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
}

/// Trigger a log compaction (snapshot) job if needed.
/// If force is True, it will skip the threshold check and start creating snapshot as demanded.
#[tracing::instrument(level = "trace", skip(self))]
pub(self) fn trigger_log_compaction_if_needed(&mut self) {
pub(self) fn trigger_log_compaction_if_needed(&mut self, force: bool) {
if self.snapshot_state.is_some() {
return;
}
Expand All @@ -426,11 +427,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
if self.last_applied == 0 || self.last_applied < self.snapshot_index {
return;
}
// If we are below the threshold, then there is nothing to do.
let is_below_threshold =
self.last_applied.checked_sub(self.snapshot_index).map(|diff| diff < *threshold).unwrap_or(false);
if is_below_threshold {
return;

if !force {
// If we are below the threshold, then there is nothing to do.
if self.last_applied < self.snapshot_index + *threshold {
return;
}
}

// At this point, we are clear to begin a new compaction process.
Expand Down Expand Up @@ -473,7 +475,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.last_applied = last_applied;
}
self.report_metrics(Update::Ignore);
self.trigger_log_compaction_if_needed();
self.trigger_log_compaction_if_needed(false);
Ok(())
}

Expand Down
6 changes: 5 additions & 1 deletion async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.get_current_snapshot()
.await
.map_err(|err| self.core.map_fatal_storage_error(err))?;

if let Some(snapshot) = current_snapshot_opt {
// If snapshot exists, ensure its distance from the leader's last log index is <= half
// of the configured snapshot threshold, else create a new snapshot.
Expand Down Expand Up @@ -325,7 +326,10 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
//
// If this block is executed, and a snapshot is needed, the repl stream will submit another
// request here shortly, and will hit the above logic where it will await the snapshot completion.
self.core.trigger_log_compaction_if_needed();
//
// If snapshot is too old, i.e., the distance from last_log_index is greater than half of snapshot threshold,
// always force a snapshot creation.
self.core.trigger_log_compaction_if_needed(true);
Ok(())
}
}
Expand Down
103 changes: 103 additions & 0 deletions async-raft/tests/snapshot_ge_half_threshold.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
mod fixtures;

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::Config;
use async_raft::SnapshotPolicy;
use async_raft::State;
use fixtures::RaftRouter;
use maplit::hashset;
use tokio::time::sleep;

/// A leader should create and send snapshot when snapshot is old and is not that old to trigger a snapshot, i.e.:
/// `threshold/2 < leader.last_log_index - snapshot.applied_index < threshold`
///
/// What does this test do?
///
/// - build a stable single node cluster.
/// - send enough requests to the node that log compaction will be triggered.
/// - send some other log after snapshot created, to make the `leader.last_log_index - snapshot.applied_index` big
/// enough.
/// - add non-voter and assert that they receive the snapshot and logs.
///
/// export RUST_LOG=async_raft,memstore,snapshot_ge_half_threshold=trace
/// cargo test -p async-raft --test snapshot_ge_half_threshold
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn snapshot_ge_half_threshold() -> Result<()> {
fixtures::init_tracing();

let snapshot_threshold: u64 = 10;
let log_cnt = snapshot_threshold + 6;

let config = Arc::new(
Config::build("test".into())
.snapshot_policy(SnapshotPolicy::LogsSinceLast(snapshot_threshold))
.validate()
.expect("failed to build Raft config"),
);
let router = Arc::new(RaftRouter::new(config.clone()));

let mut want = 0;

tracing::info!("--- initializing cluster");
{
router.new_raft_node(0).await;

router.wait_for_log(&hashset![0], want, None, "empty").await?;
router.wait_for_state(&hashset![0], State::NonVoter, None, "empty").await?;
router.initialize_from_single_node(0).await?;
want += 1;

router.wait_for_log(&hashset![0], want, None, "init leader").await?;
router.assert_stable_cluster(Some(1), Some(want)).await;
}

tracing::info!("--- send just enough logs to trigger snapshot");
{
router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await;
want = snapshot_threshold;

router.wait_for_log(&hashset![0], want, None, "send log to trigger snapshot").await?;
router.assert_stable_cluster(Some(1), Some(want)).await;

// TODO: add snapshot info into metrics.
// Then watch metrics instead of waiting.
sleep(Duration::from_secs(5)).await;
router
.assert_storage_state(
1,
want,
Some(0),
want,
Some((want.into(), 1, MembershipConfig {
members: hashset![0],
members_after_consensus: None,
})),
)
.await;
}

tracing::info!("--- send logs to make distance between snapshot index and last_log_index");
{
router.client_request_many(0, "0", (log_cnt - want) as usize).await;
want = log_cnt;
}

tracing::info!("--- add non-voter to receive snapshot and logs");
{
router.new_raft_node(1).await;
router.add_non_voter(0, 1).await.expect("failed to add new node as non-voter");

router.wait_for_log(&hashset![0, 1], want, None, "add non-voter").await?;
let expected_snap = Some((want.into(), 1, MembershipConfig {
members: hashset![0u64],
members_after_consensus: None,
}));
router.assert_storage_state(1, want, None /* non-voter does not vote */, want, expected_snap).await;
}

Ok(())
}

0 comments on commit cf4badd

Please sign in to comment.