Skip to content

Commit

Permalink
Various miscellaneous changes for next release
Browse files Browse the repository at this point in the history
  • Loading branch information
0xA001113 committed Oct 22, 2024
1 parent d756c45 commit deb1a8b
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 66 deletions.
4 changes: 4 additions & 0 deletions consensus/core/src/api/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub struct ProcessingCounters {
pub body_counts: AtomicU64,
pub txs_counts: AtomicU64,
pub chain_block_counts: AtomicU64,
pub chain_disqualified_counts: AtomicU64,
pub mass_counts: AtomicU64,
}

Expand All @@ -22,6 +23,7 @@ impl ProcessingCounters {
body_counts: self.body_counts.load(Ordering::Relaxed),
txs_counts: self.txs_counts.load(Ordering::Relaxed),
chain_block_counts: self.chain_block_counts.load(Ordering::Relaxed),
chain_disqualified_counts: self.chain_disqualified_counts.load(Ordering::Relaxed),
mass_counts: self.mass_counts.load(Ordering::Relaxed),
}
}
Expand All @@ -36,6 +38,7 @@ pub struct ProcessingCountersSnapshot {
pub body_counts: u64,
pub txs_counts: u64,
pub chain_block_counts: u64,
pub chain_disqualified_counts: u64,
pub mass_counts: u64,
}

