Skip to content

Commit

Permalink
chains Merkle shreds in broadcast duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Feb 7, 2024
1 parent 7a95e4f commit 72bbc24
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions turbine/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct BroadcastDuplicatesConfig {
pub(super) struct BroadcastDuplicatesRun {
config: BroadcastDuplicatesConfig,
current_slot: Slot,
chained_merkle_root: Hash,
next_shred_index: u32,
next_code_index: u32,
shred_version: u16,
Expand All @@ -57,6 +58,7 @@ impl BroadcastDuplicatesRun {
));
Self {
config,
chained_merkle_root: Hash::default(),
next_shred_index: u32::MAX,
next_code_index: 0,
shred_version,
Expand All @@ -76,7 +78,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
fn run(
&mut self,
keypair: &Keypair,
_blockstore: &Blockstore,
blockstore: &Blockstore,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
Expand All @@ -87,6 +89,12 @@ impl BroadcastRun for BroadcastDuplicatesRun {
let last_tick_height = receive_results.last_tick_height;

if bank.slot() != self.current_slot {
self.chained_merkle_root = broadcast_utils::get_chained_merkle_root_from_parent(
bank.slot(),
bank.parent_slot(),
blockstore,
)
.unwrap();
self.next_shred_index = 0;
self.next_code_index = 0;
self.current_slot = bank.slot();
Expand Down Expand Up @@ -173,14 +181,16 @@ impl BroadcastRun for BroadcastDuplicatesRun {
keypair,
&receive_results.entries,
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
None, // chained_merkle_root
Some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

if let Some(shred) = data_shreds.iter().max_by_key(|shred| shred.index()) {
self.chained_merkle_root = shred.merkle_root().unwrap();
}
self.next_shred_index += data_shreds.len() as u32;
if let Some(index) = coding_shreds.iter().map(Shred::index).max() {
self.next_code_index = index + 1;
Expand All @@ -191,7 +201,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
keypair,
&[original_last_entry],
true,
None, // chained_merkle_root
Some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
Expand All @@ -205,7 +215,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
keypair,
&duplicate_extra_last_entries,
true,
None, // chained_merkle_root
Some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
Expand All @@ -222,6 +232,8 @@ impl BroadcastRun for BroadcastDuplicatesRun {
sigs,
);

assert_eq!(original_last_data_shred.len(), 1);
assert_eq!(partition_last_data_shred.len(), 1);
self.next_shred_index += 1;
(original_last_data_shred, partition_last_data_shred)
});
Expand Down

0 comments on commit 72bbc24

Please sign in to comment.