From f1e371039b3defb15ad5df8732d5557d2f344836 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 10 Feb 2025 06:38:19 -0600 Subject: [PATCH] removes redundant allocations when generating new gossip pull/push messages Instead of collecting the intermediate values into vectors: https://github.com/anza-xyz/agave/blob/2b0966de4/gossip/src/cluster_info.rs#L1311-L1318 https://github.com/anza-xyz/agave/blob/2b0966de4/gossip/src/cluster_info.rs#L1257-L1268 https://github.com/anza-xyz/agave/blob/2b0966de4/gossip/src/crds_gossip_pull.rs#L292-L302 gossip new_{push,pull}_requests can just return an iterator. This iterator can then be written directly to a PacketBatch, bypassing the unnecessary vector allocations. --- gossip/src/cluster_info.rs | 202 ++++++++++++++++++++------------- gossip/src/crds_gossip.rs | 2 +- gossip/src/crds_gossip_pull.rs | 88 ++++++++++---- gossip/tests/crds_gossip.rs | 18 +++ 4 files changed, 205 insertions(+), 105 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 9b7db1b27bdf94..ac97ca51ef05e5 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -45,7 +45,7 @@ use { weighted_shuffle::WeightedShuffle, }, crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, - itertools::Itertools, + itertools::{Either, Itertools}, rand::{seq::SliceRandom, CryptoRng, Rng}, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_ledger::shred::Shred, @@ -90,6 +90,7 @@ use { num::NonZeroUsize, ops::{Deref, Div}, path::{Path, PathBuf}, + rc::Rc, result::Result, sync::{ atomic::{AtomicBool, Ordering}, @@ -1160,18 +1161,19 @@ impl ClusterInfo { &self, thread_pool: &ThreadPool, max_bloom_filter_bytes: usize, - pulls: &mut Vec<(ContactInfo, Vec)>, - ) { + pulls: impl Iterator + Clone, + ) -> impl Iterator { const THROTTLE_DELAY: u64 = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2; + let mut pulls = pulls.peekable(); let entrypoint = { let mut entrypoints = self.entrypoints.write().unwrap(); let Some(entrypoint) = entrypoints.choose_mut(&mut rand::thread_rng()) else { - return; + return Either::Left(pulls); }; - if !pulls.is_empty() { + if pulls.peek().is_some() { let now = timestamp(); if now <= entrypoint.wallclock().saturating_add(THROTTLE_DELAY) { - return; + return Either::Left(pulls); } entrypoint.set_wallclock(now); if let Some(entrypoint_gossip) = entrypoint.gossip() { @@ -1180,45 +1182,42 @@ impl ClusterInfo { .get_nodes_contact_info() .any(|node| node.gossip() == Some(entrypoint_gossip)) { - return; // Found the entrypoint, no need to pull from it + // Found the entrypoint, no need to pull from it. + return Either::Left(pulls); } } } - entrypoint.clone() + let Some(entrypoint) = entrypoint.gossip() else { + return Either::Left(pulls); + }; + entrypoint }; - let filters = if pulls.is_empty() { + let filters = if pulls.peek().is_none() { let _st = ScopedTimer::from(&self.stats.entrypoint2); - self.gossip.pull.build_crds_filters( - thread_pool, - &self.gossip.crds, - max_bloom_filter_bytes, + Either::Left( + self.gossip + .pull + .build_crds_filters(thread_pool, &self.gossip.crds, max_bloom_filter_bytes) + .into_iter(), ) } else { - pulls - .iter() - .flat_map(|(_, filters)| filters) - .cloned() - .collect() + Either::Right(pulls.clone().map(|(_, filter)| filter)) }; self.stats.pull_from_entrypoint_count.add_relaxed(1); - pulls.push((entrypoint, filters)); + Either::Right(pulls.chain(repeat(entrypoint).zip(filters))) } - #[allow(clippy::type_complexity)] fn new_pull_requests( &self, thread_pool: &ThreadPool, gossip_validators: Option<&HashSet>, stakes: &HashMap, - ) -> ( - Vec<(SocketAddr, Ping)>, // Ping packets. - Vec<(SocketAddr, Protocol)>, // Pull requests - ) { + ) -> impl Iterator { let now = timestamp(); let self_info = CrdsValue::new(CrdsData::from(self.my_contact_info()), &self.keypair()); let max_bloom_filter_bytes = get_max_bloom_filter_bytes(&self_info); let mut pings = Vec::new(); - let mut pulls = { + let pulls = { let _st = ScopedTimer::from(&self.stats.new_pull_requests); self.gossip .new_pull_request( @@ -1233,18 +1232,18 @@ impl ClusterInfo { &mut pings, &self.socket_addr_space, ) - .unwrap_or_default() + .into_iter() + .flatten() }; - self.append_entrypoint_to_pulls(thread_pool, max_bloom_filter_bytes, &mut pulls); - let pulls = pulls + let pings = pings .into_iter() - .filter_map(|(peer, filters)| Some((peer.gossip()?, filters))) - .flat_map(|(addr, filters)| repeat(addr).zip(filters)) - .map(|(gossip_addr, filter)| { + .map(|(addr, ping)| (addr, Protocol::PingMessage(ping))); + self.append_entrypoint_to_pulls(thread_pool, max_bloom_filter_bytes, pulls) + .map(move |(gossip_addr, filter)| { let request = Protocol::PullRequest(filter, self_info.clone()); (gossip_addr, request) - }); - (pings, pulls.collect()) + }) + .chain(pings) } pub fn flush_push_queue(&self) { @@ -1258,7 +1257,10 @@ impl ClusterInfo { } } } - fn new_push_requests(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { + fn new_push_requests( + &self, + stakes: &HashMap, + ) -> impl Iterator { let self_id = self.id(); let (entries, push_messages, num_pushes) = { let _st = ScopedTimer::from(&self.stats.new_push_requests); @@ -1282,19 +1284,26 @@ impl ClusterInfo { push_messages .into_iter() .filter_map(|(pubkey, messages)| { - let peer: &ContactInfo = gossip_crds.get(pubkey)?; - Some((peer.gossip()?, messages)) + let addr = get_node_addr( + pubkey, + ContactInfo::gossip, + &gossip_crds, + &self.socket_addr_space, + )?; + Some((addr, messages)) }) .collect() }; + let entries = Rc::new(entries); push_messages .into_iter() - .flat_map(|(peer, msgs)| { - let msgs = msgs.into_iter().map(|k| entries[k].clone()); - split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs) - .map(move |payload| (peer, Protocol::PushMessage(self_id, payload))) + .flat_map(move |(peer, msgs): (SocketAddr, Vec)| { + let entries = Rc::clone(&entries); + let msgs = msgs.into_iter().map(move |k| entries[k].clone()); + let msgs = split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs) + .map(move |msgs| Protocol::PushMessage(self_id, msgs)); + repeat(peer).zip(msgs) }) - .collect() } // Generate new push and pull requests @@ -1304,23 +1313,19 @@ impl ClusterInfo { gossip_validators: Option<&HashSet>, stakes: &HashMap, generate_pull_requests: bool, - ) -> Vec<(SocketAddr, Protocol)> { + ) -> impl Iterator { self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes); // This will flush local pending push messages before generating // pull-request bloom filters, preventing pull responses to return the // same values back to the node itself. Note that packets will arrive // and are processed out of order. - let mut out: Vec<_> = self.new_push_requests(stakes); + let out = self.new_push_requests(stakes); if generate_pull_requests { - let (pings, pull_requests) = - self.new_pull_requests(thread_pool, gossip_validators, stakes); - let pings = pings - .into_iter() - .map(|(addr, ping)| (addr, Protocol::PingMessage(ping))); - out.extend(pull_requests); - out.extend(pings); + let reqs = self.new_pull_requests(thread_pool, gossip_validators, stakes); + Either::Right(out.chain(reqs)) + } else { + Either::Left(out) } - out } /// At random pick a node and try to get updated changes from them @@ -1334,13 +1339,18 @@ impl ClusterInfo { generate_pull_requests: bool, ) -> Result<(), GossipError> { let _st = ScopedTimer::from(&self.stats.gossip_transmit_loop_time); - let reqs = self.generate_new_gossip_requests( + let mut packet_batch = PacketBatch::new_unpinned_with_recycler(recycler, 0, "run_gossip"); + self.generate_new_gossip_requests( thread_pool, gossip_validators, stakes, generate_pull_requests, - ); - send_gossip_packets(reqs, recycler, sender, &self.stats); + ) + .filter_map(|(addr, data)| make_gossip_packet(addr, &data, &self.stats)) + .for_each(|pkt| packet_batch.push(pkt)); + if !packet_batch.is_empty() { + sender.send(packet_batch)?; + } self.stats .gossip_transmit_loop_iterations_since_last_report .add_relaxed(1); @@ -1843,10 +1853,6 @@ impl ClusterInfo { if messages.is_empty() { return; } - let num_crds_values: u64 = messages.iter().map(|(_, data)| data.len() as u64).sum(); - self.stats - .push_message_value_count - .add_relaxed(num_crds_values); // Origins' pubkeys of upserted crds values. let origins: HashSet<_> = { let _st = ScopedTimer::from(&self.stats.process_push_message); @@ -1856,16 +1862,9 @@ impl ClusterInfo { // Generate prune messages. let prune_messages = self.generate_prune_messages(thread_pool, origins, stakes); let mut packet_batch = make_gossip_packet_batch(prune_messages, recycler, &self.stats); - let new_push_requests = self.new_push_requests(stakes); - for (address, request) in new_push_requests { - if ContactInfo::is_valid_address(&address, &self.socket_addr_space) { - if let Some(pkt) = make_gossip_packet(address, &request, &self.stats) { - packet_batch.push(pkt); - } - } else { - trace!("Dropping Gossip push response, as destination is unknown"); - } - } + self.new_push_requests(stakes) + .filter_map(|(addr, data)| make_gossip_packet(addr, &data, &self.stats)) + .for_each(|pkt| packet_batch.push(pkt)); if !packet_batch.is_empty() { let _ = response_sender.send(packet_batch); } @@ -1897,7 +1896,12 @@ impl ClusterInfo { prunes .into_par_iter() .filter_map(|(pubkey, prunes)| { - let addr = gossip_crds.get::<&ContactInfo>(pubkey)?.gossip()?; + let addr = get_node_addr( + pubkey, + ContactInfo::gossip, + &gossip_crds, + &self.socket_addr_space, + )?; Some((pubkey, addr, prunes)) }) .collect() @@ -2034,6 +2038,9 @@ impl ClusterInfo { } data.retain(&mut verify_gossip_addr); if !data.is_empty() { + self.stats + .push_message_value_count + .add_relaxed(data.len() as u64); push_messages.push((from, data)); } } @@ -2937,6 +2944,17 @@ fn filter_on_shred_version( } } +#[inline] +fn get_node_addr( + pubkey: Pubkey, + query: impl Fn(&ContactInfo) -> Option, + crds: &Crds, + socket_addr_space: &SocketAddrSpace, +) -> Option { + let node = crds.get::<&ContactInfo>(pubkey)?; + query(node).filter(|addr| socket_addr_space.check(addr)) +} + // If the CRDS value is an unstaked contact-info, verifies if // it has responded to ping on its gossip socket address. // Returns false if the CRDS value should be discarded. @@ -3042,6 +3060,30 @@ mod tests { const DEFAULT_NUM_QUIC_ENDPOINTS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(DEFAULT_QUIC_ENDPOINTS) }; + impl ClusterInfo { + // Wrapper for ClusterInfo.new_pull_requests replicating old return + // type for legacy tests. + #[allow(clippy::type_complexity)] + fn old_pull_requests( + &self, + thread_pool: &ThreadPool, + gossip_validators: Option<&HashSet>, + stakes: &HashMap, + ) -> ( + Vec<(SocketAddr, Ping)>, // Ping packets + Vec<(SocketAddr, Protocol)>, // Pull requests + ) { + self.new_pull_requests(thread_pool, gossip_validators, stakes) + .partition_map(|(addr, protocol)| { + if let Protocol::PingMessage(ping) = protocol { + Either::Left((addr, ping)) + } else { + Either::Right((addr, protocol)) + } + }) + } + } + #[test] fn test_gossip_node() { //check that a gossip nodes always show up as spies @@ -3211,18 +3253,16 @@ mod tests { &mut Vec::new(), // pings &SocketAddrSpace::Unspecified, ); - let reqs = cluster_info.generate_new_gossip_requests( + let mut reqs = cluster_info.generate_new_gossip_requests( &thread_pool, None, // gossip_validators &HashMap::new(), // stakes true, // generate_pull_requests ); //assert none of the addrs are invalid. - reqs.iter().all(|(addr, _)| { - let res = ContactInfo::is_valid_address(addr, &SocketAddrSpace::Unspecified); - assert!(res); - res - }); + assert!(reqs.all(|(addr, _)| { + ContactInfo::is_valid_address(&addr, &SocketAddrSpace::Unspecified) + })); } #[test] @@ -3370,7 +3410,7 @@ mod tests { .for_each(|v| v.par_iter().for_each(|v| assert!(v.verify()))); let mut pings = Vec::new(); - cluster_info + let _ = cluster_info .gossip .new_pull_request( &thread_pool, @@ -3707,7 +3747,7 @@ mod tests { let entrypoint_pubkey = solana_pubkey::new_rand(); let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint.clone()); - let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + let (pings, pulls) = cluster_info.old_pull_requests(&thread_pool, None, &HashMap::new()); assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); for (addr, msg) in pulls { @@ -3730,7 +3770,7 @@ mod tests { Duration::from_millis(cluster_info.gossip.pull.crds_timeout), ); cluster_info.handle_pull_response(vec![entrypoint_crdsvalue], &timeouts); - let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + let (pings, pulls) = cluster_info.old_pull_requests(&thread_pool, None, &HashMap::new()); assert_eq!(pings.len(), 1); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); assert_eq!(*cluster_info.entrypoints.read().unwrap(), vec![entrypoint]); @@ -3805,7 +3845,7 @@ mod tests { // Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a // fresh timestamp). There should only be one pull request to `other_node` - let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.old_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); assert!(pulls @@ -3815,7 +3855,7 @@ mod tests { // Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should // now be two pull requests cluster_info.entrypoints.write().unwrap()[0].set_wallclock(0); - let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.old_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); assert_eq!(pulls.len(), 2 * MIN_NUM_BLOOM_FILTERS); for node in [&other_node, &entrypoint] { @@ -3829,7 +3869,7 @@ mod tests { } // Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should // only be one pull request to `other_node` - let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.old_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); assert!(pulls diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 408902fce4b762..909c0409908e19 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -214,7 +214,7 @@ impl CrdsGossip { ping_cache: &Mutex, pings: &mut Vec<(SocketAddr, Ping)>, socket_addr_space: &SocketAddrSpace, - ) -> Result)>, CrdsGossipError> { + ) -> Result + Clone, CrdsGossipError> { self.pull.new_pull_request( thread_pool, &self.crds, diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index 7501449a1ae352..f9c929af39687b 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -37,7 +37,7 @@ use { }, solana_streamer::socket::SocketAddrSpace, std::{ - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, convert::TryInto, iter::{repeat, repeat_with}, net::SocketAddr, @@ -246,7 +246,7 @@ impl CrdsGossipPull { ping_cache: &Mutex, pings: &mut Vec<(SocketAddr, Ping)>, socket_addr_space: &SocketAddrSpace, - ) -> Result)>, CrdsGossipError> { + ) -> Result + Clone, CrdsGossipError> { let mut rng = rand::thread_rng(); // Active and valid gossip nodes with matching shred-version. let nodes = crds_gossip::get_gossip_nodes( @@ -289,17 +289,10 @@ impl CrdsGossipPull { let filters = self.build_crds_filters(thread_pool, crds, bloom_size); // Associate each pull-request filter with a randomly selected peer. let dist = WeightedIndex::new(weights).unwrap(); - let out = filters.into_iter().fold(HashMap::new(), |mut out, filter| { + Ok(filters.into_iter().filter_map(move |filter| { let node = &nodes[dist.sample(&mut rng)]; - match out.entry(*node.pubkey()) { - Entry::Vacant(entry) => { - entry.insert((node.clone(), vec![filter])); - } - Entry::Occupied(mut entry) => entry.get_mut().1.push(filter), - }; - out - }); - Ok(out.into_values().collect()) + Some((node.gossip()?, filter)) + })) } /// Create gossip responses to pull requests @@ -694,6 +687,51 @@ pub(crate) mod tests { #[cfg(not(debug_assertions))] pub(crate) const MIN_NUM_BLOOM_FILTERS: usize = 64; + impl CrdsGossipPull { + // Wrapper for CrdsGossipPush.new_pull_request replicating old return + // type for legacy tests. + #[allow(clippy::too_many_arguments)] + fn old_pull_request( + &self, + thread_pool: &ThreadPool, + crds: &RwLock, + self_keypair: &Keypair, + self_shred_version: u16, + now: u64, + gossip_validators: Option<&HashSet>, + stakes: &HashMap, + bloom_size: usize, + ping_cache: &Mutex, + pings: &mut Vec<(SocketAddr, Ping)>, + socket_addr_space: &SocketAddrSpace, + ) -> Result)>, CrdsGossipError> { + let out = self.new_pull_request( + thread_pool, + crds, + self_keypair, + self_shred_version, + now, + gossip_validators, + stakes, + bloom_size, + ping_cache, + pings, + socket_addr_space, + )?; + let nodes: HashMap = crds + .read() + .unwrap() + .get_nodes_contact_info() + .map(|node| (node.gossip().unwrap(), node.clone())) + .collect(); + Ok(out + .into_group_map() + .into_iter() + .map(|(addr, filters)| (nodes.get(&addr).cloned().unwrap(), filters)) + .collect()) + } + } + fn new_ping_cache() -> PingCache { PingCache::new( &mut rand::thread_rng(), @@ -878,7 +916,7 @@ pub(crate) mod tests { let mut pings = Vec::new(); let ping_cache = Mutex::new(new_ping_cache()); assert_eq!( - node.new_pull_request( + node.old_pull_request( &thread_pool, &crds, &node_keypair, @@ -899,7 +937,7 @@ pub(crate) mod tests { .insert(entry, 0, GossipRoute::LocalMessage) .unwrap(); assert_eq!( - node.new_pull_request( + node.old_pull_request( &thread_pool, &crds, &node_keypair, @@ -915,7 +953,8 @@ pub(crate) mod tests { Err(CrdsGossipError::NoPeers) ); let now = 1625029781069; - let new = ContactInfo::new_localhost(&solana_pubkey::new_rand(), now); + let mut new = ContactInfo::new_localhost(&solana_pubkey::new_rand(), now); + new.set_gossip(([127, 0, 0, 1], 8020)).unwrap(); ping_cache .lock() .unwrap() @@ -925,7 +964,7 @@ pub(crate) mod tests { .unwrap() .insert(new.clone(), now, GossipRoute::LocalMessage) .unwrap(); - let req = node.new_pull_request( + let req = node.old_pull_request( &thread_pool, &crds, &node_keypair, @@ -941,13 +980,14 @@ pub(crate) mod tests { let peers: Vec<_> = req.unwrap().into_iter().map(|(node, _)| node).collect(); assert_eq!(peers, vec![new.contact_info().unwrap().clone()]); - let offline = ContactInfo::new_localhost(&solana_pubkey::new_rand(), now); + let mut offline = ContactInfo::new_localhost(&solana_pubkey::new_rand(), now); + offline.set_gossip(([127, 0, 0, 1], 8021)).unwrap(); let offline = CrdsValue::new_unsigned(CrdsData::from(offline)); crds.write() .unwrap() .insert(offline, now, GossipRoute::LocalMessage) .unwrap(); - let req = node.new_pull_request( + let req = node.old_pull_request( &thread_pool, &crds, &node_keypair, @@ -979,12 +1019,14 @@ pub(crate) mod tests { ))); let node = CrdsGossipPull::default(); crds.insert(entry, now, GossipRoute::LocalMessage).unwrap(); - let old = ContactInfo::new_localhost(&solana_pubkey::new_rand(), 0); + let mut old = ContactInfo::new_localhost(&solana_pubkey::new_rand(), 0); + old.set_gossip(([127, 0, 0, 1], 8020)).unwrap(); ping_cache.mock_pong(*old.pubkey(), old.gossip().unwrap(), Instant::now()); let old = CrdsValue::new_unsigned(CrdsData::from(old)); crds.insert(old.clone(), now, GossipRoute::LocalMessage) .unwrap(); - let new = ContactInfo::new_localhost(&solana_pubkey::new_rand(), 0); + let mut new = ContactInfo::new_localhost(&solana_pubkey::new_rand(), 0); + new.set_gossip(([127, 0, 0, 1], 8021)).unwrap(); ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now()); let new = CrdsValue::new_unsigned(CrdsData::from(new)); crds.insert(new, now, GossipRoute::LocalMessage).unwrap(); @@ -1000,7 +1042,7 @@ pub(crate) mod tests { let old = old.contact_info().unwrap(); let count = repeat_with(|| { let requests = node - .new_pull_request( + .old_pull_request( &thread_pool, &crds, &node_keypair, @@ -1047,7 +1089,7 @@ pub(crate) mod tests { .unwrap(); let node_crds = RwLock::new(node_crds); let mut pings = Vec::new(); - let req = node.new_pull_request( + let req = node.old_pull_request( &thread_pool, &node_crds, &node_keypair, @@ -1192,7 +1234,7 @@ pub(crate) mod tests { let ping_cache = Mutex::new(ping_cache); for _ in 0..30 { // there is a chance of a false positive with bloom filters - let req = node.new_pull_request( + let req = node.old_pull_request( &thread_pool, &node_crds, &node_keypair, diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index 8b93a5e1db4a44..2027ecb0e58de7 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -1,6 +1,7 @@ #![allow(clippy::arithmetic_side_effects)] use { bincode::serialized_size, + itertools::Itertools, log::*, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, serial_test::serial, @@ -531,6 +532,14 @@ fn network_run_pull( } } } + let nodes: HashMap = network + .nodes + .values() + .map(|node| { + let node = &node.contact_info; + (node.gossip().unwrap(), node.clone()) + }) + .collect(); for t in start..end { let now = t as u64 * 100; let requests: Vec<_> = { @@ -552,6 +561,15 @@ fn network_run_pull( &mut pings, &SocketAddrSpace::Unspecified, ) + .map(|requests| { + requests + .into_group_map() + .into_iter() + .map(|(addr, filters)| { + (nodes.get(&addr).cloned().unwrap(), filters) + }) + .collect::>() + }) .unwrap_or_default(); let from_pubkey = from.keypair.pubkey(); let label = CrdsValueLabel::ContactInfo(from_pubkey);