Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make transaction selector iterative and reduce consensus<->mempool sync gap #5

Merged
merged 8 commits into from
Sep 22, 2023
9 changes: 7 additions & 2 deletions consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use crate::{
acceptance_data::AcceptanceData,
block::{Block, BlockTemplate},
block::{Block, BlockTemplate, TemplateBuildMode, TemplateTransactionSelector},
block_count::BlockCount,
blockstatus::BlockStatus,
coinbase::MinerData,
Expand All @@ -27,7 +27,12 @@ pub type BlockValidationFuture = BoxFuture<'static, BlockProcessResult<BlockStat
/// Abstracts the consensus external API
#[allow(unused_variables)]
pub trait ConsensusApi: Send + Sync {
fn build_block_template(&self, miner_data: MinerData, txs: Vec<Transaction>) -> Result<BlockTemplate, RuleError> {
fn build_block_template(
&self,
miner_data: MinerData,
tx_selector: Box<dyn TemplateTransactionSelector>,
build_mode: TemplateBuildMode,
) -> Result<BlockTemplate, RuleError> {
unimplemented!()
}

Expand Down
34 changes: 33 additions & 1 deletion consensus/core/src/block.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::sync::Arc;

use crate::{coinbase::MinerData, header::Header, tx::Transaction};
use crate::{
coinbase::MinerData,
header::Header,
tx::{Transaction, TransactionId},
};
use kaspa_hashes::Hash;

/// A mutable block structure where header and transactions within can still be mutated.
Expand Down Expand Up @@ -64,6 +68,34 @@ impl Block {
}
}

/// An abstraction for a recallable transaction selector with persistent state
pub trait TemplateTransactionSelector {
/// Expected to return a batch of transactions which were not previously selected.
/// The batch will typically contain sufficient transactions to fill the block
/// mass (along with the previously unrejected txs), or will drain the selector
fn select_transactions(&mut self) -> Vec<Transaction>;

/// Should be used to report invalid transactions obtained from the *most recent*
/// `select_transactions` call. Implementors should use this call to internally
/// track the selection state and discard the rejected tx from internal occupation calculations
fn reject_selection(&mut self, tx_id: TransactionId);

/// Determine whether this was an overall successful selection episode
fn is_successful(&self) -> bool;
}

/// Block template build mode
#[derive(Clone, Copy, Debug)]
pub enum TemplateBuildMode {
/// Block template build can possibly fail if `TemplateTransactionSelector::is_successful` deems the operation unsuccessful.
///
/// In such a case, the build fails with `BlockRuleError::InvalidTransactionsInNewBlock`.
Standard,

/// Block template build always succeeds. The built block contains only the validated transactions.
Infallible,
}

/// A block template for miners.
#[derive(Debug, Clone)]
pub struct BlockTemplate {
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/errors/block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{collections::HashMap, fmt::Display};

use crate::{
constants,
Expand Down Expand Up @@ -140,7 +140,7 @@ pub enum RuleError {
InvalidTransactionsInUtxoContext(usize, usize),

#[error("invalid transactions in new block template")]
InvalidTransactionsInNewBlock(Vec<(TransactionId, TxRuleError)>),
InvalidTransactionsInNewBlock(HashMap<TransactionId, TxRuleError>),

#[error("DAA window data has only {0} entries")]
InsufficientDaaWindowSize(usize),
Expand Down
11 changes: 8 additions & 3 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
use kaspa_consensus_core::{
acceptance_data::AcceptanceData,
api::{BlockValidationFuture, ConsensusApi},
block::{Block, BlockTemplate},
block::{Block, BlockTemplate, TemplateBuildMode, TemplateTransactionSelector},
block_count::BlockCount,
blockhash::BlockHashExtensions,
blockstatus::BlockStatus,
Expand Down Expand Up @@ -353,8 +353,13 @@ impl Consensus {
}

impl ConsensusApi for Consensus {
fn build_block_template(&self, miner_data: MinerData, txs: Vec<Transaction>) -> Result<BlockTemplate, RuleError> {
self.virtual_processor.build_block_template(miner_data, txs)
fn build_block_template(
&self,
miner_data: MinerData,
tx_selector: Box<dyn TemplateTransactionSelector>,
build_mode: TemplateBuildMode,
) -> Result<BlockTemplate, RuleError> {
self.virtual_processor.build_block_template(miner_data, tx_selector, build_mode)
}

fn validate_and_insert_block(&self, block: Block) -> BlockValidationFuture {
Expand Down
64 changes: 54 additions & 10 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::{
};
use kaspa_consensus_core::{
acceptance_data::AcceptanceData,
block::{BlockTemplate, MutableBlock},
block::{BlockTemplate, MutableBlock, TemplateBuildMode, TemplateTransactionSelector},
blockstatus::BlockStatus::{StatusDisqualifiedFromChain, StatusUTXOValid},
coinbase::MinerData,
config::genesis::GenesisBlock,
Expand Down Expand Up @@ -88,7 +88,7 @@ use rayon::{
use rocksdb::WriteBatch;
use std::{
cmp::min,
collections::{BinaryHeap, VecDeque},
collections::{BinaryHeap, HashMap, VecDeque},
ops::Deref,
sync::{atomic::Ordering, Arc},
};
Expand Down Expand Up @@ -787,14 +787,59 @@ impl VirtualStateProcessor {
Ok(())
}

pub fn build_block_template(&self, miner_data: MinerData, txs: Vec<Transaction>) -> Result<BlockTemplate, RuleError> {
pub fn build_block_template(
&self,
miner_data: MinerData,
mut tx_selector: Box<dyn TemplateTransactionSelector>,
build_mode: TemplateBuildMode,
) -> Result<BlockTemplate, RuleError> {
//
// TODO: tests
//

// We call for the initial tx batch before acquiring the virtual read lock,
// optimizing for the common case where all txs are valid. Following selection calls
// are called within the lock in order to preserve validness of already validated txs
let mut txs = tx_selector.select_transactions();

let virtual_read = self.virtual_stores.read();
let virtual_state = virtual_read.state.get().unwrap();
let virtual_utxo_view = &virtual_read.utxo_set;

// Validate the transactions in virtual's utxo context
self.validate_block_template_transactions(&txs, &virtual_state, virtual_utxo_view)?;
let mut invalid_transactions = HashMap::new();
for tx in txs.iter() {
if let Err(e) = self.validate_block_template_transaction(tx, &virtual_state, virtual_utxo_view) {
invalid_transactions.insert(tx.id(), e);
tx_selector.reject_selection(tx.id());
}
}

let mut has_rejections = !invalid_transactions.is_empty();
if has_rejections {
txs.retain(|tx| !invalid_transactions.contains_key(&tx.id()));
}

while has_rejections {
has_rejections = false;
let next_batch = tx_selector.select_transactions(); // Note that once next_batch is empty the loop will exit
for tx in next_batch {
if let Err(e) = self.validate_block_template_transaction(&tx, &virtual_state, virtual_utxo_view) {
invalid_transactions.insert(tx.id(), e);
tx_selector.reject_selection(tx.id());
has_rejections = true;
} else {
txs.push(tx);
}
}
}

// Check whether this was an overall successful selection episode. We pass this decision
// to the selector implementation which has the broadest picture and can use mempool config
// and context
match (build_mode, tx_selector.is_successful()) {
(TemplateBuildMode::Standard, false) => return Err(RuleError::InvalidTransactionsInNewBlock(invalid_transactions)),
(TemplateBuildMode::Standard, true) | (TemplateBuildMode::Infallible, _) => {}
}

// At this point we can safely drop the read lock
drop(virtual_read);
Expand All @@ -803,18 +848,17 @@ impl VirtualStateProcessor {
self.build_block_template_from_virtual_state(virtual_state, miner_data, txs)
}

pub fn validate_block_template_transactions(
pub(crate) fn validate_block_template_transactions(
&self,
txs: &[Transaction],
virtual_state: &VirtualState,
utxo_view: &impl UtxoView,
) -> Result<(), RuleError> {
tiram88 marked this conversation as resolved.
Show resolved Hide resolved
// Search for invalid transactions. This can happen since the mining manager calling this function is not atomically in sync with virtual state
// TODO: process transactions in parallel
let mut invalid_transactions = Vec::new();
// Search for invalid transactions
let mut invalid_transactions = HashMap::new();
for tx in txs.iter() {
if let Err(e) = self.validate_block_template_transaction(tx, virtual_state, utxo_view) {
invalid_transactions.push((tx.id(), e))
invalid_transactions.insert(tx.id(), e);
}
}
if !invalid_transactions.is_empty() {
Expand Down
37 changes: 34 additions & 3 deletions consensus/src/pipeline/virtual_processor/tests.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,41 @@
use crate::{consensus::test_consensus::TestConsensus, model::services::reachability::ReachabilityService};
use kaspa_consensus_core::{
api::ConsensusApi,
block::{Block, BlockTemplate, MutableBlock},
block::{Block, BlockTemplate, MutableBlock, TemplateBuildMode, TemplateTransactionSelector},
blockhash,
blockstatus::BlockStatus,
coinbase::MinerData,
config::{params::MAINNET_PARAMS, ConfigBuilder},
tx::{ScriptPublicKey, ScriptVec},
tx::{ScriptPublicKey, ScriptVec, Transaction},
BlockHashSet,
};
use kaspa_hashes::Hash;
use std::{collections::VecDeque, thread::JoinHandle};

struct OnetimeTxSelector {
txs: Option<Vec<Transaction>>,
}

impl OnetimeTxSelector {
fn new(txs: Vec<Transaction>) -> Self {
Self { txs: Some(txs) }
}
}

impl TemplateTransactionSelector for OnetimeTxSelector {
fn select_transactions(&mut self) -> Vec<Transaction> {
self.txs.take().unwrap()
}

fn reject_selection(&mut self, _tx_id: kaspa_consensus_core::tx::TransactionId) {
unimplemented!()
}

fn is_successful(&self) -> bool {
true
}
}

struct TestContext {
consensus: TestConsensus,
join_handles: Vec<JoinHandle<()>>,
Expand Down Expand Up @@ -78,7 +102,14 @@ impl TestContext {
}

pub fn build_block_template(&self, nonce: u64, timestamp: u64) -> BlockTemplate {
let mut t = self.consensus.build_block_template(self.miner_data.clone(), Default::default()).unwrap();
let mut t = self
.consensus
.build_block_template(
self.miner_data.clone(),
Box::new(OnetimeTxSelector::new(Default::default())),
TemplateBuildMode::Standard,
)
.unwrap();
t.block.header.timestamp = timestamp;
t.block.header.nonce = nonce;
t.block.header.finalize();
Expand Down
34 changes: 10 additions & 24 deletions mining/src/block_template/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use super::{errors::BuilderResult, policy::Policy};
use crate::{block_template::selector::TransactionsSelector, model::candidate_tx::CandidateTransaction};
use kaspa_consensus_core::{
api::ConsensusApi,
block::BlockTemplate,
block::{BlockTemplate, TemplateBuildMode},
coinbase::MinerData,
merkle::calc_hash_merkle_root,
tx::{TransactionId, COINBASE_TRANSACTION_INDEX},
tx::COINBASE_TRANSACTION_INDEX,
};
use kaspa_core::{
debug,
Expand All @@ -14,15 +14,12 @@ use kaspa_core::{

pub(crate) struct BlockTemplateBuilder {
policy: Policy,
selector: TransactionsSelector,
}

impl BlockTemplateBuilder {
pub(crate) fn new(max_block_mass: u64, transactions: Vec<CandidateTransaction>) -> Self {
let _sw = Stopwatch::<50>::with_threshold("BlockTemplateBuilder::new");
pub(crate) fn new(max_block_mass: u64) -> Self {
let policy = Policy::new(max_block_mass);
let selector = TransactionsSelector::new(policy.clone(), transactions);
Self { policy, selector }
Self { policy }
}

/// BuildBlockTemplate creates a block template for a miner to consume
Expand Down Expand Up @@ -89,27 +86,16 @@ impl BlockTemplateBuilder {
/// | <= policy.BlockMinSize) | |
/// ----------------------------------- --
pub(crate) fn build_block_template(
&mut self,
&self,
consensus: &dyn ConsensusApi,
miner_data: &MinerData,
transactions: Vec<CandidateTransaction>,
build_mode: TemplateBuildMode,
) -> BuilderResult<BlockTemplate> {
let _sw = Stopwatch::<20>::with_threshold("build_block_template op");
debug!("Considering {} transactions for a new block template", self.selector.len());
let block_txs = self.selector.select_transactions();
Ok(consensus.build_block_template(miner_data.clone(), block_txs)?)
}

pub(crate) fn update_transactions(&mut self, transactions: Vec<CandidateTransaction>) {
let selector = TransactionsSelector::new(self.policy.clone(), transactions);
self.selector = selector;
}

pub(crate) fn reject_transaction(&mut self, transaction_id: TransactionId) {
self.selector.reject(transaction_id);
}

pub(crate) fn candidates_len(&self) -> usize {
self.selector.len()
debug!("Considering {} transactions for a new block template", transactions.len());
let selector = Box::new(TransactionsSelector::new(self.policy.clone(), transactions));
Ok(consensus.build_block_template(miner_data.clone(), selector, build_mode)?)
}

/// modify_block_template clones an existing block template, modifies it to the requested coinbase data and updates the timestamp
Expand Down
17 changes: 7 additions & 10 deletions mining/src/block_template/model/tx.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
pub(crate) struct SelectableTransaction {
pub(crate) gas_limit: u64,
pub(crate) p: f64,

/// Has this candidate been rejected by the consensus?
pub(crate) is_rejected: bool,
}

impl SelectableTransaction {
pub(crate) fn new(tx_value: f64, gas_limit: u64, alpha: i32) -> Self {
Self { gas_limit, p: tx_value.powi(alpha), is_rejected: false }
Self { gas_limit, p: tx_value.powi(alpha) }
}
}

Expand All @@ -22,6 +19,7 @@ pub(crate) struct Candidate {

/// Range start in the candidate list total_p space
pub(crate) start: f64,

/// Range end in the candidate list total_p space
pub(crate) end: f64,

Expand All @@ -35,6 +33,7 @@ impl Candidate {
}
}

#[derive(Default)]
pub(crate) struct CandidateList {
pub(crate) candidates: Vec<Candidate>,
pub(crate) total_p: f64,
Expand All @@ -45,12 +44,10 @@ impl CandidateList {
let mut candidates = Vec::with_capacity(selectable_txs.len());
let mut total_p = 0.0;
selectable_txs.iter().enumerate().for_each(|(i, tx)| {
if !tx.is_rejected {
let current_p = tx.p;
let candidate = Candidate::new(i, total_p, total_p + current_p);
candidates.push(candidate);
total_p += current_p;
}
let current_p = tx.p;
let candidate = Candidate::new(i, total_p, total_p + current_p);
candidates.push(candidate);
total_p += current_p;
});
Self { candidates, total_p }
}
Expand Down
Loading