Skip to content

Commit

Permalink
feature: report snapshot metrics to RaftMetrics::snapshot, which is a…
Browse files Browse the repository at this point in the history
… LogId: (term, index) that a snapshot includes

- Add: `Wait.snapshot()` to watch snapshot changes.
- Test: replace `sleep()` with `wait_for_snapshot()` to speed up tests.
  • Loading branch information
drmingdrmer committed Jul 9, 2021
1 parent 5eb9d3a commit 8e0b0df
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 22 deletions.
1 change: 1 addition & 0 deletions async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.last_log_term = req.last_included.term;
self.last_applied = req.last_included.index;
self.snapshot_last_included = req.last_included;
self.report_metrics(Update::Ignore);
Ok(())
}
}
7 changes: 5 additions & 2 deletions async-raft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
self.storage.get_current_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?
{
self.snapshot_last_included = snapshot.included;
self.report_metrics(Update::Ignore);
}

let has_log = self.last_log_index != u64::min_value();
Expand Down Expand Up @@ -287,6 +288,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
last_applied: self.last_applied,
current_leader: self.current_leader,
membership_config: self.membership.clone(),
snapshot: self.snapshot_last_included,
leader_metrics,
});

Expand Down Expand Up @@ -409,11 +411,12 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
#[tracing::instrument(level = "trace", skip(self))]
fn update_snapshot_state(&mut self, update: SnapshotUpdate) {
if let SnapshotUpdate::SnapshotComplete(log_id) = update {
self.snapshot_last_included = log_id
self.snapshot_last_included = log_id;
self.report_metrics(Update::Ignore);
}
// If snapshot state is anything other than streaming, then drop it.
if let Some(state @ SnapshotState::Streaming { .. }) = self.snapshot_state.take() {
self.snapshot_state = Some(state)
self.snapshot_state = Some(state);
}
}

Expand Down
16 changes: 16 additions & 0 deletions async-raft/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::time::Duration;

use crate::core::State;
use crate::raft::MembershipConfig;
use crate::LogId;
use crate::NodeId;
use crate::RaftError;
use crate::ReplicationMetrics;
Expand All @@ -40,6 +41,10 @@ pub struct RaftMetrics {
/// The current membership config of the cluster.
pub membership_config: MembershipConfig,

/// The id of the last log included in snapshot.
/// If there is no snapshot, it is (0,0).
pub snapshot: LogId,

/// The metrics about the leader. It is Some() only when this node is leader.
pub leader_metrics: Option<LeaderMetrics>,
}
Expand All @@ -62,6 +67,7 @@ impl RaftMetrics {
last_applied: 0,
current_leader: None,
membership_config,
snapshot: LogId { term: 0, index: 0 },
leader_metrics: None,
}
}
Expand Down Expand Up @@ -190,4 +196,14 @@ impl Wait {
)
.await
}

/// Wait for `snapshot` to become `want_snapshot` or timeout.
#[tracing::instrument(level = "debug", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn snapshot(&self, want_snapshot: LogId, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
self.metrics(
|x| x.snapshot == want_snapshot,
&format!("{} .snapshot -> {:?}", msg.to_string(), want_snapshot),
)
.await
}
}
44 changes: 44 additions & 0 deletions async-raft/src/metrics_wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio::time::sleep;
use crate::metrics::Wait;
use crate::metrics::WaitError;
use crate::raft::MembershipConfig;
use crate::LogId;
use crate::RaftMetrics;
use crate::State;

Expand Down Expand Up @@ -99,6 +100,48 @@ async fn test_wait() -> anyhow::Result<()> {
assert_eq!(Some(hashset![1, 2]), got.membership_config.members_after_consensus);
}

tracing::info!("--- wait for snapshot, Ok");
{
let (init, w, tx) = init_wait_test();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.snapshot = LogId { term: 1, index: 2 };
let rst = tx.send(update);
assert!(rst.is_ok());
});
let got = w.snapshot(LogId { term: 1, index: 2 }, "snapshot").await?;
h.await?;

assert_eq!(LogId { term: 1, index: 2 }, got.snapshot);
}

tracing::info!("--- wait for snapshot, only index matches");
{
let (init, w, tx) = init_wait_test();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.snapshot = LogId { term: 3, index: 2 };
let rst = tx.send(update);
assert!(rst.is_ok());
// delay otherwise the channel will be closed thus the error is shutdown.
sleep(Duration::from_millis(200)).await;
});
let got = w.snapshot(LogId { term: 1, index: 2 }, "snapshot").await;
h.await?;
match got.unwrap_err() {
WaitError::Timeout(t, _) => {
assert_eq!(Duration::from_millis(100), t);
}
_ => {
panic!("expect WaitError::Timeout");
}
}
}

{
// timeout
let (_init, w, _tx) = init_wait_test();
Expand Down Expand Up @@ -136,6 +179,7 @@ fn init_wait_test() -> (RaftMetrics, Wait, watch::Sender<RaftMetrics>) {
members: Default::default(),
members_after_consensus: None,
},
snapshot: LogId { term: 0, index: 0 },
leader_metrics: None,
};
let (tx, rx) = watch::channel(init.clone());
Expand Down
24 changes: 11 additions & 13 deletions async-raft/tests/compaction.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
mod fixtures;

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::Config;
use async_raft::LogId;
use async_raft::SnapshotPolicy;
use async_raft::State;
use fixtures::RaftRouter;
use maplit::hashset;
use tokio::time::sleep;

