diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 1fbf211124c0b3..d48d844a7fe0b4 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -214,7 +214,7 @@ impl Tvu { leader_schedule_cache.clone(), verified_vote_receiver, completed_data_sets_sender, - duplicate_slots_sender, + duplicate_slots_sender.clone(), ancestor_hashes_replay_update_receiver, dumped_slots_receiver, popular_pruned_forks_sender, @@ -322,6 +322,7 @@ impl Tvu { blockstore, leader_schedule_cache.clone(), bank_forks.clone(), + duplicate_slots_sender, ), ); diff --git a/gossip/src/duplicate_shred.rs b/gossip/src/duplicate_shred.rs index 0d7d35d26afa5c..a23752cda501dd 100644 --- a/gossip/src/duplicate_shred.rs +++ b/gossip/src/duplicate_shred.rs @@ -29,7 +29,7 @@ pub struct DuplicateShred { pub(crate) from: Pubkey, pub(crate) wallclock: u64, pub(crate) slot: Slot, - shred_index: u32, + pub(crate) shred_index: u32, shred_type: ShredType, // Serialized DuplicateSlotProof split into chunks. num_chunks: u8, @@ -84,6 +84,8 @@ pub enum Error { TryFromIntError(#[from] TryFromIntError), #[error("unknown slot leader: {0}")] UnknownSlotLeader(Slot), + #[error("unable to send duplicate slot to state machine")] + DuplicateSlotSenderFailure, } // Asserts that the two shreds can indicate duplicate proof for diff --git a/gossip/src/duplicate_shred_handler.rs b/gossip/src/duplicate_shred_handler.rs index 7789404b970d08..8132e391a7f919 100644 --- a/gossip/src/duplicate_shred_handler.rs +++ b/gossip/src/duplicate_shred_handler.rs @@ -3,6 +3,7 @@ use { duplicate_shred::{self, DuplicateShred, Error}, duplicate_shred_listener::DuplicateShredHandlerTrait, }, + crossbeam_channel::Sender, log::error, solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, solana_runtime::bank_forks::BankForks, @@ -25,6 +26,7 @@ const MAX_NUM_ENTRIES_PER_PUBKEY: usize = 128; const BUFFER_CAPACITY: usize = 512 * MAX_NUM_ENTRIES_PER_PUBKEY; type BufferEntry = [Option; MAX_NUM_CHUNKS]; +type DuplicateSlotSender = Sender; pub struct DuplicateShredHandler { // Because we use UDP for packet transfer, we can normally only send ~1500 bytes @@ -44,6 +46,8 @@ pub struct DuplicateShredHandler { cached_on_epoch: Epoch, cached_staked_nodes: Arc>, cached_slots_in_epoch: u64, + // Used to notify duplicate consensus state machine + duplicate_slots_sender: DuplicateSlotSender, } impl DuplicateShredHandlerTrait for DuplicateShredHandler { @@ -63,6 +67,7 @@ impl DuplicateShredHandler { blockstore: Arc, leader_schedule_cache: Arc, bank_forks: Arc>, + duplicate_slots_sender: DuplicateSlotSender, ) -> Self { Self { buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(), @@ -74,6 +79,7 @@ impl DuplicateShredHandler { blockstore, leader_schedule_cache, bank_forks, + duplicate_slots_sender, } } @@ -131,6 +137,10 @@ impl DuplicateShredHandler { shred1.into_payload(), shred2.into_payload(), )?; + // Notify duplicate consensus state machine + self.duplicate_slots_sender + .send(slot) + .map_err(|_| Error::DuplicateSlotSenderFailure)?; } self.consumed.insert(slot, true); } @@ -211,6 +221,8 @@ mod tests { cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, duplicate_shred::{from_shred, tests::new_rand_shred}, }, + crossbeam_channel::unbounded, + itertools::Itertools, solana_ledger::{ genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, get_tmp_ledger_path_auto_delete, @@ -229,7 +241,7 @@ mod tests { slot: u64, expected_error: Option, chunk_size: usize, - ) -> Result, Error> { + ) -> Result>, Error> { let my_keypair = match expected_error { Some(Error::InvalidSignature) => Arc::new(Keypair::new()), _ => keypair, @@ -243,9 +255,6 @@ mod tests { Some(Error::SlotMismatch) => { new_rand_shred(&mut rng, next_shred_index, &shredder1, &my_keypair) } - Some(Error::ShredIndexMismatch) => { - new_rand_shred(&mut rng, next_shred_index + 1, &shredder, &my_keypair) - } Some(Error::InvalidDuplicateShreds) => shred1.clone(), _ => new_rand_shred(&mut rng, next_shred_index, &shredder, &my_keypair), }; @@ -261,7 +270,16 @@ mod tests { timestamp(), // wallclock chunk_size, // max_size )?; - Ok(chunks) + if let Some(Error::ShredIndexMismatch) = expected_error { + Ok(Box::new(chunks.map(|mut duplicate_shred| { + if duplicate_shred.chunk_index() > 0 { + duplicate_shred.shred_index += 1 + } + duplicate_shred + }))) + } else { + Ok(Box::new(chunks)) + } } #[test] @@ -278,10 +296,12 @@ mod tests { let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank( &bank_forks.working_bank(), )); + let (sender, receiver) = unbounded(); let mut duplicate_shred_handler = DuplicateShredHandler::new( blockstore.clone(), leader_schedule_cache, Arc::new(RwLock::new(bank_forks)), + sender, ); let chunks = create_duplicate_proof( my_keypair.clone(), @@ -308,6 +328,7 @@ mod tests { } assert!(blockstore.has_duplicate_shreds_in_slot(1)); assert!(blockstore.has_duplicate_shreds_in_slot(2)); + assert_eq!(receiver.try_iter().collect_vec(), vec![1, 2]); // Test all kinds of bad proofs. for error in [ @@ -329,6 +350,7 @@ mod tests { duplicate_shred_handler.handle(chunk); } assert!(!blockstore.has_duplicate_shreds_in_slot(3)); + assert!(receiver.is_empty()); } } } @@ -349,8 +371,13 @@ mod tests { &bank_forks.working_bank(), )); let bank_forks_ptr = Arc::new(RwLock::new(bank_forks)); - let mut duplicate_shred_handler = - DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks_ptr); + let (sender, receiver) = unbounded(); + let mut duplicate_shred_handler = DuplicateShredHandler::new( + blockstore.clone(), + leader_schedule_cache, + bank_forks_ptr, + sender, + ); let start_slot: Slot = 1; // This proof will not be accepted because num_chunks is too large. @@ -366,6 +393,7 @@ mod tests { duplicate_shred_handler.handle(chunk); } assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert!(receiver.is_empty()); // This proof will be rejected because the slot is too far away in the future. let future_slot = @@ -382,6 +410,7 @@ mod tests { duplicate_shred_handler.handle(chunk); } assert!(!blockstore.has_duplicate_shreds_in_slot(future_slot)); + assert!(receiver.is_empty()); // Send in two proofs, the first proof showing up will be accepted, the following // proofs will be discarded. @@ -396,10 +425,12 @@ mod tests { // handle chunk 0 of the first proof. duplicate_shred_handler.handle(chunks.next().unwrap()); assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert!(receiver.is_empty()); // Now send in the rest of the first proof, it will succeed. for chunk in chunks { duplicate_shred_handler.handle(chunk); } assert!(blockstore.has_duplicate_shreds_in_slot(start_slot)); + assert_eq!(receiver.try_iter().collect_vec(), vec![start_slot]); } }