Skip to content

Commit

Permalink
Make replay stage switch between two fork choice rules
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Jun 9, 2020
1 parent c342cca commit ed9249b
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 74 deletions.
15 changes: 1 addition & 14 deletions core/src/bank_weight_fork_choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{
sync::{Arc, RwLock},
};

#[derive(Default)]
pub struct BankWeightForkChoice {}

impl ForkChoice for BankWeightForkChoice {
Expand Down Expand Up @@ -39,20 +40,6 @@ impl ForkChoice for BankWeightForkChoice {
let ComputedBankState { bank_weight, .. } = computed_bank_stats;
stats.weight = *bank_weight;
stats.fork_weight = stats.weight + parent_weight;

datapoint_info!(
"bank_weight",
("slot", bank_slot, i64),
// u128 too large for influx, convert to hex
("weight", format!("{:X}", stats.weight), String),
);
info!(
"slot_weight: {} {} {} {}",
bank_slot,
stats.weight,
stats.fork_weight,
bank.parent().map(|b| b.slot()).unwrap_or(0)
);
}

// Returns:
Expand Down
3 changes: 3 additions & 0 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ impl Tower {
pub mod test {
use super::*;
use crate::{
bank_weight_fork_choice::BankWeightForkChoice,
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots,
fork_choice::SelectVoteAndResetForkResult,
Expand Down Expand Up @@ -747,6 +748,7 @@ pub mod test {
.collect();

let _ = ReplayStage::compute_bank_stats(
&my_pubkey,
&ancestors,
&mut frozen_banks,
tower,
Expand All @@ -756,6 +758,7 @@ pub mod test {
&self.bank_forks,
&mut PubkeyReferences::default(),
&mut self.heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(),
);

let vote_bank = self
Expand Down
150 changes: 90 additions & 60 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
//! The `replay_stage` replays transactions broadcast by the leader.

use crate::{
bank_weight_fork_choice::BankWeightForkChoice,
broadcast_stage::RetransmitSlotsSender,
cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots,
commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData},
consensus::{ComputedBankState, StakeLockout, SwitchForkDecision, Tower},
fork_choice::SelectVoteAndResetForkResult,
fork_choice::{ForkChoice, SelectVoteAndResetForkResult},
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
progress_map::{ForkProgress, ProgressMap, PropagatedStats},
Expand Down Expand Up @@ -219,6 +220,7 @@ impl ReplayStage {
let root = bank_forks.read().unwrap().root();
let mut heaviest_subtree_fork_choice =
HeaviestSubtreeForkChoice::new_from_frozen_banks(root, &frozen_banks);
let mut bank_weight_fork_choice = BankWeightForkChoice::default();
let heaviest_bank = bank_forks
.read()
.unwrap()
Expand All @@ -228,6 +230,8 @@ impl ReplayStage {
exist in bank_forks",
)
.clone();
let unlock_heaviest_subtree_fork_choice_slot =
Self::get_unlock_heaviest_subtree_fork_choice(heaviest_bank.operating_mode());
let mut tower = Tower::new(&my_pubkey, &vote_account, root, &heaviest_bank);
let mut current_leader = None;
let mut last_reset = Hash::default();
Expand Down Expand Up @@ -295,6 +299,7 @@ impl ReplayStage {
.collect();
let now = Instant::now();
let newly_computed_slot_stats = Self::compute_bank_stats(
&my_pubkey,
&ancestors,
&mut frozen_banks,
&tower,
Expand All @@ -304,6 +309,7 @@ impl ReplayStage {
&bank_forks,
&mut all_pubkeys,
&mut heaviest_subtree_fork_choice,
&mut bank_weight_fork_choice,
);
let compute_bank_stats_elapsed = now.elapsed().as_micros();
for slot in newly_computed_slot_stats {
Expand All @@ -325,8 +331,14 @@ impl ReplayStage {
}
}

let (heaviest_bank, heaviest_bank_on_same_voted_fork) =
Self::select_forks(&tower, &heaviest_subtree_fork_choice, &bank_forks);
let fork_choice: &mut dyn ForkChoice =
if forks_root > unlock_heaviest_subtree_fork_choice_slot {
&mut heaviest_subtree_fork_choice
} else {
&mut bank_weight_fork_choice
};
let (heaviest_bank, heaviest_bank_on_same_voted_fork) = fork_choice
.select_forks(&frozen_banks, &tower, &progress, &ancestors, &bank_forks);

Self::report_memory(&allocated, "select_fork", start);

Expand Down Expand Up @@ -1168,6 +1180,7 @@ impl ReplayStage {
}

pub(crate) fn compute_bank_stats(
my_pubkey: &Pubkey,
ancestors: &HashMap<u64, HashSet<u64>>,
frozen_banks: &mut Vec<Arc<Bank>>,
tower: &Tower,
Expand All @@ -1176,7 +1189,8 @@ impl ReplayStage {
cluster_slots: &ClusterSlots,
bank_forks: &RwLock<BankForks>,
all_pubkeys: &mut PubkeyReferences,
heaviest_subtree_fork_choice: &mut HeaviestSubtreeForkChoice,
heaviest_subtree_fork_choice: &mut dyn ForkChoice,
bank_weight_fork_choice: &mut dyn ForkChoice,
) -> Vec<Slot> {
frozen_banks.sort_by_key(|bank| bank.slot());
let mut new_stats = vec![];
Expand All @@ -1186,41 +1200,58 @@ impl ReplayStage {
// is if this node was the leader for this slot as those banks
// are not replayed in replay_active_banks()
{
let stats = progress
let is_computed = progress
.get_fork_stats_mut(bank_slot)
.expect("All frozen banks must exist in the Progress map");

if !stats.computed {
let ComputedBankState {
stake_lockouts,
total_staked,
lockout_intervals,
pubkey_votes,
..
} = tower.collect_vote_lockouts(
.expect("All frozen banks must exist in the Progress map")
.computed;
if !is_computed {
let computed_bank_state = tower.collect_vote_lockouts(
bank_slot,
bank.vote_accounts().into_iter(),
&ancestors,
all_pubkeys,
);
// Update `heaviest_subtree_fork_choice` to find the best fork to build on
let best_overall_slot = heaviest_subtree_fork_choice.add_votes(
&pubkey_votes,
bank.epoch_stakes_map(),
bank.epoch_schedule(),
heaviest_subtree_fork_choice.compute_bank_stats(
&bank,
tower,
progress,
&computed_bank_state,
);
stats.total_staked = total_staked;

datapoint_info!(
"best_slot",
("slot", bank_slot, i64),
("best_slot", best_overall_slot, i64),
bank_weight_fork_choice.compute_bank_stats(
&bank,
tower,
progress,
&computed_bank_state,
);
let ComputedBankState {
stake_lockouts,
total_staked,
lockout_intervals,
..
} = computed_bank_state;
let stats = progress
.get_fork_stats_mut(bank_slot)
.expect("All frozen banks must exist in the Progress map");
stats.total_staked = total_staked;
stats.stake_lockouts = stake_lockouts;
stats.lockout_intervals = lockout_intervals;
stats.block_height = bank.block_height();
stats.computed = true;
new_stats.push(bank_slot);
datapoint_info!(
"bank_weight",
("slot", bank_slot, i64),
// u128 too large for influx, convert to hex
("weight", format!("{:X}", stats.weight), String),
);
info!(
"{} slot_weight: {} {} {} {}",
my_pubkey,
bank_slot,
stats.weight,
stats.fork_weight,
bank.parent().map(|b| b.slot()).unwrap_or(0)
);
}
}

Expand Down Expand Up @@ -1313,38 +1344,6 @@ impl ReplayStage {
);
}

// Returns:
// 1) The heaviest overall bbank
// 2) The heavest bank on the same fork as the last vote (doesn't require a
// switching proof to vote for)
pub(crate) fn select_forks(
tower: &Tower,
heaviest_subtree_fork_choice: &HeaviestSubtreeForkChoice,
bank_forks: &RwLock<BankForks>,
) -> (Arc<Bank>, Option<Arc<Bank>>) {
let last_vote = tower.last_vote().slots.last().cloned();
let heaviest_slot_on_same_voted_fork = last_vote.map(|last_vote| {
let heaviest_slot_on_same_voted_fork =
heaviest_subtree_fork_choice.best_slot(last_vote).expect("last_vote is a frozen bank so must have been added to heaviest_subtree_fork_choice at time of freezing");
if heaviest_slot_on_same_voted_fork == last_vote {
None
} else {
Some(heaviest_slot_on_same_voted_fork)
}
}).unwrap_or(None);
let heaviest_slot = heaviest_subtree_fork_choice.best_overall_slot();
let r_bank_forks = bank_forks.read().unwrap();
(
r_bank_forks.get(heaviest_slot).unwrap().clone(),
heaviest_slot_on_same_voted_fork.map(|heaviest_slot_on_same_voted_fork| {
r_bank_forks
.get(heaviest_slot_on_same_voted_fork)
.unwrap()
.clone()
}),
)
}

// Given a heaviest bank, `heaviest_bank` and the next votable bank
// `heaviest_bank_on_same_voted_fork` as the validator's last vote, return
// a bank to vote on, a bank to reset to,
Expand Down Expand Up @@ -1748,6 +1747,14 @@ impl ReplayStage {
}
}

pub fn get_unlock_heaviest_subtree_fork_choice(operating_mode: OperatingMode) -> Slot {
match operating_mode {
OperatingMode::Development => std::u64::MAX / 2,
OperatingMode::Stable => std::u64::MAX / 2,
OperatingMode::Preview => std::u64::MAX / 2,
}
}

pub fn join(self) -> thread::Result<()> {
self.commitment_service.join()?;
self.t_replay.join().map(|_| ())
Expand Down Expand Up @@ -2560,6 +2567,7 @@ pub(crate) mod tests {
.collect();
let tower = Tower::new_for_tests(0, 0.67);
let newly_computed = ReplayStage::compute_bank_stats(
&node_pubkey,
&ancestors,
&mut frozen_banks,
&tower,
Expand All @@ -2569,6 +2577,7 @@ pub(crate) mod tests {
&bank_forks,
&mut PubkeyReferences::default(),
&mut heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(),
);
assert_eq!(newly_computed, vec![0]);
// The only vote is in bank 1, and bank_forks does not currently contain
Expand Down Expand Up @@ -2601,6 +2610,7 @@ pub(crate) mod tests {
.cloned()
.collect();
let newly_computed = ReplayStage::compute_bank_stats(
&node_pubkey,
&ancestors,
&mut frozen_banks,
&tower,
Expand All @@ -2610,6 +2620,7 @@ pub(crate) mod tests {
&bank_forks,
&mut PubkeyReferences::default(),
&mut heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(),
);

assert_eq!(newly_computed, vec![1]);
Expand All @@ -2634,6 +2645,7 @@ pub(crate) mod tests {
.cloned()
.collect();
let newly_computed = ReplayStage::compute_bank_stats(
&node_pubkey,
&ancestors,
&mut frozen_banks,
&tower,
Expand All @@ -2643,6 +2655,7 @@ pub(crate) mod tests {
&bank_forks,
&mut PubkeyReferences::default(),
&mut heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(),
);
// No new stats should have been computed
assert!(newly_computed.is_empty());
Expand Down Expand Up @@ -2670,6 +2683,7 @@ pub(crate) mod tests {

let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors();
ReplayStage::compute_bank_stats(
&node_pubkey,
&ancestors,
&mut frozen_banks,
&tower,
Expand All @@ -2679,16 +2693,30 @@ pub(crate) mod tests {
&vote_simulator.bank_forks,
&mut PubkeyReferences::default(),
&mut heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(),
);

assert_eq!(
heaviest_subtree_fork_choice.stake_voted_subtree(1).unwrap(),
heaviest_subtree_fork_choice.stake_voted_subtree(2).unwrap()
);

let (heaviest_bank, _) = ReplayStage::select_forks(
let (heaviest_bank, _) = heaviest_subtree_fork_choice.select_forks(
&frozen_banks,
&tower,
&vote_simulator.progress,
&ancestors,
&vote_simulator.bank_forks,
);

// Should pick the lower of the two equally weighted banks
assert_eq!(heaviest_bank.slot(), 1);

let (heaviest_bank, _) = BankWeightForkChoice::default().select_forks(
&frozen_banks,
&tower,
&heaviest_subtree_fork_choice,
&vote_simulator.progress,
&ancestors,
&vote_simulator.bank_forks,
);

Expand Down Expand Up @@ -2729,6 +2757,7 @@ pub(crate) mod tests {
.collect();

ReplayStage::compute_bank_stats(
&node_pubkey,
&vote_simulator.bank_forks.read().unwrap().ancestors(),
&mut frozen_banks,
&tower,
Expand All @@ -2738,6 +2767,7 @@ pub(crate) mod tests {
&vote_simulator.bank_forks,
&mut PubkeyReferences::default(),
&mut vote_simulator.heaviest_subtree_fork_choice,
&mut BankWeightForkChoice::default(),
);

frozen_banks.sort_by_key(|bank| bank.slot());
Expand Down

0 comments on commit ed9249b

Please sign in to comment.