Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
expands number of erasure coding shreds in the last batch in slots (b…
Browse files Browse the repository at this point in the history
…ackport #16484) (#16707)

* expands number of erasure coding shreds in the last batch in slots (#16484)

Number of parity coding shreds is always less than the number of data
shreds in FEC blocks:
https://github.com/solana-labs/solana/blob/6907a2366/ledger/src/shred.rs#L719

Data shreds are batched in chunks of 32 shreds each:
https://github.com/solana-labs/solana/blob/6907a2366/ledger/src/shred.rs#L714

However the very last batch of data shreds in a slot can be small, in
which case the loss rate can be exacerbated.

This commit expands the number of coding shreds in the last FEC block in
slots to: 64 - number of data shreds; so that FEC blocks are always 64
data and parity coding shreds each.

As a consequence of this, the last FEC block has more parity coding
shreds than data shreds. So for some shred indices we will have a coding
shred but no data shreds. This should not cause any kind of overlapping
FEC blocks as in:
#10095
since this is done only for the very last batch in a slot, and the next
slot will reset the shred index.

(cherry picked from commit 37b8587)

# Conflicts:
#	core/benches/shredder.rs
#	ledger/src/shred.rs

* removes backport merge conflicts

* ignore the flaky test for now

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
  • Loading branch information
mergify[bot] and behzadnouri authored Apr 21, 2021
1 parent ea2cc90 commit ae605f8
Show file tree
Hide file tree
Showing 14 changed files with 247 additions and 264 deletions.
3 changes: 1 addition & 2 deletions core/benches/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
let keypair = Arc::new(Keypair::new());
let slot = 0;
let parent = 0;
let shredder =
Shredder::new(slot, parent, 0.0, keypair, 0, 0).expect("Failed to create entry shredder");
let shredder = Shredder::new(slot, parent, keypair, 0, 0).unwrap();
let mut data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;

let num_packets = data_shreds.len();
Expand Down
19 changes: 8 additions & 11 deletions core/benches/shredder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use raptorq::{Decoder, Encoder};
use solana_ledger::entry::{create_ticks, Entry};
use solana_ledger::shred::{
max_entries_per_n_shred, max_ticks_per_n_shreds, ProcessShredsStats, Shred, Shredder,
MAX_DATA_SHREDS_PER_FEC_BLOCK, RECOMMENDED_FEC_RATE, SHRED_PAYLOAD_SIZE,
SIZE_OF_DATA_SHRED_IGNORED_TAIL, SIZE_OF_DATA_SHRED_PAYLOAD,
MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_PAYLOAD_SIZE, SIZE_OF_DATA_SHRED_IGNORED_TAIL,
SIZE_OF_DATA_SHRED_PAYLOAD,
};
use solana_perf::test_tx;
use solana_sdk::hash::Hash;
Expand Down Expand Up @@ -39,8 +39,7 @@ fn make_shreds(num_shreds: usize) -> Vec<Shred> {
Some(shred_size),
);
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
let shredder =
Shredder::new(1, 0, RECOMMENDED_FEC_RATE, Arc::new(Keypair::new()), 0, 0).unwrap();
let shredder = Shredder::new(1, 0, Arc::new(Keypair::new()), 0, 0).unwrap();
let data_shreds = shredder
.entries_to_data_shreds(
&entries,
Expand Down Expand Up @@ -75,7 +74,7 @@ fn bench_shredder_ticks(bencher: &mut Bencher) {
let num_ticks = max_ticks_per_n_shreds(1, Some(SIZE_OF_DATA_SHRED_PAYLOAD)) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
bencher.iter(|| {
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap();
let shredder = Shredder::new(1, 0, kp.clone(), 0, 0).unwrap();
shredder.entries_to_shreds(&entries, true, 0);
})
}
Expand All @@ -94,7 +93,7 @@ fn bench_shredder_large_entries(bencher: &mut Bencher) {
let entries = make_large_unchained_entries(txs_per_entry, num_entries);
// 1Mb
bencher.iter(|| {
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone(), 0, 0).unwrap();
let shredder = Shredder::new(1, 0, kp.clone(), 0, 0).unwrap();
shredder.entries_to_shreds(&entries, true, 0);
})
}
Expand All @@ -107,7 +106,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
let num_ticks = max_ticks_per_n_shreds(1, Some(shred_size)) * num_shreds as u64;
let entries = create_ticks(num_ticks, 0, Hash::default());
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp, 0, 0).unwrap();
let shredder = Shredder::new(1, 0, kp, 0, 0).unwrap();
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;
bencher.iter(|| {
let raw = &mut Shredder::deshred(&data_shreds).unwrap();
Expand All @@ -133,9 +132,8 @@ fn bench_shredder_coding(bencher: &mut Bencher) {
let data_shreds = make_shreds(symbol_count);
bencher.iter(|| {
Shredder::generate_coding_shreds(
RECOMMENDED_FEC_RATE,
&data_shreds[..symbol_count],
symbol_count,
true, // is_last_in_slot
)
.len();
})
Expand All @@ -146,9 +144,8 @@ fn bench_shredder_decoding(bencher: &mut Bencher) {
let symbol_count = MAX_DATA_SHREDS_PER_FEC_BLOCK as usize;
let data_shreds = make_shreds(symbol_count);
let coding_shreds = Shredder::generate_coding_shreds(
RECOMMENDED_FEC_RATE,
&data_shreds[..symbol_count],
symbol_count,
true, // is_last_in_slot
);
bencher.iter(|| {
Shredder::try_recovery(
Expand Down
4 changes: 2 additions & 2 deletions core/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ pub mod test {
entry::create_ticks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder, RECOMMENDED_FEC_RATE},
shred::{max_ticks_per_n_shreds, ProcessShredsStats, Shredder},
};
use solana_runtime::bank::Bank;
use solana_sdk::{
Expand Down Expand Up @@ -476,7 +476,7 @@ pub mod test {
let coding_shreds = Shredder::data_shreds_to_coding_shreds(
&keypair,
&data_shreds[0..],
RECOMMENDED_FEC_RATE,
true, // is_last_in_slot
&mut ProcessShredsStats::default(),
)
.unwrap();
Expand Down
3 changes: 1 addition & 2 deletions core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;
use solana_ledger::entry::Entry;
use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_ledger::shred::Shredder;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Keypair;

Expand Down Expand Up @@ -47,7 +47,6 @@ impl BroadcastRun for BroadcastFakeShredsRun {
let shredder = Shredder::new(
bank.slot(),
bank.parent().unwrap().slot(),
RECOMMENDED_FEC_RATE,
self.keypair.clone(),
(bank.tick_height() % bank.ticks_per_slot()) as u8,
self.shred_version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
let shredder = Shredder::new(
bank.slot(),
bank.parent().unwrap().slot(),
0.0,
self.keypair.clone(),
(bank.tick_height() % bank.ticks_per_slot()) as u8,
self.shred_version,
Expand Down
6 changes: 2 additions & 4 deletions core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
use solana_ledger::{
entry::Entry,
shred::{
ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, RECOMMENDED_FEC_RATE,
ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK,
SHRED_TICK_REFERENCE_MASK,
},
};
Expand Down Expand Up @@ -121,7 +121,6 @@ impl StandardBroadcastRun {
let (data_shreds, next_shred_index) = Shredder::new(
slot,
parent_slot,
RECOMMENDED_FEC_RATE,
self.keypair.clone(),
reference_tick,
self.shred_version,
Expand Down Expand Up @@ -451,8 +450,7 @@ fn make_coding_shreds(
.collect()
}
};
Shredder::data_shreds_to_coding_shreds(keypair, &data_shreds, RECOMMENDED_FEC_RATE, stats)
.unwrap()
Shredder::data_shreds_to_coding_shreds(keypair, &data_shreds, is_slot_end, stats).unwrap()
}

impl BroadcastRun for StandardBroadcastRun {
Expand Down
17 changes: 4 additions & 13 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3519,7 +3519,6 @@ mod tests {
use itertools::izip;
use rand::{seq::SliceRandom, SeedableRng};
use rand_chacha::ChaChaRng;
use serial_test::serial;
use solana_ledger::shred::Shredder;
use solana_sdk::signature::{Keypair, Signer};
use solana_vote_program::{vote_instruction, vote_state::Vote};
Expand Down Expand Up @@ -3812,17 +3811,9 @@ mod tests {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let keypair = Keypair::new();
let (slot, parent_slot, fec_rate, reference_tick, version) =
(53084024, 53084023, 0.0, 0, 0);
let shredder = Shredder::new(
slot,
parent_slot,
fec_rate,
leader.clone(),
reference_tick,
version,
)
.unwrap();
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder =
Shredder::new(slot, parent_slot, leader.clone(), reference_tick, version).unwrap();
let next_shred_index = rng.gen();
let shred = new_rand_shred(&mut rng, next_shred_index, &shredder);
let other_payload = new_rand_shred(&mut rng, next_shred_index, &shredder).payload;
Expand Down Expand Up @@ -4798,7 +4789,7 @@ mod tests {
}

#[test]
#[serial]
#[ignore] // TODO: debug why this is flaky on buildkite!
fn test_pull_request_time_pruning() {
let node = Node::new_localhost();
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info));
Expand Down
14 changes: 3 additions & 11 deletions core/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,17 +343,9 @@ pub(crate) mod tests {
fn test_duplicate_shred_round_trip() {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, fec_rate, reference_tick, version) =
(53084024, 53084023, 0.0, 0, 0);
let shredder = Shredder::new(
slot,
parent_slot,
fec_rate,
leader.clone(),
reference_tick,
version,
)
.unwrap();
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder =
Shredder::new(slot, parent_slot, leader.clone(), reference_tick, version).unwrap();
let next_shred_index = rng.gen();
let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder);
let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder);
Expand Down
6 changes: 4 additions & 2 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,10 @@ mod tests {
&hasher,
);
assert!(!packet.meta.discard);

let coding = solana_ledger::shred::Shredder::generate_coding_shreds(1.0f32, &[shred], 1);
let coding = solana_ledger::shred::Shredder::generate_coding_shreds(
&[shred],
false, // is_last_in_slot
);
coding[0].copy_to_packet(&mut packet);
ShredFetchStage::process_packet(
&mut packet,
Expand Down
3 changes: 1 addition & 2 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,7 @@ mod test {
parent: Slot,
keypair: &Arc<Keypair>,
) -> Vec<Shred> {
let shredder = Shredder::new(slot, parent, 0.0, keypair.clone(), 0, 0)
.expect("Failed to create entry shredder");
let shredder = Shredder::new(slot, parent, keypair.clone(), 0, 0).unwrap();
shredder.entries_to_shreds(&entries, true, 0).0
}

Expand Down
50 changes: 18 additions & 32 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,7 @@ impl Blockstore {

// Only used by tests
#[allow(clippy::too_many_arguments)]
pub fn write_entries(
pub(crate) fn write_entries(
&self,
start_slot: Slot,
num_ticks_in_start_slot: u64,
Expand All @@ -1529,16 +1529,15 @@ impl Blockstore {
keypair: &Arc<Keypair>,
entries: Vec<Entry>,
version: u16,
) -> Result<usize> {
) -> Result<usize /*num of data shreds*/> {
let mut parent_slot = parent.map_or(start_slot.saturating_sub(1), |v| v);
let num_slots = (start_slot - parent_slot).max(1); // Note: slot 0 has parent slot 0
assert!(num_ticks_in_start_slot < num_slots * ticks_per_slot);
let mut remaining_ticks_in_slot = num_slots * ticks_per_slot - num_ticks_in_start_slot;

let mut current_slot = start_slot;
let mut shredder =
Shredder::new(current_slot, parent_slot, 0.0, keypair.clone(), 0, version)
.expect("Failed to create entry shredder");
Shredder::new(current_slot, parent_slot, keypair.clone(), 0, version).unwrap();
let mut all_shreds = vec![];
let mut slot_entries = vec![];
// Find all the entries for start_slot
Expand All @@ -1563,12 +1562,11 @@ impl Blockstore {
shredder = Shredder::new(
current_slot,
parent_slot,
0.0,
keypair.clone(),
(ticks_per_slot - remaining_ticks_in_slot) as u8,
version,
)
.expect("Failed to create entry shredder");
.unwrap();
}

if entry.is_tick() {
Expand All @@ -1583,10 +1581,9 @@ impl Blockstore {
all_shreds.append(&mut data_shreds);
all_shreds.append(&mut coding_shreds);
}

let num_shreds = all_shreds.len();
let num_data = all_shreds.iter().filter(|shred| shred.is_data()).count();
self.insert_shreds(all_shreds, None, false)?;
Ok(num_shreds)
Ok(num_data)
}

pub fn get_index(&self, slot: Slot) -> Result<Option<Index>> {
Expand Down Expand Up @@ -3381,8 +3378,7 @@ pub fn create_new_ledger(
let last_hash = entries.last().unwrap().hash;
let version = solana_sdk::shred_version::version_from_hash(&last_hash);

let shredder = Shredder::new(0, 0, 0.0, Arc::new(Keypair::new()), 0, version)
.expect("Failed to create entry shredder");
let shredder = Shredder::new(0, 0, Arc::new(Keypair::new()), 0, version).unwrap();
let shreds = shredder.entries_to_shreds(&entries, true, 0).0;
assert!(shreds.last().unwrap().last_in_slot());

Expand Down Expand Up @@ -3558,10 +3554,10 @@ pub fn entries_to_test_shreds(
is_full_slot: bool,
version: u16,
) -> Vec<Shred> {
let shredder = Shredder::new(slot, parent_slot, 0.0, Arc::new(Keypair::new()), 0, version)
.expect("Failed to create entry shredder");

shredder.entries_to_shreds(&entries, is_full_slot, 0).0
Shredder::new(slot, parent_slot, Arc::new(Keypair::new()), 0, version)
.unwrap()
.entries_to_shreds(&entries, is_full_slot, 0)
.0
}

// used for tests only
Expand Down Expand Up @@ -7480,7 +7476,7 @@ pub mod tests {
fn test_recovery() {
let slot = 1;
let (data_shreds, coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, 100, 1.0);
setup_erasure_shreds(slot, 0, 100);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
Expand Down Expand Up @@ -7513,7 +7509,7 @@ pub mod tests {
let slot = 1;
let num_entries = 100;
let (data_shreds, coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, num_entries, 1.0);
setup_erasure_shreds(slot, 0, num_entries);
assert!(data_shreds.len() > 3);
assert!(coding_shreds.len() > 3);
let blockstore_path = get_tmp_ledger_path!();
Expand Down Expand Up @@ -7650,19 +7646,10 @@ pub mod tests {
slot: u64,
parent_slot: u64,
num_entries: u64,
erasure_rate: f32,
) -> (Vec<Shred>, Vec<Shred>, Arc<LeaderScheduleCache>) {
let entries = make_slot_entries_with_transactions(num_entries);
let leader_keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(
slot,
parent_slot,
erasure_rate,
leader_keypair.clone(),
0,
0,
)
.expect("Failed in creating shredder");
let shredder = Shredder::new(slot, parent_slot, leader_keypair.clone(), 0, 0).unwrap();
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0);

let genesis_config = create_genesis_config(2).genesis_config;
Expand Down Expand Up @@ -7714,8 +7701,7 @@ pub mod tests {
let entries1 = make_slot_entries_with_transactions(1);
let entries2 = make_slot_entries_with_transactions(1);
let leader_keypair = Arc::new(Keypair::new());
let shredder =
Shredder::new(slot, 0, 1.0, leader_keypair, 0, 0).expect("Failed in creating shredder");
let shredder = Shredder::new(slot, 0, leader_keypair, 0, 0).unwrap();
let (shreds, _, _) = shredder.entries_to_shreds(&entries1, true, 0);
let (duplicate_shreds, _, _) = shredder.entries_to_shreds(&entries2, true, 0);
let shred = shreds[0].clone();
Expand Down Expand Up @@ -8026,8 +8012,8 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();

let coding1 = Shredder::generate_coding_shreds(0.5f32, &shreds, usize::MAX);
let coding2 = Shredder::generate_coding_shreds(1.0f32, &shreds, usize::MAX);
let coding1 = Shredder::generate_coding_shreds(&shreds, false);
let coding2 = Shredder::generate_coding_shreds(&shreds, true);
for shred in &shreds {
info!("shred {:?}", shred);
}
Expand All @@ -8051,7 +8037,7 @@ pub mod tests {
solana_logger::setup();
let slot = 1;
let (_data_shreds, mut coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, 100, 1.0);
setup_erasure_shreds(slot, 0, 100);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
Expand Down
10 changes: 6 additions & 4 deletions ledger/src/erasure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ impl Session {
}

/// Create coding blocks by overwriting `parity`
pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> {
self.0.encode_sep(data, parity)?;

Ok(())
pub fn encode<T, U>(&self, data: &[T], parity: &mut [U]) -> Result<()>
where
T: AsRef<[u8]>,
U: AsRef<[u8]> + AsMut<[u8]>,
{
self.0.encode_sep(data, parity)
}

/// Recover data + coding blocks into data blocks
Expand Down
Loading

0 comments on commit ae605f8

Please sign in to comment.