Skip to content

Commit

Permalink
Trigger RPC notifications after block commitment cache update (solana…
Browse files Browse the repository at this point in the history
…-labs#10077)

* Fixup commitment-aggregation metric

* Trigger notifications after commitment-cache update

* Fixup fn name

* Add single-confirmation commitment level

* Rename to highest_confirmed_slot

* Pass commitment-cache info directly to notifications

* Use match

* Update commitment docs

* Update out of date pubsub docs
  • Loading branch information
CriesofCarrots authored May 18, 2020
1 parent 4ca352a commit bac4aec
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 122 deletions.
177 changes: 168 additions & 9 deletions core/src/commitment.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::consensus::VOTE_THRESHOLD_SIZE;
use crate::{consensus::VOTE_THRESHOLD_SIZE, rpc_subscriptions::RpcSubscriptions};
use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_info;
use solana_metrics::datapoint_info;
use solana_runtime::bank::Bank;
use solana_sdk::clock::Slot;
use solana_vote_program::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY};
Expand All @@ -14,6 +14,14 @@ use std::{
time::Duration,
};

#[derive(Default)]
pub struct CacheSlotInfo {
pub current_slot: Slot,
pub node_root: Slot,
pub largest_confirmed_root: Slot,
pub highest_confirmed_slot: Slot,
}

pub type BlockCommitmentArray = [u64; MAX_LOCKOUT_HISTORY + 1];

#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -53,6 +61,7 @@ pub struct BlockCommitmentCache {
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
root: Slot,
highest_confirmed_slot: Slot,
}

impl std::fmt::Debug for BlockCommitmentCache {
Expand All @@ -77,6 +86,7 @@ impl BlockCommitmentCache {
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
root: Slot,
highest_confirmed_slot: Slot,
) -> Self {
Self {
block_commitment,
Expand All @@ -85,6 +95,7 @@ impl BlockCommitmentCache {
bank,
blockstore,
root,
highest_confirmed_slot,
}
}

Expand All @@ -96,6 +107,7 @@ impl BlockCommitmentCache {
bank: Arc::new(Bank::default()),
blockstore,
root: Slot::default(),
highest_confirmed_slot: Slot::default(),
}
}

Expand Down Expand Up @@ -123,6 +135,26 @@ impl BlockCommitmentCache {
self.root
}

pub fn highest_confirmed_slot(&self) -> Slot {
self.highest_confirmed_slot
}

fn highest_slot_with_confirmation_count(&self, confirmation_count: usize) -> Slot {
assert!(confirmation_count > 0 && confirmation_count <= MAX_LOCKOUT_HISTORY);
for slot in (self.root()..self.slot()).rev() {
if let Some(count) = self.get_confirmation_count(slot) {
if count >= confirmation_count {
return slot;
}
}
}
self.root
}

fn calculate_highest_confirmed_slot(&self) -> Slot {
self.highest_slot_with_confirmation_count(1)
}

pub fn get_confirmation_count(&self, slot: Slot) -> Option<usize> {
self.get_lockout_count(slot, VOTE_THRESHOLD_SIZE)
}
Expand Down Expand Up @@ -159,6 +191,7 @@ impl BlockCommitmentCache {
largest_confirmed_root: Slot::default(),
bank: Arc::new(Bank::default()),
root: Slot::default(),
highest_confirmed_slot: Slot::default(),
}
}

Expand All @@ -177,10 +210,11 @@ impl BlockCommitmentCache {
largest_confirmed_root: root,
bank,
root,
highest_confirmed_slot: root,
}
}

