diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 9b7db1b27bdf94..529949260e4604 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -45,7 +45,7 @@ use { weighted_shuffle::WeightedShuffle, }, crossbeam_channel::{Receiver, RecvTimeoutError, Sender}, - itertools::Itertools, + itertools::{Either, Itertools}, rand::{seq::SliceRandom, CryptoRng, Rng}, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_ledger::shred::Shred, @@ -90,6 +90,7 @@ use { num::NonZeroUsize, ops::{Deref, Div}, path::{Path, PathBuf}, + rc::Rc, result::Result, sync::{ atomic::{AtomicBool, Ordering}, @@ -1204,16 +1205,12 @@ impl ClusterInfo { pulls.push((entrypoint, filters)); } - #[allow(clippy::type_complexity)] fn new_pull_requests( &self, thread_pool: &ThreadPool, gossip_validators: Option<&HashSet>, stakes: &HashMap, - ) -> ( - Vec<(SocketAddr, Ping)>, // Ping packets. - Vec<(SocketAddr, Protocol)>, // Pull requests - ) { + ) -> impl Iterator { let now = timestamp(); let self_info = CrdsValue::new(CrdsData::from(self.my_contact_info()), &self.keypair()); let max_bloom_filter_bytes = get_max_bloom_filter_bytes(&self_info); @@ -1236,15 +1233,18 @@ impl ClusterInfo { .unwrap_or_default() }; self.append_entrypoint_to_pulls(thread_pool, max_bloom_filter_bytes, &mut pulls); - let pulls = pulls + let pings = pings + .into_iter() + .map(|(addr, ping)| (addr, Protocol::PingMessage(ping))); + pulls .into_iter() .filter_map(|(peer, filters)| Some((peer.gossip()?, filters))) .flat_map(|(addr, filters)| repeat(addr).zip(filters)) - .map(|(gossip_addr, filter)| { + .map(move |(gossip_addr, filter)| { let request = Protocol::PullRequest(filter, self_info.clone()); (gossip_addr, request) - }); - (pings, pulls.collect()) + }) + .chain(pings) } pub fn flush_push_queue(&self) { @@ -1258,7 +1258,10 @@ impl ClusterInfo { } } } - fn new_push_requests(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { + fn new_push_requests( + &self, + stakes: &HashMap, + ) -> impl Iterator { let self_id = self.id(); let (entries, push_messages, num_pushes) = { let _st = ScopedTimer::from(&self.stats.new_push_requests); @@ -1282,19 +1285,21 @@ impl ClusterInfo { push_messages .into_iter() .filter_map(|(pubkey, messages)| { - let peer: &ContactInfo = gossip_crds.get(pubkey)?; - Some((peer.gossip()?, messages)) + let addr = get_gossip_addr(pubkey, &gossip_crds, &self.socket_addr_space)?; + Some((addr, messages)) }) .collect() }; + let entries = Rc::new(entries); push_messages .into_iter() - .flat_map(|(peer, msgs)| { - let msgs = msgs.into_iter().map(|k| entries[k].clone()); - split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs) - .map(move |payload| (peer, Protocol::PushMessage(self_id, payload))) + .flat_map(move |(peer, msgs): (SocketAddr, Vec)| { + let entries = Rc::clone(&entries); + let msgs = msgs.into_iter().map(move |k| entries[k].clone()); + let msgs = split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs) + .map(move |msgs| Protocol::PushMessage(self_id, msgs)); + repeat(peer).zip(msgs) }) - .collect() } // Generate new push and pull requests @@ -1304,23 +1309,19 @@ impl ClusterInfo { gossip_validators: Option<&HashSet>, stakes: &HashMap, generate_pull_requests: bool, - ) -> Vec<(SocketAddr, Protocol)> { + ) -> impl Iterator { self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes); // This will flush local pending push messages before generating // pull-request bloom filters, preventing pull responses to return the // same values back to the node itself. Note that packets will arrive // and are processed out of order. - let mut out: Vec<_> = self.new_push_requests(stakes); + let out = self.new_push_requests(stakes); if generate_pull_requests { - let (pings, pull_requests) = - self.new_pull_requests(thread_pool, gossip_validators, stakes); - let pings = pings - .into_iter() - .map(|(addr, ping)| (addr, Protocol::PingMessage(ping))); - out.extend(pull_requests); - out.extend(pings); + let pulls = self.new_pull_requests(thread_pool, gossip_validators, stakes); + Either::Right(out.chain(pulls)) + } else { + Either::Left(out) } - out } /// At random pick a node and try to get updated changes from them @@ -1334,13 +1335,18 @@ impl ClusterInfo { generate_pull_requests: bool, ) -> Result<(), GossipError> { let _st = ScopedTimer::from(&self.stats.gossip_transmit_loop_time); - let reqs = self.generate_new_gossip_requests( + let mut packet_batch = PacketBatch::new_unpinned_with_recycler(recycler, 0, "run_gossip"); + self.generate_new_gossip_requests( thread_pool, gossip_validators, stakes, generate_pull_requests, - ); - send_gossip_packets(reqs, recycler, sender, &self.stats); + ) + .filter_map(|(addr, data)| make_gossip_packet(addr, &data, &self.stats)) + .for_each(|pkt| packet_batch.push(pkt)); + if !packet_batch.is_empty() { + sender.send(packet_batch)?; + } self.stats .gossip_transmit_loop_iterations_since_last_report .add_relaxed(1); @@ -1843,10 +1849,6 @@ impl ClusterInfo { if messages.is_empty() { return; } - let num_crds_values: u64 = messages.iter().map(|(_, data)| data.len() as u64).sum(); - self.stats - .push_message_value_count - .add_relaxed(num_crds_values); // Origins' pubkeys of upserted crds values. let origins: HashSet<_> = { let _st = ScopedTimer::from(&self.stats.process_push_message); @@ -1856,16 +1858,9 @@ impl ClusterInfo { // Generate prune messages. let prune_messages = self.generate_prune_messages(thread_pool, origins, stakes); let mut packet_batch = make_gossip_packet_batch(prune_messages, recycler, &self.stats); - let new_push_requests = self.new_push_requests(stakes); - for (address, request) in new_push_requests { - if ContactInfo::is_valid_address(&address, &self.socket_addr_space) { - if let Some(pkt) = make_gossip_packet(address, &request, &self.stats) { - packet_batch.push(pkt); - } - } else { - trace!("Dropping Gossip push response, as destination is unknown"); - } - } + self.new_push_requests(stakes) + .filter_map(|(addr, data)| make_gossip_packet(addr, &data, &self.stats)) + .for_each(|pkt| packet_batch.push(pkt)); if !packet_batch.is_empty() { let _ = response_sender.send(packet_batch); } @@ -1897,7 +1892,7 @@ impl ClusterInfo { prunes .into_par_iter() .filter_map(|(pubkey, prunes)| { - let addr = gossip_crds.get::<&ContactInfo>(pubkey)?.gossip()?; + let addr = get_gossip_addr(pubkey, &gossip_crds, &self.socket_addr_space)?; Some((pubkey, addr, prunes)) }) .collect() @@ -2034,6 +2029,9 @@ impl ClusterInfo { } data.retain(&mut verify_gossip_addr); if !data.is_empty() { + self.stats + .push_message_value_count + .add_relaxed(data.len() as u64); push_messages.push((from, data)); } } @@ -2937,6 +2935,17 @@ fn filter_on_shred_version( } } +// Returns ContactInfo.gossip address filtered out by socket_addr_space. +#[inline] +fn get_gossip_addr( + pubkey: Pubkey, + crds: &Crds, + socket_addr_space: &SocketAddrSpace, +) -> Option { + let node = crds.get::<&ContactInfo>(pubkey)?; + node.gossip().filter(|addr| socket_addr_space.check(addr)) +} + // If the CRDS value is an unstaked contact-info, verifies if // it has responded to ping on its gossip socket address. // Returns false if the CRDS value should be discarded. @@ -3042,6 +3051,30 @@ mod tests { const DEFAULT_NUM_QUIC_ENDPOINTS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(DEFAULT_QUIC_ENDPOINTS) }; + impl ClusterInfo { + // Wrapper for ClusterInfo.new_pull_requests replicating old return + // type for legacy tests. + #[allow(clippy::type_complexity)] + fn old_new_pull_requests( + &self, + thread_pool: &ThreadPool, + gossip_validators: Option<&HashSet>, + stakes: &HashMap, + ) -> ( + Vec<(SocketAddr, Ping)>, // Ping packets + Vec<(SocketAddr, Protocol)>, // Pull requests + ) { + self.new_pull_requests(thread_pool, gossip_validators, stakes) + .partition_map(|(addr, protocol)| { + if let Protocol::PingMessage(ping) = protocol { + Either::Left((addr, ping)) + } else { + Either::Right((addr, protocol)) + } + }) + } + } + #[test] fn test_gossip_node() { //check that a gossip nodes always show up as spies @@ -3211,18 +3244,16 @@ mod tests { &mut Vec::new(), // pings &SocketAddrSpace::Unspecified, ); - let reqs = cluster_info.generate_new_gossip_requests( + let mut reqs = cluster_info.generate_new_gossip_requests( &thread_pool, None, // gossip_validators &HashMap::new(), // stakes true, // generate_pull_requests ); //assert none of the addrs are invalid. - reqs.iter().all(|(addr, _)| { - let res = ContactInfo::is_valid_address(addr, &SocketAddrSpace::Unspecified); - assert!(res); - res - }); + assert!(reqs.all(|(addr, _)| { + ContactInfo::is_valid_address(&addr, &SocketAddrSpace::Unspecified) + })); } #[test] @@ -3707,7 +3738,8 @@ mod tests { let entrypoint_pubkey = solana_pubkey::new_rand(); let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp()); cluster_info.set_entrypoint(entrypoint.clone()); - let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + let (pings, pulls) = + cluster_info.old_new_pull_requests(&thread_pool, None, &HashMap::new()); assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); for (addr, msg) in pulls { @@ -3730,7 +3762,8 @@ mod tests { Duration::from_millis(cluster_info.gossip.pull.crds_timeout), ); cluster_info.handle_pull_response(vec![entrypoint_crdsvalue], &timeouts); - let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new()); + let (pings, pulls) = + cluster_info.old_new_pull_requests(&thread_pool, None, &HashMap::new()); assert_eq!(pings.len(), 1); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); assert_eq!(*cluster_info.entrypoints.read().unwrap(), vec![entrypoint]); @@ -3805,7 +3838,7 @@ mod tests { // Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a // fresh timestamp). There should only be one pull request to `other_node` - let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.old_new_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); assert!(pulls @@ -3815,7 +3848,7 @@ mod tests { // Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should // now be two pull requests cluster_info.entrypoints.write().unwrap()[0].set_wallclock(0); - let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.old_new_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); assert_eq!(pulls.len(), 2 * MIN_NUM_BLOOM_FILTERS); for node in [&other_node, &entrypoint] { @@ -3829,7 +3862,7 @@ mod tests { } // Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should // only be one pull request to `other_node` - let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes); + let (pings, pulls) = cluster_info.old_new_pull_requests(&thread_pool, None, &stakes); assert!(pings.is_empty()); assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS); assert!(pulls