Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replay: do not recheck duplicate confirmation if already confirmed #1237

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ impl Tower {
}
}

pub(crate) fn is_slot_confirmed(
#[cfg(test)]
fn is_slot_confirmed(
&self,
slot: Slot,
voted_stakes: &VotedStakes,
Expand Down
10 changes: 5 additions & 5 deletions core/src/consensus/progress_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ pub struct ForkStats {
pub vote_threshold: Vec<ThresholdDecision>,
pub is_locked_out: bool,
pub voted_stakes: VotedStakes,
pub is_supermajority_confirmed: bool,
pub duplicate_confirmed_hash: Option<Hash>,
pub computed: bool,
pub lockout_intervals: LockoutIntervals,
pub bank_hash: Option<Hash>,
Expand Down Expand Up @@ -368,15 +368,15 @@ impl ProgressMap {
.and_then(|s| s.fork_stats.my_latest_landed_vote)
}

pub fn set_supermajority_confirmed_slot(&mut self, slot: Slot) {
pub fn set_duplicate_confirmed_hash(&mut self, slot: Slot, hash: Hash) {
let slot_progress = self.get_mut(&slot).unwrap();
slot_progress.fork_stats.is_supermajority_confirmed = true;
slot_progress.fork_stats.duplicate_confirmed_hash = Some(hash);
}

pub fn is_supermajority_confirmed(&self, slot: Slot) -> Option<bool> {
pub fn is_duplicate_confirmed(&self, slot: Slot) -> Option<bool> {
self.progress_map
.get(&slot)
.map(|s| s.fork_stats.is_supermajority_confirmed)
.map(|s| s.fork_stats.duplicate_confirmed_hash.is_some())
}

pub fn get_bank_prev_leader_slot(&self, bank: &Bank) -> Option<Slot> {
Expand Down
163 changes: 50 additions & 113 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,6 @@ enum ForkReplayMode {
Parallel(ThreadPool),
}

#[derive(PartialEq, Eq, Debug)]
enum ConfirmationType {
SupermajorityVoted,
DuplicateConfirmed,
}

enum GenerateVoteTxResult {
// non voting validator, not eligible for refresh
NonVoting,
Expand All @@ -146,31 +140,6 @@ impl GenerateVoteTxResult {
}
}

#[derive(PartialEq, Eq, Debug)]
struct ConfirmedSlot {
slot: Slot,
frozen_hash: Hash,
confirmation_type: ConfirmationType,
}

impl ConfirmedSlot {
fn new_supermajority_voted(slot: Slot, frozen_hash: Hash) -> Self {
Self {
slot,
frozen_hash,
confirmation_type: ConfirmationType::SupermajorityVoted,
}
}

fn new_duplicate_confirmed_slot(slot: Slot, frozen_hash: Hash) -> Self {
Self {
slot,
frozen_hash,
confirmation_type: ConfirmationType::DuplicateConfirmed,
}
}
}

// Implement a destructor for the ReplayStage thread to signal it exited
// even on panics
struct Finalizer {
Expand Down Expand Up @@ -864,16 +833,16 @@ impl ReplayStage {
let mut compute_slot_stats_time = Measure::start("compute_slot_stats_time");
for slot in newly_computed_slot_stats {
let fork_stats = progress.get_fork_stats(slot).unwrap();
let confirmed_slots = Self::confirm_forks(
let duplicate_confirmed_forks = Self::tower_duplicate_confirmed_forks(
&tower,
&fork_stats.voted_stakes,
fork_stats.total_stake,
&progress,
&bank_forks,
);

Self::mark_slots_confirmed(
&confirmed_slots,
Self::mark_slots_duplicate_confirmed(
&duplicate_confirmed_forks,
&blockstore,
&bank_forks,
&mut progress,
Expand Down Expand Up @@ -4117,8 +4086,8 @@ impl ReplayStage {
}

#[allow(clippy::too_many_arguments)]
fn mark_slots_confirmed(
confirmed_slots: &[ConfirmedSlot],
fn mark_slots_duplicate_confirmed(
confirmed_slots: &[(Slot, Hash)],
blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
progress: &mut ProgressMap,
Expand All @@ -4131,36 +4100,14 @@ impl ReplayStage {
duplicate_confirmed_slots: &mut DuplicateConfirmedSlots,
) {
let root_slot = bank_forks.read().unwrap().root();
for ConfirmedSlot {
slot,
frozen_hash,
confirmation_type,
} in confirmed_slots.iter()
{
if *confirmation_type == ConfirmationType::SupermajorityVoted {
// This case should be guaranteed as false by confirm_forks()
if let Some(false) = progress.is_supermajority_confirmed(*slot) {
// Because supermajority confirmation will iterate through and update the
// subtree in fork choice, only incur this cost if the slot wasn't already
// confirmed
progress.set_supermajority_confirmed_slot(*slot);
// If the slot was confirmed, then it must be frozen. Otherwise, we couldn't
// have replayed any of its descendants and figured out it was confirmed.
assert!(*frozen_hash != Hash::default());
}
}
for (slot, frozen_hash) in confirmed_slots.iter() {
assert!(*frozen_hash != Hash::default());

if *slot <= root_slot {
continue;
}

match confirmation_type {
ConfirmationType::SupermajorityVoted => (),
ConfirmationType::DuplicateConfirmed => (),
#[allow(unreachable_patterns)]
_ => panic!("programmer error"),
}

progress.set_duplicate_confirmed_hash(*slot, *frozen_hash);
if let Some(prev_hash) = duplicate_confirmed_slots.insert(*slot, *frozen_hash) {
assert_eq!(prev_hash, *frozen_hash);
// Already processed this signal
Expand All @@ -4187,60 +4134,53 @@ impl ReplayStage {
}
}

fn confirm_forks(
fn tower_duplicate_confirmed_forks(
tower: &Tower,
voted_stakes: &VotedStakes,
total_stake: Stake,
progress: &ProgressMap,
bank_forks: &RwLock<BankForks>,
) -> Vec<ConfirmedSlot> {
let mut confirmed_forks = vec![];
) -> Vec<(Slot, Hash)> {
let mut duplicate_confirmed_forks = vec![];
for (slot, prog) in progress.iter() {
if !prog.fork_stats.is_supermajority_confirmed {
let bank = bank_forks
.read()
.unwrap()
.get(*slot)
.expect("bank in progress must exist in BankForks")
.clone();
let duration = prog
.replay_stats
.read()
.unwrap()
.started
.elapsed()
.as_millis();
if bank.is_frozen() && tower.is_slot_confirmed(*slot, voted_stakes, total_stake) {
info!("validator fork confirmed {} {}ms", *slot, duration);
datapoint_info!("validator-confirmation", ("duration_ms", duration, i64));
carllin marked this conversation as resolved.
Show resolved Hide resolved
confirmed_forks
.push(ConfirmedSlot::new_supermajority_voted(*slot, bank.hash()));
} else if bank.is_frozen()
&& tower.is_slot_duplicate_confirmed(*slot, voted_stakes, total_stake)
{
info!(
"validator fork duplicate confirmed {} {}ms",
*slot, duration
);
datapoint_info!(
"validator-duplicate-confirmation",
("duration_ms", duration, i64)
);
confirmed_forks.push(ConfirmedSlot::new_duplicate_confirmed_slot(
*slot,
bank.hash(),
));
} else {
debug!(
"validator fork not confirmed {} {}ms {:?}",
*slot,
duration,
voted_stakes.get(slot)
);
}
if prog.fork_stats.duplicate_confirmed_hash.is_some() {
continue;
}
let bank = bank_forks
.read()
.unwrap()
.get(*slot)
.expect("bank in progress must exist in BankForks");
let duration = prog
.replay_stats
.read()
.unwrap()
.started
.elapsed()
.as_millis();
if !bank.is_frozen() {
continue;
}
if tower.is_slot_duplicate_confirmed(*slot, voted_stakes, total_stake) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

realized the is_slot_duplicate_confirmed detection uses voted_stakes which pops off some slots due to the simulated vote. Probably could improve that a bit by having a separate version of voted_stakes that doesn't simulate the vote

Would also make the

datapoint_info!(
                        "bank_weight",
                        ("slot", bank_slot, i64),
                        ("fork_stake", stats.fork_stake, i64),
                        ("fork_weight", stats.fork_weight(), f64),
                    );

log a bit more accurate

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, actually what do you think about using fork choice for DC? Can address in a future PR.

It seems like it should be fine to aggregate DC amongst children forks? It would also save us from the double iteration, rn we iterate over all the newly frozen banks to grab a voted stakes from each and then iterate over all unrooted slot to see if any voted stakes DC's the unrooted slot.

Instead we could just iterate once over all unrooted slots and check fork choice to see if it's DC.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only downside of using fork choice is it only tracks the latest vote, so if people jump around I think it can miss DC

info!(
"validator fork duplicate confirmed {} {}ms",
*slot, duration
);
datapoint_info!(
"validator-duplicate-confirmation",
("duration_ms", duration, i64)
);
duplicate_confirmed_forks.push((*slot, bank.hash()));
} else {
debug!(
"validator fork not confirmed {} {}ms {:?}",
*slot,
duration,
voted_stakes.get(slot)
);
}
}
confirmed_forks
duplicate_confirmed_forks
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -5552,7 +5492,7 @@ pub(crate) mod tests {
// bank 1, so no slot should be confirmed.
{
let fork_progress = progress.get(&0).unwrap();
let confirmed_forks = ReplayStage::confirm_forks(
let confirmed_forks = ReplayStage::tower_duplicate_confirmed_forks(
&tower,
&fork_progress.fork_stats.voted_stakes,
fork_progress.fork_stats.total_stake,
Expand Down Expand Up @@ -5602,18 +5542,15 @@ pub(crate) mod tests {
assert_eq!(newly_computed, vec![1]);
{
let fork_progress = progress.get(&1).unwrap();
let confirmed_forks = ReplayStage::confirm_forks(
let confirmed_forks = ReplayStage::tower_duplicate_confirmed_forks(
&tower,
&fork_progress.fork_stats.voted_stakes,
fork_progress.fork_stats.total_stake,
&progress,
&bank_forks,
);
// No new stats should have been computed
assert_eq!(
confirmed_forks,
vec![ConfirmedSlot::new_supermajority_voted(0, bank0.hash())]
);
assert_eq!(confirmed_forks, vec![(0, bank0.hash())]);
}

let ancestors = bank_forks.read().unwrap().ancestors();
Expand Down