Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive syncing metrics #5410

Merged
merged 9 commits into from
Aug 23, 2024
11 changes: 11 additions & 0 deletions prdoc/pr_5410.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
title: Reactive syncing metrics

doc:
- audience: Node Dev
description: |
Syncing metrics are now updated immediate as changes happen rather than every 1100ms as it was happening before.
This resulted in minor, but breaking API changes.

crates:
- name: sc-network-sync
bump: major
28 changes: 13 additions & 15 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,15 +471,6 @@ where
))
}

/// Report Prometheus metrics.
pub fn report_metrics(&self) {
if let Some(metrics) = &self.metrics {
let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX);
metrics.peers.set(n);
}
self.strategy.report_metrics();
}

fn update_peer_info(
&mut self,
peer_id: &PeerId,
Expand Down Expand Up @@ -606,7 +597,11 @@ where
pub async fn run(mut self) {
loop {
tokio::select! {
_ = self.tick_timeout.tick() => self.perform_periodic_actions(),
_ = self.tick_timeout.tick() => {
// TODO: This tick should not be necessary, but
// `self.process_strategy_actions()` is not called in some cases otherwise and
// some tests fail because of this
},
command = self.service_rx.select_next_some() =>
self.process_service_command(command),
notification_event = self.notification_service.next_event() => match notification_event {
Expand Down Expand Up @@ -724,10 +719,6 @@ where
Ok(())
}

fn perform_periodic_actions(&mut self) {
self.report_metrics();
}

fn process_service_command(&mut self, command: ToServiceCommand<B>) {
match command {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
Expand Down Expand Up @@ -873,6 +864,9 @@ where
log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
return
};
if let Some(metrics) = &self.metrics {
metrics.peers.dec();
}

if self.important_peers.contains(&peer_id) {
log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
Expand Down Expand Up @@ -1048,7 +1042,11 @@ where

log::debug!(target: LOG_TARGET, "Connected {peer_id}");

self.peers.insert(peer_id, peer);
if self.peers.insert(peer_id, peer).is_none() {
if let Some(metrics) = &self.metrics {
metrics.peers.inc();
}
}
self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());

if self.default_peers_set_no_slot_peers.contains(&peer_id) {
Expand Down
128 changes: 109 additions & 19 deletions substrate/client/network/sync/src/justification_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
//! that don't make sense after one of the forks is finalized).

use crate::{
request_metrics::Metrics,
strategy::chain_sync::{PeerSync, PeerSyncState},
LOG_TARGET,
};
use fork_tree::ForkTree;
use log::{debug, trace, warn};
use prometheus_endpoint::{
prometheus::core::GenericGauge, register, GaugeVec, Opts, PrometheusError, Registry, U64,
};
use sc_network_types::PeerId;
use sp_blockchain::Error as ClientError;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
Expand All @@ -41,6 +43,34 @@ const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10);
/// Pending extra data request for the given block (hash and number).
type ExtraRequest<B> = (<B as BlockT>::Hash, NumberFor<B>);

#[derive(Debug)]
struct Metrics {
pending: GenericGauge<U64>,
active: GenericGauge<U64>,
failed: GenericGauge<U64>,
importing: GenericGauge<U64>,
}

impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
let justifications = GaugeVec::<U64>::new(
Opts::new(
"substrate_sync_extra_justifications",
"Number of extra justifications requests",
),
&["status"],
)?;
let justifications = register(justifications, registry)?;

Ok(Self {
pending: justifications.with_label_values(&["pending"]),
active: justifications.with_label_values(&["active"]),
failed: justifications.with_label_values(&["failed"]),
importing: justifications.with_label_values(&["importing"]),
})
}
}

/// Manages pending block extra data (e.g. justification) requests.
///
/// Multiple extras may be requested for competing forks, or for the same branch
Expand All @@ -62,10 +92,14 @@ pub(crate) struct ExtraRequests<B: BlockT> {
importing_requests: HashSet<ExtraRequest<B>>,
/// the name of this type of extra request (useful for logging.)
request_type_name: &'static str,
metrics: Option<Metrics>,
}

impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn new(request_type_name: &'static str) -> Self {
pub(crate) fn new(
request_type_name: &'static str,
metrics_registry: Option<&Registry>,
) -> Self {
Self {
tree: ForkTree::new(),
best_seen_finalized_number: Zero::zero(),
Expand All @@ -74,6 +108,16 @@ impl<B: BlockT> ExtraRequests<B> {
failed_requests: HashMap::new(),
importing_requests: HashSet::new(),
request_type_name,
metrics: metrics_registry.and_then(|registry| {
Metrics::register(registry)
.inspect_err(|error| {
log::error!(
target: LOG_TARGET,
"Failed to register `ExtraRequests` metrics {error}",
);
})
.ok()
}),
}
}

Expand All @@ -83,6 +127,12 @@ impl<B: BlockT> ExtraRequests<B> {
self.pending_requests.clear();
self.active_requests.clear();
self.failed_requests.clear();

if let Some(metrics) = &self.metrics {
metrics.pending.set(0);
metrics.active.set(0);
metrics.failed.set(0);
}
}

/// Returns an iterator-like struct that yields peers which extra
Expand All @@ -100,6 +150,9 @@ impl<B: BlockT> ExtraRequests<B> {
Ok(true) => {
// this is a new root so we add it to the current `pending_requests`
self.pending_requests.push_back((request.0, request.1));
if let Some(metrics) = &self.metrics {
metrics.pending.inc();
}
},
Err(fork_tree::Error::Revert) => {
// we have finalized further than the given request, presumably
Expand All @@ -117,6 +170,10 @@ impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn peer_disconnected(&mut self, who: &PeerId) {
if let Some(request) = self.active_requests.remove(who) {
self.pending_requests.push_front(request);
if let Some(metrics) = &self.metrics {
metrics.active.dec();
metrics.pending.inc();
dmitry-markin marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand All @@ -130,13 +187,21 @@ impl<B: BlockT> ExtraRequests<B> {
// currently enforced by the outer network protocol before passing on
// messages to chain sync.
if let Some(request) = self.active_requests.remove(&who) {
if let Some(metrics) = &self.metrics {
metrics.active.dec();
}
Comment on lines 189 to +192
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would grouping the request hashmaps (or vec) with metrics help here?
Some time in the future we might do a active_requests.remove() and forget to decrement the appropriate metric.

One suggestion might be to group them into a wrapper:

struct ActiveRequestsMetered {
    inner: HashMap<..>, // Current self.active_requests
    metrics: Option<Metrics>,
}

Maybe an even simpler approach would be to introduce a fn active_requests_remove() { ...; metrics.active.dec(); }.
Could also be a followup if would take too long to implement :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several of these in the code and I'm not sure how much it would help with ergonomics to be completely honest. Metrics are one of those things that are just inherently invasive unfortunately.


if let Some(r) = resp {
trace!(target: LOG_TARGET,
"Queuing import of {} from {:?} for {:?}",
self.request_type_name, who, request,
);

self.importing_requests.insert(request);
if self.importing_requests.insert(request) {
if let Some(metrics) = &self.metrics {
metrics.importing.inc();
}
}
return Some((who, request.0, request.1, r))
} else {
trace!(target: LOG_TARGET,
Expand All @@ -146,6 +211,10 @@ impl<B: BlockT> ExtraRequests<B> {
}
self.failed_requests.entry(request).or_default().push((who, Instant::now()));
self.pending_requests.push_front(request);
if let Some(metrics) = &self.metrics {
metrics.failed.set(self.failed_requests.len().try_into().unwrap_or(u64::MAX));
metrics.pending.inc();
}
} else {
trace!(target: LOG_TARGET,
Copy link
Contributor

@dmitry-markin dmitry-markin Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is out of scope of this PR, but it looks like we can legitimately hit this if the response was delivered after the peer disconnected event arrived. This can happen depending on the order different protocols/streams are polled as these events are emitted by different objects (notifications protocol for sync peers and request-response protocol for responses).

Created an issue for this: #5414.

"No active {} request to {:?}",
Expand Down Expand Up @@ -194,6 +263,11 @@ impl<B: BlockT> ExtraRequests<B> {
self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &())));
self.active_requests.retain(|_, (h, n)| roots.contains(&(h, n, &())));
self.failed_requests.retain(|(h, n), _| roots.contains(&(h, n, &())));
if let Some(metrics) = &self.metrics {
metrics.pending.set(self.pending_requests.len().try_into().unwrap_or(u64::MAX));
metrics.active.set(self.active_requests.len().try_into().unwrap_or(u64::MAX));
metrics.failed.set(self.failed_requests.len().try_into().unwrap_or(u64::MAX));
}

Ok(())
}
Expand All @@ -210,12 +284,18 @@ impl<B: BlockT> ExtraRequests<B> {
if !self.importing_requests.remove(&request) {
return false
}
if let Some(metrics) = &self.metrics {
metrics.importing.dec();
}

let (finalized_hash, finalized_number) = match result {
Ok(req) => (req.0, req.1),
Err(_) => {
if reschedule_on_failure {
self.pending_requests.push_front(request);
if let Some(metrics) = &self.metrics {
metrics.pending.inc();
}
}
return true
},
Expand All @@ -233,6 +313,11 @@ impl<B: BlockT> ExtraRequests<B> {
self.active_requests.clear();
self.pending_requests.clear();
self.pending_requests.extend(self.tree.roots().map(|(&h, &n, _)| (h, n)));
if let Some(metrics) = &self.metrics {
metrics.failed.set(0);
metrics.active.set(0);
metrics.pending.set(self.pending_requests.len().try_into().unwrap_or(u64::MAX));
}
self.best_seen_finalized_number = finalized_number;

true
Expand All @@ -249,16 +334,6 @@ impl<B: BlockT> ExtraRequests<B> {
pub(crate) fn pending_requests(&self) -> impl Iterator<Item = &ExtraRequest<B>> {
self.pending_requests.iter()
}

/// Get some key metrics.
pub(crate) fn metrics(&self) -> Metrics {
Metrics {
pending_requests: self.pending_requests.len().try_into().unwrap_or(std::u32::MAX),
active_requests: self.active_requests.len().try_into().unwrap_or(std::u32::MAX),
failed_requests: self.failed_requests.len().try_into().unwrap_or(std::u32::MAX),
importing_requests: self.importing_requests.len().try_into().unwrap_or(std::u32::MAX),
}
}
}

/// Matches peers with pending extra requests.
Expand Down Expand Up @@ -301,8 +376,17 @@ impl<'a, B: BlockT> Matcher<'a, B> {
for requests in self.extras.failed_requests.values_mut() {
requests.retain(|(_, instant)| instant.elapsed() < EXTRA_RETRY_WAIT);
}
if let Some(metrics) = &self.extras.metrics {
metrics
.failed
.set(self.extras.failed_requests.len().try_into().unwrap_or(u64::MAX));
}

while let Some(request) = self.extras.pending_requests.pop_front() {
if let Some(metrics) = &self.extras.metrics {
metrics.pending.dec();
}

for (peer, sync) in
peers.iter().filter(|(_, sync)| sync.state == PeerSyncState::Available)
{
Expand All @@ -326,6 +410,9 @@ impl<'a, B: BlockT> Matcher<'a, B> {
continue
}
self.extras.active_requests.insert(*peer, request);
if let Some(metrics) = &self.extras.metrics {
metrics.active.inc();
}

trace!(target: LOG_TARGET,
"Sending {} request to {:?} for {:?}",
Expand All @@ -336,6 +423,9 @@ impl<'a, B: BlockT> Matcher<'a, B> {
}

self.extras.pending_requests.push_back(request);
if let Some(metrics) = &self.extras.metrics {
metrics.pending.inc();
}
self.remaining -= 1;

if self.remaining == 0 {
Expand All @@ -359,7 +449,7 @@ mod tests {
#[test]
fn requests_are_processed_in_order() {
fn property(mut peers: ArbitraryPeers) {
let mut requests = ExtraRequests::<Block>::new("test");
let mut requests = ExtraRequests::<Block>::new("test", None);

let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
Expand All @@ -385,7 +475,7 @@ mod tests {
#[test]
fn new_roots_schedule_new_request() {
fn property(data: Vec<BlockNumber>) {
let mut requests = ExtraRequests::<Block>::new("test");
let mut requests = ExtraRequests::<Block>::new("test", None);
for (i, number) in data.into_iter().enumerate() {
let hash = [i as u8; 32].into();
let pending = requests.pending_requests.len();
Expand All @@ -402,7 +492,7 @@ mod tests {
#[test]
fn disconnecting_implies_rescheduling() {
fn property(mut peers: ArbitraryPeers) -> bool {
let mut requests = ExtraRequests::<Block>::new("test");
let mut requests = ExtraRequests::<Block>::new("test", None);

let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
Expand Down Expand Up @@ -438,7 +528,7 @@ mod tests {
#[test]
fn no_response_reschedules() {
fn property(mut peers: ArbitraryPeers) {
let mut requests = ExtraRequests::<Block>::new("test");
let mut requests = ExtraRequests::<Block>::new("test", None);

let num_peers_available =
peers.0.values().filter(|s| s.state == PeerSyncState::Available).count();
Expand Down Expand Up @@ -480,7 +570,7 @@ mod tests {
fn request_is_rescheduled_when_earlier_block_is_finalized() {
sp_tracing::try_init_simple();

let mut finality_proofs = ExtraRequests::<Block>::new("test");
let mut finality_proofs = ExtraRequests::<Block>::new("test", None);

let hash4 = [4; 32].into();
let hash5 = [5; 32].into();
Expand Down Expand Up @@ -521,7 +611,7 @@ mod tests {

#[test]
fn ancestor_roots_are_finalized_when_finality_notification_is_missed() {
let mut finality_proofs = ExtraRequests::<Block>::new("test");
let mut finality_proofs = ExtraRequests::<Block>::new("test", None);

let hash4 = [4; 32].into();
let hash5 = [5; 32].into();
Expand Down
Loading
Loading