Skip to content

Commit

Permalink
gossip: notify state machine of duplicate proofs
Browse files Browse the repository at this point in the history
  • Loading branch information
AshwinSekar committed Aug 23, 2023
1 parent 329c6f1 commit 2331e41
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 9 deletions.
3 changes: 2 additions & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Tvu {
leader_schedule_cache.clone(),
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,
duplicate_slots_sender.clone(),
ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
popular_pruned_forks_sender,
Expand Down Expand Up @@ -322,6 +322,7 @@ impl Tvu {
blockstore,
leader_schedule_cache.clone(),
bank_forks.clone(),
duplicate_slots_sender,
),
);

Expand Down
4 changes: 3 additions & 1 deletion gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct DuplicateShred {
pub(crate) from: Pubkey,
pub(crate) wallclock: u64,
pub(crate) slot: Slot,
shred_index: u32,
pub(crate) shred_index: u32,
shred_type: ShredType,
// Serialized DuplicateSlotProof split into chunks.
num_chunks: u8,
Expand Down Expand Up @@ -84,6 +84,8 @@ pub enum Error {
TryFromIntError(#[from] TryFromIntError),
#[error("unknown slot leader: {0}")]
UnknownSlotLeader(Slot),
#[error("unable to send duplicate slot to state machine")]
DuplicateSlotSenderFailure,
}

// Asserts that the two shreds can indicate duplicate proof for
Expand Down
45 changes: 38 additions & 7 deletions gossip/src/duplicate_shred_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
duplicate_shred::{self, DuplicateShred, Error},
duplicate_shred_listener::DuplicateShredHandlerTrait,
},
crossbeam_channel::Sender,
log::error,
solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
solana_runtime::bank_forks::BankForks,
Expand All @@ -25,6 +26,7 @@ const MAX_NUM_ENTRIES_PER_PUBKEY: usize = 128;
const BUFFER_CAPACITY: usize = 512 * MAX_NUM_ENTRIES_PER_PUBKEY;

type BufferEntry = [Option<DuplicateShred>; MAX_NUM_CHUNKS];
type DuplicateSlotSender = Sender<Slot>;

pub struct DuplicateShredHandler {
// Because we use UDP for packet transfer, we can normally only send ~1500 bytes
Expand All @@ -44,6 +46,8 @@ pub struct DuplicateShredHandler {
cached_on_epoch: Epoch,
cached_staked_nodes: Arc<HashMap<Pubkey, u64>>,
cached_slots_in_epoch: u64,
// Used to notify duplicate consensus state machine
duplicate_slots_sender: DuplicateSlotSender,
}

impl DuplicateShredHandlerTrait for DuplicateShredHandler {
Expand All @@ -63,6 +67,7 @@ impl DuplicateShredHandler {
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
bank_forks: Arc<RwLock<BankForks>>,
duplicate_slots_sender: DuplicateSlotSender,
) -> Self {
Self {
buffer: HashMap::<(Slot, Pubkey), BufferEntry>::default(),
Expand All @@ -74,6 +79,7 @@ impl DuplicateShredHandler {
blockstore,
leader_schedule_cache,
bank_forks,
duplicate_slots_sender,
}
}

Expand Down Expand Up @@ -131,6 +137,10 @@ impl DuplicateShredHandler {
shred1.into_payload(),
shred2.into_payload(),
)?;
// Notify duplicate consensus state machine
self.duplicate_slots_sender
.send(slot)
.map_err(|_| Error::DuplicateSlotSenderFailure)?;
}
self.consumed.insert(slot, true);
}
Expand Down Expand Up @@ -211,6 +221,8 @@ mod tests {
cluster_info::DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
duplicate_shred::{from_shred, tests::new_rand_shred},
},
crossbeam_channel::unbounded,
itertools::Itertools,
solana_ledger::{
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
get_tmp_ledger_path_auto_delete,
Expand All @@ -229,7 +241,7 @@ mod tests {
slot: u64,
expected_error: Option<Error>,
chunk_size: usize,
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
) -> Result<Box<dyn Iterator<Item = DuplicateShred>>, Error> {
let my_keypair = match expected_error {
Some(Error::InvalidSignature) => Arc::new(Keypair::new()),
_ => keypair,
Expand All @@ -243,9 +255,6 @@ mod tests {
Some(Error::SlotMismatch) => {
new_rand_shred(&mut rng, next_shred_index, &shredder1, &my_keypair)
}
Some(Error::ShredIndexMismatch) => {
new_rand_shred(&mut rng, next_shred_index + 1, &shredder, &my_keypair)
}
Some(Error::InvalidDuplicateShreds) => shred1.clone(),
_ => new_rand_shred(&mut rng, next_shred_index, &shredder, &my_keypair),
};
Expand All @@ -261,7 +270,16 @@ mod tests {
timestamp(), // wallclock
chunk_size, // max_size
)?;
Ok(chunks)
if let Some(Error::ShredIndexMismatch) = expected_error {
Ok(Box::new(chunks.map(|mut duplicate_shred| {
if duplicate_shred.chunk_index() > 0 {
duplicate_shred.shred_index += 1
}
duplicate_shred
})))
} else {
Ok(Box::new(chunks))
}
}

#[test]
Expand All @@ -278,10 +296,12 @@ mod tests {
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
&bank_forks.working_bank(),
));
let (sender, receiver) = unbounded();
let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
Arc::new(RwLock::new(bank_forks)),
sender,
);
let chunks = create_duplicate_proof(
my_keypair.clone(),
Expand All @@ -308,6 +328,7 @@ mod tests {
}
assert!(blockstore.has_duplicate_shreds_in_slot(1));
assert!(blockstore.has_duplicate_shreds_in_slot(2));
assert_eq!(receiver.try_iter().collect_vec(), vec![1, 2]);

// Test all kinds of bad proofs.
for error in [
Expand All @@ -329,6 +350,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(3));
assert!(receiver.is_empty());
}
}
}
Expand All @@ -349,8 +371,13 @@ mod tests {
&bank_forks.working_bank(),
));
let bank_forks_ptr = Arc::new(RwLock::new(bank_forks));
let mut duplicate_shred_handler =
DuplicateShredHandler::new(blockstore.clone(), leader_schedule_cache, bank_forks_ptr);
let (sender, receiver) = unbounded();
let mut duplicate_shred_handler = DuplicateShredHandler::new(
blockstore.clone(),
leader_schedule_cache,
bank_forks_ptr,
sender,
);
let start_slot: Slot = 1;

// This proof will not be accepted because num_chunks is too large.
Expand All @@ -366,6 +393,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(receiver.is_empty());

// This proof will be rejected because the slot is too far away in the future.
let future_slot =
Expand All @@ -382,6 +410,7 @@ mod tests {
duplicate_shred_handler.handle(chunk);
}
assert!(!blockstore.has_duplicate_shreds_in_slot(future_slot));
assert!(receiver.is_empty());

// Send in two proofs, the first proof showing up will be accepted, the following
// proofs will be discarded.
Expand All @@ -396,10 +425,12 @@ mod tests {
// handle chunk 0 of the first proof.
duplicate_shred_handler.handle(chunks.next().unwrap());
assert!(!blockstore.has_duplicate_shreds_in_slot(start_slot));
assert!(receiver.is_empty());
// Now send in the rest of the first proof, it will succeed.
for chunk in chunks {
duplicate_shred_handler.handle(chunk);
}
assert!(blockstore.has_duplicate_shreds_in_slot(start_slot));
assert_eq!(receiver.try_iter().collect_vec(), vec![start_slot]);
}
}

0 comments on commit 2331e41

Please sign in to comment.