Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

filters crds values obtained through gossip by their shred version (backport #18072) #18175

Merged
merged 2 commits into from
Jun 23, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 91 additions & 135 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
crate::{
cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer},
contact_info::ContactInfo,
crds::Cursor,
crds::{Crds, Cursor},
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
Expand Down Expand Up @@ -1930,7 +1930,6 @@ impl ClusterInfo {
return;
}
let self_pubkey = self.id();
let self_shred_version = self.my_shred_version();
let requests: Vec<_> = thread_pool.install(|| {
requests
.into_par_iter()
Expand All @@ -1942,17 +1941,7 @@ impl ClusterInfo {
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
false
}
Some(caller) => {
if self_shred_version != 0
&& caller.shred_version != 0
&& caller.shred_version != self_shred_version
{
self.stats.skip_pull_shred_version.add_relaxed(1);
false
} else {
true
}
}
Some(_) => true,
})
.map(|(from_addr, filter, caller)| PullData {
from_addr,
Expand Down Expand Up @@ -2205,23 +2194,11 @@ impl ClusterInfo {
fn handle_pull_response(
&self,
from: &Pubkey,
mut crds_values: Vec<CrdsValue>,
crds_values: Vec<CrdsValue>,
timeouts: &HashMap<Pubkey, u64>,
) -> (usize, usize, usize) {
let len = crds_values.len();
trace!("PullResponse me: {} from: {} len={}", self.id, from, len);
let shred_version = {
let gossip = self.gossip.read().unwrap();
gossip.crds.get_shred_version(from).unwrap_or_default()
};
Self::filter_by_shred_version(
from,
&mut crds_values,
shred_version,
self.my_shred_version(),
);
let filtered_len = crds_values.len();

let mut pull_stats = ProcessPullStats::default();
let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = self
.time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response)
Expand All @@ -2241,14 +2218,8 @@ impl ClusterInfo {
&mut pull_stats,
);
}

self.stats
.skip_pull_response_shred_version
.add_relaxed((len - filtered_len) as u64);
self.stats.process_pull_response_count.add_relaxed(1);
self.stats
.process_pull_response_len
.add_relaxed(filtered_len as u64);
self.stats.process_pull_response_len.add_relaxed(len as u64);
self.stats
.process_pull_response_timeout
.add_relaxed(pull_stats.timeout_count as u64);
Expand All @@ -2269,23 +2240,6 @@ impl ClusterInfo {
)
}

fn filter_by_shred_version(
from: &Pubkey,
crds_values: &mut Vec<CrdsValue>,
shred_version: u16,
my_shred_version: u16,
) {
// Always run filter on spies
if my_shred_version != 0 && shred_version != my_shred_version {
// Allow someone to update their own ContactInfo so they
// can change shred versions if needed.
crds_values.retain(|crds_value| match &crds_value.data {
CrdsData::ContactInfo(contact_info) => contact_info.id == *from,
_ => false,
});
}
}

fn handle_batch_ping_messages<I>(
&self,
pings: I,
Expand Down Expand Up @@ -2358,41 +2312,10 @@ impl ClusterInfo {
self.stats
.push_message_count
.add_relaxed(messages.len() as u64);
// Obtain shred versions of the origins.
let shred_versions: Vec<_> = {
let gossip = self.gossip.read().unwrap();
messages
.iter()
.map(|(from, _)| gossip.crds.get_shred_version(from).unwrap_or_default())
.collect()
};
// Filter out data if the origin has different shred version.
let self_shred_version = self.my_shred_version();
let num_crds_values: u64 = messages.iter().map(|(_, data)| data.len() as u64).sum();
let messages: Vec<_> = messages
.into_iter()
.zip(shred_versions)
.filter_map(|((from, mut crds_values), shred_version)| {
Self::filter_by_shred_version(
&from,
&mut crds_values,
shred_version,
self_shred_version,
);
if crds_values.is_empty() {
None
} else {
Some((from, crds_values))
}
})
.collect();
let num_filtered_crds_values = messages.iter().map(|(_, data)| data.len() as u64).sum();
self.stats
.push_message_value_count
.add_relaxed(num_filtered_crds_values);
self.stats
.skip_push_message_shred_version
.add_relaxed(num_crds_values - num_filtered_crds_values);
.add_relaxed(num_crds_values);
// Origins' pubkeys of upserted crds values.
let origins: HashSet<_> = {
let mut gossip =
Expand Down Expand Up @@ -2515,6 +2438,28 @@ impl ClusterInfo {
should_check_duplicate_instance: bool,
) -> Result<(), GossipError> {
let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time);
// Filter out values if the shred-versions are different.
let self_shred_version = self.my_shred_version();
let packets = if self_shred_version == 0 {
packets
} else {
let gossip = self.gossip.read().unwrap();
thread_pool.install(|| {
packets
.into_par_iter()
.with_min_len(1024)
.filter_map(|(from, msg)| {
let msg = filter_on_shred_version(
msg,
self_shred_version,
&gossip.crds,
&self.stats,
)?;
Some((from, msg))
})
.collect()
})
};
// Check if there is a duplicate instance of
// this node with more recent timestamp.
let check_duplicate_instance = |values: &[CrdsValue]| {
Expand Down Expand Up @@ -3102,6 +3047,70 @@ pub fn stake_weight_peers(
ClusterInfo::sorted_stakes_with_index(peers, stakes)
}

// Filters out values from nodes with different shred-version.
fn filter_on_shred_version(
mut msg: Protocol,
self_shred_version: u16,
crds: &Crds,
stats: &GossipStats,
) -> Option<Protocol> {
let filter_values = |from: &Pubkey, values: &mut Vec<CrdsValue>, skipped_counter: &Counter| {
let num_values = values.len();
if crds.get_shred_version(from) == Some(self_shred_version) {
// Retain values with the same shred-vesion, or those which are
// contact-info so that shred-versions can be updated.
values.retain(|value| match &value.data {
CrdsData::ContactInfo(_) => true,
_ => crds.get_shred_version(&value.pubkey()) == Some(self_shred_version),
})
} else {
// Only allow node to update its own contact info in case their
// shred-version changes.
values.retain(|value| match &value.data {
CrdsData::ContactInfo(node) => node.id == *from,
_ => false,
})
}
let num_skipped = num_values - values.len();
if num_skipped != 0 {
skipped_counter.add_relaxed(num_skipped as u64);
}
};
match &mut msg {
Protocol::PullRequest(_, caller) => match &caller.data {
// Allow spy nodes with shred-verion == 0 to pull from other nodes.
CrdsData::ContactInfo(node)
if node.shred_version == 0 || node.shred_version == self_shred_version =>
{
Some(msg)
}
_ => {
stats.skip_pull_shred_version.add_relaxed(1);
None
}
},
Protocol::PullResponse(from, values) => {
filter_values(from, values, &stats.skip_pull_response_shred_version);
if values.is_empty() {
None
} else {
Some(msg)
}
}
Protocol::PushMessage(from, values) => {
filter_values(from, values, &stats.skip_push_message_shred_version);
if values.is_empty() {
None
} else {
Some(msg)
}
}
Protocol::PruneMessage(_, _) | Protocol::PingMessage(_) | Protocol::PongMessage(_) => {
Some(msg)
}
}
}

#[cfg(test)]
mod tests {
use {
Expand Down Expand Up @@ -3297,59 +3306,6 @@ mod tests {
vec![entrypoint_crdsvalue]
}

#[test]
fn test_filter_shred_version() {
let from = solana_sdk::pubkey::new_rand();
let my_shred_version = 1;
let other_shred_version = 1;

// Allow same shred_version
let mut values = test_crds_values(from);
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

// Allow shred_version=0.
let other_shred_version = 0;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

let snapshot_hash_data = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash {
from: solana_sdk::pubkey::new_rand(),
hashes: vec![],
wallclock: 0,
}));
values.push(snapshot_hash_data);
// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
}

#[test]
fn test_max_snapshot_hashes_with_push_messages() {
let mut rng = rand::thread_rng();
Expand Down