diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 79687b7e224d15..782c5a6fc232c7 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -16,7 +16,7 @@ use { crate::{ cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer}, contact_info::ContactInfo, - crds::Cursor, + crds::{Crds, Cursor}, crds_gossip::CrdsGossip, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, @@ -1930,7 +1930,6 @@ impl ClusterInfo { return; } let self_pubkey = self.id(); - let self_shred_version = self.my_shred_version(); let requests: Vec<_> = thread_pool.install(|| { requests .into_par_iter() @@ -1942,17 +1941,7 @@ impl ClusterInfo { inc_new_counter_debug!("cluster_info-window-request-loopback", 1); false } - Some(caller) => { - if self_shred_version != 0 - && caller.shred_version != 0 - && caller.shred_version != self_shred_version - { - self.stats.skip_pull_shred_version.add_relaxed(1); - false - } else { - true - } - } + Some(_) => true, }) .map(|(from_addr, filter, caller)| PullData { from_addr, @@ -2205,23 +2194,11 @@ impl ClusterInfo { fn handle_pull_response( &self, from: &Pubkey, - mut crds_values: Vec, + crds_values: Vec, timeouts: &HashMap, ) -> (usize, usize, usize) { let len = crds_values.len(); trace!("PullResponse me: {} from: {} len={}", self.id, from, len); - let shred_version = { - let gossip = self.gossip.read().unwrap(); - gossip.crds.get_shred_version(from).unwrap_or_default() - }; - Self::filter_by_shred_version( - from, - &mut crds_values, - shred_version, - self.my_shred_version(), - ); - let filtered_len = crds_values.len(); - let mut pull_stats = ProcessPullStats::default(); let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = self .time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response) @@ -2241,14 +2218,8 @@ impl ClusterInfo { &mut pull_stats, ); } - - self.stats - .skip_pull_response_shred_version - .add_relaxed((len - filtered_len) as u64); self.stats.process_pull_response_count.add_relaxed(1); - self.stats - .process_pull_response_len - .add_relaxed(filtered_len as u64); + self.stats.process_pull_response_len.add_relaxed(len as u64); self.stats .process_pull_response_timeout .add_relaxed(pull_stats.timeout_count as u64); @@ -2269,23 +2240,6 @@ impl ClusterInfo { ) } - fn filter_by_shred_version( - from: &Pubkey, - crds_values: &mut Vec, - shred_version: u16, - my_shred_version: u16, - ) { - // Always run filter on spies - if my_shred_version != 0 && shred_version != my_shred_version { - // Allow someone to update their own ContactInfo so they - // can change shred versions if needed. - crds_values.retain(|crds_value| match &crds_value.data { - CrdsData::ContactInfo(contact_info) => contact_info.id == *from, - _ => false, - }); - } - } - fn handle_batch_ping_messages( &self, pings: I, @@ -2358,41 +2312,10 @@ impl ClusterInfo { self.stats .push_message_count .add_relaxed(messages.len() as u64); - // Obtain shred versions of the origins. - let shred_versions: Vec<_> = { - let gossip = self.gossip.read().unwrap(); - messages - .iter() - .map(|(from, _)| gossip.crds.get_shred_version(from).unwrap_or_default()) - .collect() - }; - // Filter out data if the origin has different shred version. - let self_shred_version = self.my_shred_version(); let num_crds_values: u64 = messages.iter().map(|(_, data)| data.len() as u64).sum(); - let messages: Vec<_> = messages - .into_iter() - .zip(shred_versions) - .filter_map(|((from, mut crds_values), shred_version)| { - Self::filter_by_shred_version( - &from, - &mut crds_values, - shred_version, - self_shred_version, - ); - if crds_values.is_empty() { - None - } else { - Some((from, crds_values)) - } - }) - .collect(); - let num_filtered_crds_values = messages.iter().map(|(_, data)| data.len() as u64).sum(); self.stats .push_message_value_count - .add_relaxed(num_filtered_crds_values); - self.stats - .skip_push_message_shred_version - .add_relaxed(num_crds_values - num_filtered_crds_values); + .add_relaxed(num_crds_values); // Origins' pubkeys of upserted crds values. let origins: HashSet<_> = { let mut gossip = @@ -2515,6 +2438,28 @@ impl ClusterInfo { should_check_duplicate_instance: bool, ) -> Result<(), GossipError> { let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time); + // Filter out values if the shred-versions are different. + let self_shred_version = self.my_shred_version(); + let packets = if self_shred_version == 0 { + packets + } else { + let gossip = self.gossip.read().unwrap(); + thread_pool.install(|| { + packets + .into_par_iter() + .with_min_len(1024) + .filter_map(|(from, msg)| { + let msg = filter_on_shred_version( + msg, + self_shred_version, + &gossip.crds, + &self.stats, + )?; + Some((from, msg)) + }) + .collect() + }) + }; // Check if there is a duplicate instance of // this node with more recent timestamp. let check_duplicate_instance = |values: &[CrdsValue]| { @@ -3102,6 +3047,70 @@ pub fn stake_weight_peers( ClusterInfo::sorted_stakes_with_index(peers, stakes) } +// Filters out values from nodes with different shred-version. +fn filter_on_shred_version( + mut msg: Protocol, + self_shred_version: u16, + crds: &Crds, + stats: &GossipStats, +) -> Option { + let filter_values = |from: &Pubkey, values: &mut Vec, skipped_counter: &Counter| { + let num_values = values.len(); + if crds.get_shred_version(from) == Some(self_shred_version) { + // Retain values with the same shred-vesion, or those which are + // contact-info so that shred-versions can be updated. + values.retain(|value| match &value.data { + CrdsData::ContactInfo(_) => true, + _ => crds.get_shred_version(&value.pubkey()) == Some(self_shred_version), + }) + } else { + // Only allow node to update its own contact info in case their + // shred-version changes. + values.retain(|value| match &value.data { + CrdsData::ContactInfo(node) => node.id == *from, + _ => false, + }) + } + let num_skipped = num_values - values.len(); + if num_skipped != 0 { + skipped_counter.add_relaxed(num_skipped as u64); + } + }; + match &mut msg { + Protocol::PullRequest(_, caller) => match &caller.data { + // Allow spy nodes with shred-verion == 0 to pull from other nodes. + CrdsData::ContactInfo(node) + if node.shred_version == 0 || node.shred_version == self_shred_version => + { + Some(msg) + } + _ => { + stats.skip_pull_shred_version.add_relaxed(1); + None + } + }, + Protocol::PullResponse(from, values) => { + filter_values(from, values, &stats.skip_pull_response_shred_version); + if values.is_empty() { + None + } else { + Some(msg) + } + } + Protocol::PushMessage(from, values) => { + filter_values(from, values, &stats.skip_push_message_shred_version); + if values.is_empty() { + None + } else { + Some(msg) + } + } + Protocol::PruneMessage(_, _) | Protocol::PingMessage(_) | Protocol::PongMessage(_) => { + Some(msg) + } + } +} + #[cfg(test)] mod tests { use { @@ -3297,59 +3306,6 @@ mod tests { vec![entrypoint_crdsvalue] } - #[test] - fn test_filter_shred_version() { - let from = solana_sdk::pubkey::new_rand(); - let my_shred_version = 1; - let other_shred_version = 1; - - // Allow same shred_version - let mut values = test_crds_values(from); - ClusterInfo::filter_by_shred_version( - &from, - &mut values, - other_shred_version, - my_shred_version, - ); - assert_eq!(values.len(), 1); - - // Allow shred_version=0. - let other_shred_version = 0; - ClusterInfo::filter_by_shred_version( - &from, - &mut values, - other_shred_version, - my_shred_version, - ); - assert_eq!(values.len(), 1); - - // Change to sender's ContactInfo version, allow that. - let other_shred_version = 2; - ClusterInfo::filter_by_shred_version( - &from, - &mut values, - other_shred_version, - my_shred_version, - ); - assert_eq!(values.len(), 1); - - let snapshot_hash_data = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash { - from: solana_sdk::pubkey::new_rand(), - hashes: vec![], - wallclock: 0, - })); - values.push(snapshot_hash_data); - // Change to sender's ContactInfo version, allow that. - let other_shred_version = 2; - ClusterInfo::filter_by_shred_version( - &from, - &mut values, - other_shred_version, - my_shred_version, - ); - assert_eq!(values.len(), 1); - } - #[test] fn test_max_snapshot_hashes_with_push_messages() { let mut rng = rand::thread_rng();