Skip to content

Commit

Permalink
Fix test by adding a delay before pushing messages to Gossip.
Browse files Browse the repository at this point in the history
  • Loading branch information
wen-coding authored and AshwinSekar committed Jan 3, 2024
1 parent 03cdbbd commit fe43382
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 26 deletions.
5 changes: 4 additions & 1 deletion gossip/src/duplicate_shred_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,12 @@ impl DuplicateShredHandler {
shred2.into_payload(),
)?;
if let Some(epoch_schedule) = self.cached_epoch_schedule {
// feature_epoch could only be 0 in tests and new cluster setup.
if self
.enable_gossip_duplicate_proof_ingestion_epoch
.is_some_and(|feature_epoch| epoch_schedule.get_epoch(slot) > feature_epoch)
.is_some_and(|feature_epoch| {
epoch_schedule.get_epoch(slot) > feature_epoch || feature_epoch == 0
})
{
// Notify duplicate consensus state machine
self.duplicate_slots_sender
Expand Down
38 changes: 31 additions & 7 deletions local-cluster/src/cluster_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use {
cluster_info::{self, ClusterInfo},
contact_info::{ContactInfo, LegacyContactInfo},
crds::Cursor,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel},
gossip_error::GossipError,
gossip_service::{self, discover_cluster, GossipService},
Expand All @@ -41,7 +42,7 @@ use {
solana_vote_program::vote_transaction,
std::{
borrow::Borrow,
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
path::Path,
sync::{
Expand Down Expand Up @@ -485,11 +486,12 @@ pub fn start_gossip_voter(
vote_filter: impl Fn((CrdsValueLabel, Transaction)) -> Option<(VoteTransaction, Transaction)>
+ std::marker::Send
+ 'static,
mut process_vote_tx: impl FnMut(Slot, &Transaction, &VoteTransaction, &ClusterInfo)
mut process_vote_tx: impl FnMut(Slot, &Transaction, &VoteTransaction, &mut VecDeque<(Transaction, u8)>)
+ std::marker::Send
+ 'static,
sleep_ms: u64,
) -> GossipVoter {
let mut votes_to_sendout = VecDeque::new();
let exit = Arc::new(AtomicBool::new(false));
let (gossip_service, tcp_listener, cluster_info) = gossip_service::make_gossip_node(
// Need to use our validator's keypair to gossip EpochSlots and votes for our
Expand All @@ -503,11 +505,14 @@ pub fn start_gossip_voter(
SocketAddrSpace::Unspecified,
);

let start_time = timestamp();
let t_voter = {
let exit = exit.clone();
let cluster_info = cluster_info.clone();
std::thread::spawn(move || {
let mut cursor = Cursor::default();
let mut latest_vote_slot = 0;
let mut max_vote_slot = 0;
loop {
if exit.load(Ordering::Relaxed) {
return;
Expand All @@ -527,20 +532,39 @@ pub fn start_gossip_voter(
});

for (parsed_vote, leader_vote_tx) in &parsed_vote_iter {
if let Some(latest_vote_slot) = parsed_vote.last_voted_slot() {
if let Some(vote_slot) = parsed_vote.last_voted_slot() {
latest_vote_slot = vote_slot;
info!("received vote for {}", latest_vote_slot);
process_vote_tx(
latest_vote_slot,
leader_vote_tx,
parsed_vote,
&cluster_info,
&mut votes_to_sendout,
)
}
// Give vote some time to propagate
sleep(Duration::from_millis(sleep_ms));
}

if parsed_vote_iter.is_empty() {
// When push_active_set() is first called, Gossip doesn't have correct shred_version set, neither
// does it have any ping responses from peers, so messages pushed at this point will not be sent
// correctly. Wait until CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 + 1000 ms to give Gossip time to
// discover peers and set correct shred_version.
if timestamp() - start_time > (CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 + 1000)
&& !votes_to_sendout.is_empty()
{
while let Some((vote, vote_index)) = votes_to_sendout.pop_front() {
cluster_info.push_vote_at_index(vote, vote_index);
}
if max_vote_slot != latest_vote_slot {
let new_epoch_slots: Vec<Slot> =
(max_vote_slot + 1..latest_vote_slot + 1).collect();
info!(
"Simulating epoch slots from our node: {:?}",
new_epoch_slots
);
cluster_info.push_epoch_slots(&new_epoch_slots);
max_vote_slot = latest_vote_slot;
}
// Give vote some time to propagate
sleep(Duration::from_millis(sleep_ms));
}
}
Expand Down
22 changes: 4 additions & 18 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2708,7 +2708,7 @@ fn test_oc_bad_signatures() {
let node_keypair = node_keypair.insecure_clone();
let vote_keypair = vote_keypair.insecure_clone();
let num_votes_simulated = num_votes_simulated.clone();
move |vote_slot, leader_vote_tx, parsed_vote, _cluster_info| {
move |vote_slot, leader_vote_tx, parsed_vote, _votes_to_sendout| {
info!("received vote for {}", vote_slot);
let vote_hash = parsed_vote.hash();
info!(
Expand Down Expand Up @@ -3739,10 +3739,8 @@ fn test_kill_partition_switch_threshold_progress() {
#[test]
#[serial]
#[allow(unused_attributes)]
// This test does not work under the assumption that duplicate proofs are propagated through gossip
// ignore for now.
#[ignore]
fn test_duplicate_shreds_broadcast_leader() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// Create 4 nodes:
// 1) Bad leader sending different versions of shreds to both of the other nodes
// 2) 1 node who's voting behavior in gossip
Expand Down Expand Up @@ -3839,21 +3837,9 @@ fn test_duplicate_shreds_broadcast_leader() {
{
let node_keypair = node_keypair.insecure_clone();
let vote_keypair = vote_keypair.insecure_clone();
let mut max_vote_slot = 0;
let mut gossip_vote_index = 0;
move |latest_vote_slot, leader_vote_tx, parsed_vote, cluster_info| {
move |latest_vote_slot, leader_vote_tx, parsed_vote, votes_to_sendout| {
info!("received vote for {}", latest_vote_slot);
// Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot.
if latest_vote_slot > max_vote_slot {
let new_epoch_slots: Vec<Slot> =
(max_vote_slot + 1..latest_vote_slot + 1).collect();
info!(
"Simulating epoch slots from our node: {:?}",
new_epoch_slots
);
cluster_info.push_epoch_slots(&new_epoch_slots);
max_vote_slot = latest_vote_slot;
}

// Only vote on even slots. Note this may violate lockouts if the
// validator started voting on a different fork before we could exit
Expand Down Expand Up @@ -3892,7 +3878,7 @@ fn test_duplicate_shreds_broadcast_leader() {
);
gossip_vote_index += 1;
gossip_vote_index %= MAX_LOCKOUT_HISTORY;
cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8)
votes_to_sendout.push_back((vote_tx, gossip_vote_index as u8));
}
}
},
Expand Down

0 comments on commit fe43382

Please sign in to comment.