diff --git a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs index 3190c039a116d0..adca69ed4938cd 100644 --- a/turbine/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/turbine/src/broadcast_stage/broadcast_duplicates_run.rs @@ -37,6 +37,7 @@ pub struct BroadcastDuplicatesConfig { pub(super) struct BroadcastDuplicatesRun { config: BroadcastDuplicatesConfig, current_slot: Slot, + chained_merkle_root: Hash, next_shred_index: u32, next_code_index: u32, shred_version: u16, @@ -57,6 +58,7 @@ impl BroadcastDuplicatesRun { )); Self { config, + chained_merkle_root: Hash::default(), next_shred_index: u32::MAX, next_code_index: 0, shred_version, @@ -76,7 +78,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { fn run( &mut self, keypair: &Keypair, - _blockstore: &Blockstore, + blockstore: &Blockstore, receiver: &Receiver, socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, @@ -87,6 +89,12 @@ impl BroadcastRun for BroadcastDuplicatesRun { let last_tick_height = receive_results.last_tick_height; if bank.slot() != self.current_slot { + self.chained_merkle_root = broadcast_utils::get_chained_merkle_root_from_parent( + bank.slot(), + bank.parent_slot(), + blockstore, + ) + .unwrap(); self.next_shred_index = 0; self.next_code_index = 0; self.current_slot = bank.slot(); @@ -169,18 +177,25 @@ impl BroadcastRun for BroadcastDuplicatesRun { ) .expect("Expected to create a new shredder"); + // Chained Merkle shreds are always discarded in epoch 0, due to + // feature_set::enable_chained_merkle_shreds. Below can be removed once + // the feature gated code is removed. + let should_chain_merkle_shreds = bank.epoch() > 0; + let (data_shreds, coding_shreds) = shredder.entries_to_shreds( keypair, &receive_results.entries, last_tick_height == bank.max_tick_height() && last_entries.is_none(), - None, // chained_merkle_root + should_chain_merkle_shreds.then_some(self.chained_merkle_root), self.next_shred_index, self.next_code_index, true, // merkle_variant &self.reed_solomon_cache, &mut ProcessShredsStats::default(), ); - + if let Some(shred) = data_shreds.iter().max_by_key(|shred| shred.index()) { + self.chained_merkle_root = shred.merkle_root().unwrap(); + } self.next_shred_index += data_shreds.len() as u32; if let Some(index) = coding_shreds.iter().map(Shred::index).max() { self.next_code_index = index + 1; @@ -191,7 +206,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { keypair, &[original_last_entry], true, - None, // chained_merkle_root + should_chain_merkle_shreds.then_some(self.chained_merkle_root), self.next_shred_index, self.next_code_index, true, // merkle_variant @@ -205,7 +220,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { keypair, &duplicate_extra_last_entries, true, - None, // chained_merkle_root + should_chain_merkle_shreds.then_some(self.chained_merkle_root), self.next_shred_index, self.next_code_index, true, // merkle_variant @@ -222,6 +237,8 @@ impl BroadcastRun for BroadcastDuplicatesRun { sigs, ); + assert_eq!(original_last_data_shred.len(), 1); + assert_eq!(partition_last_data_shred.len(), 1); self.next_shred_index += 1; (original_last_data_shred, partition_last_data_shred) });