Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
removes delayed crds inserts when upserting gossip table (#16806) (#1…
Browse files Browse the repository at this point in the history
…6905)

It is crucial that VersionedCrdsValue::insert_timestamp does not go
backward in time:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L67-L79

Otherwise methods such as get_votes and get_epoch_slots_since will
break, which will break their downstream flow, including vote-listener
and optimistic confirmation:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1197-L1215
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L1274-L1298

For that, Crds::new_versioned is intended to be called "atomically" with
Crds::insert_verioned (as the comment already says so):
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/crds.rs#L126-L129

However, currently this is violated in the code. For example,
filter_pull_responses creates VersionedCrdsValues (with the current
timestamp), then acquires an exclusive lock on gossip, then
process_pull_responses writes those values to the crds table:
https://github.com/solana-labs/solana/blob/ec37a843a/core/src/cluster_info.rs#L2375-L2392

Depending on the workload and lock contention, the insert_timestamps may
well be in the past when these values finally are inserted into gossip.

To avoid such scenarios, this commit:
  * removes Crds::new_versioned and Crd::insert_versioned.
  * makes VersionedCrdsValue constructor private, only invoked in
    Crds::insert, so that insert_timestamp is populated right before
    insert.

This will improve insert_timestamp monotonicity as long as Crds::insert
is not called with a stalled timestamp. Following commits may further
improve this by calling timestamp() inside Crds::insert, and/or
switching to std::time::Instant which guarantees monotonicity.

(cherry picked from commit 1ac2a8c)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
  • Loading branch information
mergify[bot] and behzadnouri authored Apr 28, 2021
1 parent ed8c796 commit d8e8528
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 170 deletions.
24 changes: 14 additions & 10 deletions core/benches/crds_shards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,34 @@
extern crate test;

use rand::{thread_rng, Rng};
use solana_core::contact_info::ContactInfo;
use solana_core::crds::VersionedCrdsValue;
use solana_core::crds_shards::CrdsShards;
use solana_core::crds_value::{CrdsData, CrdsValue};
use solana_sdk::pubkey;
use solana_core::{
crds::{Crds, VersionedCrdsValue},
crds_shards::CrdsShards,
crds_value::CrdsValue,
};
use solana_sdk::timing::timestamp;
use std::iter::repeat_with;
use test::Bencher;

const CRDS_SHARDS_BITS: u32 = 8;

