-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Wen restart aggregate last voted fork slots #33892
Changes from 16 commits
47f8891
a8d0c08
f7b8232
f5f71b4
630cc70
ce32c03
b90185d
3c819f0
e21efe3
b24b8db
a2204f3
0c1ef0f
b9324c8
122314d
e1252a4
31ca285
c3ab972
8fc2327
229f447
bc1b4b5
8743b5c
4ebbde8
0d82a7c
5e0a5b1
ec21ec1
c172c26
08de626
ea4d800
4f91be7
de89a4e
1e98324
777523f
1e478e8
b0980e4
f4acd69
bb471c1
c45a29b
167b790
e0a070f
8be5cd0
ddd144e
1cfc510
5b10c6e
0620aaf
1ceda56
93abe45
cb1788e
72a732e
4c920cb
bf71c9b
056aef7
645452f
f46e62a
e3d0194
ffbb20c
3b50964
59fd5ff
e7c320c
9ccf5ca
021dbe9
40c0fb6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -213,6 +213,8 @@ pub struct RepairInfo { | |
pub repair_validators: Option<HashSet<Pubkey>>, | ||
// Validators which should be given priority when serving | ||
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>, | ||
// A given list of slots to repair when in wen_restart | ||
pub slots_to_repair_for_wen_restart: Option<Arc<RwLock<Vec<Slot>>>>, | ||
} | ||
|
||
pub struct RepairSlotRange { | ||
|
@@ -386,17 +388,24 @@ impl RepairService { | |
); | ||
add_votes_elapsed.stop(); | ||
|
||
let repairs = repair_weight.get_best_weighted_repairs( | ||
blockstore, | ||
root_bank.epoch_stakes_map(), | ||
root_bank.epoch_schedule(), | ||
MAX_ORPHANS, | ||
MAX_REPAIR_LENGTH, | ||
MAX_UNKNOWN_LAST_INDEX_REPAIRS, | ||
MAX_CLOSEST_COMPLETION_REPAIRS, | ||
&mut repair_timing, | ||
&mut best_repairs_stats, | ||
); | ||
let repairs = match repair_info.slots_to_repair_for_wen_restart.clone() { | ||
Some(slots_to_repair) => Self::generate_repairs_for_wen_restart( | ||
blockstore, | ||
MAX_REPAIR_LENGTH, | ||
AshwinSekar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
&slots_to_repair.read().unwrap(), | ||
), | ||
None => repair_weight.get_best_weighted_repairs( | ||
blockstore, | ||
root_bank.epoch_stakes_map(), | ||
root_bank.epoch_schedule(), | ||
MAX_ORPHANS, | ||
MAX_REPAIR_LENGTH, | ||
MAX_UNKNOWN_LAST_INDEX_REPAIRS, | ||
MAX_CLOSEST_COMPLETION_REPAIRS, | ||
&mut repair_timing, | ||
&mut best_repairs_stats, | ||
), | ||
}; | ||
|
||
let mut popular_pruned_forks = repair_weight.get_popular_pruned_forks( | ||
root_bank.epoch_stakes_map(), | ||
|
@@ -613,7 +622,14 @@ impl RepairService { | |
slot: Slot, | ||
slot_meta: &SlotMeta, | ||
max_repairs: usize, | ||
add_delay_after_first_shred: bool, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe change name to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
) -> Vec<ShredRepairType> { | ||
// When in wen_restart, turbine not running, no need to wait after first shred. | ||
carllin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let defer_repair_threshold_ticks = if add_delay_after_first_shred { | ||
DEFER_REPAIR_THRESHOLD_TICKS | ||
} else { | ||
0 | ||
}; | ||
if max_repairs == 0 || slot_meta.is_full() { | ||
vec![] | ||
} else if slot_meta.consumed == slot_meta.received { | ||
|
@@ -630,7 +646,7 @@ impl RepairService { | |
* timestamp().saturating_sub(slot_meta.first_shred_timestamp) | ||
/ 1_000; | ||
if ticks_since_first_insert | ||
< reference_tick.saturating_add(DEFER_REPAIR_THRESHOLD_TICKS) | ||
< reference_tick.saturating_add(defer_repair_threshold_ticks) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we bypass the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
{ | ||
return vec![]; | ||
} | ||
|
@@ -641,7 +657,7 @@ impl RepairService { | |
.find_missing_data_indexes( | ||
slot, | ||
slot_meta.first_shred_timestamp, | ||
DEFER_REPAIR_THRESHOLD_TICKS, | ||
defer_repair_threshold_ticks, | ||
slot_meta.consumed, | ||
slot_meta.received, | ||
max_repairs, | ||
|
@@ -668,6 +684,7 @@ impl RepairService { | |
slot, | ||
&slot_meta, | ||
max_repairs - repairs.len(), | ||
true, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could you add the param name in a comment like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
); | ||
repairs.extend(new_repairs); | ||
let next_slots = slot_meta.next_slots; | ||
|
@@ -678,6 +695,32 @@ impl RepairService { | |
} | ||
} | ||
|
||
pub(crate) fn generate_repairs_for_wen_restart( | ||
blockstore: &Blockstore, | ||
max_repairs: usize, | ||
slots: &Vec<Slot>, | ||
) -> Vec<ShredRepairType> { | ||
let mut result: Vec<ShredRepairType> = Vec::new(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
for slot in slots { | ||
if let Some(slot_meta) = blockstore.meta(*slot).unwrap() { | ||
let new_repairs = Self::generate_repairs_for_slot( | ||
AshwinSekar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
blockstore, | ||
*slot, | ||
&slot_meta, | ||
max_repairs - result.len(), | ||
false, | ||
); | ||
result.extend(new_repairs); | ||
} else { | ||
result.push(ShredRepairType::HighestShred(*slot, 0)); | ||
} | ||
if result.len() >= max_repairs { | ||
break; | ||
} | ||
} | ||
result | ||
} | ||
|
||
/// Generate repairs for all slots `x` in the repair_range.start <= x <= repair_range.end | ||
#[cfg(test)] | ||
pub fn generate_repairs_in_range( | ||
|
@@ -705,6 +748,7 @@ impl RepairService { | |
slot, | ||
&meta, | ||
max_repairs - repairs.len(), | ||
true, | ||
); | ||
repairs.extend(new_repairs); | ||
} | ||
|
@@ -727,6 +771,7 @@ impl RepairService { | |
slot, | ||
&slot_meta, | ||
MAX_REPAIR_PER_DUPLICATE, | ||
true, | ||
)) | ||
} | ||
} else { | ||
|
@@ -1349,4 +1394,64 @@ mod test { | |
); | ||
assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr); | ||
} | ||
|
||
#[test] | ||
fn test_generate_repairs_for_wen_restart() { | ||
solana_logger::setup(); | ||
let ledger_path = get_tmp_ledger_path_auto_delete!(); | ||
let blockstore = Blockstore::open(ledger_path.path()).unwrap(); | ||
let max_repairs = 3; | ||
|
||
let slots: Vec<u64> = vec![2, 3, 5, 7]; | ||
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1; | ||
|
||
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); | ||
for (mut slot_shreds, _) in shreds.into_iter() { | ||
slot_shreds.remove(1); | ||
blockstore.insert_shreds(slot_shreds, None, false).unwrap(); | ||
} | ||
sleep_shred_deferment_period(); | ||
carllin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let mut slots_to_repair: Vec<Slot> = vec![]; | ||
|
||
// When slots_to_repair is empty, ignore all and return empty result. | ||
let result = RepairService::generate_repairs_for_wen_restart( | ||
&blockstore, | ||
max_repairs, | ||
&slots_to_repair, | ||
); | ||
assert!(result.is_empty()); | ||
|
||
// When asked to repair dead_slot and some unknown slot, return correct results. | ||
carllin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
slots_to_repair = vec![2, 81]; | ||
let result = RepairService::generate_repairs_for_wen_restart( | ||
&blockstore, | ||
max_repairs, | ||
&slots_to_repair, | ||
); | ||
assert_eq!( | ||
result, | ||
vec![ | ||
ShredRepairType::HighestShred(2, 1), | ||
ShredRepairType::HighestShred(81, 0), | ||
], | ||
); | ||
|
||
// Test that it will not generate more than max_repairs.e().unwrap(); | ||
slots_to_repair = vec![3, 82, 5, 83, 7, 84]; | ||
let result = RepairService::generate_repairs_for_wen_restart( | ||
&blockstore, | ||
max_repairs, | ||
&slots_to_repair, | ||
); | ||
assert_eq!(result.len(), max_repairs); | ||
assert_eq!( | ||
result, | ||
vec![ | ||
ShredRepairType::HighestShred(3, 1), | ||
ShredRepairType::HighestShred(82, 0), | ||
ShredRepairType::HighestShred(5, 1), | ||
], | ||
carllin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -138,6 +138,7 @@ impl Tvu { | |
turbine_quic_endpoint_sender: AsyncSender<(SocketAddr, Bytes)>, | ||
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, | ||
repair_quic_endpoint_sender: AsyncSender<LocalRequest>, | ||
slots_to_repair_for_wen_restart: Option<Arc<RwLock<Vec<Slot>>>>, | ||
) -> Result<Self, String> { | ||
let TvuSockets { | ||
repair: repair_socket, | ||
|
@@ -205,6 +206,7 @@ impl Tvu { | |
repair_whitelist: tvu_config.repair_whitelist, | ||
cluster_info: cluster_info.clone(), | ||
cluster_slots: cluster_slots.clone(), | ||
slots_to_repair_for_wen_restart: slots_to_repair_for_wen_restart.clone(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you need to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually I don't now, removed. |
||
}; | ||
WindowService::new( | ||
blockstore.clone(), | ||
|
@@ -491,6 +493,7 @@ pub mod tests { | |
turbine_quic_endpoint_sender, | ||
turbine_quic_endpoint_receiver, | ||
repair_quic_endpoint_sender, | ||
None, | ||
) | ||
.expect("assume success"); | ||
exit.store(true, Ordering::Relaxed); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -204,15 +204,19 @@ fn prune_shreds_invalid_repair( | |
shreds: &mut Vec<Shred>, | ||
repair_infos: &mut Vec<Option<RepairMeta>>, | ||
outstanding_requests: &RwLock<OutstandingShredRepairs>, | ||
in_wen_restart: bool, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe something like: I think the function name should probably change to something more generic like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
) { | ||
assert_eq!(shreds.len(), repair_infos.len()); | ||
let mut i = 0; | ||
let mut removed = HashSet::new(); | ||
{ | ||
let mut outstanding_requests = outstanding_requests.write().unwrap(); | ||
shreds.retain(|shred| { | ||
// In wen_restart, we discard all shreds from Turbine and keep only those from repair to | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the param name is changed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
// avoid new shreds make validator OOM before wen_restart is over. | ||
let should_keep = ( | ||
verify_repair(&mut outstanding_requests, shred, &repair_infos[i]), | ||
(!in_wen_restart || repair_infos[i].is_some()) | ||
&& verify_repair(&mut outstanding_requests, shred, &repair_infos[i]), | ||
i += 1, | ||
) | ||
.0; | ||
|
@@ -240,6 +244,7 @@ fn run_insert<F>( | |
retransmit_sender: &Sender<Vec<ShredPayload>>, | ||
outstanding_requests: &RwLock<OutstandingShredRepairs>, | ||
reed_solomon_cache: &ReedSolomonCache, | ||
in_wen_restart: bool, | ||
) -> Result<()> | ||
where | ||
F: Fn(PossibleDuplicateShred), | ||
|
@@ -285,7 +290,12 @@ where | |
|
||
let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed"); | ||
let num_shreds = shreds.len(); | ||
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests); | ||
prune_shreds_invalid_repair( | ||
&mut shreds, | ||
&mut repair_infos, | ||
outstanding_requests, | ||
in_wen_restart, | ||
); | ||
ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len(); | ||
let repairs: Vec<_> = repair_infos | ||
.iter() | ||
|
@@ -343,6 +353,8 @@ impl WindowService { | |
|
||
let cluster_info = repair_info.cluster_info.clone(); | ||
|
||
let in_wen_restart = repair_info.slots_to_repair_for_wen_restart.is_some(); | ||
|
||
let repair_service = RepairService::new( | ||
blockstore.clone(), | ||
exit.clone(), | ||
|
@@ -377,6 +389,7 @@ impl WindowService { | |
completed_data_sets_sender, | ||
retransmit_sender, | ||
outstanding_requests, | ||
in_wen_restart, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will be enabled for as long as the validator is not restarted. When wen_restart is done, we will set wen_restart state to DONE and restart the validator, then in_wen_restart will be false and this behavior will be turned off. |
||
); | ||
|
||
WindowService { | ||
|
@@ -424,6 +437,7 @@ impl WindowService { | |
completed_data_sets_sender: CompletedDataSetsSender, | ||
retransmit_sender: Sender<Vec<ShredPayload>>, | ||
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>, | ||
in_wen_restart: bool, | ||
) -> JoinHandle<()> { | ||
let handle_error = || { | ||
inc_new_counter_error!("solana-window-insert-error", 1, 1); | ||
|
@@ -456,6 +470,7 @@ impl WindowService { | |
&retransmit_sender, | ||
&outstanding_requests, | ||
&reed_solomon_cache, | ||
in_wen_restart, | ||
) { | ||
ws_metrics.record_error(&e); | ||
if Self::should_exit_on_error(e, &handle_error) { | ||
|
@@ -683,7 +698,7 @@ mod test { | |
4, // position | ||
0, // version | ||
); | ||
let mut shreds = vec![shred.clone(), shred.clone(), shred]; | ||
let mut shreds = vec![shred.clone(), shred.clone(), shred.clone()]; | ||
let repair_meta = RepairMeta { nonce: 0 }; | ||
let outstanding_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default())); | ||
let repair_type = ShredRepairType::Orphan(9); | ||
|
@@ -693,9 +708,21 @@ mod test { | |
.add_request(repair_type, timestamp()); | ||
let repair_meta1 = RepairMeta { nonce }; | ||
let mut repair_infos = vec![None, Some(repair_meta), Some(repair_meta1)]; | ||
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, &outstanding_requests); | ||
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, &outstanding_requests, false); | ||
assert_eq!(shreds.len(), 2); | ||
assert_eq!(repair_infos.len(), 2); | ||
assert!(repair_infos[0].is_none()); | ||
assert_eq!(repair_infos[1].as_ref().unwrap().nonce, nonce); | ||
|
||
shreds = vec![shred.clone(), shred.clone(), shred]; | ||
let repair_meta2 = RepairMeta { nonce: 0 }; | ||
let repair_meta3 = RepairMeta { nonce }; | ||
repair_infos = vec![None, Some(repair_meta2), Some(repair_meta3)]; | ||
// In wen_restart, we discard all Turbine shreds and only keep valid repair shreds. | ||
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, &outstanding_requests, true); | ||
assert_eq!(shreds.len(), 1); | ||
assert_eq!(repair_infos.len(), 1); | ||
assert!(repair_infos[0].is_some()); | ||
assert_eq!(repair_infos[0].as_ref().unwrap().nonce, nonce); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion:
s/slots_to_repair_for_wen_restart/wen_restart_repair_slots/
for brevityThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.