Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
adds mapping from nodes pubkeys to their shred-version (#17940)
Browse files Browse the repository at this point in the history
Crds values of nodes with different shred versions are creeping into
gossip table resulting in runtime issues as the one addressed in:
#17899

This commit works towards enforcing more checks and filtering based on
shred version by adding necessary mapping and api to gossip table.
Once populated, pubkey->shred-version mapping persists as long as there
are any values associated with the pubkey.
  • Loading branch information
behzadnouri authored Jun 18, 2021
1 parent abc9839 commit 5a99fa3
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 20 deletions.
1 change: 1 addition & 0 deletions gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ indexmap = { version = "1.6", features = ["rayon"] }
itertools = "0.10.1"
log = "0.4.11"
lru = "0.6.1"
matches = "0.1.8"
num-traits = "0.2"
rand = "0.7.0"
rand_chacha = "0.2.2"
Expand Down
23 changes: 9 additions & 14 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,16 +1173,13 @@ impl ClusterInfo {
/// Returns epoch-slots inserted since the given cursor.
/// Excludes entries from nodes with unkown or different shred version.
pub fn get_epoch_slots(&self, cursor: &mut Cursor) -> Vec<EpochSlots> {
let self_shred_version = self.my_shred_version();
let self_shred_version = Some(self.my_shred_version());
let gossip = self.gossip.read().unwrap();
let entries = gossip.crds.get_epoch_slots(cursor);
entries
.filter(
|entry| match gossip.crds.get_contact_info(entry.value.pubkey()) {
Some(node) => node.shred_version == self_shred_version,
None => false,
},
)
.filter(|entry| {
gossip.crds.get_shred_version(&entry.value.pubkey()) == self_shred_version
})
.map(|entry| match &entry.value.data {
CrdsData::EpochSlots(_, slots) => slots.clone(),
_ => panic!("this should not happen!"),
Expand Down Expand Up @@ -2213,9 +2210,10 @@ impl ClusterInfo {
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", self.id, from, len);
let shred_version = self
.lookup_contact_info(from, |ci| ci.shred_version)
.unwrap_or(0);
let shred_version = {
let gossip = self.gossip.read().unwrap();
gossip.crds.get_shred_version(from).unwrap_or_default()
};
Self::filter_by_shred_version(
from,
&mut crds_values,
Expand Down Expand Up @@ -2365,10 +2363,7 @@ impl ClusterInfo {
let gossip = self.gossip.read().unwrap();
messages
.iter()
.map(|(from, _)| match gossip.crds.get_contact_info(*from) {
None => 0,
Some(info) => info.shred_version,
})
.map(|(from, _)| gossip.crds.get_shred_version(from).unwrap_or_default())
.collect()
};
// Filter out data if the origin has different shred version.
Expand Down
78 changes: 72 additions & 6 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use {
map::{rayon::ParValues, Entry, IndexMap},
set::IndexSet,
},
matches::debug_assert_matches,
rayon::{prelude::*, ThreadPool},
solana_sdk::{
hash::{hash, Hash},
Expand Down Expand Up @@ -66,6 +67,8 @@ pub struct Crds {
entries: BTreeMap<u64 /*insert order*/, usize /*index*/>,
// Hash of recently purged values.
purged: VecDeque<(Hash, u64 /*timestamp*/)>,
// Mapping from nodes' pubkeys to their respective shred-version.
shred_versions: HashMap<Pubkey, u16>,
}

#[derive(PartialEq, Debug)]
Expand Down Expand Up @@ -125,6 +128,7 @@ impl Default for Crds {
records: HashMap::default(),
entries: BTreeMap::default(),
purged: VecDeque::default(),
shred_versions: HashMap::default(),
}
}
}
Expand Down Expand Up @@ -173,9 +177,10 @@ impl Crds {
Entry::Vacant(entry) => {
let entry_index = entry.index();
self.shards.insert(entry_index, &value);
match value.value.data {
CrdsData::ContactInfo(_) => {
match &value.value.data {
CrdsData::ContactInfo(node) => {
self.nodes.insert(entry_index);
self.shred_versions.insert(pubkey, node.shred_version);
}
CrdsData::Vote(_, _) => {
self.votes.insert(value.ordinal, entry_index);
Expand All @@ -195,7 +200,13 @@ impl Crds {
let entry_index = entry.index();
self.shards.remove(entry_index, entry.get());
self.shards.insert(entry_index, &value);
match value.value.data {
match &value.value.data {
CrdsData::ContactInfo(node) => {
self.shred_versions.insert(pubkey, node.shred_version);
// self.nodes does not need to be updated since the
// entry at this index was and stays contact-info.
debug_assert_matches!(entry.get().value.data, CrdsData::ContactInfo(_));
}
CrdsData::Vote(_, _) => {
self.votes.remove(&entry.get().ordinal);
self.votes.insert(value.ordinal, entry_index);
Expand Down Expand Up @@ -239,6 +250,10 @@ impl Crds {
self.table.get(&label)?.value.contact_info()
}

pub(crate) fn get_shred_version(&self, pubkey: &Pubkey) -> Option<u16> {
self.shred_versions.get(pubkey).copied()
}

pub fn get_lowest_slot(&self, pubkey: Pubkey) -> Option<&LowestSlot> {
let lable = CrdsValueLabel::LowestSlot(pubkey);
self.table.get(&lable)?.value.lowest_slot()
Expand Down Expand Up @@ -449,6 +464,7 @@ impl Crds {
records_entry.get_mut().swap_remove(&index);
if records_entry.get().is_empty() {
records_entry.remove();
self.shred_versions.remove(&pubkey);
}
// If index == self.table.len(), then the removed entry was the last
// entry in the table, in which case no other keys were modified.
Expand Down Expand Up @@ -544,17 +560,20 @@ impl Crds {
}

#[cfg(test)]
mod test {
mod tests {
use {
super::*,
crate::{
contact_info::ContactInfo,
crds_value::{new_rand_timestamp, NodeInstance},
crds_value::{new_rand_timestamp, NodeInstance, SnapshotHash},
},
rand::{thread_rng, Rng, SeedableRng},
rand_chacha::ChaChaRng,
rayon::ThreadPoolBuilder,
solana_sdk::signature::{Keypair, Signer},
solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
},
std::{collections::HashSet, iter::repeat_with},
};

Expand Down Expand Up @@ -1017,6 +1036,53 @@ mod test {
assert!(crds.records.is_empty());
}

#[test]
fn test_get_shred_version() {
let mut rng = rand::thread_rng();
let pubkey = Pubkey::new_unique();
let mut crds = Crds::default();
assert_eq!(crds.get_shred_version(&pubkey), None);
// Initial insertion of a node with shred version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
let wallclock = node.wallclock;
node.shred_version = 42;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Ok(()));
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
// An outdated value should not update shred-version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
node.wallclock = wallclock - 1; // outdated.
node.shred_version = 8;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Err(CrdsError::InsertFailed));
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
// Update shred version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
node.wallclock = wallclock + 1; // so that it overrides the prev one.
node.shred_version = 8;
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(crds.insert(node, timestamp()), Ok(()));
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Add other crds values with the same pubkey.
let val = SnapshotHash::new_rand(&mut rng, Some(pubkey));
let val = CrdsData::SnapshotHashes(val);
let val = CrdsValue::new_unsigned(val);
assert_eq!(crds.insert(val, timestamp()), Ok(()));
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Remove contact-info. Shred version should stay there since there
// are still values associated with the pubkey.
crds.remove(&CrdsValueLabel::ContactInfo(pubkey), timestamp());
assert_eq!(crds.get_contact_info(pubkey), None);
assert_eq!(crds.get_shred_version(&pubkey), Some(8));
// Remove the remaining entry with the same pubkey.
crds.remove(&CrdsValueLabel::SnapshotHashes(pubkey), timestamp());
assert_eq!(crds.get_records(&pubkey).count(), 0);
assert_eq!(crds.get_shred_version(&pubkey), None);
}

#[test]
#[allow(clippy::needless_collect)]
fn test_drop() {
Expand Down

0 comments on commit 5a99fa3

Please sign in to comment.