Skip to content

Commit

Permalink
Feature: add RaftMetrics.vote, Wait::vote()
Browse files Browse the repository at this point in the history
The latest approved value of `Vote`, which has been saved to disk, is
referred to as `RaftMetrics.vote`. Additionally, a new `vote()` method
has been included in `Wait` to enable the application to wait for `vote`
to reach the anticipated value.
  • Loading branch information
drmingdrmer committed May 2, 2023
1 parent 8bf82a9 commit 0b419eb
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 11 deletions.
18 changes: 11 additions & 7 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,21 +507,24 @@ where
/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn report_metrics(&self, replication: Option<ReplicationMetrics<C::NodeId>>) {
let st = &self.engine.state;

let m = RaftMetrics {
running_state: Ok(()),
id: self.id,

// --- data ---
current_term: self.engine.state.vote_ref().leader_id().get_term(),
last_log_index: self.engine.state.last_log_id().index(),
last_applied: self.engine.state.io_applied().copied(),
snapshot: self.engine.state.snapshot_meta.last_log_id,
purged: self.engine.state.io_purged().copied(),
current_term: st.vote_ref().leader_id().get_term(),
vote: *st.io_state().vote(),
last_log_index: st.last_log_id().index(),
last_applied: st.io_applied().copied(),
snapshot: st.snapshot_meta.last_log_id,
purged: st.io_purged().copied(),

// --- cluster ---
state: self.engine.state.server_state,
state: st.server_state,
current_leader: self.current_leader(),
membership_config: self.engine.state.membership_state.effective().stored_membership().clone(),
membership_config: st.membership_state.effective().stored_membership().clone(),

// --- replication ---
replication,
Expand Down Expand Up @@ -1520,6 +1523,7 @@ where
}
Command::SaveVote { vote } => {
self.log_store.save_vote(&vote).await?;
self.engine.state.io_state_mut().update_vote(vote);
}
Command::PurgeLog { upto } => {
self.log_store.purge(upto).await?;
Expand Down
8 changes: 7 additions & 1 deletion openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::summary::MessageSummary;
use crate::LogId;
use crate::NodeId;
use crate::StoredMembership;
use crate::Vote;

/// A set of metrics describing the current state of a Raft node.
#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -28,6 +29,9 @@ where
/// The current term of the Raft node.
pub current_term: u64,

/// The last accepted vote.
pub vote: Vote<NID>,

/// The last log index has been appended to this Raft node's log.
pub last_log_index: Option<u64>,

Expand Down Expand Up @@ -70,10 +74,11 @@ where
{
// TODO: make this more readable
fn summary(&self) -> String {
format!("Metrics{{id:{},{:?}, term:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, purged:{}, replication:{{{}}}",
format!("Metrics{{id:{},{:?}, term:{}, vote:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, purged:{}, replication:{{{}}}",
self.id,
self.state,
self.current_term,
self.vote,
self.last_log_index,
self.last_applied.summary(),
self.current_leader,
Expand All @@ -98,6 +103,7 @@ where
id,

current_term: 0,
vote: Vote::default(),
last_log_index: None,
last_applied: None,
snapshot: None,
Expand Down
7 changes: 7 additions & 0 deletions openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::LogId;
use crate::LogIdOptionExt;
use crate::MessageSummary;
use crate::NodeId;
use crate::Vote;

// Error variants related to metrics.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -105,6 +106,12 @@ where
}
}

/// Wait for `vote` to become `want` or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn vote(&self, want: Vote<NID>, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
self.metrics(|m| m.vote == want, &format!("{} .vote -> {}", msg.to_string(), want)).await
}

/// Wait for `current_leader` to become `Some(leader_id)` until timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn current_leader(&self, leader_id: NID, msg: impl ToString) -> Result<RaftMetrics<NID, N>, WaitError> {
Expand Down
25 changes: 25 additions & 0 deletions openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::Node;
use crate::NodeId;
use crate::RaftMetrics;
use crate::StoredMembership;
use crate::Vote;

/// Test wait for different state changes
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
Expand Down Expand Up @@ -171,6 +172,29 @@ async fn test_wait() -> anyhow::Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_wait_vote() -> anyhow::Result<()> {
let (init, w, tx) = init_wait_test::<u64, ()>();

let h = tokio::spawn(async move {
sleep(Duration::from_millis(10)).await;
let mut update = init.clone();
update.vote = Vote::new_committed(1, 2);
let rst = tx.send(update);
assert!(rst.is_ok());
});

// timeout
let res = w.vote(Vote::new(1, 2), "vote").await;
assert!(res.is_err());

let got = w.vote(Vote::new_committed(1, 2), "vote").await?;
h.await?;
assert_eq!(Vote::new_committed(1, 2), got.vote);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_wait_purged() -> anyhow::Result<()> {
let (init, w, tx) = init_wait_test::<u64, ()>();
Expand Down Expand Up @@ -203,6 +227,7 @@ where
id: NID::default(),
state: ServerState::Learner,
current_term: 0,
vote: Vote::default(),
last_log_index: None,
last_applied: None,
purged: None,
Expand Down
21 changes: 20 additions & 1 deletion openraft/src/raft_state/io_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::display_ext::DisplayOption;
use crate::LeaderId;
use crate::LogId;
use crate::NodeId;
use crate::Vote;

#[derive(Debug, Clone, Copy)]
#[derive(Default)]
Expand All @@ -23,6 +24,9 @@ pub(crate) struct IOState<NID: NodeId> {
/// Whether it is building a snapshot
building_snapshot: bool,

// The last flushed vote.
pub(crate) vote: Vote<NID>,

/// The last log id that has been flushed to storage.
pub(crate) flushed: LogIOId<NID>,

Expand All @@ -38,14 +42,29 @@ pub(crate) struct IOState<NID: NodeId> {
}

impl<NID: NodeId> IOState<NID> {
pub(crate) fn new(flushed: LogIOId<NID>, applied: Option<LogId<NID>>, purged: Option<LogId<NID>>) -> Self {
pub(crate) fn new(
vote: Vote<NID>,
flushed: LogIOId<NID>,
applied: Option<LogId<NID>>,
purged: Option<LogId<NID>>,
) -> Self {
Self {
building_snapshot: false,
vote,
flushed,
applied,
purged,
}
}

pub(crate) fn update_vote(&mut self, vote: Vote<NID>) {
self.vote = vote;
}

pub(crate) fn vote(&self) -> &Vote<NID> {
&self.vote
}

pub(crate) fn update_applied(&mut self, log_id: Option<LogId<NID>>) {
tracing::debug!(applied = display(DisplayOption(&log_id)), "{}", func_name!());

Expand Down
14 changes: 14 additions & 0 deletions openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,20 @@ where
&mut self.io_state
}

// TODO: move these doc to the [`IOState`]
/// Returns the state of the already happened IO.
///
/// [`RaftState`] stores the expected state when all queued IO are completed,
/// which may advance the actual IO state.
///
/// [`IOState`] stores the actual state of the storage.
///
/// Usually, when a client request is handled, [`RaftState`] is updated and several IO command
/// is enqueued. And when the IO commands are completed, [`IOState`] is updated.
pub(crate) fn io_state(&self) -> &IOState<NID> {
&self.io_state
}

/// Find the first entry in the input that does not exist on local raft-log,
/// by comparing the log id.
pub(crate) fn first_conflicting_index<Ent>(&self, entries: &[Ent]) -> usize
Expand Down
6 changes: 4 additions & 2 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ where
/// state from stable storage.
pub async fn get_initial_state(&mut self) -> Result<RaftState<C::NodeId, C::Node>, StorageError<C::NodeId>> {
let vote = self.log_store.read_vote().await?;
let vote = vote.unwrap_or_default();

let st = self.log_store.get_log_state().await?;
let mut last_purged_log_id = st.last_purged_log_id;
let mut last_log_id = st.last_log_id;
Expand All @@ -75,7 +77,7 @@ where
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?;

// TODO: `flushed` is not set.
let io_state = IOState::new(LogIOId::default(), last_applied, last_purged_log_id);
let io_state = IOState::new(vote, LogIOId::default(), last_applied, last_purged_log_id);

let snapshot = self.state_machine.get_current_snapshot().await?;

Expand All @@ -101,7 +103,7 @@ where
committed: last_applied,
// The initial value for `vote` is the minimal possible value.
// See: [Conditions for initialization](https://datafuselabs.github.io/openraft/cluster-formation.html#conditions-for-initialization)
vote: UTime::new(now, vote.unwrap_or_default()),
vote: UTime::new(now, vote),
purged_next: last_purged_log_id.next_index(),
log_ids,
membership_state: mem_state,
Expand Down

0 comments on commit 0b419eb

Please sign in to comment.