From fe4338245c830d89f855bbecdd77babbda056b10 Mon Sep 17 00:00:00 2001 From: Wen Date: Wed, 20 Dec 2023 17:44:05 -0800 Subject: [PATCH] Fix test by adding a delay before pushing messages to Gossip. --- gossip/src/duplicate_shred_handler.rs | 5 +++- local-cluster/src/cluster_tests.rs | 38 ++++++++++++++++++++++----- local-cluster/tests/local_cluster.rs | 22 +++------------- 3 files changed, 39 insertions(+), 26 deletions(-) diff --git a/gossip/src/duplicate_shred_handler.rs b/gossip/src/duplicate_shred_handler.rs index cc3b3d4e017e00..b6a3b98e5abb44 100644 --- a/gossip/src/duplicate_shred_handler.rs +++ b/gossip/src/duplicate_shred_handler.rs @@ -149,9 +149,12 @@ impl DuplicateShredHandler { shred2.into_payload(), )?; if let Some(epoch_schedule) = self.cached_epoch_schedule { + // feature_epoch could only be 0 in tests and new cluster setup. if self .enable_gossip_duplicate_proof_ingestion_epoch - .is_some_and(|feature_epoch| epoch_schedule.get_epoch(slot) > feature_epoch) + .is_some_and(|feature_epoch| { + epoch_schedule.get_epoch(slot) > feature_epoch || feature_epoch == 0 + }) { // Notify duplicate consensus state machine self.duplicate_slots_sender diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index b410585396f8f0..27693b51e56591 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -16,6 +16,7 @@ use { cluster_info::{self, ClusterInfo}, contact_info::{ContactInfo, LegacyContactInfo}, crds::Cursor, + crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel}, gossip_error::GossipError, gossip_service::{self, discover_cluster, GossipService}, @@ -41,7 +42,7 @@ use { solana_vote_program::vote_transaction, std::{ borrow::Borrow, - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener}, path::Path, sync::{ @@ -485,11 +486,12 @@ pub fn start_gossip_voter( vote_filter: impl Fn((CrdsValueLabel, Transaction)) -> Option<(VoteTransaction, Transaction)> + std::marker::Send + 'static, - mut process_vote_tx: impl FnMut(Slot, &Transaction, &VoteTransaction, &ClusterInfo) + mut process_vote_tx: impl FnMut(Slot, &Transaction, &VoteTransaction, &mut VecDeque<(Transaction, u8)>) + std::marker::Send + 'static, sleep_ms: u64, ) -> GossipVoter { + let mut votes_to_sendout = VecDeque::new(); let exit = Arc::new(AtomicBool::new(false)); let (gossip_service, tcp_listener, cluster_info) = gossip_service::make_gossip_node( // Need to use our validator's keypair to gossip EpochSlots and votes for our @@ -503,11 +505,14 @@ pub fn start_gossip_voter( SocketAddrSpace::Unspecified, ); + let start_time = timestamp(); let t_voter = { let exit = exit.clone(); let cluster_info = cluster_info.clone(); std::thread::spawn(move || { let mut cursor = Cursor::default(); + let mut latest_vote_slot = 0; + let mut max_vote_slot = 0; loop { if exit.load(Ordering::Relaxed) { return; @@ -527,20 +532,39 @@ pub fn start_gossip_voter( }); for (parsed_vote, leader_vote_tx) in &parsed_vote_iter { - if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() { + if let Some(vote_slot) = parsed_vote.last_voted_slot() { + latest_vote_slot = vote_slot; info!("received vote for {}", latest_vote_slot); process_vote_tx( latest_vote_slot, leader_vote_tx, parsed_vote, - &cluster_info, + &mut votes_to_sendout, ) } - // Give vote some time to propagate - sleep(Duration::from_millis(sleep_ms)); } - if parsed_vote_iter.is_empty() { + // When push_active_set() is first called, Gossip doesn't have correct shred_version set, neither + // does it have any ping responses from peers, so messages pushed at this point will not be sent + // correctly. Wait until CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 + 1000 ms to give Gossip time to + // discover peers and set correct shred_version. + if timestamp() - start_time > (CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 + 1000) + && !votes_to_sendout.is_empty() + { + while let Some((vote, vote_index)) = votes_to_sendout.pop_front() { + cluster_info.push_vote_at_index(vote, vote_index); + } + if max_vote_slot != latest_vote_slot { + let new_epoch_slots: Vec = + (max_vote_slot + 1..latest_vote_slot + 1).collect(); + info!( + "Simulating epoch slots from our node: {:?}", + new_epoch_slots + ); + cluster_info.push_epoch_slots(&new_epoch_slots); + max_vote_slot = latest_vote_slot; + } + // Give vote some time to propagate sleep(Duration::from_millis(sleep_ms)); } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index db7ab0519b6ea0..cb9c1271b50e05 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2708,7 +2708,7 @@ fn test_oc_bad_signatures() { let node_keypair = node_keypair.insecure_clone(); let vote_keypair = vote_keypair.insecure_clone(); let num_votes_simulated = num_votes_simulated.clone(); - move |vote_slot, leader_vote_tx, parsed_vote, _cluster_info| { + move |vote_slot, leader_vote_tx, parsed_vote, _votes_to_sendout| { info!("received vote for {}", vote_slot); let vote_hash = parsed_vote.hash(); info!( @@ -3739,10 +3739,8 @@ fn test_kill_partition_switch_threshold_progress() { #[test] #[serial] #[allow(unused_attributes)] -// This test does not work under the assumption that duplicate proofs are propagated through gossip -// ignore for now. -#[ignore] fn test_duplicate_shreds_broadcast_leader() { + solana_logger::setup_with_default(RUST_LOG_FILTER); // Create 4 nodes: // 1) Bad leader sending different versions of shreds to both of the other nodes // 2) 1 node who's voting behavior in gossip @@ -3839,21 +3837,9 @@ fn test_duplicate_shreds_broadcast_leader() { { let node_keypair = node_keypair.insecure_clone(); let vote_keypair = vote_keypair.insecure_clone(); - let mut max_vote_slot = 0; let mut gossip_vote_index = 0; - move |latest_vote_slot, leader_vote_tx, parsed_vote, cluster_info| { + move |latest_vote_slot, leader_vote_tx, parsed_vote, votes_to_sendout| { info!("received vote for {}", latest_vote_slot); - // Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot. - if latest_vote_slot > max_vote_slot { - let new_epoch_slots: Vec = - (max_vote_slot + 1..latest_vote_slot + 1).collect(); - info!( - "Simulating epoch slots from our node: {:?}", - new_epoch_slots - ); - cluster_info.push_epoch_slots(&new_epoch_slots); - max_vote_slot = latest_vote_slot; - } // Only vote on even slots. Note this may violate lockouts if the // validator started voting on a different fork before we could exit @@ -3892,7 +3878,7 @@ fn test_duplicate_shreds_broadcast_leader() { ); gossip_vote_index += 1; gossip_vote_index %= MAX_LOCKOUT_HISTORY; - cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8) + votes_to_sendout.push_back((vote_tx, gossip_vote_index as u8)); } } },