/// Compaction test.
///
Expand All @@ -25,10 +24,12 @@ use tokio::time::sleep;
async fn compaction() -> Result<()> {
fixtures::init_tracing();

let snapshot_threshold: u64 = 50;

// Setup test dependencies.
let config = Arc::new(
Config::build("test".into())
.snapshot_policy(SnapshotPolicy::LogsSinceLast(500))
.snapshot_policy(SnapshotPolicy::LogsSinceLast(snapshot_threshold))
.validate()
.expect("failed to build Raft config"),
);
Expand All @@ -53,22 +54,19 @@ async fn compaction() -> Result<()> {

// Send enough requests to the cluster that compaction on the node should be triggered.
// Puts us exactly at the configured snapshot policy threshold.
router.client_request_many(0, "0", 499).await;
want += 499;
router.client_request_many(0, "0", (snapshot_threshold - want) as usize).await;
want = snapshot_threshold;

router.wait_for_log(&hashset![0], want, None, "write").await?;
router.assert_stable_cluster(Some(1), Some(want)).await;

// TODO: add snapshot info into metrics.
// Then watch metrics instead of waiting.
sleep(Duration::from_secs(10)).await;
router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
router
.assert_storage_state(
1,
500,
want,
Some(0),
500,
Some((500.into(), 1, MembershipConfig {
want,
Some((want.into(), 1, MembershipConfig {
members: hashset![0],
members_after_consensus: None,
})),
Expand All @@ -85,7 +83,7 @@ async fn compaction() -> Result<()> {
want += 1;

router.wait_for_log(&hashset![0, 1], want, None, "add follower").await?;
let expected_snap = Some((500.into(), 1, MembershipConfig {
let expected_snap = Some((snapshot_threshold.into(), 1, MembershipConfig {
members: hashset![0u64],
members_after_consensus: None,
}));
Expand Down
16 changes: 16 additions & 0 deletions async-raft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use async_raft::raft::VoteRequest;
use async_raft::raft::VoteResponse;
use async_raft::storage::RaftStorage;
use async_raft::Config;
use async_raft::LogId;
use async_raft::NodeId;
use async_raft::Raft;
use async_raft::RaftMetrics;
Expand Down Expand Up @@ -232,6 +233,21 @@ impl RaftRouter {
Ok(())
}

/// Wait for specified nodes until their snapshot becomes `want`.
#[tracing::instrument(level = "info", skip(self))]
pub async fn wait_for_snapshot(
&self,
node_ids: &HashSet<u64>,
want: LogId,
timeout: Option<Duration>,
msg: &str,
) -> Result<()> {
for i in node_ids.iter() {
self.wait(i, timeout).await?.snapshot(want, msg).await?;
}
Ok(())
}

/// Get the ID of the current leader.
pub async fn leader(&self) -> Option<NodeId> {
let isolated = self.isolated_nodes.read().await;
Expand Down
8 changes: 3 additions & 5 deletions async-raft/tests/snapshot_ge_half_threshold.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
mod fixtures;

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::Config;
use async_raft::LogId;
use async_raft::SnapshotPolicy;
use async_raft::State;
use fixtures::RaftRouter;
use maplit::hashset;
use tokio::time::sleep;

/// A leader should create and send snapshot when snapshot is old and is not that old to trigger a snapshot, i.e.:
/// `threshold/2 < leader.last_log_index - snapshot.applied_index < threshold`
Expand Down Expand Up @@ -63,9 +62,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
router.wait_for_log(&hashset![0], want, None, "send log to trigger snapshot").await?;
router.assert_stable_cluster(Some(1), Some(want)).await;

// TODO: add snapshot info into metrics.
// Then watch metrics instead of waiting.
sleep(Duration::from_secs(5)).await;
router.wait_for_snapshot(&hashset![0], LogId { term: 1, index: want }, None, "snapshot").await?;
router
.assert_storage_state(
1,
Expand Down Expand Up @@ -96,6 +93,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
members: hashset![0u64],
members_after_consensus: None,
}));
router.wait_for_snapshot(&hashset![1], LogId { term: 1, index: want }, None, "").await?;
router.assert_storage_state(1, want, None /* non-voter does not vote */, want, expected_snap).await;
}

Expand Down
16 changes: 14 additions & 2 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,21 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
{ snapshot_size = snapshot.get_ref().len() },
"decoding snapshot for installation"
);
let raw = serde_json::to_string_pretty(snapshot.get_ref().as_slice())?;
println!("JSON SNAP:\n{}", raw);

{
let t = snapshot.get_ref().as_slice();
let y = std::str::from_utf8(t).unwrap();
tracing::debug!("JSON SNAP:\n{}", y);
}

let new_snapshot: MemStoreSnapshot = serde_json::from_slice(snapshot.get_ref().as_slice())?;

{
let t = &new_snapshot.data;
let y = std::str::from_utf8(t).unwrap();
tracing::debug!("JSON SNAP DATA:\n{}", y);
}

// Update log.
{
// Go backwards through the log to find the most recent membership config <= the `through` index.
Expand Down

0 comments on commit 8e0b0df

Please sign in to comment.