Skip to content

Commit

Permalink
Merge pull request #242 from lichuang/optimize_report_metrics
Browse files Browse the repository at this point in the history
Refactor: optimize `report_metrics` call, avoids call this function everywhere
  • Loading branch information
mergify[bot] authored Mar 12, 2022
2 parents 70689b7 + 16a2cc5 commit f5b5d54
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 33 deletions.
20 changes: 7 additions & 13 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::RaftNetworkFactory;
use crate::RaftStorage;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Update;

impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C, N, S> {
/// An RPC invoked by the leader to replicate log entries (§5.3); also used as heartbeat (§5.2).
Expand Down Expand Up @@ -47,7 +46,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.set_target_state(State::Follower); // State update will emit metrics.
}

self.report_metrics(Update::AsIs);
self.update_other_metrics_option();
}

// Caveat: [commit-index must not advance the last known consistent log](https://datafuselabs.github.io/openraft/replication.html#caveat-commit-index-must-not-advance-the-last-known-consistent-log)
Expand Down Expand Up @@ -211,11 +210,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// This is guaranteed by caller.
self.committed = committed;

let need_to_report_metrics = !self.replicate_to_state_machine_if_needed().await?;

if need_to_report_metrics {
self.report_metrics(Update::AsIs);
}
self.replicate_to_state_machine_if_needed().await?;

Ok(AppendEntriesResponse::Success)
}
Expand Down Expand Up @@ -334,9 +329,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
///
/// Very importantly, this routine must not block the main control loop main task, else it
/// may cause the Raft leader to timeout the requests to this node.
/// return if or not has `report_metrics`, than caller donot need to call it again.
#[tracing::instrument(level = "debug", skip(self))]
async fn replicate_to_state_machine_if_needed(&mut self) -> Result<bool, StorageError<C>> {
async fn replicate_to_state_machine_if_needed(&mut self) -> Result<(), StorageError<C>> {
tracing::debug!(?self.last_applied, ?self.committed, "replicate_to_sm_if_needed");

// If we don't have any new entries to replicate, then do nothing.
Expand All @@ -346,7 +340,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.committed,
self.last_applied
);
return Ok(false);
self.update_other_metrics_option();
return Ok(());
}

// Drain entries from the beginning of the cache up to commit index.
Expand All @@ -364,9 +359,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

self.last_applied = Some(last_log_id);

self.report_metrics(Update::AsIs);
self.trigger_log_compaction_if_needed(false).await;

Ok(true)
self.update_other_metrics_option();
Ok(())
}
}
5 changes: 2 additions & 3 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::RaftTypeConfig;
use crate::SnapshotSegmentId;
use crate::StorageError;
use crate::StorageIOError;
use crate::Update;

impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C, N, S> {
/// Invoked by leader to send chunks of a snapshot to a follower (§7).
Expand Down Expand Up @@ -52,7 +51,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.set_target_state(State::Follower); // State update will emit metrics.
}

self.report_metrics(Update::AsIs);
self.update_other_metrics_option();
}

// Compare current snapshot state with received RPC and handle as needed.
Expand Down Expand Up @@ -240,7 +239,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.update_membership(membership);

self.snapshot_last_log_id = self.last_applied;
self.report_metrics(Update::AsIs);
self.update_other_metrics_option();

Ok(())
}
Expand Down
128 changes: 116 additions & 12 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ use crate::LogId;
use crate::Membership;
use crate::MessageSummary;
use crate::Node;
use crate::NodeId;
use crate::RaftNetworkFactory;
use crate::RaftStorage;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Update;
use crate::UpdateMetricsOption;

/// The currently active membership config.
///
Expand Down Expand Up @@ -148,6 +150,13 @@ impl<C: RaftTypeConfig> MessageSummary for EffectiveMembership<C> {
}
}

pub trait MetricsOptionUpdater<NID: NodeId> {
// the default impl of `get_leader_metrics_option`, for the non-leader state
fn get_leader_metrics_option(&self, _option: Update<()>) -> Update<Option<Versioned<LeaderMetrics<NID>>>> {
Update::Update(None)
}
}

