diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 16f2f2426d69af..bd565e19ee32f0 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -2562,7 +2562,7 @@ impl ClusterInfo { thread_pool: &ThreadPool, recycler: &PacketsRecycler, response_sender: &PacketSender, - stakes: HashMap, + stakes: &HashMap, feature_set: Option<&FeatureSet>, epoch_time_ms: u64, should_check_duplicate_instance: bool, @@ -2635,13 +2635,13 @@ impl ClusterInfo { self.stats .packets_received_prune_messages_count .add_relaxed(prune_messages.len() as u64); - let require_stake_for_gossip = self.require_stake_for_gossip(feature_set, &stakes); + let require_stake_for_gossip = self.require_stake_for_gossip(feature_set, stakes); if require_stake_for_gossip { for (_, data) in &mut pull_responses { - retain_staked(data, &stakes); + retain_staked(data, stakes); } for (_, data) in &mut push_messages { - retain_staked(data, &stakes); + retain_staked(data, stakes); } pull_responses.retain(|(_, data)| !data.is_empty()); push_messages.retain(|(_, data)| !data.is_empty()); @@ -2652,18 +2652,18 @@ impl ClusterInfo { push_messages, thread_pool, recycler, - &stakes, + stakes, response_sender, require_stake_for_gossip, ); - self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms); - self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes); + self.handle_batch_pull_responses(pull_responses, thread_pool, stakes, epoch_time_ms); + self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes); self.handle_batch_pong_messages(pong_messages, Instant::now()); self.handle_batch_pull_requests( pull_requests, thread_pool, recycler, - &stakes, + stakes, response_sender, require_stake_for_gossip, ); @@ -2682,6 +2682,7 @@ impl ClusterInfo { should_check_duplicate_instance: bool, ) -> Result<()> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); + const SUBMIT_GOSSIP_STATS_INTERVAL: Duration = Duration::from_secs(2); let packets: Vec<_> = requests_receiver.recv_timeout(RECV_TIMEOUT)?.packets.into(); let mut packets = VecDeque::from(packets); while let Ok(packet) = requests_receiver.try_recv() { @@ -2712,13 +2713,13 @@ impl ClusterInfo { thread_pool, recycler, response_sender, - stakes, + &stakes, feature_set.as_deref(), epoch_time_ms, should_check_duplicate_instance, )?; - if last_print.elapsed().as_millis() > 2000 { - submit_gossip_stats(&self.stats, &self.gossip); + if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL { + submit_gossip_stats(&self.stats, &self.gossip, &stakes); *last_print = Instant::now(); } Ok(()) diff --git a/core/src/cluster_info_metrics.rs b/core/src/cluster_info_metrics.rs index 1250824560502c..2bdb5e00c3be1d 100644 --- a/core/src/cluster_info_metrics.rs +++ b/core/src/cluster_info_metrics.rs @@ -1,6 +1,8 @@ use crate::crds_gossip::CrdsGossip; use solana_measure::measure::Measure; +use solana_sdk::pubkey::Pubkey; use std::{ + collections::HashMap, sync::{ atomic::{AtomicU64, Ordering}, RwLock, @@ -116,15 +118,21 @@ pub(crate) struct GossipStats { pub(crate) tvu_peers: Counter, } -pub(crate) fn submit_gossip_stats(stats: &GossipStats, gossip: &RwLock) { - let (table_size, purged_values_size, failed_inserts_size) = { +pub(crate) fn submit_gossip_stats( + stats: &GossipStats, + gossip: &RwLock, + stakes: &HashMap, +) { + let (table_size, num_nodes, purged_values_size, failed_inserts_size) = { let gossip = gossip.read().unwrap(); ( gossip.crds.len(), + gossip.crds.num_nodes(), gossip.pull.purged_values.len(), gossip.pull.failed_inserts.len(), ) }; + let num_nodes_staked = stakes.values().filter(|stake| **stake > 0).count(); datapoint_info!( "cluster_info_stats", ("entrypoint", stats.entrypoint.clear(), i64), @@ -142,6 +150,8 @@ pub(crate) fn submit_gossip_stats(stats: &GossipStats, gossip: &RwLock