Skip to content

Commit

Permalink
Change: Do not copy replication metrics over and over
Browse files Browse the repository at this point in the history
Replication metrics were cloned for each leader metrics update. This is not necessary. Instead, keep a hashmap of replication metrics with *atomic* indices of matching log on individual replicas. Only if anything else changes clone the map.

The change improves the performance of the empty Raft benchmark by about 5% by reducing reallocations of the replication metrics.

Further improvement could be made by reducing the number of atomic operations on the `Arc` backing the `LeaderMetrics`, but that's likely not worth it as of now.

NOTE: This change slightly changes the API of `ReplicationMetrics`. Previously, public access was given to the member `matched`. Now, the member access was replaced with a method `matched()` and a constructor to construct the object. I.e., any user of the `ReplicationMetrics` interested in exact log position on the replica needs to be adjusted to call the method `matched()` instead of just using `matched` member. To prevent such issues in the future, the APIs should not expose members, only accessor functions. This should be taken into consideration in the upcoming API refactoring.
  • Loading branch information
schreter committed Feb 22, 2022
1 parent 2055a23 commit 5ba730c
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 15 deletions.
4 changes: 3 additions & 1 deletion openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,9 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

tracing::info!("removed replication to: {}", target);
self.nodes.remove(&target);
self.leader_metrics.replication.remove(&target);
let mut metrics_clone = self.leader_metrics.as_ref().clone();
metrics_clone.replication.remove(&target);
self.leader_metrics = Arc::new(metrics_clone);
true
}
}
8 changes: 4 additions & 4 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "trace", skip(self))]
fn report_metrics(&mut self, leader_metrics: Update<Option<&LeaderMetrics>>) {
fn report_metrics(&mut self, leader_metrics: Update<Option<&Arc<LeaderMetrics>>>) {
let leader_metrics = match leader_metrics {
Update::Update(v) => v.map(|m| Arc::new(m.clone())),
Update::Update(v) => v.cloned(),
Update::AsIs => self.tx_metrics.borrow().leader_metrics.clone(),
};

Expand Down Expand Up @@ -731,7 +731,7 @@ struct LeaderState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: Raf
pub(super) nodes: BTreeMap<NodeId, ReplicationState>,

/// The metrics about a leader
pub leader_metrics: LeaderMetrics,
pub leader_metrics: Arc<LeaderMetrics>,

/// The stream of events coming from replication streams.
pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent<S::SnapshotData>, Span)>,
Expand All @@ -750,7 +750,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
Self {
core,
nodes: BTreeMap::new(),
leader_metrics: LeaderMetrics::default(),
leader_metrics: Arc::new(LeaderMetrics::default()),
replication_tx,
replication_rx,
awaiting_committed: Vec::new(),
Expand Down
24 changes: 23 additions & 1 deletion openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::BTreeMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use tokio::sync::oneshot;
use tracing_futures::Instrument;
Expand Down Expand Up @@ -175,7 +178,26 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
#[tracing::instrument(level = "trace", skip(self))]
fn update_leader_metrics(&mut self, target: NodeId, matched: Option<LogId>) {
tracing::debug!(%target, ?matched, "update_leader_metrics");
self.leader_metrics.replication.insert(target, ReplicationMetrics { matched });
let (matched_leader_id, matched_index) = if let Some(log_id) = matched {
(Some(log_id.leader_id), log_id.index)
} else {
(None, 0)
};
if let Some(target_metrics) = self.leader_metrics.replication.get(&target) {
if target_metrics.matched_leader_id == matched_leader_id {
// we can update the metrics in-place
target_metrics.matched_index.store(matched_index, Ordering::Relaxed);
return;
}
}
// either the record does not exist or the leader ID is different
// create a new object with updated metrics
let mut metrics_clone = self.leader_metrics.as_ref().clone();
metrics_clone.replication.insert(target, ReplicationMetrics {
matched_leader_id,
matched_index: AtomicU64::new(matched_index),
});
self.leader_metrics = Arc::new(metrics_clone);
}

#[tracing::instrument(level = "trace", skip(self))]
Expand Down
49 changes: 46 additions & 3 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Replication stream.

use std::io::SeekFrom;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use futures::future::FutureExt;
Expand Down Expand Up @@ -37,6 +39,7 @@ use crate::AppData;
use crate::AppDataResponse;
use crate::ErrorSubject;
use crate::ErrorVerb;
use crate::LeaderId;
use crate::LogId;
use crate::MessageSummary;
use crate::Node;
Expand All @@ -47,14 +50,54 @@ use crate::RaftStorage;
use crate::ToStorageResult;
use crate::Vote;

#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct ReplicationMetrics {
pub matched: Option<LogId>,
pub(crate) matched_leader_id: Option<LeaderId>,
pub(crate) matched_index: AtomicU64,
}

impl Clone for ReplicationMetrics {
fn clone(&self) -> Self {
Self {
matched_leader_id: self.matched_leader_id,
matched_index: AtomicU64::new(self.matched_index.load(Ordering::Relaxed)),
}
}
}

impl PartialEq for ReplicationMetrics {
fn eq(&self, other: &Self) -> bool {
self.matched_leader_id == other.matched_leader_id
&& self.matched_index.load(Ordering::Relaxed) == other.matched_index.load(Ordering::Relaxed)
}
}

impl Eq for ReplicationMetrics {}

impl ReplicationMetrics {
pub fn new(log_id: Option<LogId>) -> Self {
if let Some(log_id) = log_id {
Self {
matched_leader_id: Some(log_id.leader_id),
matched_index: AtomicU64::new(log_id.index),
}
} else {
Self::default()
}
}
pub fn matched(&self) -> Option<LogId> {
if let Some(leader_id) = self.matched_leader_id {
let index = self.matched_index.load(Ordering::Relaxed);
Some(LogId { leader_id, index })
} else {
None
}
}
}

impl MessageSummary for ReplicationMetrics {
fn summary(&self) -> String {
format!("{:?}", self.matched)
format!("{:?}", self.matched())
}
}

Expand Down
8 changes: 2 additions & 6 deletions openraft/tests/metrics/t30_leader_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,7 @@ async fn leader_metrics() -> Result<()> {

router.assert_stable_cluster(Some(1), Some(log_index)).await; // Still in term 1, so leader is still node 0.

let ww = ReplicationMetrics {
matched: Some(LogId::new(LeaderId::new(1, 0), log_index)),
};
let ww = ReplicationMetrics::new(Some(LogId::new(LeaderId::new(1, 0), log_index)));
let want_repl = hashmap! { 1=>ww.clone(), 2=>ww.clone(), 3=>ww.clone(), 4=>ww.clone(), };
router
.wait_for_metrics(
Expand Down Expand Up @@ -157,9 +155,7 @@ async fn leader_metrics() -> Result<()> {

tracing::info!("--- replication metrics should reflect the replication state");
{
let ww = ReplicationMetrics {
matched: Some(LogId::new(LeaderId::new(1, 0), log_index)),
};
let ww = ReplicationMetrics::new(Some(LogId::new(LeaderId::new(1, 0), log_index)));
let want_repl = hashmap! { 1=>ww.clone(), 2=>ww.clone(), 3=>ww.clone()};
router
.wait_for_metrics(
Expand Down

0 comments on commit 5ba730c

Please sign in to comment.