Expand All @@ -51,6 +54,7 @@ impl core::ops::Sub for &ProcessingCountersSnapshot {
body_counts: self.body_counts.saturating_sub(rhs.body_counts),
txs_counts: self.txs_counts.saturating_sub(rhs.txs_counts),
chain_block_counts: self.chain_block_counts.saturating_sub(rhs.chain_block_counts),
chain_disqualified_counts: self.chain_disqualified_counts.saturating_sub(rhs.chain_disqualified_counts),
mass_counts: self.mass_counts.saturating_sub(rhs.mass_counts),
}
}
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/errors/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ pub enum RuleError {
#[error("DAA window data has only {0} entries")]
InsufficientDaaWindowSize(usize),

/// Currently this error is never created because it is impossible to submit such a block
#[error("cannot add block body to a pruned block")]
PrunedBlock,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,11 @@ use std::sync::Arc;

impl BlockBodyProcessor {
pub fn validate_body_in_context(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
self.check_block_is_not_pruned(block)?;
self.check_parent_bodies_exist(block)?;
self.check_coinbase_blue_score_and_subsidy(block)?;
self.check_block_transactions_in_context(block)
}

fn check_block_is_not_pruned(self: &Arc<Self>, _block: &Block) -> BlockProcessResult<()> {
match self.statuses_store.read().get(_block.hash()).unwrap_option() {
Some(_) => {
let Some(pp) = self.pruning_point_store.read().pruning_point().unwrap_option() else {
return Ok(());
};

if self.reachability_service.is_dag_ancestor_of(_block.hash(), pp) {
Err(RuleError::PrunedBlock)
} else {
Ok(())
}
}
None => Ok(()),
}
}

fn check_block_transactions_in_context(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
let (pmt, _) = self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap())?;
for tx in block.transactions.iter() {
Expand All @@ -50,12 +32,6 @@ impl BlockBodyProcessor {
}

fn check_parent_bodies_exist(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
// TODO: Skip this check for blocks in PP anticone that comes as part of the pruning proof.

if block.header.direct_parents().len() == 1 && block.header.direct_parents()[0] == self.genesis.hash {
return Ok(());
}

let statuses_read_guard = self.statuses_store.read();
let missing: Vec<Hash> = block
.header
Expand Down
4 changes: 1 addition & 3 deletions consensus/src/pipeline/body_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ impl BlockBodyProcessor {
// transactions that fits the merkle root.
// PrunedBlock - PrunedBlock is an error that rejects a block body and
// not the block as a whole, so we shouldn't mark it as invalid.
// TODO: implement the last part.
if !matches!(e, RuleError::BadMerkleRoot(_, _) | RuleError::MissingParents(_)) {
if !matches!(e, RuleError::BadMerkleRoot(_, _) | RuleError::MissingParents(_) | RuleError::PrunedBlock) {
self.statuses_store.write().set(block.hash(), BlockStatus::StatusInvalid).unwrap();
}
return Err(e);
Expand All @@ -230,7 +229,6 @@ impl BlockBodyProcessor {
fn validate_body(self: &Arc<BlockBodyProcessor>, block: &Block, is_trusted: bool) -> BlockProcessResult<u64> {
let mass = self.validate_body_in_isolation(block)?;
if !is_trusted {
// TODO: Check that it's safe to skip this check if the block is trusted.
self.validate_body_in_context(block)?;
}
Ok(mass)
Expand Down
3 changes: 0 additions & 3 deletions consensus/src/pipeline/header_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,6 @@ impl HeaderProcessor {

// Runs partial header validation for trusted blocks (currently validates only header-in-isolation and computes GHOSTDAG).
fn validate_trusted_header(&self, header: &Arc<Header>) -> BlockProcessResult<HeaderProcessingContext> {
// TODO: For now we skip most validations for trusted blocks, but in the future we should
// employ some validations to avoid spam etc.
let block_level = self.validate_header_in_isolation(header)?;
let mut ctx = self.build_processing_context(header, block_level);
self.ghostdag(&mut ctx);
Expand Down Expand Up @@ -407,7 +405,6 @@ impl HeaderProcessor {
&& reachability::is_chain_ancestor_of(&staging, pp, ctx.hash).unwrap()
{
// Hint reachability about the new tip.
// TODO: identify a disqualified hst and make sure to use sink instead
reachability::hint_virtual_selected_parent(&mut staging, ctx.hash).unwrap();
hst_write.set_batch(&mut batch, SortableBlock::new(ctx.hash, header.blue_work)).unwrap();
}
Expand Down
9 changes: 8 additions & 1 deletion consensus/src/pipeline/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use spectre_core::{
service::{AsyncService, AsyncServiceFuture},
tick::{TickReason, TickService},
},
trace,
trace, warn,
};
use std::{
sync::Arc,
Expand Down Expand Up @@ -62,6 +62,13 @@ impl ConsensusMonitor {
if delta.body_counts != 0 { delta.mass_counts as f64 / delta.body_counts as f64 } else{ 0f64 },
);

if delta.chain_disqualified_counts > 0 {
warn!(
"Consensus detected UTXO-invalid blocks which are disqualified from the virtual selected chain (possibly due to inheritance): {} disqualified vs. {} valid chain blocks",
delta.chain_disqualified_counts, delta.chain_block_counts
);
}

last_snapshot = snapshot;
last_log_time = now;
}
Expand Down
16 changes: 11 additions & 5 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,12 @@ impl VirtualStateProcessor {

// Walk back up to the new virtual selected parent candidate
let mut chain_block_counter = 0;
let mut chain_disqualified_counter = 0;
for (selected_parent, current) in self.reachability_service.forward_chain_iterator(split_point, to, true).tuple_windows() {
if selected_parent != diff_point {
// This indicates that the selected parent is disqualified, propagate up and continue
self.statuses_store.write().set(current, StatusDisqualifiedFromChain).unwrap();
chain_disqualified_counter += 1;
continue;
}

Expand Down Expand Up @@ -416,6 +418,7 @@ impl VirtualStateProcessor {
if let Err(rule_error) = res {
info!("Block {} is disqualified from virtual chain: {}", current, rule_error);
self.statuses_store.write().set(current, StatusDisqualifiedFromChain).unwrap();
chain_disqualified_counter += 1;
} else {
debug!("VIRTUAL PROCESSOR, UTXO validated for {current}");

Expand All @@ -434,6 +437,9 @@ impl VirtualStateProcessor {
}
// Report counters
self.counters.chain_block_counts.fetch_add(chain_block_counter, Ordering::Relaxed);
if chain_disqualified_counter > 0 {
self.counters.chain_disqualified_counts.fetch_add(chain_disqualified_counter, Ordering::Relaxed);
}

diff_point
}
Expand Down Expand Up @@ -559,7 +565,7 @@ impl VirtualStateProcessor {
finality_point: Hash,
pruning_point: Hash,
) -> (Hash, VecDeque<Hash>) {
// TODO: tests
// TODO (relaxed): additional tests

let mut heap = tips
.into_iter()
Expand Down Expand Up @@ -621,7 +627,7 @@ impl VirtualStateProcessor {
mut candidates: VecDeque<Hash>,
pruning_point: Hash,
) -> (Vec<Hash>, GhostdagData) {
// TODO: tests
// TODO (relaxed): additional tests

// Mergeset increasing might traverse DAG areas which are below the finality point and which theoretically
// can borderline with pruned data, hence we acquire the prune lock to ensure data consistency. Note that
Expand Down Expand Up @@ -670,7 +676,7 @@ impl VirtualStateProcessor {
MergesetIncreaseResult::Rejected { new_candidate } => {
// If we already have a candidate in the past of new candidate then skip.
if self.reachability_service.is_any_dag_ancestor(&mut candidates.iter().copied(), new_candidate) {
continue; // TODO: not sure this test is needed if candidates invariant as antichain is kept
continue; // TODO (optimization): not sure this check is needed if candidates invariant as antichain is kept
}
// Remove all candidates which are in the future of the new candidate
candidates.retain(|&h| !self.reachability_service.is_dag_ancestor_of(new_candidate, h));
Expand Down Expand Up @@ -860,7 +866,7 @@ impl VirtualStateProcessor {
build_mode: TemplateBuildMode,
) -> Result<BlockTemplate, RuleError> {
//
// TODO: tests
// TODO (relaxed): additional tests
//

// We call for the initial tx batch before acquiring the virtual read lock,
Expand Down Expand Up @@ -1048,7 +1054,7 @@ impl VirtualStateProcessor {
);
}

// TODO: rename to reflect finalizing pruning point utxoset state and importing *to* virtual utxoset
/// Finalizes the pruning point utxoset state and imports the pruning point utxoset *to* virtual utxoset
pub fn import_pruning_point_utxo_set(
&self,
new_pruning_point: Hash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ impl VirtualStateProcessor {
for i in 0..mutable_tx.tx.inputs.len() {
if mutable_tx.entries[i].is_some() {
// We prefer a previously populated entry if such exists
// TODO: consider re-checking the utxo view to get the most up-to-date entry (since DAA score can change)
continue;
}
if let Some(entry) = utxo_view.get(&mutable_tx.tx.inputs[i].previous_outpoint) {
Expand Down
4 changes: 2 additions & 2 deletions crypto/txscript/src/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ impl core::ops::Sub for &TxScriptCacheCountersSnapshot {

fn sub(self, rhs: Self) -> Self::Output {
Self::Output {
insert_counts: self.insert_counts.checked_sub(rhs.insert_counts).unwrap_or_default(),
get_counts: self.get_counts.checked_sub(rhs.get_counts).unwrap_or_default(),
insert_counts: self.insert_counts.saturating_sub(rhs.insert_counts),
get_counts: self.get_counts.saturating_sub(rhs.get_counts),
}
}
}
23 changes: 14 additions & 9 deletions mining/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct MiningCounters {
pub low_priority_tx_counts: AtomicU64,
pub block_tx_counts: AtomicU64,
pub tx_accepted_counts: AtomicU64,
pub tx_evicted_counts: AtomicU64,
pub input_counts: AtomicU64,
pub output_counts: AtomicU64,

Expand All @@ -48,6 +49,7 @@ impl Default for MiningCounters {
low_priority_tx_counts: Default::default(),
block_tx_counts: Default::default(),
tx_accepted_counts: Default::default(),
tx_evicted_counts: Default::default(),
input_counts: Default::default(),
output_counts: Default::default(),
ready_txs_sample: Default::default(),
Expand All @@ -66,6 +68,7 @@ impl MiningCounters {
low_priority_tx_counts: self.low_priority_tx_counts.load(Ordering::Relaxed),
block_tx_counts: self.block_tx_counts.load(Ordering::Relaxed),
tx_accepted_counts: self.tx_accepted_counts.load(Ordering::Relaxed),
tx_evicted_counts: self.tx_evicted_counts.load(Ordering::Relaxed),
input_counts: self.input_counts.load(Ordering::Relaxed),
output_counts: self.output_counts.load(Ordering::Relaxed),
ready_txs_sample: self.ready_txs_sample.load(Ordering::Relaxed),
Expand Down Expand Up @@ -101,6 +104,7 @@ pub struct MempoolCountersSnapshot {
pub low_priority_tx_counts: u64,
pub block_tx_counts: u64,
pub tx_accepted_counts: u64,
pub tx_evicted_counts: u64,
pub input_counts: u64,
pub output_counts: u64,
pub ready_txs_sample: u64,
Expand Down Expand Up @@ -151,13 +155,14 @@ impl core::ops::Sub for &MempoolCountersSnapshot {

fn sub(self, rhs: Self) -> Self::Output {
Self::Output {
elapsed_time: self.elapsed_time.checked_sub(rhs.elapsed_time).unwrap_or_default(),
high_priority_tx_counts: self.high_priority_tx_counts.checked_sub(rhs.high_priority_tx_counts).unwrap_or_default(),
low_priority_tx_counts: self.low_priority_tx_counts.checked_sub(rhs.low_priority_tx_counts).unwrap_or_default(),
block_tx_counts: self.block_tx_counts.checked_sub(rhs.block_tx_counts).unwrap_or_default(),
tx_accepted_counts: self.tx_accepted_counts.checked_sub(rhs.tx_accepted_counts).unwrap_or_default(),
input_counts: self.input_counts.checked_sub(rhs.input_counts).unwrap_or_default(),
output_counts: self.output_counts.checked_sub(rhs.output_counts).unwrap_or_default(),
elapsed_time: self.elapsed_time.saturating_sub(rhs.elapsed_time),
high_priority_tx_counts: self.high_priority_tx_counts.saturating_sub(rhs.high_priority_tx_counts),
low_priority_tx_counts: self.low_priority_tx_counts.saturating_sub(rhs.low_priority_tx_counts),
block_tx_counts: self.block_tx_counts.saturating_sub(rhs.block_tx_counts),
tx_accepted_counts: self.tx_accepted_counts.saturating_sub(rhs.tx_accepted_counts),
tx_evicted_counts: self.tx_evicted_counts.saturating_sub(rhs.tx_evicted_counts),
input_counts: self.input_counts.saturating_sub(rhs.input_counts),
output_counts: self.output_counts.saturating_sub(rhs.output_counts),
ready_txs_sample: (self.ready_txs_sample + rhs.ready_txs_sample) / 2,
txs_sample: (self.txs_sample + rhs.txs_sample) / 2,
orphans_sample: (self.orphans_sample + rhs.orphans_sample) / 2,
Expand All @@ -177,8 +182,8 @@ impl core::ops::Sub for &P2pTxCountSample {

fn sub(self, rhs: Self) -> Self::Output {
Self::Output {
elapsed_time: self.elapsed_time.checked_sub(rhs.elapsed_time).unwrap_or_default(),
low_priority_tx_counts: self.low_priority_tx_counts.checked_sub(rhs.low_priority_tx_counts).unwrap_or_default(),
elapsed_time: self.elapsed_time.saturating_sub(rhs.elapsed_time),
low_priority_tx_counts: self.low_priority_tx_counts.saturating_sub(rhs.low_priority_tx_counts),
}
}
}
36 changes: 22 additions & 14 deletions mining/src/mempool/validate_and_insert_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::Ordering;

use crate::mempool::{
errors::{RuleError, RuleResult},
model::{
Expand Down Expand Up @@ -84,21 +86,27 @@ impl Mempool {
// Before adding the transaction, check if there is room in the pool
let transaction_size = transaction.mempool_estimated_bytes();
let txs_to_remove = self.transaction_pool.limit_transaction_count(&transaction, transaction_size)?;
for x in txs_to_remove.iter() {
self.remove_transaction(x, true, TxRemovalReason::MakingRoom, format!(" for {}", transaction_id).as_str())?;
// self.transaction_pool.limit_transaction_count(&transaction) returns the
// smallest prefix of `ready_transactions` (sorted by ascending fee-rate)
// that makes enough room for `transaction`, but since each call to `self.remove_transaction`
// also removes all transactions dependant on `x` we might already have sufficient space, so
// we constantly check the break condition.
//
// Note that self.transaction_pool.len() < self.config.maximum_transaction_count means we have
// at least one available slot in terms of the count limit
if self.transaction_pool.len() < self.config.maximum_transaction_count
&& self.transaction_pool.get_estimated_size() + transaction_size <= self.config.mempool_size_limit
{
break;
if !txs_to_remove.is_empty() {
let transaction_pool_len_before = self.transaction_pool.len();
for x in txs_to_remove.iter() {
self.remove_transaction(x, true, TxRemovalReason::MakingRoom, format!(" for {}", transaction_id).as_str())?;
// self.transaction_pool.limit_transaction_count(&transaction) returns the
// smallest prefix of `ready_transactions` (sorted by ascending fee-rate)
// that makes enough room for `transaction`, but since each call to `self.remove_transaction`
// also removes all transactions dependant on `x` we might already have sufficient space, so
// we constantly check the break condition.
//
// Note that self.transaction_pool.len() < self.config.maximum_transaction_count means we have
// at least one available slot in terms of the count limit
if self.transaction_pool.len() < self.config.maximum_transaction_count
&& self.transaction_pool.get_estimated_size() + transaction_size <= self.config.mempool_size_limit
{
break;
}
}
self.counters
.tx_evicted_counts
.fetch_add(transaction_pool_len_before.saturating_sub(self.transaction_pool.len()) as u64, Ordering::Relaxed);
}

assert!(
Expand Down
6 changes: 6 additions & 0 deletions mining/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ impl MiningMonitor {
let feerate_estimations = self.mining_manager.clone().get_realtime_feerate_estimations().await;
debug!("Realtime feerate estimations: {}", feerate_estimations);
}
if delta.tx_evicted_counts > 0 {
info!(
"Mempool stats: {} transactions were evicted from the mempool in favor of incoming higher feerate transactions",
delta.tx_evicted_counts
);
}
if tx_script_cache_snapshot != last_tx_script_cache_snapshot {
debug!(
"UTXO set stats: {} spent, {} created ({} signatures validated, {} cache hits, {:.2} hit ratio)",
Expand Down
Loading

0 comments on commit deb1a8b

Please sign in to comment.