Skip to content

Commit

Permalink
reworks gossip packets-sent metrics
Browse files Browse the repository at this point in the history
A lot of gossip packets are not counted, and for some there are
duplicate metrics. The commit adds helper methods to count gossip packet
every time we create outbound packets.
  • Loading branch information
behzadnouri committed Feb 11, 2025
1 parent e9d6f1e commit b8f55a7
Showing 1 changed file with 90 additions and 57 deletions.
147 changes: 90 additions & 57 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use {
weighted_shuffle::WeightedShuffle,
},
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
itertools::Itertools,
itertools::{Either, Itertools},
rand::{seq::SliceRandom, CryptoRng, Rng},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_ledger::shred::Shred,
Expand Down Expand Up @@ -90,6 +90,7 @@ use {
num::NonZeroUsize,
ops::{Deref, Div},
path::{Path, PathBuf},
rc::Rc,
result::Result,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -1204,16 +1205,12 @@ impl ClusterInfo {
pulls.push((entrypoint, filters));
}

#[allow(clippy::type_complexity)]
fn new_pull_requests(
&self,
thread_pool: &ThreadPool,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
) -> (
Vec<(SocketAddr, Ping)>, // Ping packets.
Vec<(SocketAddr, Protocol)>, // Pull requests
) {
) -> impl Iterator<Item = (SocketAddr, Protocol)> {
let now = timestamp();
let self_info = CrdsValue::new(CrdsData::from(self.my_contact_info()), &self.keypair());
let max_bloom_filter_bytes = get_max_bloom_filter_bytes(&self_info);
Expand All @@ -1236,15 +1233,18 @@ impl ClusterInfo {
.unwrap_or_default()
};
self.append_entrypoint_to_pulls(thread_pool, max_bloom_filter_bytes, &mut pulls);
let pulls = pulls
let pings = pings
.into_iter()
.map(|(addr, ping)| (addr, Protocol::PingMessage(ping)));
pulls
.into_iter()
.filter_map(|(peer, filters)| Some((peer.gossip()?, filters)))
.flat_map(|(addr, filters)| repeat(addr).zip(filters))
.map(|(gossip_addr, filter)| {
.map(move |(gossip_addr, filter)| {
let request = Protocol::PullRequest(filter, self_info.clone());
(gossip_addr, request)
});
(pings, pulls.collect())
})
.chain(pings)
}

pub fn flush_push_queue(&self) {
Expand All @@ -1258,7 +1258,10 @@ impl ClusterInfo {
}
}
}
fn new_push_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
fn new_push_requests(
&self,
stakes: &HashMap<Pubkey, u64>,
) -> impl Iterator<Item = (SocketAddr, Protocol)> {
let self_id = self.id();
let (entries, push_messages, num_pushes) = {
let _st = ScopedTimer::from(&self.stats.new_push_requests);
Expand All @@ -1282,19 +1285,21 @@ impl ClusterInfo {
push_messages
.into_iter()
.filter_map(|(pubkey, messages)| {
let peer: &ContactInfo = gossip_crds.get(pubkey)?;
Some((peer.gossip()?, messages))
let addr = get_gossip_addr(pubkey, &gossip_crds, &self.socket_addr_space)?;
Some((addr, messages))
})
.collect()
};
let entries = Rc::new(entries);
push_messages
.into_iter()
.flat_map(|(peer, msgs)| {
let msgs = msgs.into_iter().map(|k| entries[k].clone());
split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs)
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload)))
.flat_map(move |(peer, msgs): (SocketAddr, Vec<usize>)| {
let entries = Rc::clone(&entries);
let msgs = msgs.into_iter().map(move |k| entries[k].clone());
let msgs = split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs)
.map(move |msgs| Protocol::PushMessage(self_id, msgs));
repeat(peer).zip(msgs)
})
.collect()
}