/// The core type implementing the Raft protocol.
pub struct RaftCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> {
/// This node's ID.
Expand Down Expand Up @@ -195,6 +204,9 @@ pub struct RaftCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<
/// The last time a heartbeat was received.
last_heartbeat: Option<Instant>,

/// Options of update the metrics
update_metrics: UpdateMetricsOption,

/// The duration until the next election timeout.
next_election_timeout: Option<Instant>,

Expand Down Expand Up @@ -238,6 +250,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
snapshot_state: None,
snapshot_last_log_id: None,
last_heartbeat: None,
update_metrics: UpdateMetricsOption::default(),
next_election_timeout: None,

tx_compaction,
Expand Down Expand Up @@ -292,7 +305,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// Fetch the most recent snapshot in the system.
if let Some(snapshot) = self.storage.get_current_snapshot().await? {
self.snapshot_last_log_id = Some(snapshot.meta.last_log_id);
self.report_metrics(Update::AsIs);
self.update_other_metrics_option();
}

let has_log = if self.last_log_id.is_some() {
Expand Down Expand Up @@ -368,11 +381,40 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

pub fn update_leader_metrics_option(&mut self) {
self.update_metrics.leader = Update::Update(());
}

pub fn update_other_metrics_option(&mut self) {
self.update_metrics.other_metrics = Update::Update(());
}

pub fn reset_update_metrics(&mut self) {
// reset update_metrics
self.update_metrics = UpdateMetricsOption::default();
}

#[tracing::instrument(level = "trace", skip(self, metrics_reporter))]
pub fn check_report_metrics(&self, metrics_reporter: &impl MetricsOptionUpdater<C::NodeId>) {
let update_metrics = self.update_metrics.clone();
let leader_metrics = metrics_reporter.get_leader_metrics_option(update_metrics.leader);
let other_metrics = update_metrics.other_metrics;

match leader_metrics {
Update::AsIs => {
if other_metrics == Update::Update(()) {
self.report_metrics(Update::Update(None));
}
}
Update::Update(u) => self.report_metrics(Update::Update(u)),
}
}

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "trace", skip(self))]
fn report_metrics(&mut self, leader_metrics: Update<Option<&Versioned<LeaderMetrics<C::NodeId>>>>) {
fn report_metrics(&self, leader_metrics: Update<Option<Versioned<LeaderMetrics<C::NodeId>>>>) {
let leader_metrics = match leader_metrics {
Update::Update(v) => v.cloned(),
Update::Update(v) => v,
Update::AsIs => self.tx_metrics.borrow().leader_metrics.clone(),
};

Expand Down Expand Up @@ -482,7 +524,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
fn update_snapshot_state(&mut self, update: SnapshotUpdate<C>) {
if let SnapshotUpdate::SnapshotComplete(log_id) = update {
self.snapshot_last_log_id = Some(log_id);
self.report_metrics(Update::AsIs);
self.update_other_metrics_option();
}
// If snapshot state is anything other than streaming, then drop it.
if let Some(state @ SnapshotState::Streaming { .. }) = self.snapshot_state.take() {
Expand Down Expand Up @@ -770,6 +812,17 @@ struct LeaderState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStora
pub(super) awaiting_committed: Vec<ClientRequestEntry<C>>,
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsOptionUpdater<C::NodeId>
for LeaderState<'a, C, N, S>
{
fn get_leader_metrics_option(&self, option: Update<()>) -> Update<Option<Versioned<LeaderMetrics<C::NodeId>>>> {
match option {
Update::AsIs => Update::AsIs,
Update::Update(_) => Update::Update(Some(self.leader_metrics.clone())),
}
}
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderState<'a, C, N, S> {
/// Create a new instance.
pub(self) fn new(core: &'a mut RaftCore<C, N, S>) -> Self {
Expand Down Expand Up @@ -806,8 +859,6 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
self.nodes.insert(target, state);
}

self.leader_report_metrics();

self.commit_initial_leader_entry().await?;

self.leader_loop().await?;
Expand All @@ -817,6 +868,10 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

#[tracing::instrument(level="debug", skip(self), fields(id=display(self.core.id)))]
pub(self) async fn leader_loop(mut self) -> Result<(), Fatal<C>> {
// report the leader metrics every time there came to a new leader
// if not `report_metrics` before the leader loop, the leader metrics may not be updated cause no coming event.
self.core.report_metrics(Update::Update(Some(self.leader_metrics.clone())));

loop {
if !self.core.target_state.is_leader() {
tracing::info!("id={} state becomes: {:?}", self.core.id, self.core.target_state);
Expand All @@ -829,6 +884,9 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
let span = tracing::debug_span!("CHrx:LeaderState");
let _ent = span.enter();

self.core.check_report_metrics(&self);
self.core.reset_update_metrics();

tokio::select! {
Some((msg,span)) = self.core.rx_api.recv() => {
self.handle_msg(msg).instrument(span).await?;
Expand Down Expand Up @@ -898,9 +956,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
}

/// Report metrics with leader specific states.
#[tracing::instrument(level = "trace", skip(self))]
pub fn leader_report_metrics(&mut self) {
self.core.report_metrics(Update::Update(Some(&self.leader_metrics)));
self.core.update_leader_metrics_option();
}
}

Expand Down Expand Up @@ -950,6 +1007,12 @@ struct CandidateState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftSt
granted: BTreeSet<C::NodeId>,
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsOptionUpdater<C::NodeId>
for CandidateState<'a, C, N, S>
{
// the non-leader state use the default impl of `get_leader_metrics_option`
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> CandidateState<'a, C, N, S> {
pub(self) fn new(core: &'a mut RaftCore<C, N, S>) -> Self {
Self {
Expand All @@ -960,21 +1023,30 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Candida

/// Run the candidate loop.
#[tracing::instrument(level="debug", skip(self), fields(id=display(self.core.id), raft_state="candidate"))]
pub(self) async fn run(mut self) -> Result<(), Fatal<C>> {
pub(self) async fn run(self) -> Result<(), Fatal<C>> {
// Each iteration of the outer loop represents a new term.
self.candidate_loop().await?;
Ok(())
}

async fn candidate_loop(mut self) -> Result<(), Fatal<C>> {
// report the new state before enter the loop
self.core.report_metrics(Update::Update(None));

loop {
if !self.core.target_state.is_candidate() {
return Ok(());
}

self.core.check_report_metrics(&self);
self.core.reset_update_metrics();

// Setup new term.
self.core.update_next_election_timeout(false); // Generates a new rand value within range.

self.core.vote = Vote::new(self.core.vote.term + 1, self.core.id);

self.core.save_vote().await?;
self.core.report_metrics(Update::Update(None));

// vote for itself.
self.handle_vote_response(
Expand All @@ -998,6 +1070,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Candida
if !self.core.target_state.is_candidate() {
return Ok(());
}

let timeout_fut = sleep_until(self.core.get_next_election_timeout());

let span = tracing::debug_span!("CHrx:CandidateState");
Expand Down Expand Up @@ -1065,21 +1138,37 @@ pub struct FollowerState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: Raf
core: &'a mut RaftCore<C, N, S>,
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsOptionUpdater<C::NodeId>
for FollowerState<'a, C, N, S>
{
// the non-leader state use the default impl of `get_leader_metrics_option`
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> FollowerState<'a, C, N, S> {
pub(self) fn new(core: &'a mut RaftCore<C, N, S>) -> Self {
Self { core }
}

/// Run the follower loop.
#[tracing::instrument(level="debug", skip(self), fields(id=display(self.core.id), raft_state="follower"))]
pub(self) async fn run(mut self) -> Result<(), Fatal<C>> {
pub(self) async fn run(self) -> Result<(), Fatal<C>> {
self.follower_loop().await?;
Ok(())
}

#[tracing::instrument(level="debug", skip(self), fields(id=display(self.core.id), raft_state="follower"))]
pub(self) async fn follower_loop(mut self) -> Result<(), Fatal<C>> {
// report the new state before enter the loop
self.core.report_metrics(Update::Update(None));

loop {
if !self.core.target_state.is_follower() {
return Ok(());
}

self.core.check_report_metrics(&self);
self.core.reset_update_metrics();

let election_timeout = sleep_until(self.core.get_next_election_timeout()); // Value is updated as heartbeats are received.

tokio::select! {
Expand Down Expand Up @@ -1144,21 +1233,36 @@ pub struct LearnerState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: Raft
core: &'a mut RaftCore<C, N, S>,
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsOptionUpdater<C::NodeId>
for LearnerState<'a, C, N, S>
{
// the non-leader state use the default impl of `get_leader_metrics_option`
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LearnerState<'a, C, N, S> {
pub(self) fn new(core: &'a mut RaftCore<C, N, S>) -> Self {
Self { core }
}

/// Run the learner loop.
#[tracing::instrument(level="debug", skip(self), fields(id=display(self.core.id), raft_state="learner"))]
pub(self) async fn run(mut self) -> Result<(), Fatal<C>> {
pub(self) async fn run(self) -> Result<(), Fatal<C>> {
self.learner_loop().await?;
Ok(())
}

pub(self) async fn learner_loop(mut self) -> Result<(), Fatal<C>> {
// report the new state before enter the loop
self.core.report_metrics(Update::Update(None));

loop {
if !self.core.target_state.is_learner() {
return Ok(());
}

self.core.check_report_metrics(&self);
self.core.reset_update_metrics();

let span = tracing::debug_span!("CHrx:LearnerState");
let _ent = span.enter();

Expand Down
1 change: 1 addition & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub use crate::raft_types::SnapshotId;
pub use crate::raft_types::SnapshotSegmentId;
pub use crate::raft_types::StateMachineChanges;
pub use crate::raft_types::Update;
pub use crate::raft_types::UpdateMetricsOption;
pub use crate::replication::ReplicationMetrics;
pub use crate::storage::RaftLogReader;
pub use crate::storage::RaftSnapshotBuilder;
Expand Down
Loading

0 comments on commit f5b5d54

Please sign in to comment.