Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive syncing metrics #5410

Merged
merged 9 commits into from
Aug 23, 2024
11 changes: 11 additions & 0 deletions prdoc/pr_5410.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
title: Reactive syncing metrics

doc:
- audience: Node Dev
description: |
Syncing metrics are now updated immediate as changes happen rather than every 1100ms as it was happening before.
This resulted in minor, but breaking API changes.

crates:
- name: sc-network-sync
bump: major
2 changes: 1 addition & 1 deletion substrate/client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ schnellru = { workspace = true }
smallvec = { workspace = true, default-features = true }
thiserror = { workspace = true }
tokio-stream = { workspace = true }
tokio = { features = ["macros", "time"], workspace = true, default-features = true }
tokio = { features = ["macros"], workspace = true, default-features = true }
fork-tree = { workspace = true, default-features = true }
prometheus-endpoint = { workspace = true, default-features = true }
sc-client-api = { workspace = true, default-features = true }
Expand Down
37 changes: 8 additions & 29 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use prometheus_endpoint::{
};
use prost::Message;
use schnellru::{ByLength, LruMap};
use tokio::time::{Interval, MissedTickBehavior};

use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider};
use sc_consensus::{import_queue::ImportQueueService, IncomingBlock};
Expand Down Expand Up @@ -93,9 +92,6 @@ use std::{
},
};

/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);

/// Maximum number of known block hashes to keep for a peer.
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead

Expand Down Expand Up @@ -219,9 +215,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Set of channels for other protocols that have subscribed to syncing events.
event_streams: Vec<TracingUnboundedSender<SyncEvent>>,

/// Interval at which we call `tick`.
tick_timeout: Interval,

/// All connected peers. Contains both full and light node peers.
peers: HashMap<PeerId, Peer<B>>,

Expand Down Expand Up @@ -436,12 +429,6 @@ where
let max_out_peers = net_config.network_config.default_peers_set.out_peers;
let max_in_peers = (max_full_peers - max_out_peers) as usize;

let tick_timeout = {
let mut interval = tokio::time::interval(TICK_TIMEOUT);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
interval
};

Ok((
Self {
roles,
Expand Down Expand Up @@ -469,7 +456,6 @@ where
max_in_peers,
event_streams: Vec::new(),
notification_service,
tick_timeout,
peer_store_handle,
metrics: if let Some(r) = metrics_registry {
match Metrics::register(r, is_major_syncing.clone()) {
Expand All @@ -493,15 +479,6 @@ where
))
}

/// Report Prometheus metrics.
pub fn report_metrics(&self) {
if let Some(metrics) = &self.metrics {
let n = u64::try_from(self.peers.len()).unwrap_or(std::u64::MAX);
metrics.peers.set(n);
}
self.strategy.report_metrics();
}

fn update_peer_info(
&mut self,
peer_id: &PeerId,
Expand Down Expand Up @@ -628,7 +605,6 @@ where
pub async fn run(mut self) {
loop {
tokio::select! {
_ = self.tick_timeout.tick() => self.perform_periodic_actions(),
command = self.service_rx.select_next_some() =>
self.process_service_command(command),
notification_event = self.notification_service.next_event() => match notification_event {
Expand Down Expand Up @@ -757,10 +733,6 @@ where
Ok(())
}

fn perform_periodic_actions(&mut self) {
self.report_metrics();
}

fn process_service_command(&mut self, command: ToServiceCommand<B>) {
match command {
ToServiceCommand::SetSyncForkRequest(peers, hash, number) => {
Expand Down Expand Up @@ -922,6 +894,9 @@ where
log::debug!(target: LOG_TARGET, "{peer_id} does not exist in `SyncingEngine`");
return
};
if let Some(metrics) = &self.metrics {
metrics.peers.dec();
}

if self.important_peers.contains(&peer_id) {
log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
Expand Down Expand Up @@ -1097,7 +1072,11 @@ where

log::debug!(target: LOG_TARGET, "Connected {peer_id}");

self.peers.insert(peer_id, peer);
if self.peers.insert(peer_id, peer).is_none() {
if let Some(metrics) = &self.metrics {
metrics.peers.inc();
}
}
self.peer_store_handle.set_peer_role(&peer_id, status.roles.into());

if self.default_peers_set_no_slot_peers.contains(&peer_id) {
Expand Down
Loading
Loading