Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wen restart: play unfrozen block #4626

Merged
171 changes: 109 additions & 62 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ pub(crate) fn find_heaviest_fork(
slots,
blockstore.clone(),
bank_forks.clone(),
root_bank,
&exit,
)?;
info!(
Expand Down Expand Up @@ -593,7 +592,6 @@ pub(crate) fn find_bankhash_of_heaviest_fork(
slots: Vec<Slot>,
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
root_bank: Arc<Bank>,
exit: &AtomicBool,
) -> Result<Hash> {
if let Some(hash) = bank_forks
Expand All @@ -604,6 +602,7 @@ pub(crate) fn find_bankhash_of_heaviest_fork(
{
return Ok(hash);
}
let root_bank = bank_forks.read().unwrap().root_bank();
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&root_bank);
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("solReplayTx{i:02}"))
Expand All @@ -618,59 +617,47 @@ pub(crate) fn find_bankhash_of_heaviest_fork(
if exit.load(Ordering::Relaxed) {
return Err(WenRestartError::Exiting.into());
}
let saved_bank;
{
saved_bank = bank_forks.read().unwrap().get(slot);
}
let bank = match saved_bank {
Some(cur_bank) => {
if !cur_bank.is_frozen() {
return Err(WenRestartError::BlockNotFrozenAfterReplay(slot, None).into());
}
cur_bank
}
None => {
let new_bank = Bank::new_from_parent(
parent_bank.clone(),
&leader_schedule_cache
.slot_leader_at(slot, Some(&parent_bank))
.unwrap(),
slot,
let saved_bank = bank_forks.read().unwrap().get_with_scheduler(slot);
let bank_with_scheduler = saved_bank.unwrap_or_else(|| {
let new_bank = Bank::new_from_parent(
parent_bank.clone(),
&leader_schedule_cache
.slot_leader_at(slot, Some(&parent_bank))
.unwrap(),
slot,
);
bank_forks.write().unwrap().insert_from_ledger(new_bank)
});
let bank = if bank_with_scheduler.is_frozen() {
bank_with_scheduler.clone_without_scheduler()
} else {
let mut progress = ConfirmationProgress::new(parent_bank.last_blockhash());
if let Err(e) = process_single_slot(
&blockstore,
&bank_with_scheduler,
&replay_tx_thread_pool,
&opts,
&recyclers,
&mut progress,
None,
None,
None,
None,
&mut timing,
) {
return Err(
WenRestartError::BlockNotFrozenAfterReplay(slot, Some(e.to_string())).into(),
);
let bank_with_scheduler;
{
bank_with_scheduler = bank_forks.write().unwrap().insert_from_ledger(new_bank);
}
let mut progress = ConfirmationProgress::new(parent_bank.last_blockhash());
if let Err(e) = process_single_slot(
&blockstore,
&bank_with_scheduler,
&replay_tx_thread_pool,
&opts,
&recyclers,
&mut progress,
None,
None,
None,
None,
&mut timing,
) {
return Err(WenRestartError::BlockNotFrozenAfterReplay(
slot,
Some(e.to_string()),
)
.into());
}
let cur_bank;
{
cur_bank = bank_forks
.read()
.unwrap()
.get(slot)
.expect("bank should have been just inserted");
}
cur_bank
}
let cur_bank;
{
cur_bank = bank_forks
.read()
.unwrap()
.get(slot)
.expect("bank should have been just inserted");
}
cur_bank
};
parent_bank = bank;
}
Expand Down Expand Up @@ -837,11 +824,7 @@ pub(crate) fn verify_coordinator_heaviest_fork(
blockstore.clone(),
wen_restart_repair_slots.clone(),
)?;
let root_bank;
{
root_bank = bank_forks.read().unwrap().root_bank();
}
let root_slot = root_bank.slot();
let root_slot = bank_forks.read().unwrap().root_bank().slot();
let mut coordinator_heaviest_slot_ancestors: Vec<Slot> =
AncestorIterator::new_inclusive(coordinator_heaviest_slot, &blockstore)
.take_while(|slot| slot >= &root_slot)
Expand Down Expand Up @@ -879,7 +862,6 @@ pub(crate) fn verify_coordinator_heaviest_fork(
coordinator_heaviest_slot_ancestors,
blockstore.clone(),
bank_forks.clone(),
root_bank,
&exit,
)?
} else {
Expand Down Expand Up @@ -3242,7 +3224,6 @@ mod tests {
slots,
test_state.blockstore.clone(),
test_state.bank_forks.clone(),
old_root_bank,
&exit,
)
.unwrap();
Expand Down Expand Up @@ -3636,15 +3617,13 @@ mod tests {
let mut pushed_hash = Hash::default();
// The coordinator always sends its own choice.
let coordinator_slot = last_vote;
let old_root_bank = test_state.bank_forks.read().unwrap().root_bank();
let mut slots = test_state.last_voted_fork_slots.clone();
slots.reverse();
let coordinator_hash = find_bankhash_of_heaviest_fork(
coordinator_slot,
slots,
test_state.blockstore.clone(),
test_state.bank_forks.clone(),
old_root_bank,
&exit,
)
.unwrap();
Expand Down Expand Up @@ -3752,4 +3731,72 @@ mod tests {
assert_eq!(pushed_slot, my_slot);
assert_eq!(pushed_hash, my_hash);
}

fn run_and_check_find_bankhash_of_heaviest_fork(
test_state: &WenRestartTestInitResult,
slots: &[Slot],
slot: Slot,
) {
let exit = Arc::new(AtomicBool::new(false));
assert_eq!(
find_bankhash_of_heaviest_fork(
slot,
slots.to_vec(),
test_state.blockstore.clone(),
test_state.bank_forks.clone(),
&exit,
)
.unwrap(),
test_state
.bank_forks
.read()
.unwrap()
.get(slot)
.unwrap()
.hash()
);
}

#[test]
fn test_find_bankhash_of_heaviest_fork() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
let test_state = wen_restart_test_init(&ledger_path);
let last_vote = test_state.last_voted_fork_slots[0];
let mut slots = test_state.last_voted_fork_slots.clone();
slots.reverse();
run_and_check_find_bankhash_of_heaviest_fork(&test_state, &slots, last_vote);
let new_slot = last_vote + 1;
let _ = insert_slots_into_blockstore(
test_state.blockstore.clone(),
last_vote,
&[new_slot],
TICKS_PER_SLOT,
test_state.last_blockhash,
);
slots.push(new_slot);
run_and_check_find_bankhash_of_heaviest_fork(&test_state, &slots, new_slot);
let slot_full_but_not_replayed = last_vote + 2;
let _ = insert_slots_into_blockstore(
test_state.blockstore.clone(),
last_vote,
&[slot_full_but_not_replayed],
TICKS_PER_SLOT,
test_state.last_blockhash,
);
let new_bank = Bank::new_from_parent(
test_state.bank_forks.read().unwrap().get(new_slot).unwrap(),
&Pubkey::default(),
slot_full_but_not_replayed,
);
let _ = test_state
.bank_forks
.write()
.unwrap()
.insert_from_ledger(new_bank);
run_and_check_find_bankhash_of_heaviest_fork(
&test_state,
&slots,
slot_full_but_not_replayed,
);
}
}
Loading