Skip to content

Commit

Permalink
Add RestartHeaviestFork to Gossip (solana-labs#34161)
Browse files Browse the repository at this point in the history
* Add RestartHeaviestFork to Gossip.

* Add a test for out of bound value.

* Send observed_stake and total_epoch_stake in ResatartHeaviestFork.

* Remove total_epoch_stake from RestartHeaviestFork.

* Forgot to update ABI digest.

* Remove checking of whether stake is zero.

* Remove unnecessary new function and make new_rand pub(crate).
  • Loading branch information
wen-coding authored Jan 19, 2024
1 parent 3eb06b4 commit 4a2871f
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 6 deletions.
3 changes: 2 additions & 1 deletion gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "7a2P1GeQjyqCHMyBrhNPTKfPfG4iv32vki7XHahoN55z")]
#[frozen_abi(digest = "ogEqvffeEkPpojAaSiUbCv2HdJcdXDQ1ykgYyvKvLo2")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down Expand Up @@ -395,6 +395,7 @@ fn retain_staked(values: &mut Vec<CrdsValue>, stakes: &HashMap<Pubkey, u64>) {
CrdsData::LowestSlot(_, _)
| CrdsData::LegacyVersion(_)
| CrdsData::DuplicateShred(_, _)
| CrdsData::RestartHeaviestFork(_)
| CrdsData::RestartLastVotedForkSlots(_) => {
let stake = stakes.get(&value.pubkey()).copied();
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
Expand Down
4 changes: 4 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.counts[12],
i64
),
("RestartHeaviestFork-push", crds_stats.push.counts[13], i64),
("RestartHeaviestFork-pull", crds_stats.pull.counts[13], i64),
(
"all-push",
crds_stats.push.counts.iter().sum::<usize>(),
Expand Down Expand Up @@ -684,6 +686,8 @@ pub(crate) fn submit_gossip_stats(
crds_stats.pull.fails[12],
i64
),
("RestartHeaviestFork-push", crds_stats.push.fails[13], i64),
("RestartHeaviestFork-pull", crds_stats.pull.fails[13], i64),
("all-push", crds_stats.push.fails.iter().sum::<usize>(), i64),
("all-pull", crds_stats.pull.fails.iter().sum::<usize>(), i64),
);
Expand Down
3 changes: 2 additions & 1 deletion gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum GossipRoute<'a> {
PushMessage(/*from:*/ &'a Pubkey),
}

type CrdsCountsArray = [usize; 13];
type CrdsCountsArray = [usize; 14];

pub(crate) struct CrdsDataStats {
pub(crate) counts: CrdsCountsArray,
Expand Down Expand Up @@ -722,6 +722,7 @@ impl CrdsDataStats {
CrdsData::SnapshotHashes(_) => 10,
CrdsData::ContactInfo(_) => 11,
CrdsData::RestartLastVotedForkSlots(_) => 12,
CrdsData::RestartHeaviestFork(_) => 13,
// Update CrdsCountsArray if new items are added here.
}
}
Expand Down
15 changes: 13 additions & 2 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
duplicate_shred::{DuplicateShred, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
epoch_slots::EpochSlots,
legacy_contact_info::LegacyContactInfo,
restart_crds_values::RestartLastVotedForkSlots,
restart_crds_values::{RestartHeaviestFork, RestartLastVotedForkSlots},
},
bincode::{serialize, serialized_size},
rand::{CryptoRng, Rng},
Expand Down Expand Up @@ -96,6 +96,7 @@ pub enum CrdsData {
SnapshotHashes(SnapshotHashes),
ContactInfo(ContactInfo),
RestartLastVotedForkSlots(RestartLastVotedForkSlots),
RestartHeaviestFork(RestartHeaviestFork),
}

