Skip to content

Commit

Permalink
Use node's latest vote for commitment calc. too (anza-xyz#1964)
Browse files Browse the repository at this point in the history
* Use node's latest vote for commitment calc. too

* Make local_cluster test use finalized

* Update core/src/commitment_service.rs

Co-authored-by: Tyera <teulberg@gmail.com>

* Don't wrap with Option and update tests

---------

Co-authored-by: Tyera Eulberg <tyera@anza.xyz>
Co-authored-by: Tyera <teulberg@gmail.com>
  • Loading branch information
3 people authored Jul 3, 2024
1 parent 0ff5a4b commit 5562989
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 52 deletions.
100 changes: 67 additions & 33 deletions core/src/commitment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
bank::Bank,
commitment::{BlockCommitment, BlockCommitmentCache, CommitmentSlots, VOTE_THRESHOLD_SIZE},
},
solana_sdk::clock::Slot,
solana_sdk::{clock::Slot, pubkey::Pubkey},
solana_vote_program::vote_state::VoteState,
std::{
cmp::max,
Expand All @@ -26,14 +26,23 @@ pub struct CommitmentAggregationData {
bank: Arc<Bank>,
root: Slot,
total_stake: Stake,
// The latest local vote state of the node running this service.
// Used for commitment aggregation if the node's vote account is staked.
node_vote_state: (Pubkey, VoteState),
}

impl CommitmentAggregationData {
pub fn new(bank: Arc<Bank>, root: Slot, total_stake: Stake) -> Self {
pub fn new(
bank: Arc<Bank>,
root: Slot,
total_stake: Stake,
node_vote_state: (Pubkey, VoteState),
) -> Self {
Self {
bank,
root,
total_stake,
node_vote_state,
}
}
}
Expand Down Expand Up @@ -139,8 +148,11 @@ impl AggregateCommitmentService {
aggregation_data: CommitmentAggregationData,
ancestors: Vec<u64>,
) -> CommitmentSlots {
let (block_commitment, rooted_stake) =
Self::aggregate_commitment(&ancestors, &aggregation_data.bank);
let (block_commitment, rooted_stake) = Self::aggregate_commitment(
&ancestors,
&aggregation_data.bank,
&aggregation_data.node_vote_state,
);

let highest_super_majority_root =
get_highest_super_majority_root(rooted_stake, aggregation_data.total_stake);
Expand Down Expand Up @@ -173,6 +185,7 @@ impl AggregateCommitmentService {
pub fn aggregate_commitment(
ancestors: &[Slot],
bank: &Bank,
(node_vote_pubkey, node_vote_state): &(Pubkey, VoteState),
) -> (HashMap<Slot, BlockCommitment>, Vec<(Slot, u64)>) {
assert!(!ancestors.is_empty());

Expand All @@ -183,11 +196,17 @@ impl AggregateCommitmentService {

let mut commitment = HashMap::new();
let mut rooted_stake: Vec<(Slot, u64)> = Vec::new();
for (lamports, account) in bank.vote_accounts().values() {
for (pubkey, (lamports, account)) in bank.vote_accounts().iter() {
if *lamports == 0 {
continue;
}
if let Ok(vote_state) = account.vote_state().as_ref() {
let vote_state = if pubkey == node_vote_pubkey {
// Override old vote_state in bank with latest one for my own vote pubkey
Ok(node_vote_state)
} else {
account.vote_state()
};
if let Ok(vote_state) = vote_state {
Self::aggregate_commitment_for_vote_account(
&mut commitment,
&mut rooted_stake,
Expand Down Expand Up @@ -382,8 +401,7 @@ mod tests {
assert_eq!(rooted_stake[0], (root, lamports));
}

#[test]
fn test_aggregate_commitment_validity() {
fn do_test_aggregate_commitment_validity(with_node_vote_state: bool) {
let ancestors = vec![3, 4, 5, 7, 9, 10, 11];
let GenesisConfigInfo {
mut genesis_config, ..
Expand Down Expand Up @@ -447,9 +465,11 @@ mod tests {
let mut vote_state1 = vote_state::from(&vote_account1).unwrap();
process_slot_vote_unchecked(&mut vote_state1, 3);
process_slot_vote_unchecked(&mut vote_state1, 5);
let versioned = VoteStateVersions::new_current(vote_state1);
vote_state::to(&versioned, &mut vote_account1).unwrap();
bank.store_account(&pk1, &vote_account1);
if !with_node_vote_state {
let versioned = VoteStateVersions::new_current(vote_state1.clone());
vote_state::to(&versioned, &mut vote_account1).unwrap();
bank.store_account(&pk1, &vote_account1);
}

let mut vote_state2 = vote_state::from(&vote_account2).unwrap();
process_slot_vote_unchecked(&mut vote_state2, 9);
Expand All @@ -470,8 +490,18 @@ mod tests {
vote_state::to(&versioned, &mut vote_account4).unwrap();
bank.store_account(&pk4, &vote_account4);

let (commitment, rooted_stake) =
AggregateCommitmentService::aggregate_commitment(&ancestors, &bank);
let node_vote_pubkey = if with_node_vote_state {
pk1
} else {
// Use some random pubkey as dummy to suppress the override.
solana_sdk::pubkey::new_rand()
};

let (commitment, rooted_stake) = AggregateCommitmentService::aggregate_commitment(
&ancestors,
&bank,
&(node_vote_pubkey, vote_state1),
);

for a in ancestors {
if a <= 3 {
Expand Down Expand Up @@ -499,17 +529,21 @@ mod tests {
assert_eq!(get_highest_super_majority_root(rooted_stake, 100), 1)
}

#[test]
fn test_aggregate_commitment_validity_with_node_vote_state() {
do_test_aggregate_commitment_validity(true)
}

#[test]
fn test_aggregate_commitment_validity_without_node_vote_state() {
do_test_aggregate_commitment_validity(false);
}

#[test]
fn test_highest_super_majority_root_advance() {
fn get_vote_account_root_slot(vote_pubkey: Pubkey, bank: &Bank) -> Slot {
fn get_vote_state(vote_pubkey: Pubkey, bank: &Bank) -> VoteState {
let vote_account = bank.get_vote_account(&vote_pubkey).unwrap();
let slot = vote_account
.vote_state()
.as_ref()
.unwrap()
.root_slot
.unwrap();
slot
vote_account.vote_state().cloned().unwrap()
}

let block_commitment_cache = RwLock::new(BlockCommitmentCache::new_for_tests());
Expand Down Expand Up @@ -547,10 +581,10 @@ mod tests {
}

let working_bank = bank_forks.read().unwrap().working_bank();
let root = get_vote_account_root_slot(
validator_vote_keypairs.vote_keypair.pubkey(),
&working_bank,
);
let vote_pubkey = validator_vote_keypairs.vote_keypair.pubkey();
let root = get_vote_state(vote_pubkey, &working_bank)
.root_slot
.unwrap();
for x in 0..root {
bank_forks
.write()
Expand Down Expand Up @@ -579,17 +613,16 @@ mod tests {
bank34.process_transaction(&vote33).unwrap();

let working_bank = bank_forks.read().unwrap().working_bank();
let root = get_vote_account_root_slot(
validator_vote_keypairs.vote_keypair.pubkey(),
&working_bank,
);
let vote_state = get_vote_state(vote_pubkey, &working_bank);
let root = vote_state.root_slot.unwrap();
let ancestors = working_bank.status_cache_ancestors();
let _ = AggregateCommitmentService::update_commitment_cache(
&block_commitment_cache,
CommitmentAggregationData {
bank: working_bank,
root: 0,
total_stake: 100,
node_vote_state: (vote_pubkey, vote_state.clone()),
},
ancestors,
);
Expand Down Expand Up @@ -628,6 +661,7 @@ mod tests {
bank: working_bank,
root: 1,
total_stake: 100,
node_vote_state: (vote_pubkey, vote_state),
},
ancestors,
);
Expand Down Expand Up @@ -662,17 +696,17 @@ mod tests {
}

let working_bank = bank_forks.read().unwrap().working_bank();
let root = get_vote_account_root_slot(
validator_vote_keypairs.vote_keypair.pubkey(),
&working_bank,
);
let vote_state =
get_vote_state(validator_vote_keypairs.vote_keypair.pubkey(), &working_bank);
let root = vote_state.root_slot.unwrap();
let ancestors = working_bank.status_cache_ancestors();
let _ = AggregateCommitmentService::update_commitment_cache(
&block_commitment_cache,
CommitmentAggregationData {
bank: working_bank,
root: 0,
total_stake: 100,
node_vote_state: (vote_pubkey, vote_state),
},
ancestors,
);
Expand Down
38 changes: 31 additions & 7 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use {
timing::timestamp,
transaction::Transaction,
},
solana_vote_program::vote_state::VoteTransaction,
solana_vote_program::vote_state::{VoteState, VoteTransaction},
std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
Expand Down Expand Up @@ -2406,10 +2406,28 @@ impl ReplayStage {
}

let mut update_commitment_cache_time = Measure::start("update_commitment_cache");
// Send (voted) bank along with the updated vote account state for this node, the vote
// state is always newer than the one in the bank by definition, because banks can't
// contain vote transactions which are voting on its own slot.
//
// It should be acceptable to aggressively use the vote for our own _local view_ of
// commitment aggregation, although it's not guaranteed that the new vote transaction is
// observed by other nodes at this point.
//
// The justification stems from the assumption of the sensible voting behavior from the
// consensus subsystem. That's because it means there would be a slashing possibility
// otherwise.
//
// This behavior isn't significant normally for mainnet-beta, because staked nodes aren't
// servicing RPC requests. However, this eliminates artificial 1-slot delay of the
// `finalized` confirmation if a node is materially staked and servicing RPC requests at
// the same time for development purposes.
let node_vote_state = (*vote_account_pubkey, tower.vote_state.clone());
Self::update_commitment_cache(
bank.clone(),
bank_forks.read().unwrap().root(),
progress.get_fork_stats(bank.slot()).unwrap().total_stake,
node_vote_state,
lockouts_sender,
);
update_commitment_cache_time.stop();
Expand Down Expand Up @@ -2699,11 +2717,15 @@ impl ReplayStage {
bank: Arc<Bank>,
root: Slot,
total_stake: Stake,
node_vote_state: (Pubkey, VoteState),
lockouts_sender: &Sender<CommitmentAggregationData>,
) {
if let Err(e) =
lockouts_sender.send(CommitmentAggregationData::new(bank, root, total_stake))
{
if let Err(e) = lockouts_sender.send(CommitmentAggregationData::new(
bank,
root,
total_stake,
node_vote_state,
)) {
trace!("lockouts_sender failed: {:?}", e);
}
}
Expand Down Expand Up @@ -5281,13 +5303,14 @@ pub(crate) mod tests {

#[test]
fn test_replay_commitment_cache() {
fn leader_vote(vote_slot: Slot, bank: &Bank, pubkey: &Pubkey) {
fn leader_vote(vote_slot: Slot, bank: &Bank, pubkey: &Pubkey) -> (Pubkey, VoteState) {
let mut leader_vote_account = bank.get_account(pubkey).unwrap();
let mut vote_state = vote_state::from(&leader_vote_account).unwrap();
vote_state::process_slot_vote_unchecked(&mut vote_state, vote_slot);
let versioned = VoteStateVersions::new_current(vote_state);
let versioned = VoteStateVersions::new_current(vote_state.clone());
vote_state::to(&versioned, &mut leader_vote_account).unwrap();
bank.store_account(pubkey, &leader_vote_account);
(*pubkey, vote_state)
}

let leader_pubkey = solana_sdk::pubkey::new_rand();
Expand Down Expand Up @@ -5353,11 +5376,12 @@ pub(crate) mod tests {
}

let arc_bank = bank_forks.read().unwrap().get(i).unwrap();
leader_vote(i - 1, &arc_bank, &leader_voting_pubkey);
let node_vote_state = leader_vote(i - 1, &arc_bank, &leader_voting_pubkey);
ReplayStage::update_commitment_cache(
arc_bank.clone(),
0,
leader_lamports,
node_vote_state,
&lockouts_sender,
);
arc_bank.freeze();
Expand Down
15 changes: 3 additions & 12 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1794,12 +1794,9 @@ fn test_validator_saves_tower() {

// Wait for the first new root
let last_replayed_root = loop {
#[allow(deprecated)]
// This test depends on knowing the immediate root, without any delay from the commitment
// service, so the deprecated CommitmentConfig::root() is retained
if let Ok(root) = validator_client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::root())
.get_slot_with_commitment(CommitmentConfig::finalized())
{
trace!("current root: {}", root);
if root > 0 {
Expand All @@ -1826,12 +1823,9 @@ fn test_validator_saves_tower() {

// Wait for a new root, demonstrating the validator was able to make progress from the older `tower1`
let new_root = loop {
#[allow(deprecated)]
// This test depends on knowing the immediate root, without any delay from the commitment
// service, so the deprecated CommitmentConfig::root() is retained
if let Ok(root) = validator_client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::root())
.get_slot_with_commitment(CommitmentConfig::finalized())
{
trace!(
"current root: {}, last_replayed_root: {}",
Expand Down Expand Up @@ -1862,12 +1856,9 @@ fn test_validator_saves_tower() {

// Wait for another new root
let new_root = loop {
#[allow(deprecated)]
// This test depends on knowing the immediate root, without any delay from the commitment
// service, so the deprecated CommitmentConfig::root() is retained
if let Ok(root) = validator_client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::root())
.get_slot_with_commitment(CommitmentConfig::finalized())
{
trace!("current root: {}, last tower root: {}", root, tower3_root);
if root > tower3_root {
Expand Down

0 comments on commit 5562989

Please sign in to comment.