Skip to content

Commit

Permalink
Add "last known good" cache for virtual state (kaspanet#399)
Browse files Browse the repository at this point in the history
* make a better estimation of header count (during IBD this will now count for the headers phase)

* rpc service responsiveness: use `transaction_count_sample` for inferring mempool size

* use counters snapshot

* introduce last known good virtual state cache

* move processing counters to consensus core and remove rpc service crate dep on consensus

* extract consensus stats to a single call

* turn some consensus session calls into non-async + fix get_block_dag_info_call

* use saturating_sub instead of checked_sub + unwrap_or_default

* use join over all async calls

---------

Co-authored-by: coderofstuff <114628839+coderofstuff@users.noreply.github.com>

---------

Co-authored-by: coderofstuff <114628839+coderofstuff@users.noreply.github.com>
  • Loading branch information
michaelsutton and coderofstuff authored Jan 21, 2024
1 parent d4ddaf3 commit 401aa16
Show file tree
Hide file tree
Showing 21 changed files with 295 additions and 148 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ kaspa-alloc = { version = "0.13.3", path = "utils/alloc" }
# external
aes = "0.8.3"
ahash = "0.8.6"
arc-swap = "1.6.0"
argon2 = "0.5.2"
async-channel = "2.0.0"
async-std = { version = "1.12.0", features = ['attributes'] }
Expand Down
40 changes: 24 additions & 16 deletions components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
use kaspa_consensus_core::{
acceptance_data::AcceptanceData,
api::{BlockValidationFutures, ConsensusApi, DynConsensus},
api::{BlockCount, BlockValidationFutures, ConsensusApi, ConsensusStats, DynConsensus},
block::Block,
block_count::BlockCount,
blockstatus::BlockStatus,
daa_score_timestamp::DaaScoreTimestamp,
errors::consensus::ConsensusResult,
Expand Down Expand Up @@ -187,16 +186,33 @@ impl ConsensusSessionOwned {
self.consensus.calculate_transaction_storage_mass(transaction)
}

pub async fn async_get_virtual_daa_score(&self) -> u64 {
self.clone().spawn_blocking(|c| c.get_virtual_daa_score()).await
pub fn get_virtual_daa_score(&self) -> u64 {
// Accessing cached virtual fields is lock-free and does not require spawn_blocking
self.consensus.get_virtual_daa_score()
}

pub async fn async_get_virtual_bits(&self) -> u32 {
self.clone().spawn_blocking(|c| c.get_virtual_bits()).await
pub fn get_virtual_bits(&self) -> u32 {
// Accessing cached virtual fields is lock-free and does not require spawn_blocking
self.consensus.get_virtual_bits()
}

pub async fn async_get_virtual_past_median_time(&self) -> u64 {
self.clone().spawn_blocking(|c| c.get_virtual_past_median_time()).await
pub fn get_virtual_past_median_time(&self) -> u64 {
// Accessing cached virtual fields is lock-free and does not require spawn_blocking
self.consensus.get_virtual_past_median_time()
}

pub fn get_virtual_parents(&self) -> BlockHashSet {
// Accessing cached virtual fields is lock-free and does not require spawn_blocking
self.consensus.get_virtual_parents()
}

pub fn get_virtual_parents_len(&self) -> usize {
// Accessing cached virtual fields is lock-free and does not require spawn_blocking
self.consensus.get_virtual_parents_len()
}

pub async fn async_get_stats(&self) -> ConsensusStats {
self.clone().spawn_blocking(|c| c.get_stats()).await
}

pub async fn async_get_virtual_merge_depth_root(&self) -> Option<Hash> {
Expand Down Expand Up @@ -238,14 +254,6 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(move |c| c.get_virtual_chain_from_block(hash)).await
}

pub async fn async_get_virtual_parents(&self) -> BlockHashSet {
self.clone().spawn_blocking(|c| c.get_virtual_parents()).await
}

pub async fn async_get_virtual_parents_len(&self) -> usize {
self.clone().spawn_blocking(|c| c.get_virtual_parents_len()).await
}

pub async fn async_get_virtual_utxos(
&self,
from_outpoint: Option<TransactionOutpoint>,
Expand Down
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ include.workspace = true
license.workspace = true

[dependencies]
arc-swap.workspace = true
async-channel.workspace = true
bincode.workspace = true
crossbeam-channel.workspace = true
Expand Down
57 changes: 57 additions & 0 deletions consensus/core/src/api/counters.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Default)]
pub struct ProcessingCounters {
pub blocks_submitted: AtomicU64,
pub header_counts: AtomicU64,
pub dep_counts: AtomicU64,
pub mergeset_counts: AtomicU64,
pub body_counts: AtomicU64,
pub txs_counts: AtomicU64,
pub chain_block_counts: AtomicU64,
pub mass_counts: AtomicU64,
}

impl ProcessingCounters {
pub fn snapshot(&self) -> ProcessingCountersSnapshot {
ProcessingCountersSnapshot {
blocks_submitted: self.blocks_submitted.load(Ordering::Relaxed),
header_counts: self.header_counts.load(Ordering::Relaxed),
dep_counts: self.dep_counts.load(Ordering::Relaxed),
mergeset_counts: self.mergeset_counts.load(Ordering::Relaxed),
body_counts: self.body_counts.load(Ordering::Relaxed),
txs_counts: self.txs_counts.load(Ordering::Relaxed),
chain_block_counts: self.chain_block_counts.load(Ordering::Relaxed),
mass_counts: self.mass_counts.load(Ordering::Relaxed),
}
}
}

#[derive(Debug, PartialEq, Eq)]
pub struct ProcessingCountersSnapshot {
pub blocks_submitted: u64,
pub header_counts: u64,
pub dep_counts: u64,
pub mergeset_counts: u64,
pub body_counts: u64,
pub txs_counts: u64,
pub chain_block_counts: u64,
pub mass_counts: u64,
}

impl core::ops::Sub for &ProcessingCountersSnapshot {
type Output = ProcessingCountersSnapshot;

fn sub(self, rhs: Self) -> Self::Output {
Self::Output {
blocks_submitted: self.blocks_submitted.saturating_sub(rhs.blocks_submitted),
header_counts: self.header_counts.saturating_sub(rhs.header_counts),
dep_counts: self.dep_counts.saturating_sub(rhs.dep_counts),
mergeset_counts: self.mergeset_counts.saturating_sub(rhs.mergeset_counts),
body_counts: self.body_counts.saturating_sub(rhs.body_counts),
txs_counts: self.txs_counts.saturating_sub(rhs.txs_counts),
chain_block_counts: self.chain_block_counts.saturating_sub(rhs.chain_block_counts),
mass_counts: self.mass_counts.saturating_sub(rhs.mass_counts),
}
}
}
12 changes: 11 additions & 1 deletion consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::Arc;
use crate::{
acceptance_data::AcceptanceData,
block::{Block, BlockTemplate, TemplateBuildMode, TemplateTransactionSelector, VirtualStateApproxId},
block_count::BlockCount,
blockstatus::BlockStatus,
coinbase::MinerData,
daa_score_timestamp::DaaScoreTimestamp,
Expand All @@ -23,6 +22,12 @@ use crate::{
BlockHashSet, BlueWorkType, ChainPath,
};
use kaspa_hashes::Hash;

pub use self::stats::{BlockCount, ConsensusStats};

pub mod counters;
pub mod stats;

pub type BlockValidationFuture = BoxFuture<'static, BlockProcessResult<BlockStatus>>;

/// A struct returned by consensus for block validation processing calls
Expand Down Expand Up @@ -86,6 +91,11 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

/// Returns an aggregation of consensus stats. Designed to be a fast call.
fn get_stats(&self) -> ConsensusStats {
unimplemented!()
}

fn get_virtual_daa_score(&self) -> u64 {
unimplemented!()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,23 @@ impl BlockCount {
Self { block_count, header_count }
}
}

#[derive(Clone, Default)]
pub struct VirtualStateStats {
/// Number of direct parents of virtual
pub num_parents: u32,
pub daa_score: u64,
pub bits: u32,
pub past_median_time: u64,
}

pub struct ConsensusStats {
/// Block and header counts
pub block_counts: BlockCount,

/// Overall number of current DAG tips
pub num_tips: u64,

/// Virtual-related stats
pub virtual_stats: VirtualStateStats,
}
1 change: 0 additions & 1 deletion consensus/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub use kaspa_hashes::Hash;
pub mod acceptance_data;
pub mod api;
pub mod block;
pub mod block_count;
pub mod blockhash;
pub mod blockstatus;
pub mod coinbase;
Expand Down
53 changes: 36 additions & 17 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::{
statuses::StatusesStoreReader,
tips::TipsStoreReader,
utxo_set::{UtxoSetStore, UtxoSetStoreReader},
virtual_state::VirtualStateStoreReader,
DB,
},
},
Expand All @@ -41,9 +40,8 @@ use crate::{
};
use kaspa_consensus_core::{
acceptance_data::AcceptanceData,
api::{BlockValidationFutures, ConsensusApi},
api::{stats::BlockCount, BlockValidationFutures, ConsensusApi, ConsensusStats},
block::{Block, BlockTemplate, TemplateBuildMode, TemplateTransactionSelector, VirtualStateApproxId},
block_count::BlockCount,
blockhash::BlockHashExtensions,
blockstatus::BlockStatus,
coinbase::MinerData,
Expand Down Expand Up @@ -446,22 +444,34 @@ impl ConsensusApi for Consensus {
self.services.mass_calculator.calc_tx_storage_mass(&transaction.as_verifiable())
}

fn get_stats(&self) -> ConsensusStats {
// This method is designed to return stats asap and not depend on locks which
// might take time to acquire
ConsensusStats {
block_counts: self.estimate_block_count(),
// This call acquires the tips store read lock which is expected to be fast. If this
// turns out to be not fast enough then we should maintain an atomic integer holding this value
num_tips: self.get_tips_len() as u64,
virtual_stats: self.lkg_virtual_state.load().as_ref().into(),
}
}

fn get_virtual_daa_score(&self) -> u64 {
self.virtual_stores.read().state.get().unwrap().daa_score
self.lkg_virtual_state.load().daa_score
}

fn get_virtual_bits(&self) -> u32 {
self.virtual_stores.read().state.get().unwrap().bits
self.lkg_virtual_state.load().bits
}

fn get_virtual_past_median_time(&self) -> u64 {
self.virtual_stores.read().state.get().unwrap().past_median_time
self.lkg_virtual_state.load().past_median_time
}

fn get_virtual_merge_depth_root(&self) -> Option<Hash> {
// TODO: consider saving the merge depth root as part of virtual state
let pruning_point = self.pruning_point_store.read().pruning_point().unwrap();
let virtual_state = self.virtual_stores.read().state.get().unwrap();
let virtual_state = self.lkg_virtual_state.load();
let virtual_ghostdag_data = &virtual_state.ghostdag_data;
let root = self.services.depth_manager.calc_merge_depth_root(virtual_ghostdag_data, pruning_point);
if root.is_origin() {
Expand All @@ -477,15 +487,15 @@ impl ConsensusApi for Consensus {
}

fn get_sink(&self) -> Hash {
self.virtual_stores.read().state.get().unwrap().ghostdag_data.selected_parent
self.lkg_virtual_state.load().ghostdag_data.selected_parent
}

fn get_sink_timestamp(&self) -> u64 {
self.headers_store.get_timestamp(self.get_sink()).unwrap()
}

fn get_virtual_state_approx_id(&self) -> VirtualStateApproxId {
self.virtual_stores.read().state.get().unwrap().to_virtual_state_approx_id()
self.lkg_virtual_state.load().to_virtual_state_approx_id()
}

fn get_source(&self) -> Hash {
Expand All @@ -502,8 +512,18 @@ impl ConsensusApi for Consensus {
/// as such, it does not include non-daa blocks, and does not include headers stored as part of the pruning proof.
fn estimate_block_count(&self) -> BlockCount {
// PRUNE SAFETY: node is either archival or source is the pruning point which its header is kept permanently
let count = self.get_virtual_daa_score() - self.get_header(self.get_source()).unwrap().daa_score;
BlockCount { header_count: count, block_count: count }
let source_score = self.headers_store.get_compact_header_data(self.get_source()).unwrap().daa_score;
let virtual_score = self.get_virtual_daa_score();
let header_count = self
.headers_store
.get_compact_header_data(self.get_headers_selected_tip())
.unwrap_option()
.map(|h| h.daa_score)
.unwrap_or(virtual_score)
.max(virtual_score)
- source_score;
let block_count = virtual_score - source_score;
BlockCount { header_count, block_count }
}

fn is_nearly_synced(&self) -> bool {
Expand Down Expand Up @@ -591,11 +611,11 @@ impl ConsensusApi for Consensus {
}

fn get_virtual_parents(&self) -> BlockHashSet {
self.virtual_stores.read().state.get().unwrap().parents.iter().copied().collect()
self.lkg_virtual_state.load().parents.iter().copied().collect()
}

fn get_virtual_parents_len(&self) -> usize {
self.virtual_stores.read().state.get().unwrap().parents.len()
self.lkg_virtual_state.load().parents.len()
}

fn get_virtual_utxos(
Expand Down Expand Up @@ -718,7 +738,7 @@ impl ConsensusApi for Consensus {
fn get_anticone(&self, hash: Hash) -> ConsensusResult<Vec<Hash>> {
let _guard = self.pruning_lock.blocking_read();
self.validate_block_exists(hash)?;
let virtual_state = self.virtual_stores.read().state.get().unwrap();
let virtual_state = self.lkg_virtual_state.load();
Ok(self.services.dag_traversal_manager.anticone(hash, virtual_state.parents.iter().copied(), None)?)
}

Expand Down Expand Up @@ -886,7 +906,7 @@ impl ConsensusApi for Consensus {
self.estimate_network_hashes_per_second_impl(&ghostdag_data, window_size)
}
None => {
let virtual_state = self.virtual_stores.read().state.get().unwrap();
let virtual_state = self.lkg_virtual_state.load();
self.estimate_network_hashes_per_second_impl(&virtual_state.ghostdag_data, window_size)
}
}
Expand All @@ -901,7 +921,6 @@ impl ConsensusApi for Consensus {
}

fn finality_point(&self) -> Hash {
self.virtual_processor
.virtual_finality_point(&self.virtual_stores.read().state.get().unwrap().ghostdag_data, self.pruning_point())
self.virtual_processor.virtual_finality_point(&self.lkg_virtual_state.load().ghostdag_data, self.pruning_point())
}
}
12 changes: 10 additions & 2 deletions consensus/src/consensus/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
tips::DbTipsStore,
utxo_diffs::DbUtxoDiffsStore,
utxo_multisets::DbUtxoMultisetsStore,
virtual_state::VirtualStores,
virtual_state::{LkgVirtualState, VirtualStores},
DB,
},
processes::{ghostdag::ordering::SortableBlock, reachability::inquirer as reachability, relations},
Expand Down Expand Up @@ -66,6 +66,11 @@ pub struct ConsensusStorage {
// Block window caches
pub block_window_cache_for_difficulty: Arc<BlockWindowCacheStore>,
pub block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// "Last Known Good" caches
/// The "last known good" virtual state. To be used by any logic which does not want to wait
/// for a possible virtual state write to complete but can rather settle with the last known state
pub lkg_virtual_state: LkgVirtualState,
}

impl ConsensusStorage {
Expand Down Expand Up @@ -226,7 +231,9 @@ impl ConsensusStorage {
let block_window_cache_for_past_median_time = Arc::new(BlockWindowCacheStore::new(median_window_builder.build()));

// Virtual stores
let virtual_stores = Arc::new(RwLock::new(VirtualStores::new(db.clone(), utxo_set_builder.build())));
let lkg_virtual_state = LkgVirtualState::default();
let virtual_stores =
Arc::new(RwLock::new(VirtualStores::new(db.clone(), lkg_virtual_state.clone(), utxo_set_builder.build())));

// Ensure that reachability stores are initialized
reachability::init(reachability_store.write().deref_mut()).unwrap();
Expand Down Expand Up @@ -256,6 +263,7 @@ impl ConsensusStorage {
utxo_multisets_store,
block_window_cache_for_difficulty,
block_window_cache_for_past_median_time,
lkg_virtual_state,
})
}
}
Loading

0 comments on commit 401aa16

Please sign in to comment.