Skip to content

Commit

Permalink
reduces intermediate vector allocations when recovering shreds (#4289)
Browse files Browse the repository at this point in the history
When recovering shreds, below two .collect::<Vec<_>>()  are
unnecessarily allocating a vector and can be skipped by using iterators:
https://github.com/anza-xyz/agave/blob/40be9b6d1/ledger/src/shred/merkle.rs#L987
https://github.com/anza-xyz/agave/blob/40be9b6d1/ledger/src/blockstore.rs#L1089

Also we can reduce allocations by collecting a Vec<Vec<Shred>> instead
of below append:
https://github.com/anza-xyz/agave/blob/40be9b6d1/ledger/src/blockstore.rs#L844
  • Loading branch information
behzadnouri authored Jan 6, 2025
1 parent a2b219c commit 7740b82
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 69 deletions.
113 changes: 53 additions & 60 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,10 +824,9 @@ impl Blockstore {
index: &Index,
erasure_meta: &ErasureMeta,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
recovered_shreds: &mut Vec<Shred>,
leader_schedule_cache: &LeaderScheduleCache,
reed_solomon_cache: &ReedSolomonCache,
) {
) -> std::result::Result<Vec<Shred>, shred::Error> {
// Find shreds for this erasure set and try recovery
let slot = index.slot;
let available_shreds: Vec<_> = self
Expand All @@ -837,14 +836,13 @@ impl Blockstore {
let get_slot_leader = |slot: Slot| -> Option<Pubkey> {
leader_schedule_cache.slot_leader_at(slot, /*bank:*/ None)
};
if let Ok(mut result) =
shred::recover(available_shreds, reed_solomon_cache, get_slot_leader)
{
let result = shred::recover(available_shreds, reed_solomon_cache, get_slot_leader);
if let Ok(result) = &result {
Self::submit_metrics(slot, erasure_meta, true, "complete".into(), result.len());
recovered_shreds.append(&mut result);
} else {
Self::submit_metrics(slot, erasure_meta, true, "incomplete".into(), 0);
}
result
}

fn submit_metrics(
Expand Down Expand Up @@ -974,44 +972,46 @@ impl Blockstore {
prev_inserted_shreds: &HashMap<ShredId, Shred>,
leader_schedule_cache: &LeaderScheduleCache,
reed_solomon_cache: &ReedSolomonCache,
) -> Vec<Shred> {
let mut recovered_shreds = vec![];
) -> Vec<Vec<Shred>> {
// Recovery rules:
// 1. Only try recovery around indexes for which new data or coding shreds are received
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
// 3. Before trying recovery, check if enough number of shreds have been received
// 3a. Enough number of shreds = (#data + #coding shreds) > erasure.num_data
for (erasure_set, working_erasure_meta) in erasure_metas.iter() {
let erasure_meta = working_erasure_meta.as_ref();
let slot = erasure_set.slot();
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
match erasure_meta.status(index) {
ErasureMetaStatus::CanRecover => {
self.recover_shreds(
index,
erasure_meta,
prev_inserted_shreds,
&mut recovered_shreds,
leader_schedule_cache,
reed_solomon_cache,
);
}
ErasureMetaStatus::DataFull => {
Self::submit_metrics(slot, erasure_meta, false, "complete".into(), 0);
}
ErasureMetaStatus::StillNeed(needed) => {
Self::submit_metrics(
slot,
erasure_meta,
false,
format!("still need: {needed}"),
0,
);
erasure_metas
.iter()
.filter_map(|(erasure_set, working_erasure_meta)| {
let erasure_meta = working_erasure_meta.as_ref();
let slot = erasure_set.slot();
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
match erasure_meta.status(index) {
ErasureMetaStatus::CanRecover => self
.recover_shreds(
index,
erasure_meta,
prev_inserted_shreds,
leader_schedule_cache,
reed_solomon_cache,
)
.ok(),
ErasureMetaStatus::DataFull => {
Self::submit_metrics(slot, erasure_meta, false, "complete".into(), 0);
None
}
ErasureMetaStatus::StillNeed(needed) => {
Self::submit_metrics(
slot,
erasure_meta,
false,
format!("still need: {needed}"),
0,
);
None
}
}
};
}
recovered_shreds
})
.collect()
}

/// Attempts shred recovery and does the following for recovered data
Expand All @@ -1030,29 +1030,27 @@ impl Blockstore {
) {
let mut start = Measure::start("Shred recovery");
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_shreds = self.try_shred_recovery(
&shred_insertion_tracker.erasure_metas,
&mut shred_insertion_tracker.index_working_set,
&shred_insertion_tracker.just_inserted_shreds,
leader_schedule_cache,
reed_solomon_cache,
);

metrics.num_recovered += recovered_shreds
.iter()
.filter(|shred| shred.is_data())
.count();
let recovered_shreds: Vec<_> = recovered_shreds
let recovered_shreds: Vec<_> = self
.try_shred_recovery(
&shred_insertion_tracker.erasure_metas,
&mut shred_insertion_tracker.index_working_set,
&shred_insertion_tracker.just_inserted_shreds,
leader_schedule_cache,
reed_solomon_cache,
)
.into_iter()
.flatten()
.filter_map(|shred| {
// Since the data shreds are fully recovered from the
// erasure batch, no need to store coding shreds in
// blockstore.
if shred.is_code() {
return Some(shred);
return Some(shred.into_payload());
}
metrics.num_recovered += 1;
let shred_payload = shred.payload().clone();
match self.check_insert_data_shred(
shred.clone(),
shred,
shred_insertion_tracker,
is_trusted,
leader_schedule,
Expand All @@ -1073,7 +1071,7 @@ impl Blockstore {
}
Ok(()) => {
metrics.num_recovered_inserted += 1;
Some(shred)
Some(shred_payload)
}
}
})
Expand All @@ -1082,12 +1080,7 @@ impl Blockstore {
.collect();
if !recovered_shreds.is_empty() {
if let Some(retransmit_sender) = retransmit_sender {
let _ = retransmit_sender.send(
recovered_shreds
.into_iter()
.map(Shred::into_payload)
.collect(),
);
let _ = retransmit_sender.send(recovered_shreds);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,6 @@ pub(crate) fn recover(
// when reconstructing the Merkle tree for the erasure batch, we
// will obtain the same Merkle root.
Ok(merkle::recover(shreds, reed_solomon_cache)?
.into_iter()
.map(|shred| {
let shred = Shred::from(shred);
debug_assert!(get_slot_leader(shred.slot())
Expand Down
15 changes: 7 additions & 8 deletions ledger/src/shred/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ fn make_merkle_proof(
pub(super) fn recover(
mut shreds: Vec<Shred>,
reed_solomon_cache: &ReedSolomonCache,
) -> Result<Vec<Shred>, Error> {
) -> Result<impl Iterator<Item = Shred>, Error> {
// Grab {common, coding} headers from first coding shred.
// Incoming shreds are resigned immediately after signature verification,
// so we can just grab the retransmitter signature from one of the
Expand Down Expand Up @@ -983,8 +983,7 @@ pub(super) fn recover(
.into_iter()
.zip(mask)
.filter(|(_, mask)| !mask)
.map(|(shred, _)| shred)
.collect())
.map(|(shred, _)| shred))
}

// Maps number of (code + data) shreds to merkle_proof.len().
Expand Down Expand Up @@ -1631,19 +1630,19 @@ mod test {
)
}) {
assert_matches!(
recover(shreds, reed_solomon_cache),
Err(Error::ErasureError(TooFewParityShards))
recover(shreds, reed_solomon_cache).err(),
Some(Error::ErasureError(TooFewParityShards))
);
continue;
}
if shreds.len() < num_data_shreds {
assert_matches!(
recover(shreds, reed_solomon_cache),
Err(Error::ErasureError(TooFewShardsPresent))
recover(shreds, reed_solomon_cache).err(),
Some(Error::ErasureError(TooFewShardsPresent))
);
continue;
}
let recovered_shreds = recover(shreds, reed_solomon_cache).unwrap();
let recovered_shreds: Vec<_> = recover(shreds, reed_solomon_cache).unwrap().collect();
assert_eq!(size + recovered_shreds.len(), num_shreds);
assert_eq!(recovered_shreds.len(), removed_shreds.len());
removed_shreds.sort_by(|a, b| {
Expand Down

0 comments on commit 7740b82

Please sign in to comment.