pub(crate) fn set_get_largest_confirmed_root(&mut self, root: Slot) {
pub(crate) fn set_largest_confirmed_root(&mut self, root: Slot) {
self.largest_confirmed_root = root;
}
}
Expand Down Expand Up @@ -221,6 +255,7 @@ impl AggregateCommitmentService {
pub fn new(
exit: &Arc<AtomicBool>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
subscriptions: Arc<RpcSubscriptions>,
) -> (Sender<CommitmentAggregationData>, Self) {
let (sender, receiver): (
Sender<CommitmentAggregationData>,
Expand All @@ -238,7 +273,7 @@ impl AggregateCommitmentService {
}

if let Err(RecvTimeoutError::Disconnected) =
Self::run(&receiver, &block_commitment_cache, &exit_)
Self::run(&receiver, &block_commitment_cache, &subscriptions, &exit_)
{
break;
}
Expand All @@ -251,6 +286,7 @@ impl AggregateCommitmentService {
fn run(
receiver: &Receiver<CommitmentAggregationData>,
block_commitment_cache: &RwLock<BlockCommitmentCache>,
subscriptions: &Arc<RpcSubscriptions>,
exit: &Arc<AtomicBool>,
) -> Result<(), RecvTimeoutError> {
loop {
Expand Down Expand Up @@ -283,16 +319,30 @@ impl AggregateCommitmentService {
aggregation_data.bank,
block_commitment_cache.read().unwrap().blockstore.clone(),
aggregation_data.root,
aggregation_data.root,
);
new_block_commitment.highest_confirmed_slot =
new_block_commitment.calculate_highest_confirmed_slot();

let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();

std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment);
aggregate_commitment_time.stop();
inc_new_counter_info!(
"aggregate-commitment-ms",
aggregate_commitment_time.as_ms() as usize
datapoint_info!(
"block-commitment-cache",
(
"aggregate-commitment-ms",
aggregate_commitment_time.as_ms() as i64,
i64
)
);

subscriptions.notify_subscribers(CacheSlotInfo {
current_slot: w_block_commitment_cache.slot(),
node_root: w_block_commitment_cache.root(),
largest_confirmed_root: w_block_commitment_cache.largest_confirmed_root(),
highest_confirmed_slot: w_block_commitment_cache.highest_confirmed_slot(),
});
}
}

Expand Down Expand Up @@ -382,7 +432,7 @@ mod tests {
genesis_utils::{create_genesis_config, GenesisConfigInfo},
get_tmp_ledger_path,
};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::{genesis_config::GenesisConfig, pubkey::Pubkey};
use solana_stake_program::stake_state;
use solana_vote_program::vote_state::{self, VoteStateVersions};

Expand Down Expand Up @@ -419,7 +469,7 @@ mod tests {
block_commitment.entry(1).or_insert(cache1);
block_commitment.entry(2).or_insert(cache2);
let block_commitment_cache =
BlockCommitmentCache::new(block_commitment, 0, 50, bank, blockstore, 0);
BlockCommitmentCache::new(block_commitment, 0, 50, bank, blockstore, 0, 0);

assert_eq!(block_commitment_cache.get_confirmation_count(0), Some(2));
assert_eq!(block_commitment_cache.get_confirmation_count(1), Some(1));
Expand Down Expand Up @@ -453,6 +503,7 @@ mod tests {
bank,
blockstore,
0,
0,
);

assert!(block_commitment_cache.is_confirmed_rooted(0));
Expand All @@ -476,6 +527,114 @@ mod tests {
assert_eq!(get_largest_confirmed_root(rooted_stake, 10), 1);
}

#[test]
fn test_highest_confirmed_slot() {
let bank = Arc::new(Bank::new(&GenesisConfig::default()));
let bank_slot_5 = Arc::new(Bank::new_from_parent(&bank, &Pubkey::default(), 5));
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let total_stake = 50;

// Build cache with confirmation_count 2 given total_stake
let mut cache0 = BlockCommitment::default();
cache0.increase_confirmation_stake(1, 5);
cache0.increase_confirmation_stake(2, 40);

// Build cache with confirmation_count 1 given total_stake
let mut cache1 = BlockCommitment::default();
cache1.increase_confirmation_stake(1, 40);
cache1.increase_confirmation_stake(2, 5);

// Build cache with confirmation_count 0 given total_stake
let mut cache2 = BlockCommitment::default();
cache2.increase_confirmation_stake(1, 20);
cache2.increase_confirmation_stake(2, 5);

let mut block_commitment = HashMap::new();
block_commitment.entry(1).or_insert(cache0.clone()); // Slot 1, conf 2
block_commitment.entry(2).or_insert(cache1.clone()); // Slot 2, conf 1
block_commitment.entry(3).or_insert(cache2.clone()); // Slot 3, conf 0
let block_commitment_cache = BlockCommitmentCache::new(
block_commitment,
0,
total_stake,
bank_slot_5.clone(),
blockstore.clone(),
0,
0,
);

assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 2);

// Build map with multiple slots at conf 1
let mut block_commitment = HashMap::new();
block_commitment.entry(1).or_insert(cache1.clone()); // Slot 1, conf 1
block_commitment.entry(2).or_insert(cache1.clone()); // Slot 2, conf 1
block_commitment.entry(3).or_insert(cache2.clone()); // Slot 3, conf 0
let block_commitment_cache = BlockCommitmentCache::new(
block_commitment,
0,
total_stake,
bank_slot_5.clone(),
blockstore.clone(),
0,
0,
);

assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 2);

// Build map with slot gaps
let mut block_commitment = HashMap::new();
block_commitment.entry(1).or_insert(cache1.clone()); // Slot 1, conf 1
block_commitment.entry(3).or_insert(cache1.clone()); // Slot 3, conf 1
block_commitment.entry(5).or_insert(cache2.clone()); // Slot 5, conf 0
let block_commitment_cache = BlockCommitmentCache::new(
block_commitment,
0,
total_stake,
bank_slot_5.clone(),
blockstore.clone(),
0,
0,
);

assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 3);

// Build map with no conf 1 slots, but one higher
let mut block_commitment = HashMap::new();
block_commitment.entry(1).or_insert(cache0.clone()); // Slot 1, conf 2
block_commitment.entry(2).or_insert(cache2.clone()); // Slot 2, conf 0
block_commitment.entry(3).or_insert(cache2.clone()); // Slot 3, conf 0
let block_commitment_cache = BlockCommitmentCache::new(
block_commitment,
0,
total_stake,
bank_slot_5.clone(),
blockstore.clone(),
0,
0,
);

assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 1);

// Build map with no conf 1 or higher slots
let mut block_commitment = HashMap::new();
block_commitment.entry(1).or_insert(cache2.clone()); // Slot 1, conf 0
block_commitment.entry(2).or_insert(cache2.clone()); // Slot 2, conf 0
block_commitment.entry(3).or_insert(cache2.clone()); // Slot 3, conf 0
let block_commitment_cache = BlockCommitmentCache::new(
block_commitment,
0,
total_stake,
bank_slot_5.clone(),
blockstore.clone(),
0,
0,
);

assert_eq!(block_commitment_cache.calculate_highest_confirmed_slot(), 0);
}

#[test]
fn test_aggregate_commitment_for_vote_account_1() {
let ancestors = vec![3, 4, 5, 7, 9, 11];
Expand Down
28 changes: 17 additions & 11 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,11 @@ impl ReplayStage {
let mut tower = Tower::new(&my_pubkey, &vote_account, &bank_forks.read().unwrap());

// Start the replay stage loop
let (lockouts_sender, commitment_service) =
AggregateCommitmentService::new(&exit, block_commitment_cache.clone());
let (lockouts_sender, commitment_service) = AggregateCommitmentService::new(
&exit,
block_commitment_cache.clone(),
subscriptions.clone(),
);

#[allow(clippy::cognitive_complexity)]
let t_replay = Builder::new()
Expand Down Expand Up @@ -353,8 +356,6 @@ impl ReplayStage {

// Vote on a fork
if let Some(ref vote_bank) = vote_bank {
subscriptions
.notify_subscribers(block_commitment_cache.read().unwrap().slot());
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(vote_bank.slot(), Some(vote_bank))
{
Expand Down Expand Up @@ -2595,13 +2596,6 @@ pub(crate) mod tests {

let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let block_commitment_cache = Arc::new(RwLock::new(
BlockCommitmentCache::default_with_blockstore(blockstore),
));
let (lockouts_sender, _) = AggregateCommitmentService::new(
&Arc::new(AtomicBool::new(false)),
block_commitment_cache.clone(),
);

let leader_pubkey = Pubkey::new_rand();
let leader_lamports = 3;
Expand All @@ -2622,6 +2616,18 @@ pub(crate) mod tests {
0,
)));

let exit = Arc::new(AtomicBool::new(false));
let block_commitment_cache = Arc::new(RwLock::new(
BlockCommitmentCache::default_with_blockstore(blockstore.clone()),
));
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
block_commitment_cache.clone(),
));
let (lockouts_sender, _) =
AggregateCommitmentService::new(&exit, block_commitment_cache.clone(), subscriptions);

assert!(block_commitment_cache
.read()
.unwrap()
Expand Down
Loading

0 comments on commit bac4aec

Please sign in to comment.