// Generate new push and pull requests
Expand All @@ -1304,23 +1309,19 @@ impl ClusterInfo {
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
generate_pull_requests: bool,
) -> Vec<(SocketAddr, Protocol)> {
) -> impl Iterator<Item = (SocketAddr, Protocol)> {
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes);
// This will flush local pending push messages before generating
// pull-request bloom filters, preventing pull responses to return the
// same values back to the node itself. Note that packets will arrive
// and are processed out of order.
let mut out: Vec<_> = self.new_push_requests(stakes);
let out = self.new_push_requests(stakes);
if generate_pull_requests {
let (pings, pull_requests) =
self.new_pull_requests(thread_pool, gossip_validators, stakes);
let pings = pings
.into_iter()
.map(|(addr, ping)| (addr, Protocol::PingMessage(ping)));
out.extend(pull_requests);
out.extend(pings);
let pulls = self.new_pull_requests(thread_pool, gossip_validators, stakes);
Either::Right(out.chain(pulls))
} else {
Either::Left(out)
}
out
}

/// At random pick a node and try to get updated changes from them
Expand All @@ -1334,13 +1335,18 @@ impl ClusterInfo {
generate_pull_requests: bool,
) -> Result<(), GossipError> {
let _st = ScopedTimer::from(&self.stats.gossip_transmit_loop_time);
let reqs = self.generate_new_gossip_requests(
let mut packet_batch = PacketBatch::new_unpinned_with_recycler(recycler, 0, "run_gossip");
self.generate_new_gossip_requests(
thread_pool,
gossip_validators,
stakes,
generate_pull_requests,
);
send_gossip_packets(reqs, recycler, sender, &self.stats);
)
.filter_map(|(addr, data)| make_gossip_packet(addr, &data, &self.stats))
.for_each(|pkt| packet_batch.push(pkt));
if !packet_batch.is_empty() {
sender.send(packet_batch)?;
}
self.stats
.gossip_transmit_loop_iterations_since_last_report
.add_relaxed(1);
Expand Down Expand Up @@ -1843,10 +1849,6 @@ impl ClusterInfo {
if messages.is_empty() {
return;
}
let num_crds_values: u64 = messages.iter().map(|(_, data)| data.len() as u64).sum();
self.stats
.push_message_value_count
.add_relaxed(num_crds_values);
// Origins' pubkeys of upserted crds values.
let origins: HashSet<_> = {
let _st = ScopedTimer::from(&self.stats.process_push_message);
Expand All @@ -1856,16 +1858,9 @@ impl ClusterInfo {
// Generate prune messages.
let prune_messages = self.generate_prune_messages(thread_pool, origins, stakes);
let mut packet_batch = make_gossip_packet_batch(prune_messages, recycler, &self.stats);
let new_push_requests = self.new_push_requests(stakes);
for (address, request) in new_push_requests {
if ContactInfo::is_valid_address(&address, &self.socket_addr_space) {
if let Some(pkt) = make_gossip_packet(address, &request, &self.stats) {
packet_batch.push(pkt);
}
} else {
trace!("Dropping Gossip push response, as destination is unknown");
}
}
self.new_push_requests(stakes)
.filter_map(|(addr, data)| make_gossip_packet(addr, &data, &self.stats))
.for_each(|pkt| packet_batch.push(pkt));
if !packet_batch.is_empty() {
let _ = response_sender.send(packet_batch);
}
Expand Down Expand Up @@ -1897,7 +1892,7 @@ impl ClusterInfo {
prunes
.into_par_iter()
.filter_map(|(pubkey, prunes)| {
let addr = gossip_crds.get::<&ContactInfo>(pubkey)?.gossip()?;
let addr = get_gossip_addr(pubkey, &gossip_crds, &self.socket_addr_space)?;
Some((pubkey, addr, prunes))
})
.collect()
Expand Down Expand Up @@ -2034,6 +2029,9 @@ impl ClusterInfo {
}
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
self.stats
.push_message_value_count
.add_relaxed(data.len() as u64);
push_messages.push((from, data));
}
}
Expand Down Expand Up @@ -2937,6 +2935,17 @@ fn filter_on_shred_version(
}
}

// Returns ContactInfo.gossip address filtered out by socket_addr_space.
#[inline]
fn get_gossip_addr(
pubkey: Pubkey,
crds: &Crds,
socket_addr_space: &SocketAddrSpace,
) -> Option<SocketAddr> {
let node = crds.get::<&ContactInfo>(pubkey)?;
node.gossip().filter(|addr| socket_addr_space.check(addr))
}

// If the CRDS value is an unstaked contact-info, verifies if
// it has responded to ping on its gossip socket address.
// Returns false if the CRDS value should be discarded.
Expand Down Expand Up @@ -3042,6 +3051,30 @@ mod tests {
const DEFAULT_NUM_QUIC_ENDPOINTS: NonZeroUsize =
unsafe { NonZeroUsize::new_unchecked(DEFAULT_QUIC_ENDPOINTS) };

impl ClusterInfo {
// Wrapper for ClusterInfo.new_pull_requests replicating old return
// type for legacy tests.
#[allow(clippy::type_complexity)]
fn old_new_pull_requests(
&self,
thread_pool: &ThreadPool,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
) -> (
Vec<(SocketAddr, Ping)>, // Ping packets
Vec<(SocketAddr, Protocol)>, // Pull requests
) {
self.new_pull_requests(thread_pool, gossip_validators, stakes)
.partition_map(|(addr, protocol)| {
if let Protocol::PingMessage(ping) = protocol {
Either::Left((addr, ping))
} else {
Either::Right((addr, protocol))
}
})
}
}

#[test]
fn test_gossip_node() {
//check that a gossip nodes always show up as spies
Expand Down Expand Up @@ -3211,18 +3244,16 @@ mod tests {
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
let reqs = cluster_info.generate_new_gossip_requests(
let mut reqs = cluster_info.generate_new_gossip_requests(
&thread_pool,
None, // gossip_validators
&HashMap::new(), // stakes
true, // generate_pull_requests
);
//assert none of the addrs are invalid.
reqs.iter().all(|(addr, _)| {
let res = ContactInfo::is_valid_address(addr, &SocketAddrSpace::Unspecified);
assert!(res);
res
});
assert!(reqs.all(|(addr, _)| {
ContactInfo::is_valid_address(&addr, &SocketAddrSpace::Unspecified)
}));
}

#[test]
Expand Down Expand Up @@ -3707,7 +3738,8 @@ mod tests {
let entrypoint_pubkey = solana_pubkey::new_rand();
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
cluster_info.set_entrypoint(entrypoint.clone());
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
let (pings, pulls) =
cluster_info.old_new_pull_requests(&thread_pool, None, &HashMap::new());
assert!(pings.is_empty());
assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS);
for (addr, msg) in pulls {
Expand All @@ -3730,7 +3762,8 @@ mod tests {
Duration::from_millis(cluster_info.gossip.pull.crds_timeout),
);
cluster_info.handle_pull_response(vec![entrypoint_crdsvalue], &timeouts);
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
let (pings, pulls) =
cluster_info.old_new_pull_requests(&thread_pool, None, &HashMap::new());
assert_eq!(pings.len(), 1);
assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS);
assert_eq!(*cluster_info.entrypoints.read().unwrap(), vec![entrypoint]);
Expand Down Expand Up @@ -3805,7 +3838,7 @@ mod tests {

// Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a
// fresh timestamp). There should only be one pull request to `other_node`
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
let (pings, pulls) = cluster_info.old_new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS);
assert!(pulls
Expand All @@ -3815,7 +3848,7 @@ mod tests {
// Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should
// now be two pull requests
cluster_info.entrypoints.write().unwrap()[0].set_wallclock(0);
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
let (pings, pulls) = cluster_info.old_new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(pulls.len(), 2 * MIN_NUM_BLOOM_FILTERS);
for node in [&other_node, &entrypoint] {
Expand All @@ -3829,7 +3862,7 @@ mod tests {
}
// Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should
// only be one pull request to `other_node`
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
let (pings, pulls) = cluster_info.old_new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS);
assert!(pulls
Expand Down

0 comments on commit b8f55a7

Please sign in to comment.