Skip to content

Commit

Permalink
chains Merkle shreds in broadcast duplicates (#35058)
Browse files Browse the repository at this point in the history
The commit migrates
    turbine/src/broadcast_stage/broadcast_duplicates_run.rs
to use chained Merkle shreds variant.

(cherry picked from commit 1b9dfd4)
  • Loading branch information
behzadnouri authored and mergify[bot] committed Feb 22, 2024
1 parent b369831 commit 080484d
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions turbine/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct BroadcastDuplicatesConfig {
pub(super) struct BroadcastDuplicatesRun {
config: BroadcastDuplicatesConfig,
current_slot: Slot,
chained_merkle_root: Hash,
next_shred_index: u32,
next_code_index: u32,
shred_version: u16,
Expand All @@ -57,6 +58,7 @@ impl BroadcastDuplicatesRun {
));
Self {
config,
chained_merkle_root: Hash::default(),
next_shred_index: u32::MAX,
next_code_index: 0,
shred_version,
Expand All @@ -76,7 +78,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
fn run(
&mut self,
keypair: &Keypair,
_blockstore: &Blockstore,
blockstore: &Blockstore,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
Expand All @@ -87,6 +89,12 @@ impl BroadcastRun for BroadcastDuplicatesRun {
let last_tick_height = receive_results.last_tick_height;

if bank.slot() != self.current_slot {
self.chained_merkle_root = broadcast_utils::get_chained_merkle_root_from_parent(
bank.slot(),
bank.parent_slot(),
blockstore,
)
.unwrap();
self.next_shred_index = 0;
self.next_code_index = 0;
self.current_slot = bank.slot();
Expand Down Expand Up @@ -169,18 +177,25 @@ impl BroadcastRun for BroadcastDuplicatesRun {
)
.expect("Expected to create a new shredder");

// Chained Merkle shreds are always discarded in epoch 0, due to
// feature_set::enable_chained_merkle_shreds. Below can be removed once
// the feature gated code is removed.
let should_chain_merkle_shreds = bank.epoch() > 0;

let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
keypair,
&receive_results.entries,
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
None, // chained_merkle_root
should_chain_merkle_shreds.then_some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);

if let Some(shred) = data_shreds.iter().max_by_key(|shred| shred.index()) {
self.chained_merkle_root = shred.merkle_root().unwrap();
}
self.next_shred_index += data_shreds.len() as u32;
if let Some(index) = coding_shreds.iter().map(Shred::index).max() {
self.next_code_index = index + 1;
Expand All @@ -191,7 +206,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
keypair,
&[original_last_entry],
true,
None, // chained_merkle_root
should_chain_merkle_shreds.then_some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
Expand All @@ -205,7 +220,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
keypair,
&duplicate_extra_last_entries,
true,
None, // chained_merkle_root
should_chain_merkle_shreds.then_some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
Expand All @@ -222,6 +237,8 @@ impl BroadcastRun for BroadcastDuplicatesRun {
sigs,
);

assert_eq!(original_last_data_shred.len(), 1);
assert_eq!(partition_last_data_shred.len(), 1);
self.next_shred_index += 1;
(original_last_data_shred, partition_last_data_shred)
});
Expand Down

0 comments on commit 080484d

Please sign in to comment.