fn new_test_crds_value() -> VersionedCrdsValue {
let data = CrdsData::ContactInfo(ContactInfo::new_localhost(&pubkey::new_rand(), timestamp()));
VersionedCrdsValue::new(timestamp(), CrdsValue::new_unsigned(data))
fn new_test_crds_value<R: Rng>(rng: &mut R) -> VersionedCrdsValue {
let value = CrdsValue::new_rand(rng, None);
let label = value.label();
let mut crds = Crds::default();
crds.insert(value, timestamp()).unwrap();
crds.remove(&label).unwrap()
}

fn bench_crds_shards_find(bencher: &mut Bencher, num_values: usize, mask_bits: u32) {
let values: Vec<VersionedCrdsValue> = std::iter::repeat_with(new_test_crds_value)
let mut rng = thread_rng();
let values: Vec<_> = repeat_with(|| new_test_crds_value(&mut rng))
.take(num_values)
.collect();
let mut shards = CrdsShards::new(CRDS_SHARDS_BITS);
for (index, value) in values.iter().enumerate() {
assert!(shards.insert(index, value));
}
let mut rng = thread_rng();
bencher.iter(|| {
let mask = rng.gen();
let _hits = shards.find(mask, mask_bits).count();
Expand Down
190 changes: 89 additions & 101 deletions core/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ use indexmap::set::IndexSet;
use rayon::{prelude::*, ThreadPool};
use solana_sdk::hash::{hash, Hash};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::timing::timestamp;
use std::cmp;
use std::collections::{hash_map, BTreeSet, HashMap};
use std::ops::{Bound, Index, IndexMut};
use std::{
cmp::Ordering,
collections::{hash_map, BTreeSet, HashMap},
ops::{Bound, Index, IndexMut},
};

const CRDS_SHARDS_BITS: u32 = 8;
// Limit number of crds values associated with each unique pubkey. This
Expand All @@ -60,7 +60,9 @@ pub struct Crds {

#[derive(PartialEq, Debug)]
pub enum CrdsError {
InsertFailed,
// Hash of the crds value which failed to insert should be recorded in
// failed_inserts to be excluded from the next pull-request.
InsertFailed(Hash),
UnknownStakes,
}

Expand All @@ -71,26 +73,15 @@ pub enum CrdsError {
pub struct VersionedCrdsValue {
pub value: CrdsValue,
/// local time when inserted
pub insert_timestamp: u64,
pub(crate) insert_timestamp: u64,
/// local time when updated
pub(crate) local_timestamp: u64,
/// value hash
pub value_hash: Hash,
pub(crate) value_hash: Hash,
}

impl PartialOrd for VersionedCrdsValue {
fn partial_cmp(&self, other: &VersionedCrdsValue) -> Option<cmp::Ordering> {
if self.value.label() != other.value.label() {
None
} else if self.value.wallclock() == other.value.wallclock() {
Some(self.value_hash.cmp(&other.value_hash))
} else {
Some(self.value.wallclock().cmp(&other.value.wallclock()))
}
}
}
impl VersionedCrdsValue {
pub fn new(local_timestamp: u64, value: CrdsValue) -> Self {
fn new(local_timestamp: u64, value: CrdsValue) -> Self {
let value_hash = hash(&serialize(&value).unwrap());
VersionedCrdsValue {
value,
Expand All @@ -99,13 +90,6 @@ impl VersionedCrdsValue {
value_hash,
}
}

/// New random VersionedCrdsValue for tests and simulations.
pub fn new_rand<R: rand::Rng>(rng: &mut R, keypair: Option<&Keypair>) -> Self {
let delay = 10 * 60 * 1000; // 10 minutes
let now = timestamp() - delay + rng.gen_range(0, 2 * delay);
Self::new(now, CrdsValue::new_rand(rng, keypair))
}
}

impl Default for Crds {
Expand All @@ -122,34 +106,46 @@ impl Default for Crds {
}
}

// Returns true if the first value updates the 2nd one.
// Both values should have the same key/label.
fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool {
assert_eq!(value.label(), other.value.label(), "labels mismatch!");
match value.wallclock().cmp(&other.value.wallclock()) {
Ordering::Less => false,
Ordering::Greater => true,
// Ties should be broken in a deterministic way across the cluster.
// For backward compatibility this is done by comparing hash of
// serialized values.
Ordering::Equal => {
let value_hash = hash(&serialize(&value).unwrap());
other.value_hash < value_hash
}
}
}

impl Crds {
/// must be called atomically with `insert_versioned`
pub fn new_versioned(&self, local_timestamp: u64, value: CrdsValue) -> VersionedCrdsValue {
VersionedCrdsValue::new(local_timestamp, value)
/// Returns true if the given value updates an existing one in the table.
/// The value is outdated and fails to insert, if it already exists in the
/// table with a more recent wallclock.
pub(crate) fn upserts(&self, value: &CrdsValue) -> bool {
match self.table.get(&value.label()) {
Some(other) => overrides(value, other),
None => true,
}
}
pub fn would_insert(
&self,

pub fn insert(
&mut self,
value: CrdsValue,
local_timestamp: u64,
) -> (bool, VersionedCrdsValue) {
let new_value = self.new_versioned(local_timestamp, value);
let label = new_value.value.label();
// New value is outdated and fails to insert, if it already exists in
// the table with a more recent wallclock.
let outdated = matches!(self.table.get(&label), Some(current) if new_value <= *current);
(!outdated, new_value)
}
/// insert the new value, returns the old value if insert succeeds
pub fn insert_versioned(
&mut self,
new_value: VersionedCrdsValue,
) -> Result<Option<VersionedCrdsValue>, CrdsError> {
let label = new_value.value.label();
let label = value.label();
let value = VersionedCrdsValue::new(local_timestamp, value);
match self.table.entry(label) {
Entry::Vacant(entry) => {
let entry_index = entry.index();
self.shards.insert(entry_index, &new_value);
match new_value.value.data {
self.shards.insert(entry_index, &value);
match value.value.data {
CrdsData::ContactInfo(_) => {
self.nodes.insert(entry_index);
}
Expand All @@ -158,52 +154,45 @@ impl Crds {
}
CrdsData::EpochSlots(_, _) => {
self.epoch_slots
.insert((new_value.insert_timestamp, entry_index));
.insert((value.insert_timestamp, entry_index));
}
_ => (),
};
self.records
.entry(new_value.value.pubkey())
.entry(value.value.pubkey())
.or_default()
.insert(entry_index);
entry.insert(new_value);
entry.insert(value);
self.num_inserts += 1;
Ok(None)
}
Entry::Occupied(mut entry) if *entry.get() < new_value => {
Entry::Occupied(mut entry) if overrides(&value.value, entry.get()) => {
let entry_index = entry.index();
self.shards.remove(entry_index, entry.get());
self.shards.insert(entry_index, &new_value);
if let CrdsData::EpochSlots(_, _) = new_value.value.data {
self.shards.insert(entry_index, &value);
if let CrdsData::EpochSlots(_, _) = value.value.data {
self.epoch_slots
.remove(&(entry.get().insert_timestamp, entry_index));
self.epoch_slots
.insert((new_value.insert_timestamp, entry_index));
.insert((value.insert_timestamp, entry_index));
}
self.num_inserts += 1;
// As long as the pubkey does not change, self.records
// does not need to be updated.
debug_assert_eq!(entry.get().value.pubkey(), new_value.value.pubkey());
Ok(Some(entry.insert(new_value)))
debug_assert_eq!(entry.get().value.pubkey(), value.value.pubkey());
Ok(Some(entry.insert(value)))
}
_ => {
trace!(
"INSERT FAILED data: {} new.wallclock: {}",
new_value.value.label(),
new_value.value.wallclock(),
value.value.label(),
value.value.wallclock(),
);
Err(CrdsError::InsertFailed)
Err(CrdsError::InsertFailed(value.value_hash))
}
}
}
pub fn insert(
&mut self,
value: CrdsValue,
local_timestamp: u64,
) -> Result<Option<VersionedCrdsValue>, CrdsError> {
let new_value = self.new_versioned(local_timestamp, value);
self.insert_versioned(new_value)
}

pub fn lookup(&self, label: &CrdsValueLabel) -> Option<&CrdsValue> {
self.table.get(label).map(|x| &x.value)
}
Expand Down Expand Up @@ -504,10 +493,13 @@ impl Crds {
#[cfg(test)]
mod test {
use super::*;
use crate::{contact_info::ContactInfo, crds_value::NodeInstance};
use crate::{
contact_info::ContactInfo,
crds_value::{new_rand_timestamp, NodeInstance},
};
use rand::{thread_rng, Rng};
use rayon::ThreadPoolBuilder;
use solana_sdk::signature::Signer;
use solana_sdk::signature::{Keypair, Signer};
use std::{collections::HashSet, iter::repeat_with};

#[test]
Expand All @@ -523,8 +515,12 @@ mod test {
fn test_update_old() {
let mut crds = Crds::default();
let val = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::default()));
let value_hash = hash(&serialize(&val).unwrap());
assert_eq!(crds.insert(val.clone(), 0), Ok(None));
assert_eq!(crds.insert(val.clone(), 1), Err(CrdsError::InsertFailed));
assert_eq!(
crds.insert(val.clone(), 1),
Err(CrdsError::InsertFailed(value_hash))
);
assert_eq!(crds.table[&val.label()].local_timestamp, 0);
}
#[test]
Expand Down Expand Up @@ -718,8 +714,9 @@ mod test {
let mut num_overrides = 0;
for _ in 0..4096 {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
match crds.insert_versioned(value) {
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
match crds.insert(value, local_timestamp) {
Ok(None) => {
num_inserts += 1;
check_crds_shards(&crds);
Expand Down Expand Up @@ -811,8 +808,9 @@ mod test {
let mut num_overrides = 0;
for k in 0..4096 {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
match crds.insert_versioned(value) {
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
match crds.insert(value, local_timestamp) {
Ok(None) => {
num_inserts += 1;
}
Expand Down Expand Up @@ -870,8 +868,9 @@ mod test {
let mut crds = Crds::default();
for k in 0..4096 {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
let _ = crds.insert_versioned(value);
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
let _ = crds.insert(value, local_timestamp);
if k % 64 == 0 {
check_crds_records(&crds);
}
Expand Down Expand Up @@ -911,8 +910,9 @@ mod test {
let mut crds = Crds::default();
for _ in 0..2048 {
let keypair = &keypairs[rng.gen_range(0, keypairs.len())];
let value = VersionedCrdsValue::new_rand(&mut rng, Some(keypair));
let _ = crds.insert_versioned(value);
let value = CrdsValue::new_rand(&mut rng, Some(keypair));
let local_timestamp = new_rand_timestamp(&mut rng);
let _ = crds.insert(value, local_timestamp);
}
let num_values = crds.table.len();
let num_pubkeys = num_unique_pubkeys(crds.table.values());
Expand Down Expand Up @@ -967,8 +967,8 @@ mod test {
let v2 = VersionedCrdsValue::new(1, val);
assert_eq!(v1, v2);
assert!(!(v1 != v2));
assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Equal));
assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Equal));
assert!(!overrides(&v1.value, &v2));
assert!(!overrides(&v2.value, &v1));
}
#[test]
#[allow(clippy::neg_cmp_op_on_partial_ord)]
Expand All @@ -991,18 +991,12 @@ mod test {
assert_ne!(v1.value_hash, v2.value_hash);
assert!(v1 != v2);
assert!(!(v1 == v2));
if v1 > v2 {
assert!(v1 > v2);
assert!(v2 < v1);
assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Greater));
assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Less));
} else if v2 > v1 {
assert!(v1 < v2);
assert!(v2 > v1);
assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Less));
assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Greater));
if v1.value_hash > v2.value_hash {
assert!(overrides(&v1.value, &v2));
assert!(!overrides(&v2.value, &v1));
} else {
panic!("bad PartialOrd implementation?");
assert!(overrides(&v2.value, &v1));
assert!(!overrides(&v1.value, &v2));
}
}
#[test]
Expand All @@ -1023,14 +1017,13 @@ mod test {
))),
);
assert_eq!(v1.value.label(), v2.value.label());
assert!(v1 > v2);
assert!(!(v1 < v2));
assert!(overrides(&v1.value, &v2));
assert!(!overrides(&v2.value, &v1));
assert!(v1 != v2);
assert!(!(v1 == v2));
assert_eq!(v1.partial_cmp(&v2), Some(cmp::Ordering::Greater));
assert_eq!(v2.partial_cmp(&v1), Some(cmp::Ordering::Less));
}
#[test]
#[should_panic(expected = "labels mismatch!")]
#[allow(clippy::neg_cmp_op_on_partial_ord)]
fn test_label_order() {
let v1 = VersionedCrdsValue::new(
Expand All @@ -1049,11 +1042,6 @@ mod test {
);
assert_ne!(v1, v2);
assert!(!(v1 == v2));
assert!(!(v1 < v2));
assert!(!(v1 > v2));
assert!(!(v2 < v1));
assert!(!(v2 > v1));
assert_eq!(v1.partial_cmp(&v2), None);
assert_eq!(v2.partial_cmp(&v1), None);
assert!(!overrides(&v2.value, &v1));
}
}
Loading

0 comments on commit d8e8528

Please sign in to comment.