Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Track gossip vote updates per hash for replay stage (#16421) (#16468)
Browse files Browse the repository at this point in the history
* Track gossip vote updates per hash for replay stage

(cherry picked from commit 99b3aab)

Co-authored-by: carllin <carl@solana.com>
  • Loading branch information
mergify[bot] and carllin authored Apr 11, 2021
1 parent 24075ce commit 60fba7b
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 33 deletions.
106 changes: 89 additions & 17 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ use std::{

// Map from a vote account to the authorized voter for an epoch
pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>;
pub type VotedHashUpdates = HashMap<Hash, Vec<Pubkey>>;
pub type VerifiedLabelVotePacketsSender = CrossbeamSender<Vec<(CrdsValueLabel, Slot, Packets)>>;
pub type VerifiedLabelVotePacketsReceiver = CrossbeamReceiver<Vec<(CrdsValueLabel, Slot, Packets)>>;
pub type VerifiedVoteTransactionsSender = CrossbeamSender<Vec<Transaction>>;
pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver<Vec<Transaction>>;
pub type VerifiedVoteSender = CrossbeamSender<(Pubkey, Vec<Slot>)>;
pub type VerifiedVoteReceiver = CrossbeamReceiver<(Pubkey, Vec<Slot>)>;
pub type GossipVerifiedVoteHashSender = CrossbeamSender<(Pubkey, Slot, Hash)>;
pub type GossipVerifiedVoteHashReceiver = CrossbeamReceiver<(Pubkey, Slot, Hash)>;
pub type GossipDuplicateConfirmedSlotsSender = CrossbeamSender<ThresholdConfirmedSlots>;
pub type GossipDuplicateConfirmedSlotsReceiver = CrossbeamReceiver<ThresholdConfirmedSlots>;

Expand All @@ -65,14 +68,13 @@ pub struct SlotVoteTracker {
// True if seen on gossip, false if only seen in replay.
voted: HashMap<Pubkey, bool>,
optimistic_votes_tracker: HashMap<Hash, VoteStakeTracker>,
updates: Option<Vec<Pubkey>>,
voted_slot_updates: Option<Vec<Pubkey>>,
gossip_only_stake: u64,
}

impl SlotVoteTracker {
#[allow(dead_code)]
pub fn get_updates(&mut self) -> Option<Vec<Pubkey>> {
self.updates.take()
pub fn get_voted_slot_updates(&mut self) -> Option<Vec<Pubkey>> {
self.voted_slot_updates.take()
}

pub fn get_or_insert_optimistic_votes_tracker(&mut self, hash: Hash) -> &mut VoteStakeTracker {
Expand Down Expand Up @@ -119,7 +121,7 @@ impl VoteTracker {
let new_slot_tracker = Arc::new(RwLock::new(SlotVoteTracker {
voted: HashMap::new(),
optimistic_votes_tracker: HashMap::default(),
updates: None,
voted_slot_updates: None,
gossip_only_stake: 0,
}));
self.slot_vote_trackers
Expand Down Expand Up @@ -170,10 +172,10 @@ impl VoteTracker {
let mut w_slot_vote_tracker = slot_vote_tracker.write().unwrap();

w_slot_vote_tracker.voted.insert(pubkey, true);
if let Some(ref mut updates) = w_slot_vote_tracker.updates {
updates.push(pubkey)
if let Some(ref mut voted_slot_updates) = w_slot_vote_tracker.voted_slot_updates {
voted_slot_updates.push(pubkey)
} else {
w_slot_vote_tracker.updates = Some(vec![pubkey]);
w_slot_vote_tracker.voted_slot_updates = Some(vec![pubkey]);
}
}

Expand Down Expand Up @@ -249,6 +251,7 @@ impl ClusterInfoVoteListener {
bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>,
verified_vote_sender: VerifiedVoteSender,
gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
replay_votes_receiver: ReplayVoteReceiver,
blockstore: Arc<Blockstore>,
bank_notification_sender: Option<BankNotificationSender>,
Expand Down Expand Up @@ -295,6 +298,7 @@ impl ClusterInfoVoteListener {
vote_tracker,
bank_forks,
subscriptions,
gossip_verified_vote_hash_sender,
verified_vote_sender,
replay_votes_receiver,
blockstore,
Expand Down Expand Up @@ -422,6 +426,7 @@ impl ClusterInfoVoteListener {
vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>,
gossip_verified_vote_hash_sender: GossipVerifiedVoteHashSender,
verified_vote_sender: VerifiedVoteSender,
replay_votes_receiver: ReplayVoteReceiver,
blockstore: Arc<Blockstore>,
Expand Down Expand Up @@ -457,6 +462,7 @@ impl ClusterInfoVoteListener {
&vote_tracker,
&root_bank,
&subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender,
&replay_votes_receiver,
&bank_notification_sender,
Expand Down Expand Up @@ -484,6 +490,7 @@ impl ClusterInfoVoteListener {
vote_tracker: &VoteTracker,
root_bank: &Bank,
subscriptions: &RpcSubscriptions,
gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
verified_vote_sender: &VerifiedVoteSender,
replay_votes_receiver: &ReplayVoteReceiver,
) -> Result<ThresholdConfirmedSlots> {
Expand All @@ -492,6 +499,7 @@ impl ClusterInfoVoteListener {
vote_tracker,
root_bank,
subscriptions,
gossip_verified_vote_hash_sender,
verified_vote_sender,
replay_votes_receiver,
&None,
Expand All @@ -504,6 +512,7 @@ impl ClusterInfoVoteListener {
vote_tracker: &VoteTracker,
root_bank: &Bank,
subscriptions: &RpcSubscriptions,
gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
verified_vote_sender: &VerifiedVoteSender,
replay_votes_receiver: &ReplayVoteReceiver,
bank_notification_sender: &Option<BankNotificationSender>,
Expand Down Expand Up @@ -535,6 +544,7 @@ impl ClusterInfoVoteListener {
replay_votes,
root_bank,
subscriptions,
gossip_verified_vote_hash_sender,
verified_vote_sender,
bank_notification_sender,
cluster_confirmed_slot_sender,
Expand All @@ -555,6 +565,7 @@ impl ClusterInfoVoteListener {
root_bank: &Bank,
subscriptions: &RpcSubscriptions,
verified_vote_sender: &VerifiedVoteSender,
gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
diff: &mut HashMap<Slot, HashMap<Pubkey, bool>>,
new_optimistic_confirmed_slots: &mut ThresholdConfirmedSlots,
is_gossip_vote: bool,
Expand Down Expand Up @@ -604,6 +615,14 @@ impl ClusterInfoVoteListener {
total_stake,
);

if is_gossip_vote && is_new && stake > 0 {
let _ = gossip_verified_vote_hash_sender.send((
*vote_pubkey,
last_vote_slot,
last_vote_hash,
));
}

if reached_threshold_results[0] {
if let Some(sender) = cluster_confirmed_slot_sender {
let _ = sender.send(vec![(last_vote_slot, last_vote_hash)]);
Expand Down Expand Up @@ -691,6 +710,7 @@ impl ClusterInfoVoteListener {
replayed_votes: Vec<ReplayedVote>,
root_bank: &Bank,
subscriptions: &RpcSubscriptions,
gossip_verified_vote_hash_sender: &GossipVerifiedVoteHashSender,
verified_vote_sender: &VerifiedVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
cluster_confirmed_slot_sender: &Option<GossipDuplicateConfirmedSlotsSender>,
Expand All @@ -717,6 +737,7 @@ impl ClusterInfoVoteListener {
root_bank,
subscriptions,
verified_vote_sender,
gossip_verified_vote_hash_sender,
&mut diff,
&mut new_optimistic_confirmed_slots,
is_gossip,
Expand All @@ -742,8 +763,8 @@ impl ClusterInfoVoteListener {
});
}
let mut w_slot_tracker = slot_tracker.write().unwrap();
if w_slot_tracker.updates.is_none() {
w_slot_tracker.updates = Some(vec![]);
if w_slot_tracker.voted_slot_updates.is_none() {
w_slot_tracker.voted_slot_updates = Some(vec![]);
}
let mut gossip_only_stake = 0;
let epoch = root_bank.epoch_schedule().get_epoch(slot);
Expand All @@ -764,7 +785,11 @@ impl ClusterInfoVoteListener {
// `is_new || is_new_from_gossip`. In both cases we want to record
// `is_new_from_gossip` for the `pubkey` entry.
w_slot_tracker.voted.insert(pubkey, seen_in_gossip_above);
w_slot_tracker.updates.as_mut().unwrap().push(pubkey);
w_slot_tracker
.voted_slot_updates
.as_mut()
.unwrap()
.push(pubkey);
}

w_slot_tracker.gossip_only_stake += gossip_only_stake
Expand Down Expand Up @@ -997,6 +1022,7 @@ mod tests {
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
let (votes_sender, votes_receiver) = unbounded();
let (verified_vote_sender, _verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver) = unbounded();

let GenesisConfigInfo { genesis_config, .. } =
Expand Down Expand Up @@ -1027,6 +1053,7 @@ mod tests {
&vote_tracker,
&bank3,
&subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender,
&replay_votes_receiver,
&None,
Expand Down Expand Up @@ -1057,6 +1084,7 @@ mod tests {
&vote_tracker,
&bank3,
&subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender,
&replay_votes_receiver,
&None,
Expand Down Expand Up @@ -1109,6 +1137,7 @@ mod tests {
let (vote_tracker, _, validator_voting_keypairs, subscriptions) = setup();
let (votes_txs_sender, votes_txs_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, gossip_verified_vote_hash_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded();

let GenesisConfigInfo { genesis_config, .. } =
Expand Down Expand Up @@ -1136,18 +1165,52 @@ mod tests {
&vote_tracker,
&bank0,
&subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender,
&replay_votes_receiver,
&None,
&None,
)
.unwrap();

let mut gossip_verified_votes: HashMap<Slot, HashMap<Hash, Vec<Pubkey>>> = HashMap::new();
for (pubkey, slot, hash) in gossip_verified_vote_hash_receiver.try_iter() {
// send_vote_txs() will send each vote twice, but we should only get a notification
// once for each via this channel
let exists = gossip_verified_votes
.get(&slot)
.and_then(|slot_hashes| slot_hashes.get(&hash))
.map(|slot_hash_voters| slot_hash_voters.contains(&pubkey))
.unwrap_or(false);
assert!(!exists);
gossip_verified_votes
.entry(slot)
.or_default()
.entry(hash)
.or_default()
.push(pubkey);
}

// Only the last vote in the `gossip_vote` set should count towards
// the `voted_hash_updates` set. Important to note here that replay votes
// should not count
let last_gossip_vote_slot = *gossip_vote_slots.last().unwrap();
assert_eq!(gossip_verified_votes.len(), 1);
let slot_hashes = gossip_verified_votes.get(&last_gossip_vote_slot).unwrap();
assert_eq!(slot_hashes.len(), 1);
let slot_hash_votes = slot_hashes.get(&Hash::default()).unwrap();
assert_eq!(slot_hash_votes.len(), validator_voting_keypairs.len());
for voting_keypairs in &validator_voting_keypairs {
let pubkey = voting_keypairs.vote_keypair.pubkey();
assert!(slot_hash_votes.contains(&pubkey));
}

// Check that the received votes were pushed to other commponents
// subscribing via `verified_vote_receiver`
let all_expected_slots: BTreeSet<_> = gossip_vote_slots
.clone()
.into_iter()
.chain(replay_vote_slots.into_iter())
.chain(replay_vote_slots.clone().into_iter())
.collect();
let mut pubkey_to_votes: HashMap<Pubkey, BTreeSet<Slot>> = HashMap::new();
for (received_pubkey, new_votes) in verified_vote_receiver.try_iter() {
Expand Down Expand Up @@ -1175,15 +1238,17 @@ mod tests {
let pubkey = voting_keypairs.vote_keypair.pubkey();
assert!(r_slot_vote_tracker.voted.contains_key(&pubkey));
assert!(r_slot_vote_tracker
.updates
.voted_slot_updates
.as_ref()
.unwrap()
.contains(&Arc::new(pubkey)));
// Only the last vote in the stack of `gossip_votes` should count towards
// the `optimistic` vote set.
// Only the last vote in the stack of `gossip_vote` and `replay_vote_slots`
// should count towards the `optimistic` vote set,
let optimistic_votes_tracker =
r_slot_vote_tracker.optimistic_votes_tracker(&Hash::default());
if vote_slot == 2 || vote_slot == 4 {
if vote_slot == *gossip_vote_slots.last().unwrap()
|| vote_slot == *replay_vote_slots.last().unwrap()
{
let optimistic_votes_tracker = optimistic_votes_tracker.unwrap();
assert!(optimistic_votes_tracker.voted().contains(&pubkey));
assert_eq!(
Expand Down Expand Up @@ -1220,6 +1285,7 @@ mod tests {

// Send some votes to process
let (votes_txs_sender, votes_txs_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let (verified_vote_sender, verified_vote_receiver) = unbounded();
let (_replay_votes_sender, replay_votes_receiver) = unbounded();

Expand Down Expand Up @@ -1256,6 +1322,7 @@ mod tests {
&vote_tracker,
&bank0,
&subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender,
&replay_votes_receiver,
&None,
Expand All @@ -1281,7 +1348,7 @@ mod tests {
let pubkey = voting_keypairs.vote_keypair.pubkey();
assert!(r_slot_vote_tracker.voted.contains_key(&pubkey));
assert!(r_slot_vote_tracker
.updates
.voted_slot_updates
.as_ref()
.unwrap()
.contains(&Arc::new(pubkey)));
Expand All @@ -1302,6 +1369,7 @@ mod tests {
fn run_test_process_votes3(switch_proof_hash: Option<Hash>) {
let (votes_sender, votes_receiver) = unbounded();
let (verified_vote_sender, _verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
let (replay_votes_sender, replay_votes_receiver) = unbounded();

let vote_slot = 1;
Expand Down Expand Up @@ -1352,6 +1420,7 @@ mod tests {
&vote_tracker,
&bank,
&subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender,
&replay_votes_receiver,
&None,
Expand Down Expand Up @@ -1487,6 +1556,7 @@ mod tests {
)];

let (verified_vote_sender, _verified_vote_receiver) = unbounded();
let (gossip_verified_vote_hash_sender, _gossip_verified_vote_hash_receiver) = unbounded();
ClusterInfoVoteListener::filter_and_confirm_with_new_votes(
&vote_tracker,
vote_tx,
Expand All @@ -1498,6 +1568,7 @@ mod tests {
)],
&bank,
&subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender,
&None,
&None,
Expand Down Expand Up @@ -1553,6 +1624,7 @@ mod tests {
)],
&new_root_bank,
&subscriptions,
&gossip_verified_vote_hash_sender,
&verified_vote_sender,
&None,
&None,
Expand Down
3 changes: 2 additions & 1 deletion core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1454,9 +1454,10 @@ pub mod test {
&AbsRequestSender::default(),
None,
&mut self.heaviest_subtree_fork_choice,
&mut BTreeMap::new(),
&mut BTreeMap::new(),
&mut true,
&mut Vec::new(),
&mut BTreeMap::new(),
)
}

Expand Down
Loading

0 comments on commit 60fba7b

Please sign in to comment.