Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
limits number of unique pubkeys in the crds table (bp #15539) (#16475)
Browse files Browse the repository at this point in the history
* limits number of unique pubkeys in the crds table (#15539)

(cherry picked from commit 56923c9)

# Conflicts:
#	core/src/cluster_info.rs

* removes backport merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
  • Loading branch information
mergify[bot] and behzadnouri authored Apr 12, 2021
1 parent 127e740 commit 4276591
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 47 deletions.
91 changes: 47 additions & 44 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ struct GossipStats {
prune_message_len: Counter,
pull_request_ping_pong_check_failed_count: Counter,
purge: Counter,
trim_crds_table_failed: Counter,
trim_crds_table_purged_values_count: Counter,
epoch_slots_lookup: Counter,
new_pull_requests: Counter,
new_pull_requests_count: Counter,
Expand Down Expand Up @@ -605,16 +607,6 @@ impl ClusterInfo {
self.contact_debug_interval = new;
}

pub fn update_contact_info<F>(&self, modify: F)
where
F: FnOnce(&mut ContactInfo),
{
let my_id = self.id();
modify(&mut self.my_contact_info.write().unwrap());
assert_eq!(self.my_contact_info.read().unwrap().id, my_id);
self.insert_self()
}

fn push_self(
&self,
stakes: &HashMap<Pubkey, u64>,
Expand Down Expand Up @@ -1733,6 +1725,7 @@ impl ClusterInfo {
stakes: &HashMap<Pubkey, u64>,
generate_pull_requests: bool,
) -> Vec<(SocketAddr, Protocol)> {
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes);
let mut pulls: Vec<_> = if generate_pull_requests {
self.new_pull_requests(&thread_pool, gossip_validators, stakes)
} else {
Expand Down Expand Up @@ -1846,6 +1839,39 @@ impl ClusterInfo {
inc_new_counter_info!("cluster_info-purge-count", num_purged);
}

// Trims the CRDS table by dropping all values associated with the pubkeys
// with the lowest stake, so that the number of unique pubkeys are bounded.
fn trim_crds_table(&self, cap: usize, stakes: &HashMap<Pubkey, u64>) {
if !self.gossip.read().unwrap().crds.should_trim(cap) {
return;
}
let keep: Vec<_> = self
.entrypoints
.read()
.unwrap()
.iter()
.map(|k| k.id)
.chain(std::iter::once(self.id))
.collect();
let mut gossip = self.gossip.write().unwrap();
match gossip.crds.trim(cap, &keep, stakes) {
Err(err) => {
self.stats.trim_crds_table_failed.add_relaxed(1);
error!("crds table trim failed: {:?}", err);
}
Ok(purged_values) => {
self.stats
.trim_crds_table_purged_values_count
.add_relaxed(purged_values.len() as u64);
gossip.pull.purged_values.extend(
purged_values
.into_iter()
.map(|v| (v.value_hash, v.local_timestamp)),
);
}
}
}

/// randomly pick a node and ask them for updates asynchronously
pub fn gossip(
self: Arc<Self>,
Expand Down Expand Up @@ -2670,6 +2696,7 @@ impl ClusterInfo {
response_sender,
);
self.handle_batch_pull_responses(pull_responses, thread_pool, &stakes, epoch_time_ms);
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, &stakes);
self.handle_batch_pong_messages(pong_messages, Instant::now());
self.handle_batch_pull_requests(
pull_requests,
Expand Down Expand Up @@ -3017,6 +3044,16 @@ impl ClusterInfo {
self.stats.packets_sent_push_messages_count.clear(),
i64
),
(
"trim_crds_table_failed",
self.stats.trim_crds_table_failed.clear(),
i64
),
(
"trim_crds_table_purged_values_count",
self.stats.trim_crds_table_purged_values_count.clear(),
i64
),
);

*last_print = Instant::now();
Expand Down Expand Up @@ -3726,40 +3763,6 @@ mod tests {
.lookup(&label)
.is_some());
}
#[test]
#[should_panic]
fn test_update_contact_info() {
let d = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), timestamp());
let cluster_info = ClusterInfo::new_with_invalid_keypair(d);
let entry_label = CrdsValueLabel::ContactInfo(cluster_info.id());
assert!(cluster_info
.gossip
.read()
.unwrap()
.crds
.lookup(&entry_label)
.is_some());

let now = timestamp();
cluster_info.update_contact_info(|ci| ci.wallclock = now);
assert_eq!(
cluster_info
.gossip
.read()
.unwrap()
.crds
.lookup(&entry_label)
.unwrap()
.contact_info()
.unwrap()
.wallclock,
now
);

// Inserting Contactinfo with different pubkey should panic,
// and update should fail
cluster_info.update_contact_info(|ci| ci.id = solana_sdk::pubkey::new_rand())
}

fn assert_in_range(x: u16, range: (u16, u16)) {
assert!(x >= range.0);
Expand Down
7 changes: 4 additions & 3 deletions core/src/cluster_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ impl ClusterSlots {
{
let mut cluster_slots = self.cluster_slots.write().unwrap();
*cluster_slots = cluster_slots.split_off(&(root + 1));
// Trimming is done at 2x size so that amortized it has a constant
// cost. The slots furthest away from the root are discarded.
if cluster_slots.len() > 2 * CLUSTER_SLOTS_TRIM_SIZE {
// Allow 10% overshoot so that the computation cost is amortized
// down. The slots furthest away from the root are discarded.
if 10 * cluster_slots.len() > 11 * CLUSTER_SLOTS_TRIM_SIZE {
warn!("trimming cluster slots");
let key = *cluster_slots.keys().nth(CLUSTER_SLOTS_TRIM_SIZE).unwrap();
cluster_slots.split_off(&key);
}
Expand Down
108 changes: 108 additions & 0 deletions core/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct Crds {
#[derive(PartialEq, Debug)]
pub enum CrdsError {
InsertFailed,
UnknownStakes,
}

/// This structure stores some local metadata associated with the CrdsValue
Expand Down Expand Up @@ -430,6 +431,62 @@ impl Crds {
}
Some(value)
}

/// Returns true if the number of unique pubkeys in the table exceeds the
/// given capacity (plus some margin).
/// Allows skipping unnecessary calls to trim without obtaining a write
/// lock on gossip.
pub(crate) fn should_trim(&self, cap: usize) -> bool {
// Allow 10% overshoot so that the computation cost is amortized down.
10 * self.records.len() > 11 * cap
}

/// Trims the table by dropping all values associated with the pubkeys with
/// the lowest stake, so that the number of unique pubkeys are bounded.
pub(crate) fn trim(
&mut self,
cap: usize, // Capacity hint for number of unique pubkeys.
// Set of pubkeys to never drop.
// e.g. trusted validators, self pubkey, ...
keep: &[Pubkey],
stakes: &HashMap<Pubkey, u64>,
) -> Result<Vec<VersionedCrdsValue>, CrdsError> {
if self.should_trim(cap) {
let size = self.records.len().saturating_sub(cap);
self.drop(size, keep, stakes)
} else {
Ok(Vec::default())
}
}

// Drops 'size' many pubkeys with the lowest stake.
fn drop(
&mut self,
size: usize,
keep: &[Pubkey],
stakes: &HashMap<Pubkey, u64>,
) -> Result<Vec<VersionedCrdsValue>, CrdsError> {
if stakes.is_empty() {
return Err(CrdsError::UnknownStakes);
}
let mut keys: Vec<_> = self
.records
.keys()
.map(|k| (stakes.get(k).copied().unwrap_or_default(), *k))
.collect();
if size < keys.len() {
keys.select_nth_unstable(size);
}
let keys: Vec<_> = keys
.into_iter()
.take(size)
.map(|(_, k)| k)
.filter(|k| !keep.contains(k))
.flat_map(|k| &self.records[&k])
.map(|k| self.table.get_index(*k).unwrap().0.clone())
.collect();
Ok(keys.iter().map(|k| self.remove(k).unwrap()).collect())
}
}

#[cfg(test)]
Expand All @@ -438,6 +495,7 @@ mod test {
use crate::{contact_info::ContactInfo, crds_value::NodeInstance};
use rand::{thread_rng, Rng};
use rayon::ThreadPoolBuilder;
use solana_sdk::signature::Signer;
use std::{collections::HashSet, iter::repeat_with};

#[test]
Expand Down Expand Up @@ -820,6 +878,56 @@ mod test {
assert!(crds.records.is_empty());
}

#[test]
fn test_drop() {
fn num_unique_pubkeys<'a, I>(values: I) -> usize
where
I: IntoIterator<Item = &'a VersionedCrdsValue>,
{
values
.into_iter()
.map(|v| v.value.pubkey())
.collect::<HashSet<_>>()
.len()
}
let mut rng = thread_rng();
let keypairs: Vec<_> = repeat_with(Keypair::new).take(64).collect();
let stakes = keypairs
.iter()
.map(|k| (k.pubkey(), rng.gen_range(0, 1000)))
.collect();
let mut crds = Crds::default();
for _ in 0..2048 {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
let _ = crds.insert_versioned(value);
}
let num_values = crds.table.len();
let num_pubkeys = num_unique_pubkeys(crds.table.values());
assert!(!crds.should_trim(num_pubkeys));
assert!(crds.should_trim(num_pubkeys * 5 / 6));
let purged = crds.drop(16, &[], &stakes).unwrap();
assert_eq!(purged.len() + crds.table.len(), num_values);
assert_eq!(num_unique_pubkeys(&purged), 16);
assert_eq!(num_unique_pubkeys(crds.table.values()), num_pubkeys - 16);
let attach_stake = |v: &VersionedCrdsValue| {
let pk = v.value.pubkey();
(stakes[&pk], pk)
};
assert!(
purged.iter().map(attach_stake).max().unwrap()
< crds.table.values().map(attach_stake).min().unwrap()
);
let purged = purged
.into_iter()
.map(|v| v.value.pubkey())
.collect::<HashSet<_>>();
for (k, v) in crds.table {
assert!(!purged.contains(&k.pubkey()));
assert!(!purged.contains(&v.value.pubkey()));
}
}

#[test]
fn test_remove_staked() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
Expand Down

0 comments on commit 4276591

Please sign in to comment.