Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wen restart aggregate last voted fork slots #33892

Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
47f8891
Push and aggregate RestartLastVotedForkSlots.
wen-coding Oct 25, 2023
a8d0c08
Fix API and lint errors.
wen-coding Oct 26, 2023
f7b8232
Reduce clutter.
wen-coding Oct 26, 2023
f5f71b4
Put my own LastVotedForkSlots into the aggregate.
wen-coding Oct 26, 2023
630cc70
Merge branch 'solana-labs:master' into wen_restart_aggregate_last_vot…
wen-coding Oct 27, 2023
ce32c03
Write LastVotedForkSlots aggregate progress into local file.
wen-coding Oct 27, 2023
b90185d
Fix typo and name constants.
wen-coding Oct 27, 2023
3c819f0
Fix flaky test.
wen-coding Oct 27, 2023
e21efe3
Clarify the comments.
wen-coding Oct 27, 2023
b24b8db
- Use constant for wait_for_supermajority
wen-coding Nov 8, 2023
a2204f3
Fix delay_after_first_shred and remove loop in wen_restart.
wen-coding Nov 9, 2023
0c1ef0f
Read wen_restart slots inside the loop instead.
wen-coding Nov 9, 2023
b9324c8
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Nov 11, 2023
122314d
Discard turbine shreds while in wen_restart in windows insert rather …
wen-coding Nov 12, 2023
e1252a4
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Nov 14, 2023
31ca285
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Nov 15, 2023
c3ab972
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Nov 16, 2023
8fc2327
Use the new Gossip API.
wen-coding Nov 16, 2023
229f447
Rename slots_to_repair_for_wen_restart and a few others.
wen-coding Nov 16, 2023
bc1b4b5
Rename a few more and list all states.
wen-coding Nov 17, 2023
8743b5c
Pipe exit down to aggregate loop so we can exit early.
wen-coding Nov 17, 2023
4ebbde8
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Nov 17, 2023
0d82a7c
Fix import of RestartLastVotedForkSlots.
wen-coding Nov 17, 2023
5e0a5b1
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Nov 17, 2023
ec21ec1
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Dec 12, 2023
c172c26
Use the new method to generate test bank.
wen-coding Dec 12, 2023
08de626
Make linter happy.
wen-coding Jan 4, 2024
ea4d800
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Jan 4, 2024
4f91be7
Use new bank constructor for tests.
wen-coding Jan 4, 2024
de89a4e
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Jan 20, 2024
1e98324
Fix a bad merge.
wen-coding Jan 20, 2024
777523f
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Jan 21, 2024
1e478e8
- add new const for wen_restart
wen-coding Jan 23, 2024
b0980e4
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Jan 23, 2024
f4acd69
Add initialize and put the main logic into a loop.
wen-coding Jan 23, 2024
bb471c1
Merge branch 'wen_restart_aggregate_last_voted_fork_slots' of https:/…
wen-coding Jan 23, 2024
c45a29b
Change aggregate interface and other fixes.
wen-coding Jan 24, 2024
167b790
Add failure tests and tests for state transition.
wen-coding Jan 24, 2024
e0a070f
Add more tests and add ability to recover from written records in
wen-coding Jan 27, 2024
8be5cd0
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Jan 27, 2024
ddd144e
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Jan 29, 2024
1cfc510
Various name changes.
wen-coding Jan 31, 2024
5b10c6e
We don't really care what type of error is returned.
wen-coding Jan 31, 2024
0620aaf
Wait on expected progress message in proto file instead of sleep.
wen-coding Jan 31, 2024
1ceda56
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Jan 31, 2024
93abe45
Code reorganization and cleanup.
wen-coding Feb 5, 2024
cb1788e
Make linter happy.
wen-coding Feb 5, 2024
72a732e
Add WenRestartError.
wen-coding Feb 5, 2024
4c920cb
Split WenRestartErrors into separate erros per state.
wen-coding Feb 7, 2024
bf71c9b
Revert "Split WenRestartErrors into separate erros per state."
wen-coding Feb 9, 2024
056aef7
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Feb 9, 2024
645452f
Use individual functions when testing for failures.
wen-coding Feb 10, 2024
f46e62a
Move initialization errors into initialize().
wen-coding Feb 28, 2024
e3d0194
Use anyhow instead of thiserror to generate backtrace for error.
wen-coding Feb 28, 2024
ffbb20c
Add missing Cargo.lock.
wen-coding Feb 29, 2024
3b50964
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Feb 29, 2024
59fd5ff
Add error log when last_vote is missing in the tower storage.
wen-coding Feb 29, 2024
e7c320c
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Feb 29, 2024
9ccf5ca
Merge branch 'master' into wen_restart_aggregate_last_voted_fork_slots
wen-coding Mar 1, 2024
021dbe9
Change error log info.
wen-coding Mar 1, 2024
40c0fb6
Change test to match exact error.
wen-coding Mar 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,7 @@ mod test {
ancestor_duplicate_slots_sender,
repair_validators: None,
repair_whitelist,
wen_restart_repair_slots: None,
};

