Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
filters crds values obtained through gossip by their shred version (#…
Browse files Browse the repository at this point in the history
…18072)

filter_by_shred_version does not check the shred-version of the owner of
the crds-value. It only checks the shred-version of the node which is
relaying the value:
https://github.com/solana-labs/solana/blob/5cc073420/gossip/src/cluster_info.rs#L2274-L2289

So crds-values with different shred versions can still pass through this
function as long as they are relayed by a node with matching shred
version; and so, a single node can bridge different shred values
through-out the cluster.

(cherry picked from commit 69a5f0e)

# Conflicts:
#	gossip/src/cluster_info.rs
  • Loading branch information
behzadnouri authored and mergify-bot committed Jun 23, 2021
1 parent 088b389 commit 5f12952
Showing 1 changed file with 95 additions and 123 deletions.
218 changes: 95 additions & 123 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
crate::{
cluster_info_metrics::{submit_gossip_stats, Counter, GossipStats, ScopedTimer},
contact_info::ContactInfo,
crds::Cursor,
crds::{Crds, Cursor},
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
Expand Down Expand Up @@ -1930,7 +1930,6 @@ impl ClusterInfo {
return;
}
let self_pubkey = self.id();
let self_shred_version = self.my_shred_version();
let requests: Vec<_> = thread_pool.install(|| {
requests
.into_par_iter()
Expand All @@ -1942,17 +1941,7 @@ impl ClusterInfo {
inc_new_counter_debug!("cluster_info-window-request-loopback", 1);
false
}
Some(caller) => {
if self_shred_version != 0
&& caller.shred_version != 0
&& caller.shred_version != self_shred_version
{
self.stats.skip_pull_shred_version.add_relaxed(1);
false
} else {
true
}
}
Some(_) => true,
})
.map(|(from_addr, filter, caller)| PullData {
from_addr,
Expand Down Expand Up @@ -2205,10 +2194,11 @@ impl ClusterInfo {
fn handle_pull_response(
&self,
from: &Pubkey,
mut crds_values: Vec<CrdsValue>,
crds_values: Vec<CrdsValue>,
timeouts: &HashMap<Pubkey, u64>,
) -> (usize, usize, usize) {
let len = crds_values.len();
<<<<<<< HEAD
trace!("PullResponse me: {} from: {} len={}", self.id, from, len);
let shred_version = {
let gossip = self.gossip.read().unwrap();
Expand All @@ -2222,6 +2212,9 @@ impl ClusterInfo {
);
let filtered_len = crds_values.len();

=======
trace!("PullResponse me: {} from: {} len={}", self.id(), from, len);
>>>>>>> 69a5f0e6c (filters crds values obtained through gossip by their shred version (#18072))
let mut pull_stats = ProcessPullStats::default();
let (filtered_pulls, filtered_pulls_expired_timeout, failed_inserts) = self
.time_gossip_read_lock("filter_pull_resp", &self.stats.filter_pull_response)
Expand All @@ -2241,14 +2234,8 @@ impl ClusterInfo {
&mut pull_stats,
);
}

self.stats
.skip_pull_response_shred_version
.add_relaxed((len - filtered_len) as u64);
self.stats.process_pull_response_count.add_relaxed(1);
self.stats
.process_pull_response_len
.add_relaxed(filtered_len as u64);
self.stats.process_pull_response_len.add_relaxed(len as u64);
self.stats
.process_pull_response_timeout
.add_relaxed(pull_stats.timeout_count as u64);
Expand All @@ -2269,23 +2256,6 @@ impl ClusterInfo {
)
}

fn filter_by_shred_version(
from: &Pubkey,
crds_values: &mut Vec<CrdsValue>,
shred_version: u16,
my_shred_version: u16,
) {
// Always run filter on spies
if my_shred_version != 0 && shred_version != my_shred_version {
// Allow someone to update their own ContactInfo so they
// can change shred versions if needed.
crds_values.retain(|crds_value| match &crds_value.data {
CrdsData::ContactInfo(contact_info) => contact_info.id == *from,
_ => false,
});
}
}

fn handle_batch_ping_messages<I>(
&self,
pings: I,
Expand Down Expand Up @@ -2358,41 +2328,10 @@ impl ClusterInfo {
self.stats
.push_message_count
.add_relaxed(messages.len() as u64);
// Obtain shred versions of the origins.
let shred_versions: Vec<_> = {
let gossip = self.gossip.read().unwrap();
messages
.iter()
.map(|(from, _)| gossip.crds.get_shred_version(from).unwrap_or_default())
.collect()
};
// Filter out data if the origin has different shred version.
let self_shred_version = self.my_shred_version();
let num_crds_values: u64 = messages.iter().map(|(_, data)| data.len() as u64).sum();
let messages: Vec<_> = messages
.into_iter()
.zip(shred_versions)
.filter_map(|((from, mut crds_values), shred_version)| {
Self::filter_by_shred_version(
&from,
&mut crds_values,
shred_version,
self_shred_version,
);
if crds_values.is_empty() {
None
} else {
Some((from, crds_values))
}
})
.collect();
let num_filtered_crds_values = messages.iter().map(|(_, data)| data.len() as u64).sum();
self.stats
.push_message_value_count
.add_relaxed(num_filtered_crds_values);
self.stats
.skip_push_message_shred_version
.add_relaxed(num_crds_values - num_filtered_crds_values);
.add_relaxed(num_crds_values);
// Origins' pubkeys of upserted crds values.
let origins: HashSet<_> = {
let mut gossip =
Expand Down Expand Up @@ -2515,6 +2454,28 @@ impl ClusterInfo {
should_check_duplicate_instance: bool,
) -> Result<(), GossipError> {
let _st = ScopedTimer::from(&self.stats.process_gossip_packets_time);
// Filter out values if the shred-versions are different.
let self_shred_version = self.my_shred_version();
let packets = if self_shred_version == 0 {
packets
} else {
let gossip = self.gossip.read().unwrap();
thread_pool.install(|| {
packets
.into_par_iter()
.with_min_len(1024)
.filter_map(|(from, msg)| {
let msg = filter_on_shred_version(
msg,
self_shred_version,
&gossip.crds,
&self.stats,
)?;
Some((from, msg))
})
.collect()
})
};
// Check if there is a duplicate instance of
// this node with more recent timestamp.
let check_duplicate_instance = |values: &[CrdsValue]| {
Expand Down Expand Up @@ -3102,6 +3063,70 @@ pub fn stake_weight_peers(
ClusterInfo::sorted_stakes_with_index(peers, stakes)
}

// Filters out values from nodes with different shred-version.
fn filter_on_shred_version(
mut msg: Protocol,
self_shred_version: u16,
crds: &Crds,
stats: &GossipStats,
) -> Option<Protocol> {
let filter_values = |from: &Pubkey, values: &mut Vec<CrdsValue>, skipped_counter: &Counter| {
let num_values = values.len();
if crds.get_shred_version(from) == Some(self_shred_version) {
// Retain values with the same shred-vesion, or those which are
// contact-info so that shred-versions can be updated.
values.retain(|value| match &value.data {
CrdsData::ContactInfo(_) => true,
_ => crds.get_shred_version(&value.pubkey()) == Some(self_shred_version),
})
} else {
// Only allow node to update its own contact info in case their
// shred-version changes.
values.retain(|value| match &value.data {
CrdsData::ContactInfo(node) => node.id == *from,
_ => false,
})
}
let num_skipped = num_values - values.len();
if num_skipped != 0 {
skipped_counter.add_relaxed(num_skipped as u64);
}
};
match &mut msg {
Protocol::PullRequest(_, caller) => match &caller.data {
// Allow spy nodes with shred-verion == 0 to pull from other nodes.
CrdsData::ContactInfo(node)
if node.shred_version == 0 || node.shred_version == self_shred_version =>
{
Some(msg)
}
_ => {
stats.skip_pull_shred_version.add_relaxed(1);
None
}
},
Protocol::PullResponse(from, values) => {
filter_values(from, values, &stats.skip_pull_response_shred_version);
if values.is_empty() {
None
} else {
Some(msg)
}
}
Protocol::PushMessage(from, values) => {
filter_values(from, values, &stats.skip_push_message_shred_version);
if values.is_empty() {
None
} else {
Some(msg)
}
}
Protocol::PruneMessage(_, _) | Protocol::PingMessage(_) | Protocol::PongMessage(_) => {
Some(msg)
}
}
}

#[cfg(test)]
mod tests {
use {
Expand Down Expand Up @@ -3297,59 +3322,6 @@ mod tests {
vec![entrypoint_crdsvalue]
}

#[test]
fn test_filter_shred_version() {
let from = solana_sdk::pubkey::new_rand();
let my_shred_version = 1;
let other_shred_version = 1;

// Allow same shred_version
let mut values = test_crds_values(from);
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

// Allow shred_version=0.
let other_shred_version = 0;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);

let snapshot_hash_data = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash {
from: solana_sdk::pubkey::new_rand(),
hashes: vec![],
wallclock: 0,
}));
values.push(snapshot_hash_data);
// Change to sender's ContactInfo version, allow that.
let other_shred_version = 2;
ClusterInfo::filter_by_shred_version(
&from,
&mut values,
other_shred_version,
my_shred_version,
);
assert_eq!(values.len(), 1);
}

#[test]
fn test_max_snapshot_hashes_with_push_messages() {
let mut rng = rand::thread_rng();
Expand Down

0 comments on commit 5f12952

Please sign in to comment.