impl Sanitize for CrdsData {
Expand Down Expand Up @@ -135,6 +136,7 @@ impl Sanitize for CrdsData {
CrdsData::SnapshotHashes(val) => val.sanitize(),
CrdsData::ContactInfo(node) => node.sanitize(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.sanitize(),
CrdsData::RestartHeaviestFork(fork) => fork.sanitize(),
}
}
}
Expand All @@ -148,7 +150,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0..8);
let kind = rng.gen_range(0..9);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
Expand All @@ -163,6 +165,7 @@ impl CrdsData {
6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
rng, pubkey,
)),
7 => CrdsData::RestartHeaviestFork(RestartHeaviestFork::new_rand(rng, pubkey)),
_ => CrdsData::EpochSlots(
rng.gen_range(0..MAX_EPOCH_SLOTS),
EpochSlots::new_rand(rng, pubkey),
Expand Down Expand Up @@ -508,6 +511,7 @@ pub enum CrdsValueLabel {
SnapshotHashes(Pubkey),
ContactInfo(Pubkey),
RestartLastVotedForkSlots(Pubkey),
RestartHeaviestFork(Pubkey),
}

impl fmt::Display for CrdsValueLabel {
Expand All @@ -534,6 +538,9 @@ impl fmt::Display for CrdsValueLabel {
CrdsValueLabel::RestartLastVotedForkSlots(_) => {
write!(f, "RestartLastVotedForkSlots({})", self.pubkey())
}
CrdsValueLabel::RestartHeaviestFork(_) => {
write!(f, "RestartHeaviestFork({})", self.pubkey())
}
}
}
}
Expand All @@ -554,6 +561,7 @@ impl CrdsValueLabel {
CrdsValueLabel::SnapshotHashes(p) => *p,
CrdsValueLabel::ContactInfo(pubkey) => *pubkey,
CrdsValueLabel::RestartLastVotedForkSlots(p) => *p,
CrdsValueLabel::RestartHeaviestFork(p) => *p,
}
}
}
Expand Down Expand Up @@ -605,6 +613,7 @@ impl CrdsValue {
CrdsData::SnapshotHashes(hash) => hash.wallclock,
CrdsData::ContactInfo(node) => node.wallclock(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.wallclock,
CrdsData::RestartHeaviestFork(fork) => fork.wallclock,
}
}
pub fn pubkey(&self) -> Pubkey {
Expand All @@ -622,6 +631,7 @@ impl CrdsValue {
CrdsData::SnapshotHashes(hash) => hash.from,
CrdsData::ContactInfo(node) => *node.pubkey(),
CrdsData::RestartLastVotedForkSlots(slots) => slots.from,
CrdsData::RestartHeaviestFork(fork) => fork.from,
}
}
pub fn label(&self) -> CrdsValueLabel {
Expand All @@ -643,6 +653,7 @@ impl CrdsValue {
CrdsData::RestartLastVotedForkSlots(_) => {
CrdsValueLabel::RestartLastVotedForkSlots(self.pubkey())
}
CrdsData::RestartHeaviestFork(_) => CrdsValueLabel::RestartHeaviestFork(self.pubkey()),
}
}
pub fn contact_info(&self) -> Option<&LegacyContactInfo> {
Expand Down
54 changes: 52 additions & 2 deletions gossip/src/restart_crds_values.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::crds_value::new_rand_timestamp,
crate::crds_value::{new_rand_timestamp, sanitize_wallclock},
bv::BitVec,
itertools::Itertools,
rand::Rng,
Expand Down Expand Up @@ -29,6 +29,16 @@ pub enum RestartLastVotedForkSlotsError {
LastVotedForkEmpty,
}

#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, AbiExample, Debug)]
pub struct RestartHeaviestFork {
pub from: Pubkey,
pub wallclock: u64,
pub last_slot: Slot,
pub last_slot_hash: Hash,
pub observed_stake: u64,
pub shred_version: u16,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, AbiExample, AbiEnumVisitor)]
enum SlotsOffsets {
RunLengthEncoding(RunLengthEncoding),
Expand All @@ -48,6 +58,7 @@ struct RawOffsets(BitVec<u8>);

impl Sanitize for RestartLastVotedForkSlots {
fn sanitize(&self) -> std::result::Result<(), SanitizeError> {
sanitize_wallclock(self.wallclock)?;
self.last_voted_hash.sanitize()
}
}
Expand Down Expand Up @@ -94,7 +105,7 @@ impl RestartLastVotedForkSlots {
}

/// New random Version for tests and benchmarks.
pub fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
pub(crate) fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
let pubkey = pubkey.unwrap_or_else(solana_sdk::pubkey::new_rand);
let num_slots = rng.gen_range(2..20);
let slots = std::iter::repeat_with(|| 47825632 + rng.gen_range(0..512))
Expand Down Expand Up @@ -122,6 +133,27 @@ impl RestartLastVotedForkSlots {
}
}

impl Sanitize for RestartHeaviestFork {
fn sanitize(&self) -> Result<(), SanitizeError> {
sanitize_wallclock(self.wallclock)?;
self.last_slot_hash.sanitize()
}
}

impl RestartHeaviestFork {
pub(crate) fn new_rand<R: Rng>(rng: &mut R, from: Option<Pubkey>) -> Self {
let from = from.unwrap_or_else(solana_sdk::pubkey::new_rand);
Self {
from,
wallclock: new_rand_timestamp(rng),
last_slot: rng.gen_range(0..1000),
last_slot_hash: Hash::new_unique(),
observed_stake: rng.gen_range(1..u64::MAX),
shred_version: 1,
}
}
}

impl RunLengthEncoding {
fn new(bits: &BitVec<u8>) -> Self {
let encoded = (0..bits.len())
Expand Down Expand Up @@ -317,4 +349,22 @@ mod test {
let range: Vec<Slot> = make_rand_slots(&mut rng).take(large_length).collect();
check_run_length_encoding(range);
}

#[test]
fn test_restart_heaviest_fork() {
let keypair = Keypair::new();
let slot = 53;
let mut fork = RestartHeaviestFork {
from: keypair.pubkey(),
wallclock: timestamp(),
last_slot: slot,
last_slot_hash: Hash::default(),
observed_stake: 800_000,
shred_version: 1,
};
assert_eq!(fork.sanitize(), Ok(()));
assert_eq!(fork.observed_stake, 800_000);
fork.wallclock = crate::crds_value::MAX_WALLCLOCK;
assert_eq!(fork.sanitize(), Err(SanitizeError::ValueOutOfBounds));
}
}

0 comments on commit 4a2871f

Please sign in to comment.