let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
Expand Down
2 changes: 1 addition & 1 deletion core/src/repair/repair_generic_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ pub fn get_closest_completion(
continue;
}
let slot_meta = slot_meta_cache.get(&path_slot).unwrap().as_ref().unwrap();
let new_repairs = RepairService::generate_repairs_for_slot(
let new_repairs = RepairService::generate_repairs_for_slot_throttled_by_tick(
blockstore,
path_slot,
slot_meta,
Expand Down
183 changes: 152 additions & 31 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ pub struct RepairInfo {
pub repair_validators: Option<HashSet<Pubkey>>,
// Validators which should be given priority when serving
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
// A given list of slots to repair when in wen_restart
pub wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
}

pub struct RepairSlotRange {
Expand Down Expand Up @@ -397,17 +399,24 @@ impl RepairService {
);
add_votes_elapsed.stop();

let repairs = repair_weight.get_best_weighted_repairs(
blockstore,
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut repair_timing,
&mut best_repairs_stats,
);
let repairs = match repair_info.wen_restart_repair_slots.clone() {
Some(slots_to_repair) => Self::generate_repairs_for_wen_restart(
blockstore,
MAX_REPAIR_LENGTH,
AshwinSekar marked this conversation as resolved.
Show resolved Hide resolved
&slots_to_repair.read().unwrap(),
),
None => repair_weight.get_best_weighted_repairs(
blockstore,
root_bank.epoch_stakes_map(),
root_bank.epoch_schedule(),
MAX_ORPHANS,
MAX_REPAIR_LENGTH,
MAX_UNKNOWN_LAST_INDEX_REPAIRS,
MAX_CLOSEST_COMPLETION_REPAIRS,
&mut repair_timing,
&mut best_repairs_stats,
),
};

let mut popular_pruned_forks = repair_weight.get_popular_pruned_forks(
root_bank.epoch_stakes_map(),
Expand Down Expand Up @@ -618,32 +627,58 @@ impl RepairService {
}
}

pub fn generate_repairs_for_slot_throttled_by_tick(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, true)
}

pub fn generate_repairs_for_slot_not_throttled_by_tick(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
) -> Vec<ShredRepairType> {
Self::generate_repairs_for_slot(blockstore, slot, slot_meta, max_repairs, false)
}

/// If this slot is missing shreds generate repairs
pub fn generate_repairs_for_slot(
fn generate_repairs_for_slot(
blockstore: &Blockstore,
slot: Slot,
slot_meta: &SlotMeta,
max_repairs: usize,
throttle_requests_by_shred_tick: bool,
carllin marked this conversation as resolved.
Show resolved Hide resolved
) -> Vec<ShredRepairType> {
let defer_repair_threshold_ticks = if throttle_requests_by_shred_tick {
DEFER_REPAIR_THRESHOLD_TICKS
} else {
0
};
if max_repairs == 0 || slot_meta.is_full() {
vec![]
} else if slot_meta.consumed == slot_meta.received {
// check delay time of last shred
if let Some(reference_tick) = slot_meta
.received
.checked_sub(1)
.and_then(|index| blockstore.get_data_shred(slot, index).ok()?)
.and_then(|shred| shred::layout::get_reference_tick(&shred).ok())
.map(u64::from)
{
// System time is not monotonic
let ticks_since_first_insert = DEFAULT_TICKS_PER_SECOND
* timestamp().saturating_sub(slot_meta.first_shred_timestamp)
/ 1_000;
if ticks_since_first_insert
< reference_tick.saturating_add(DEFER_REPAIR_THRESHOLD_TICKS)
if throttle_requests_by_shred_tick {
// check delay time of last shred
if let Some(reference_tick) = slot_meta
.received
.checked_sub(1)
.and_then(|index| blockstore.get_data_shred(slot, index).ok()?)
.and_then(|shred| shred::layout::get_reference_tick(&shred).ok())
.map(u64::from)
{
return vec![];
// System time is not monotonic
let ticks_since_first_insert = DEFAULT_TICKS_PER_SECOND
* timestamp().saturating_sub(slot_meta.first_shred_timestamp)
/ 1_000;
if ticks_since_first_insert
< reference_tick.saturating_add(defer_repair_threshold_ticks)
{
return vec![];
}
}
}
vec![ShredRepairType::HighestShred(slot, slot_meta.received)]
Expand All @@ -652,7 +687,7 @@ impl RepairService {
.find_missing_data_indexes(
slot,
slot_meta.first_shred_timestamp,
DEFER_REPAIR_THRESHOLD_TICKS,
defer_repair_threshold_ticks,
slot_meta.consumed,
slot_meta.received,
max_repairs,
Expand All @@ -674,7 +709,7 @@ impl RepairService {
while repairs.len() < max_repairs && !pending_slots.is_empty() {
let slot = pending_slots.pop().unwrap();
if let Some(slot_meta) = blockstore.meta(slot).unwrap() {
let new_repairs = Self::generate_repairs_for_slot(
let new_repairs = Self::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
&slot_meta,
Expand All @@ -689,6 +724,33 @@ impl RepairService {
}
}

pub(crate) fn generate_repairs_for_wen_restart(
blockstore: &Blockstore,
max_repairs: usize,
slots: &Vec<Slot>,
) -> Vec<ShredRepairType> {
let mut repairs: Vec<ShredRepairType> = Vec::new();
for slot in slots {
if let Some(slot_meta) = blockstore.meta(*slot).unwrap() {
// When in wen_restart, turbine is not running, so there is
// no need to wait after first shred.
let new_repairs = Self::generate_repairs_for_slot_not_throttled_by_tick(
blockstore,
*slot,
&slot_meta,
max_repairs - repairs.len(),
);
repairs.extend(new_repairs);
} else {
repairs.push(ShredRepairType::HighestShred(*slot, 0));
}
if repairs.len() >= max_repairs {
break;
}
}
repairs
}

fn get_repair_peers(
cluster_info: Arc<ClusterInfo>,
cluster_slots: Arc<ClusterSlots>,
Expand Down Expand Up @@ -845,7 +907,7 @@ impl RepairService {
..SlotMeta::default()
});

let new_repairs = Self::generate_repairs_for_slot(
let new_repairs = Self::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
&meta,
Expand All @@ -867,7 +929,7 @@ impl RepairService {
// If the slot is full, no further need to repair this slot
None
} else {
Some(Self::generate_repairs_for_slot(
Some(Self::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
&slot_meta,
Expand Down Expand Up @@ -1548,4 +1610,63 @@ mod test {
);
assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
}

#[test]
fn test_generate_repairs_for_wen_restart() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let max_repairs = 3;

let slots: Vec<u64> = vec![2, 3, 5, 7];
let num_entries_per_slot = max_ticks_per_n_shreds(3, None) + 1;

let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (i, (mut slot_shreds, _)) in shreds.into_iter().enumerate() {
slot_shreds.remove(i);
blockstore.insert_shreds(slot_shreds, None, false).unwrap();
}

let mut slots_to_repair: Vec<Slot> = vec![];

// When slots_to_repair is empty, ignore all and return empty result.
let result = RepairService::generate_repairs_for_wen_restart(
&blockstore,
max_repairs,
&slots_to_repair,
);
assert!(result.is_empty());

// When asked to repair slot with missing shreds and some unknown slot, return correct results.
slots_to_repair = vec![3, 81];
let result = RepairService::generate_repairs_for_wen_restart(
&blockstore,
max_repairs,
&slots_to_repair,
);
assert_eq!(
result,
vec![
ShredRepairType::Shred(3, 1),
ShredRepairType::HighestShred(81, 0),
],
);

// Test that it will not generate more than max_repairs.e().unwrap();
slots_to_repair = vec![2, 82, 7, 83, 84];
let result = RepairService::generate_repairs_for_wen_restart(
&blockstore,
max_repairs,
&slots_to_repair,
);
assert_eq!(result.len(), max_repairs);
assert_eq!(
result,
vec![
ShredRepairType::Shred(2, 0),
ShredRepairType::HighestShred(82, 0),
ShredRepairType::HighestShred(7, 3),
],
carllin marked this conversation as resolved.
Show resolved Hide resolved
);
}
}
2 changes: 1 addition & 1 deletion core/src/repair/repair_weighted_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub fn get_best_repair_shreds(
if let Some(slot_meta) = slot_meta {
match next {
Visit::Unvisited(slot) => {
let new_repairs = RepairService::generate_repairs_for_slot(
let new_repairs = RepairService::generate_repairs_for_slot_throttled_by_tick(
blockstore,
slot,
slot_meta,
Expand Down
3 changes: 3 additions & 0 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl Tvu {
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
cluster_slots: Arc<ClusterSlots>,
wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
Expand Down Expand Up @@ -214,6 +215,7 @@ impl Tvu {
repair_whitelist: tvu_config.repair_whitelist,
cluster_info: cluster_info.clone(),
cluster_slots: cluster_slots.clone(),
wen_restart_repair_slots,
};
WindowService::new(
blockstore.clone(),
Expand Down Expand Up @@ -506,6 +508,7 @@ pub mod tests {
repair_quic_endpoint_sender,
outstanding_repair_requests,
cluster_slots,
None,
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);
Expand Down
15 changes: 15 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ use {

const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
// Right now since we reuse the wait for supermajority code, the
// following threshold should always greater than or equal to
// WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT.
const WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT: u64 =
WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT;

#[derive(Clone, EnumString, EnumVariantNames, Default, IntoStaticStr, Display)]
#[strum(serialize_all = "kebab-case")]
Expand Down Expand Up @@ -1230,6 +1235,11 @@ impl Validator {
};

let in_wen_restart = config.wen_restart_proto_path.is_some() && !waited_for_supermajority;
let wen_restart_repair_slots = if in_wen_restart {
Some(Arc::new(RwLock::new(Vec::new())))
} else {
None
};
let tower = match process_blockstore.process_to_create_tower() {
Ok(tower) => {
info!("Tower state: {:?}", tower);
Expand Down Expand Up @@ -1304,6 +1314,7 @@ impl Validator {
repair_quic_endpoint_sender,
outstanding_repair_requests.clone(),
cluster_slots.clone(),
wen_restart_repair_slots.clone(),
)?;

if in_wen_restart {
Expand All @@ -1313,6 +1324,10 @@ impl Validator {
last_vote,
blockstore.clone(),
cluster_info.clone(),
bank_forks.clone(),
wen_restart_repair_slots.clone(),
WAIT_FOR_WEN_RESTART_SUPERMAJORITY_THRESHOLD_PERCENT,
exit.clone(),
) {
Ok(()) => {
return Err("wen_restart phase one completedy".to_string());
Expand Down
Loading
Loading