diff --git a/gossip/Cargo.toml b/gossip/Cargo.toml index 19aa27b97edf31..3318e148c1ff5d 100644 --- a/gossip/Cargo.toml +++ b/gossip/Cargo.toml @@ -18,6 +18,7 @@ indexmap = { version = "1.6", features = ["rayon"] } itertools = "0.10.1" log = "0.4.11" lru = "0.6.1" +matches = "0.1.8" num-traits = "0.2" rand = "0.7.0" rand_chacha = "0.2.2" diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 452c7c0999d2af..636c3f93966b82 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1173,16 +1173,13 @@ impl ClusterInfo { /// Returns epoch-slots inserted since the given cursor. /// Excludes entries from nodes with unkown or different shred version. pub fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec { - let self_shred_version = self.my_shred_version(); + let self_shred_version = Some(self.my_shred_version()); let gossip = self.gossip.read().unwrap(); let entries = gossip.crds.get_epoch_slots(cursor); entries - .filter( - |entry| match gossip.crds.get_contact_info(entry.value.pubkey()) { - Some(node) => node.shred_version == self_shred_version, - None => false, - }, - ) + .filter(|entry| { + gossip.crds.get_shred_version(&entry.value.pubkey()) == self_shred_version + }) .map(|entry| match &entry.value.data { CrdsData::EpochSlots(_, slots) => slots.clone(), _ => panic!("this should not happen!"), @@ -2213,9 +2210,10 @@ impl ClusterInfo { ) -> (usize, usize, usize) { let len = crds_values.len(); trace!("PullResponse me: {} from: {} len={}", self.id, from, len); - let shred_version = self - .lookup_contact_info(from, |ci| ci.shred_version) - .unwrap_or(0); + let shred_version = { + let gossip = self.gossip.read().unwrap(); + gossip.crds.get_shred_version(from).unwrap_or_default() + }; Self::filter_by_shred_version( from, &mut crds_values, @@ -2365,10 +2363,7 @@ impl ClusterInfo { let gossip = self.gossip.read().unwrap(); messages .iter() - .map(|(from, _)| match gossip.crds.get_contact_info(*from) { - None => 0, - Some(info) => info.shred_version, - }) + .map(|(from, _)| gossip.crds.get_shred_version(from).unwrap_or_default()) .collect() }; // Filter out data if the origin has different shred version. diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index a350f306b71e13..13b3ed08e23344 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -35,6 +35,7 @@ use { map::{rayon::ParValues, Entry, IndexMap}, set::IndexSet, }, + matches::debug_assert_matches, rayon::{prelude::*, ThreadPool}, solana_sdk::{ hash::{hash, Hash}, @@ -66,6 +67,8 @@ pub struct Crds { entries: BTreeMap, // Hash of recently purged values. purged: VecDeque<(Hash, u64 /*timestamp*/)>, + // Mapping from nodes' pubkeys to their respective shred-version. + shred_versions: HashMap, } #[derive(PartialEq, Debug)] @@ -125,6 +128,7 @@ impl Default for Crds { records: HashMap::default(), entries: BTreeMap::default(), purged: VecDeque::default(), + shred_versions: HashMap::default(), } } } @@ -173,9 +177,10 @@ impl Crds { Entry::Vacant(entry) => { let entry_index = entry.index(); self.shards.insert(entry_index, &value); - match value.value.data { - CrdsData::ContactInfo(_) => { + match &value.value.data { + CrdsData::ContactInfo(node) => { self.nodes.insert(entry_index); + self.shred_versions.insert(pubkey, node.shred_version); } CrdsData::Vote(_, _) => { self.votes.insert(value.ordinal, entry_index); @@ -195,7 +200,13 @@ impl Crds { let entry_index = entry.index(); self.shards.remove(entry_index, entry.get()); self.shards.insert(entry_index, &value); - match value.value.data { + match &value.value.data { + CrdsData::ContactInfo(node) => { + self.shred_versions.insert(pubkey, node.shred_version); + // self.nodes does not need to be updated since the + // entry at this index was and stays contact-info. + debug_assert_matches!(entry.get().value.data, CrdsData::ContactInfo(_)); + } CrdsData::Vote(_, _) => { self.votes.remove(&entry.get().ordinal); self.votes.insert(value.ordinal, entry_index); @@ -239,6 +250,10 @@ impl Crds { self.table.get(&label)?.value.contact_info() } + pub(crate) fn get_shred_version(&self, pubkey: &Pubkey) -> Option { + self.shred_versions.get(pubkey).copied() + } + pub fn get_lowest_slot(&self, pubkey: Pubkey) -> Option<&LowestSlot> { let lable = CrdsValueLabel::LowestSlot(pubkey); self.table.get(&lable)?.value.lowest_slot() @@ -449,6 +464,7 @@ impl Crds { records_entry.get_mut().swap_remove(&index); if records_entry.get().is_empty() { records_entry.remove(); + self.shred_versions.remove(&pubkey); } // If index == self.table.len(), then the removed entry was the last // entry in the table, in which case no other keys were modified. @@ -544,17 +560,20 @@ impl Crds { } #[cfg(test)] -mod test { +mod tests { use { super::*, crate::{ contact_info::ContactInfo, - crds_value::{new_rand_timestamp, NodeInstance}, + crds_value::{new_rand_timestamp, NodeInstance, SnapshotHash}, }, rand::{thread_rng, Rng, SeedableRng}, rand_chacha::ChaChaRng, rayon::ThreadPoolBuilder, - solana_sdk::signature::{Keypair, Signer}, + solana_sdk::{ + signature::{Keypair, Signer}, + timing::timestamp, + }, std::{collections::HashSet, iter::repeat_with}, }; @@ -1017,6 +1036,53 @@ mod test { assert!(crds.records.is_empty()); } + #[test] + fn test_get_shred_version() { + let mut rng = rand::thread_rng(); + let pubkey = Pubkey::new_unique(); + let mut crds = Crds::default(); + assert_eq!(crds.get_shred_version(&pubkey), None); + // Initial insertion of a node with shred version: + let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey)); + let wallclock = node.wallclock; + node.shred_version = 42; + let node = CrdsData::ContactInfo(node); + let node = CrdsValue::new_unsigned(node); + assert_eq!(crds.insert(node, timestamp()), Ok(())); + assert_eq!(crds.get_shred_version(&pubkey), Some(42)); + // An outdated value should not update shred-version: + let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey)); + node.wallclock = wallclock - 1; // outdated. + node.shred_version = 8; + let node = CrdsData::ContactInfo(node); + let node = CrdsValue::new_unsigned(node); + assert_eq!(crds.insert(node, timestamp()), Err(CrdsError::InsertFailed)); + assert_eq!(crds.get_shred_version(&pubkey), Some(42)); + // Update shred version: + let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey)); + node.wallclock = wallclock + 1; // so that it overrides the prev one. + node.shred_version = 8; + let node = CrdsData::ContactInfo(node); + let node = CrdsValue::new_unsigned(node); + assert_eq!(crds.insert(node, timestamp()), Ok(())); + assert_eq!(crds.get_shred_version(&pubkey), Some(8)); + // Add other crds values with the same pubkey. + let val = SnapshotHash::new_rand(&mut rng, Some(pubkey)); + let val = CrdsData::SnapshotHashes(val); + let val = CrdsValue::new_unsigned(val); + assert_eq!(crds.insert(val, timestamp()), Ok(())); + assert_eq!(crds.get_shred_version(&pubkey), Some(8)); + // Remove contact-info. Shred version should stay there since there + // are still values associated with the pubkey. + crds.remove(&CrdsValueLabel::ContactInfo(pubkey), timestamp()); + assert_eq!(crds.get_contact_info(pubkey), None); + assert_eq!(crds.get_shred_version(&pubkey), Some(8)); + // Remove the remaining entry with the same pubkey. + crds.remove(&CrdsValueLabel::SnapshotHashes(pubkey), timestamp()); + assert_eq!(crds.get_records(&pubkey).count(), 0); + assert_eq!(crds.get_shred_version(&pubkey), None); + } + #[test] #[allow(clippy::needless_collect)] fn test_drop() {