diff --git a/Cargo.lock b/Cargo.lock index 0577d9788..c33869fcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,12 +119,6 @@ version = "3.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" -[[package]] -name = "bytecount" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" - [[package]] name = "bzip2-sys" version = "0.1.11+1.0.8" @@ -136,37 +130,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "camino" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88ad0e1e3e88dd237a156ab9f571021b8a158caa0ae44b1968a241efb5144c1e" -dependencies = [ - "serde", -] - -[[package]] -name = "cargo-platform" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27" -dependencies = [ - "serde", -] - -[[package]] -name = "cargo_metadata" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" -dependencies = [ - "camino", - "cargo-platform", - "semver", - "serde", - "serde_json", -] - [[package]] name = "cast" version = "0.3.0" @@ -268,11 +231,11 @@ dependencies = [ "flate2", "futures-util", "hashes", + "indexmap", "itertools", "kaspa-core", "math", "merkle", - "moka", "parking_lot", "pow", "rand", @@ -436,15 +399,6 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" -[[package]] -name = "error-chain" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" -dependencies = [ - "version_check", -] - [[package]] name = "faster-hex" version = "0.6.1" @@ -512,7 +466,7 @@ checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ "cfg-if", "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -597,7 +551,7 @@ dependencies = [ "proc-macro2", "quote", "syn", - "uuid 0.8.2", + "uuid", ] [[package]] @@ -630,15 +584,6 @@ dependencies = [ "libc", ] -[[package]] -name = "js-sys" -version = "0.3.60" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47" -dependencies = [ - "wasm-bindgen", -] - [[package]] name = "kaspa-core" version = "0.1.0" @@ -784,15 +729,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "mach" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" -dependencies = [ - "libc", -] - [[package]] name = "math" version = "0.1.0" @@ -840,28 +776,6 @@ dependencies = [ "adler", ] -[[package]] -name = "moka" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8256cf9fa396576521e5e94affadb95818ec48f5dbedca36714387688aea29" -dependencies = [ - "crossbeam-channel", - "crossbeam-epoch", - "crossbeam-utils", - "num_cpus", - "once_cell", - "parking_lot", - "quanta", - "scheduled-thread-pool", - "skeptic", - "smallvec", - "tagptr", - "thiserror", - "triomphe", - "uuid 1.1.2", -] - [[package]] name = "muhash" version = "0.1.0" @@ -1022,33 +936,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "pulldown-cmark" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63" -dependencies = [ - "bitflags", - "memchr", - "unicase", -] - -[[package]] -name = "quanta" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27" -dependencies = [ - "crossbeam-utils", - "libc", - "mach", - "once_cell", - "raw-cpuid", - "wasi 0.10.2+wasi-snapshot-preview1", - "web-sys", - "winapi", -] - [[package]] name = "quote" version = "1.0.21" @@ -1098,15 +985,6 @@ dependencies = [ "rand", ] -[[package]] -name = "raw-cpuid" -version = "10.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb" -dependencies = [ - "bitflags", -] - [[package]] name = "rayon" version = "1.5.3" @@ -1195,30 +1073,12 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "977a7519bff143a44f842fd07e80ad1329295bd71686457f18e496736f4bf9bf" -dependencies = [ - "parking_lot", -] - [[package]] name = "scopeguard" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" -[[package]] -name = "semver" -version = "1.0.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" -dependencies = [ - "serde", -] - [[package]] name = "serde" version = "1.0.145" @@ -1277,21 +1137,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" -[[package]] -name = "skeptic" -version = "0.13.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" -dependencies = [ - "bytecount", - "cargo_metadata", - "error-chain", - "glob", - "pulldown-cmark", - "tempfile", - "walkdir", -] - [[package]] name = "smallvec" version = "1.9.0" @@ -1309,12 +1154,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "tagptr" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" - [[package]] name = "tempfile" version = "3.3.0" @@ -1389,27 +1228,12 @@ dependencies = [ "syn", ] -[[package]] -name = "triomphe" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" - [[package]] name = "typenum" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" -[[package]] -name = "unicase" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-ident" version = "1.0.4" @@ -1425,15 +1249,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "uuid" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" -dependencies = [ - "getrandom", -] - [[package]] name = "vcpkg" version = "0.2.15" @@ -1457,12 +1272,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "wasi" -version = "0.10.2+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -1525,16 +1334,6 @@ version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" -[[package]] -name = "web-sys" -version = "0.3.60" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 0617ef2f4..410f9ee78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,8 +43,11 @@ rand_chacha = "0.3" rayon = "1" tempfile = "3.3" serde = { version = "1", features = ["derive", "rc"] } -futures-util = { version = "0.3", default-features = false, features = ["alloc"] } +futures-util = { version = "0.3", default-features = false, features = [ + "alloc", +] } bincode = { version = "1", default-features = false } tokio = { version = "1", features = ["sync"] } wasm-bindgen = { version = "0.2", features = ["serde-serialize"] } criterion = { version = "0.4", default-features = false } +indexmap = "1.9.1" diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index e85fb0ed0..39d6419f1 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -23,9 +23,10 @@ tokio.workspace = true bincode.workspace = true tempfile.workspace = true rayon.workspace = true +rand.workspace = true +indexmap.workspace = true rocksdb = "0.19" -moka = "0.9" parking_lot = "0.12" crossbeam-channel = "0.5" diff --git a/consensus/pow/src/lib.rs b/consensus/pow/src/lib.rs index 33161ee05..ee5b4ec13 100644 --- a/consensus/pow/src/lib.rs +++ b/consensus/pow/src/lib.rs @@ -42,9 +42,9 @@ impl State { #[inline] #[must_use] - pub fn check_pow(&self, nonce: u64) -> bool { + pub fn check_pow(&self, nonce: u64) -> (bool, Uint256) { let pow = self.calculate_pow(nonce); // The pow hash must be less or equal than the claimed target. - pow <= self.target + (pow <= self.target, pow) } } diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 9ab131a25..c8696eb62 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -11,6 +11,7 @@ use crate::{ depth::DbDepthStore, ghostdag::DbGhostdagStore, headers::DbHeadersStore, + past_pruning_points::DbPastPruningPointsStore, pruning::DbPruningStore, reachability::DbReachabilityStore, relations::DbRelationsStore, @@ -28,8 +29,9 @@ use crate::{ }, processes::{ block_at_depth::BlockDepthManager, coinbase::CoinbaseManager, dagtraversalmanager::DagTraversalManager, - difficulty::DifficultyManager, ghostdag::protocol::GhostdagManager, mass::MassCalculator, - pastmediantime::PastMedianTimeManager, reachability::inquirer as reachability, transaction_validator::TransactionValidator, + difficulty::DifficultyManager, ghostdag::protocol::GhostdagManager, mass::MassCalculator, parents_builder::ParentsManager, + pastmediantime::PastMedianTimeManager, pruning::PruningManager, reachability::inquirer as reachability, + transaction_validator::TransactionValidator, }, }; use consensus_core::block::Block; @@ -79,6 +81,7 @@ pub struct Consensus { >, pub(super) past_median_time_manager: PastMedianTimeManager, pub(super) coinbase_manager: CoinbaseManager, + pub(super) pruning_manager: PruningManager, // Counters pub counters: Arc, @@ -86,15 +89,17 @@ pub struct Consensus { impl Consensus { pub fn new(db: Arc, params: &Params) -> Self { - let statuses_store = Arc::new(RwLock::new(DbStatusesStore::new(db.clone(), 100000))); - let relations_store = Arc::new(RwLock::new(DbRelationsStore::new(db.clone(), 100000))); - let reachability_store = Arc::new(RwLock::new(DbReachabilityStore::new(db.clone(), 100000))); + const CACHE_SIZE: u64 = 100_000; + let statuses_store = Arc::new(RwLock::new(DbStatusesStore::new(db.clone(), CACHE_SIZE))); + let relations_store = Arc::new(RwLock::new(DbRelationsStore::new(db.clone(), CACHE_SIZE))); + let reachability_store = Arc::new(RwLock::new(DbReachabilityStore::new(db.clone(), CACHE_SIZE))); let pruning_store = Arc::new(RwLock::new(DbPruningStore::new(db.clone()))); - let ghostdag_store = Arc::new(DbGhostdagStore::new(db.clone(), 100000)); - let daa_store = Arc::new(DbDaaStore::new(db.clone(), 100000)); - let headers_store = Arc::new(DbHeadersStore::new(db.clone(), 100000)); - let depth_store = Arc::new(DbDepthStore::new(db.clone(), 100000)); - let block_transactions_store = Arc::new(DbBlockTransactionsStore::new(db.clone(), 100000)); + let ghostdag_store = Arc::new(DbGhostdagStore::new(db.clone(), CACHE_SIZE)); + let daa_store = Arc::new(DbDaaStore::new(db.clone(), CACHE_SIZE)); + let headers_store = Arc::new(DbHeadersStore::new(db.clone(), CACHE_SIZE)); + let depth_store = Arc::new(DbDepthStore::new(db.clone(), CACHE_SIZE)); + let block_transactions_store = Arc::new(DbBlockTransactionsStore::new(db.clone(), CACHE_SIZE)); + let past_pruning_points_store = Arc::new(DbPastPruningPointsStore::new(db.clone(), CACHE_SIZE)); let block_window_cache_for_difficulty = Arc::new(BlockWindowCacheStore::new(2000)); let block_window_cache_for_past_median_time = Arc::new(BlockWindowCacheStore::new(2000)); @@ -150,6 +155,24 @@ impl Consensus { params.coinbase_payload_script_public_key_max_len, ); + let pruning_manager = PruningManager::new( + params.pruning_depth, + params.finality_depth, + params.genesis_hash, + reachability_service.clone(), + ghostdag_store.clone(), + headers_store.clone(), + past_pruning_points_store.clone(), + ); + + let parents_manager = ParentsManager::new( + params.max_block_level, + params.genesis_hash, + headers_store.clone(), + reachability_service.clone(), + relations_store.clone(), + ); + let (sender, receiver): (Sender, Receiver) = unbounded(); let (body_sender, body_receiver): (Sender, Receiver) = unbounded(); let (virtual_sender, virtual_receiver): (Sender, Receiver) = unbounded(); @@ -177,6 +200,8 @@ impl Consensus { dag_traversal_manager.clone(), difficulty_manager.clone(), depth_manager, + pruning_manager.clone(), + parents_manager, counters.clone(), )); @@ -197,8 +222,17 @@ impl Consensus { params.genesis_hash, )); - let virtual_processor = - Arc::new(VirtualStateProcessor::new(virtual_receiver, db.clone(), statuses_store.clone(), reachability_service.clone())); + let virtual_processor = Arc::new(VirtualStateProcessor::new( + virtual_receiver, + db.clone(), + params.genesis_hash, + statuses_store.clone(), + pruning_store.clone(), + ghostdag_store.clone(), + past_pruning_points_store, + reachability_service.clone(), + pruning_manager.clone(), + )); Self { db, @@ -227,6 +261,7 @@ impl Consensus { ), past_median_time_manager, coinbase_manager, + pruning_manager, counters, } @@ -238,6 +273,7 @@ impl Consensus { // Ensure that genesis was processed self.header_processor.process_genesis_if_needed(); + self.virtual_processor.process_genesis_if_needed(); // Spawn the asynchronous processors. let header_processor = self.header_processor.clone(); diff --git a/consensus/src/consensus/test_consensus.rs b/consensus/src/consensus/test_consensus.rs index 2dbb02c0b..b858b5540 100644 --- a/consensus/src/consensus/test_consensus.rs +++ b/consensus/src/consensus/test_consensus.rs @@ -43,22 +43,18 @@ impl TestConsensus { pub fn build_header_with_parents(&self, hash: Hash, parents: Vec) -> Header { let mut header = header_from_precomputed_hash(hash, parents); - let mut ctx: HeaderProcessingContext = - HeaderProcessingContext::new(hash, &header, self.consensus.pruning_store.read().pruning_point().unwrap()); + let mut ctx = HeaderProcessingContext::new(hash, &header, self.consensus.pruning_store.read().get().unwrap()); self.consensus.ghostdag_manager.add_block(&mut ctx, hash); let ghostdag_data = ctx.ghostdag_data.unwrap(); + header.pruning_point = + self.consensus.pruning_manager.expected_header_pruning_point(ghostdag_data.to_compact(), ctx.pruning_info); let window = self.consensus.dag_traversal_manager.block_window(ghostdag_data.clone(), self.params.difficulty_window_size); - let mut window_hashes = window.iter().map(|item| item.0.hash); - let (daa_score, _) = self.consensus.difficulty_manager.calc_daa_score_and_added_blocks(&mut window_hashes, &ghostdag_data); - header.bits = self.consensus.difficulty_manager.calculate_difficulty_bits(&window); - header.daa_score = daa_score; - header.timestamp = self.consensus.past_median_time_manager.calc_past_median_time(ghostdag_data.clone()).0 + 1; header.blue_score = ghostdag_data.blue_score; header.blue_work = ghostdag_data.blue_work; @@ -72,12 +68,11 @@ impl TestConsensus { pub fn build_block_with_parents_and_transactions(&self, hash: Hash, parents: Vec, txs: Vec) -> Block { let mut header = self.build_header_with_parents(hash, parents); - let mut cb_payload: Vec = vec![]; - cb_payload.append(&mut header.blue_score.to_le_bytes().to_vec()); - cb_payload.append(&mut (self.consensus.coinbase_manager.calc_block_subsidy(header.daa_score)).to_le_bytes().to_vec()); // Subsidy - cb_payload.append(&mut (0_u16).to_le_bytes().to_vec()); // Script public key version - cb_payload.append(&mut (0_u8).to_le_bytes().to_vec()); // Script public key length - cb_payload.append(&mut vec![]); // Script public key + let cb_payload: Vec = header.blue_score.to_le_bytes().iter().copied() // Blue score + .chain(self.consensus.coinbase_manager.calc_block_subsidy(header.daa_score).to_le_bytes().iter().copied()) // Subsidy + .chain((0_u16).to_le_bytes().iter().copied()) // Script public key version + .chain((0_u8).to_le_bytes().iter().copied()) // Script public key length + .collect(); let cb = Transaction::new(TX_VERSION, vec![], vec![], 0, SUBNETWORK_ID_COINBASE, 0, cb_payload, 0); let final_txs = vec![vec![cb], txs].concat(); diff --git a/consensus/src/errors.rs b/consensus/src/errors.rs index 78fcd4079..a08faae83 100644 --- a/consensus/src/errors.rs +++ b/consensus/src/errors.rs @@ -1,3 +1,5 @@ +use std::fmt::Display; + use crate::{ constants, processes::{coinbase::CoinbaseError, transaction_validator::errors::TxRuleError}, @@ -7,8 +9,25 @@ use consensus_core::{ BlueWorkType, }; use hashes::Hash; +use itertools::Itertools; use thiserror::Error; +#[derive(Clone, Debug)] + +pub struct VecDisplay(pub Vec); +impl Display for VecDisplay { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[{}]", self.0.iter().map(|item| item.to_string()).join(", ")) + } +} +#[derive(Clone, Debug)] +pub struct TwoDimVecDisplay(pub Vec>); +impl Display for TwoDimVecDisplay { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[\n\t{}\n]", self.0.iter().cloned().map(|item| VecDisplay(item).to_string()).join(", \n\t")) + } +} + #[derive(Error, Debug, Clone)] pub enum RuleError { #[error("wrong block version: got {0} but expected {}", constants::BLOCK_VERSION)] @@ -103,6 +122,12 @@ pub enum RuleError { #[error("block has invalid proof-of-work")] InvalidPoW, + + #[error("Expected header pruning point is {0} but got {1}")] + WrongHeaderPruningPoint(Hash, Hash), + + #[error("Expected indirect parents {0} but got {1}")] + UnexpectedIndirectParents(TwoDimVecDisplay, TwoDimVecDisplay), } pub type BlockProcessResult = std::result::Result; diff --git a/consensus/src/model/services/reachability.rs b/consensus/src/model/services/reachability.rs index d8802ef6a..ade969561 100644 --- a/consensus/src/model/services/reachability.rs +++ b/consensus/src/model/services/reachability.rs @@ -5,12 +5,15 @@ use consensus_core::blockhash; use parking_lot::RwLock; use crate::model::stores::reachability::ReachabilityStoreReader; -use crate::processes::reachability::{inquirer, ReachabilityError, Result}; +use crate::processes::reachability::{inquirer, Result}; use hashes::Hash; pub trait ReachabilityService { fn is_chain_ancestor_of(&self, this: Hash, queried: Hash) -> bool; + fn is_dag_ancestor_of_result(&self, this: Hash, queried: Hash) -> Result; fn is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> bool; + fn is_dag_ancestor_of_any(&self, this: Hash, queried: &mut impl Iterator) -> bool; + fn is_any_dag_ancestor(&self, list: &mut impl Iterator, queried: Hash) -> bool; fn get_next_chain_ancestor(&self, descendant: Hash, ancestor: Hash) -> Hash; } @@ -32,11 +35,26 @@ impl ReachabilityService for MTReachability inquirer::is_chain_ancestor_of(read_guard.deref(), this, queried).unwrap() } + fn is_dag_ancestor_of_result(&self, this: Hash, queried: Hash) -> Result { + let read_guard = self.store.read(); + inquirer::is_dag_ancestor_of(read_guard.deref(), this, queried) + } + fn is_dag_ancestor_of(&self, this: Hash, queried: Hash) -> bool { let read_guard = self.store.read(); inquirer::is_dag_ancestor_of(read_guard.deref(), this, queried).unwrap() } + fn is_any_dag_ancestor(&self, list: &mut impl Iterator, queried: Hash) -> bool { + let read_guard = self.store.read(); + list.any(|hash| inquirer::is_dag_ancestor_of(read_guard.deref(), hash, queried).unwrap()) + } + + fn is_dag_ancestor_of_any(&self, this: Hash, queried: &mut impl Iterator) -> bool { + let read_guard = self.store.read(); + queried.any(|hash| inquirer::is_dag_ancestor_of(read_guard.deref(), this, hash).unwrap()) + } + fn get_next_chain_ancestor(&self, descendant: Hash, ancestor: Hash) -> Hash { let read_guard = self.store.read(); inquirer::get_next_chain_ancestor(read_guard.deref(), descendant, ancestor).unwrap() @@ -50,13 +68,8 @@ impl MTReachabilityService { /// To skip `from_ancestor` simply apply `skip(1)`. /// /// The caller is expected to verify that `from_ancestor` is indeed a chain ancestor of - /// `to_descendant`, otherwise an error will be returned. - pub fn forward_chain_iterator( - &self, - from_ancestor: Hash, - to_descendant: Hash, - inclusive: bool, - ) -> impl Iterator> { + /// `to_descendant`, otherwise the function will panic. + pub fn forward_chain_iterator(&self, from_ancestor: Hash, to_descendant: Hash, inclusive: bool) -> impl Iterator { ForwardChainIterator::new(self.store.clone(), from_ancestor, to_descendant, inclusive) } @@ -66,19 +79,14 @@ impl MTReachabilityService { /// To skip `from_descendant` simply apply `skip(1)`. /// /// The caller is expected to verify that `to_ancestor` is indeed a chain ancestor of - /// `from_descendant`, otherwise the iterator will eventually return an error. - pub fn backward_chain_iterator( - &self, - from_descendant: Hash, - to_ancestor: Hash, - inclusive: bool, - ) -> impl Iterator> { + /// `from_descendant`, otherwise the function will panic. + pub fn backward_chain_iterator(&self, from_descendant: Hash, to_ancestor: Hash, inclusive: bool) -> impl Iterator { BackwardChainIterator::new(self.store.clone(), from_descendant, to_ancestor, inclusive) } /// Returns the default chain iterator, walking from `from` backward down the /// selected chain until `virtual genesis` (aka `blockhash::ORIGIN`; exclusive) - pub fn default_chain_iterator(&self, from: Hash) -> impl Iterator> { + pub fn default_chain_iterator(&self, from: Hash) -> impl Iterator { BackwardChainIterator::new(self.store.clone(), from, blockhash::ORIGIN, false) } } @@ -103,30 +111,23 @@ impl BackwardChainIterator { } impl Iterator for BackwardChainIterator { - type Item = Result; + type Item = Hash; fn next(&mut self) -> Option { if let Some(current) = self.current { if current == self.ancestor { if self.inclusive { self.current = None; - Some(Ok(current)) + Some(current) } else { self.current = None; None } } else { debug_assert_ne!(current, blockhash::NONE); - match self.store.read().get_parent(current) { - Ok(next) => { - self.current = Some(next); - Some(Ok(current)) - } - Err(e) => { - self.current = None; - Some(Err(ReachabilityError::StoreError(e))) - } - } + let next = self.store.read().get_parent(current).unwrap(); + self.current = Some(next); + Some(current) } } else { None @@ -148,29 +149,22 @@ impl ForwardChainIterator { } impl Iterator for ForwardChainIterator { - type Item = Result; + type Item = Hash; fn next(&mut self) -> Option { if let Some(current) = self.current { if current == self.descendant { if self.inclusive { self.current = None; - Some(Ok(current)) + Some(current) } else { self.current = None; None } } else { - match inquirer::get_next_chain_ancestor(self.store.read().deref(), self.descendant, current) { - Ok(next) => { - self.current = Some(next); - Some(Ok(current)) - } - Err(e) => { - self.current = None; - Some(Err(e)) - } - } + let next = inquirer::get_next_chain_ancestor(self.store.read().deref(), self.descendant, current).unwrap(); + self.current = Some(next); + Some(current) } } else { None @@ -213,19 +207,19 @@ mod tests { // Assert let expected_hashes = [2u64, 3, 5, 6].map(Hash::from); - assert!(expected_hashes.iter().cloned().eq(iter.map(|r| r.unwrap()))); + assert!(expected_hashes.iter().cloned().eq(iter)); // Inclusive let iter = service.forward_chain_iterator(2.into(), 10.into(), true); // Assert let expected_hashes = [2u64, 3, 5, 6, 10].map(Hash::from); - assert!(expected_hashes.iter().cloned().eq(iter.map(|r| r.unwrap()))); + assert!(expected_hashes.iter().cloned().eq(iter)); // Compare backward to reversed forward - let forward_iter = service.forward_chain_iterator(2.into(), 10.into(), true).map(|r| r.unwrap()); - let backward_iter: Result> = service.backward_chain_iterator(10.into(), 2.into(), true).collect(); - assert!(forward_iter.eq(backward_iter.unwrap().iter().cloned().rev())) + let forward_iter = service.forward_chain_iterator(2.into(), 10.into(), true); + let backward_iter: Vec = service.backward_chain_iterator(10.into(), 2.into(), true).collect(); + assert!(forward_iter.eq(backward_iter.iter().cloned().rev())) } #[test] @@ -238,28 +232,13 @@ mod tests { let service = MTReachabilityService::new(Arc::new(RwLock::new(store))); // Asserts - assert!([1u64, 2] - .map(Hash::from) - .iter() - .cloned() - .eq(service.forward_chain_iterator(1.into(), 2.into(), true).map(|r| r.unwrap()))); - - assert!([1u64] - .map(Hash::from) - .iter() - .cloned() - .eq(service.forward_chain_iterator(1.into(), 2.into(), false).map(|r| r.unwrap()))); - - assert!([2u64, 1] - .map(Hash::from) - .iter() - .cloned() - .eq(service.backward_chain_iterator(2.into(), root, true).map(|r| r.unwrap()))); - - assert!([2u64].map(Hash::from).iter().cloned().eq(service.backward_chain_iterator(2.into(), root, false).map(|r| r.unwrap()))); - - assert!(std::iter::once_with(|| root).eq(service.backward_chain_iterator(root, root, true).map(|r| r.unwrap()))); - - assert!(std::iter::empty::().eq(service.backward_chain_iterator(root, root, false).map(|r| r.unwrap()))); + assert!([1u64, 2].map(Hash::from).iter().cloned().eq(service.forward_chain_iterator(1.into(), 2.into(), true))); + assert!([1u64].map(Hash::from).iter().cloned().eq(service.forward_chain_iterator(1.into(), 2.into(), false))); + assert!([2u64, 1].map(Hash::from).iter().cloned().eq(service.backward_chain_iterator(2.into(), root, true))); + assert!([2u64].map(Hash::from).iter().cloned().eq(service.backward_chain_iterator(2.into(), root, false))); + assert!(std::iter::once(root).eq(service.backward_chain_iterator(root, root, true))); + assert!(std::iter::empty::().eq(service.backward_chain_iterator(root, root, false))); + assert!(std::iter::once(root).eq(service.forward_chain_iterator(root, root, true))); + assert!(std::iter::empty::().eq(service.forward_chain_iterator(root, root, false))); } } diff --git a/consensus/src/model/stores/block_window_cache.rs b/consensus/src/model/stores/block_window_cache.rs index ab655d2fe..d3e2f10ab 100644 --- a/consensus/src/model/stores/block_window_cache.rs +++ b/consensus/src/model/stores/block_window_cache.rs @@ -1,10 +1,11 @@ -use moka::sync::Cache; use std::{cmp::Reverse, collections::BinaryHeap, sync::Arc}; use crate::processes::ghostdag::ordering::SortableBlock; use hashes::Hash; +use super::caching::Cache; + pub type BlockWindowHeap = BinaryHeap>; /// Reader API for `BlockWindowCacheStore`. diff --git a/consensus/src/model/stores/caching.rs b/consensus/src/model/stores/caching.rs index 8d9f446a1..85253333b 100644 --- a/consensus/src/model/stores/caching.rs +++ b/consensus/src/model/stores/caching.rs @@ -1,8 +1,10 @@ use super::{errors::StoreError, DB}; -use moka::sync::Cache; +use indexmap::IndexMap; +use parking_lot::RwLock; +use rand::Rng; use rocksdb::WriteBatch; use serde::{de::DeserializeOwned, Serialize}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; const SEP: u8 = b'/'; @@ -22,16 +24,46 @@ impl AsRef<[u8]> for DbKey { } } +#[derive(Clone)] +pub struct Cache { + map: Arc>>, // We use IndexMap and not HashMap, because it makes it cheaper to remove a random element when the cache is full. + size: usize, +} + +impl Cache { + pub fn new(size: u64) -> Self { + Self { map: Arc::new(RwLock::new(IndexMap::with_capacity(size as usize))), size: size as usize } + } + + pub fn get(&self, key: &TKey) -> Option { + self.map.read().get(key).cloned() + } + + pub fn contains_key(&self, key: &TKey) -> bool { + self.map.read().contains_key(key) + } + + pub fn insert(&self, key: TKey, data: TData) { + if self.size == 0 { + return; + } + + let mut write_guard = self.map.write(); + if write_guard.len() == self.size { + write_guard.swap_remove_index(rand::thread_rng().gen_range(0..self.size)); + } + write_guard.insert(key, data); + } +} + /// A concurrent DB store with typed caching. #[derive(Clone)] pub struct CachedDbAccess where - TKey: std::hash::Hash + Eq + Send + Sync + 'static, - TData: Clone + Send + Sync + 'static, + TKey: Clone + std::hash::Hash + Eq + Send + Sync, + TData: Clone + Send + Sync, { db: Arc, - // The moka cache type supports shallow cloning and manages - // ref counting internally, so no need for Arc cache: Cache>, // DB bucket/path (TODO: eventually this must become dynamic in @@ -41,8 +73,8 @@ where impl CachedDbAccess where - TKey: std::hash::Hash + Eq + Send + Sync + 'static, - TData: Clone + Send + Sync + 'static, + TKey: Clone + std::hash::Hash + Eq + Send + Sync, + TData: Clone + Send + Sync, { pub fn new(db: Arc, cache_size: u64, prefix: &'static [u8]) -> Self { Self { db, cache: Cache::new(cache_size), prefix } @@ -106,12 +138,10 @@ where #[derive(Clone)] pub struct CachedDbAccessForCopy where - TKey: std::hash::Hash + Eq + Send + Sync + 'static, - TData: Clone + Copy + Send + Sync + 'static, + TKey: Clone + std::hash::Hash + Eq + Send + Sync, + TData: Clone + Copy + Send + Sync, { db: Arc, - // The moka cache type supports shallow cloning and manages - // ref counting internally, so no need for Arc cache: Cache, // DB bucket/path (TODO: eventually this must become dynamic in @@ -121,8 +151,8 @@ where impl CachedDbAccessForCopy where - TKey: std::hash::Hash + Eq + Send + Sync + 'static, - TData: Clone + Copy + Send + Sync + 'static, + TKey: Clone + std::hash::Hash + Eq + Send + Sync, + TData: Clone + Copy + Send + Sync, { pub fn new(db: Arc, cache_size: u64, prefix: &'static [u8]) -> Self { Self { db, cache: Cache::new(cache_size), prefix } @@ -192,11 +222,11 @@ impl CachedDbItem { where T: Copy + DeserializeOwned, { - if let Some(root) = *self.cached_item.read().unwrap() { + if let Some(root) = *self.cached_item.read() { Ok(root) } else if let Some(slice) = self.db.get_pinned(self.key)? { let item: T = bincode::deserialize(&slice)?; - *self.cached_item.write().unwrap() = Some(item); + *self.cached_item.write() = Some(item); Ok(item) } else { Err(StoreError::KeyNotFound(String::from_utf8(Vec::from(self.key)).unwrap())) @@ -207,7 +237,7 @@ impl CachedDbItem { where T: Copy + Serialize, // Copy can be relaxed to Clone if needed by new usages { - *self.cached_item.write().unwrap() = Some(*item); + *self.cached_item.write() = Some(*item); let bin_data = bincode::serialize(&item)?; self.db.put(self.key, bin_data)?; Ok(()) @@ -217,7 +247,7 @@ impl CachedDbItem { where T: Copy + Serialize, { - *self.cached_item.write().unwrap() = Some(*item); + *self.cached_item.write() = Some(*item); let bin_data = bincode::serialize(&item)?; batch.put(self.key, bin_data); Ok(()) diff --git a/consensus/src/model/stores/ghostdag.rs b/consensus/src/model/stores/ghostdag.rs index 26795e7f3..05a956e33 100644 --- a/consensus/src/model/stores/ghostdag.rs +++ b/consensus/src/model/stores/ghostdag.rs @@ -1,5 +1,6 @@ use crate::processes::ghostdag::ordering::SortableBlock; +use super::caching::CachedDbAccessForCopy; use super::{caching::CachedDbAccess, errors::StoreError, DB}; use consensus_core::{blockhash::BlockHashes, BlueWorkType}; use hashes::Hash; @@ -22,6 +23,13 @@ pub struct GhostdagData { pub blues_anticone_sizes: HashKTypeMap, } +#[derive(Clone, Serialize, Deserialize, Copy)] +pub struct CompactGhostdagData { + pub blue_score: u64, + pub blue_work: BlueWorkType, + pub selected_parent: Hash, +} + impl GhostdagData { pub fn new( blue_score: u64, @@ -115,6 +123,10 @@ impl GhostdagData { pub fn unordered_mergeset(&self) -> impl Iterator + '_ { self.mergeset_blues.iter().cloned().chain(self.mergeset_reds.iter().cloned()) } + + pub fn to_compact(&self) -> CompactGhostdagData { + CompactGhostdagData { blue_score: self.blue_score, blue_work: self.blue_work, selected_parent: self.selected_parent } + } } impl GhostdagData { @@ -163,6 +175,8 @@ pub trait GhostdagStoreReader { /// Returns full block data for the requested hash fn get_data(&self, hash: Hash) -> Result, StoreError>; + fn get_compact_data(&self, hash: Hash) -> Result; + /// Check if the store contains data for the requested hash fn has(&self, hash: Hash) -> Result; } @@ -176,6 +190,7 @@ pub trait GhostdagStore: GhostdagStoreReader { } const STORE_PREFIX: &[u8] = b"block-ghostdag-data"; +const COMPACT_STORE_PREFIX: &[u8] = b"compact-block-ghostdag-data"; /// A DB + cache implementation of `GhostdagStore` trait, with concurrency support. #[derive(Clone)] @@ -183,11 +198,16 @@ pub struct DbGhostdagStore { raw_db: Arc, // `CachedDbAccess` is shallow cloned so no need to wrap with Arc cached_access: CachedDbAccess, + compact_cached_access: CachedDbAccessForCopy, } impl DbGhostdagStore { pub fn new(db: Arc, cache_size: u64) -> Self { - Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccess::new(db, cache_size, STORE_PREFIX) } + Self { + raw_db: Arc::clone(&db), + cached_access: CachedDbAccess::new(db.clone(), cache_size, STORE_PREFIX), + compact_cached_access: CachedDbAccessForCopy::new(db, cache_size, COMPACT_STORE_PREFIX), + } } pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { @@ -199,6 +219,11 @@ impl DbGhostdagStore { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } self.cached_access.write_batch(batch, hash, data)?; + self.compact_cached_access.write_batch( + batch, + hash, + CompactGhostdagData { blue_score: data.blue_score, blue_work: data.blue_work, selected_parent: data.selected_parent }, + )?; Ok(()) } } @@ -232,6 +257,10 @@ impl GhostdagStoreReader for DbGhostdagStore { self.cached_access.read(hash) } + fn get_compact_data(&self, hash: Hash) -> Result { + self.compact_cached_access.read(hash) + } + fn has(&self, hash: Hash) -> Result { self.cached_access.has(hash) } @@ -243,6 +272,13 @@ impl GhostdagStore for DbGhostdagStore { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } self.cached_access.write(hash, &data)?; + if self.compact_cached_access.has(hash)? { + return Err(StoreError::KeyAlreadyExists(hash.to_string())); + } + self.compact_cached_access.write( + hash, + CompactGhostdagData { blue_score: data.blue_score, blue_work: data.blue_work, selected_parent: data.selected_parent }, + )?; Ok(()) } } @@ -353,6 +389,10 @@ impl GhostdagStoreReader for MemoryGhostdagStore { fn has(&self, hash: Hash) -> Result { Ok(self.blue_score_map.borrow().contains_key(&hash)) } + + fn get_compact_data(&self, hash: Hash) -> Result { + Ok(self.get_data(hash)?.to_compact()) + } } #[cfg(test)] diff --git a/consensus/src/model/stores/headers.rs b/consensus/src/model/stores/headers.rs index 0c1f04649..67d44d167 100644 --- a/consensus/src/model/stores/headers.rs +++ b/consensus/src/model/stores/headers.rs @@ -12,15 +12,23 @@ use serde::{Deserialize, Serialize}; pub trait HeaderStoreReader { fn get_daa_score(&self, hash: Hash) -> Result; + fn get_blue_score(&self, hash: Hash) -> Result; fn get_timestamp(&self, hash: Hash) -> Result; fn get_bits(&self, hash: Hash) -> Result; fn get_header(&self, hash: Hash) -> Result, StoreError>; + fn get_header_with_block_level(&self, hash: Hash) -> Result, StoreError>; fn get_compact_header_data(&self, hash: Hash) -> Result; } +#[derive(Clone, Serialize, Deserialize)] +pub struct HeaderWithBlockLevel { + pub header: Arc
, + pub block_level: u8, +} + pub trait HeaderStore: HeaderStoreReader { // This is append only - fn insert(&self, hash: Hash, header: Arc
) -> Result<(), StoreError>; + fn insert(&self, hash: Hash, header: Arc
, block_level: u8) -> Result<(), StoreError>; } const HEADERS_STORE_PREFIX: &[u8] = b"headers"; @@ -31,6 +39,7 @@ pub struct CompactHeaderData { pub daa_score: u64, pub timestamp: u64, pub bits: u32, + pub blue_score: u64, } /// A DB + cache implementation of `HeaderStore` trait, with concurrency support. @@ -39,7 +48,7 @@ pub struct DbHeadersStore { raw_db: Arc, // `CachedDbAccess` is shallow cloned so no need to wrap with Arc cached_compact_headers_access: CachedDbAccessForCopy, - cached_headers_access: CachedDbAccess, + cached_headers_access: CachedDbAccess, } impl DbHeadersStore { @@ -55,15 +64,24 @@ impl DbHeadersStore { Self::new(Arc::clone(&self.raw_db), cache_size) } - pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, header: Arc
) -> Result<(), StoreError> { + pub fn insert_batch(&self, batch: &mut WriteBatch, hash: Hash, header: Arc
, block_level: u8) -> Result<(), StoreError> { if self.cached_headers_access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_headers_access.write_batch(batch, hash, &header)?; + self.cached_headers_access.write_batch( + batch, + hash, + &Arc::new(HeaderWithBlockLevel { header: header.clone(), block_level }), + )?; self.cached_compact_headers_access.write_batch( batch, hash, - CompactHeaderData { daa_score: header.daa_score, timestamp: header.timestamp, bits: header.bits }, + CompactHeaderData { + daa_score: header.daa_score, + timestamp: header.timestamp, + bits: header.bits, + blue_score: header.blue_score, + }, )?; Ok(()) } @@ -71,46 +89,69 @@ impl DbHeadersStore { impl HeaderStoreReader for DbHeadersStore { fn get_daa_score(&self, hash: Hash) -> Result { - if let Some(header) = self.cached_headers_access.read_from_cache(hash) { - return Ok(header.daa_score); + if let Some(header_with_block_level) = self.cached_headers_access.read_from_cache(hash) { + return Ok(header_with_block_level.header.daa_score); } Ok(self.cached_compact_headers_access.read(hash)?.daa_score) } + fn get_blue_score(&self, hash: Hash) -> Result { + if let Some(header_with_block_level) = self.cached_headers_access.read_from_cache(hash) { + return Ok(header_with_block_level.header.blue_score); + } + Ok(self.cached_compact_headers_access.read(hash)?.blue_score) + } + fn get_timestamp(&self, hash: Hash) -> Result { - if let Some(header) = self.cached_headers_access.read_from_cache(hash) { - return Ok(header.timestamp); + if let Some(header_with_block_level) = self.cached_headers_access.read_from_cache(hash) { + return Ok(header_with_block_level.header.timestamp); } Ok(self.cached_compact_headers_access.read(hash)?.timestamp) } fn get_bits(&self, hash: Hash) -> Result { - if let Some(header) = self.cached_headers_access.read_from_cache(hash) { - return Ok(header.bits); + if let Some(header_with_block_level) = self.cached_headers_access.read_from_cache(hash) { + return Ok(header_with_block_level.header.bits); } Ok(self.cached_compact_headers_access.read(hash)?.bits) } fn get_header(&self, hash: Hash) -> Result, StoreError> { + Ok(self.cached_headers_access.read(hash)?.header.clone()) + } + + fn get_header_with_block_level(&self, hash: Hash) -> Result, StoreError> { self.cached_headers_access.read(hash) } fn get_compact_header_data(&self, hash: Hash) -> Result { - if let Some(header) = self.cached_headers_access.read_from_cache(hash) { - return Ok(CompactHeaderData { daa_score: header.daa_score, timestamp: header.timestamp, bits: header.bits }); + if let Some(header_with_block_level) = self.cached_headers_access.read_from_cache(hash) { + return Ok(CompactHeaderData { + daa_score: header_with_block_level.header.daa_score, + timestamp: header_with_block_level.header.timestamp, + bits: header_with_block_level.header.bits, + blue_score: header_with_block_level.header.blue_score, + }); } self.cached_compact_headers_access.read(hash) } } impl HeaderStore for DbHeadersStore { - fn insert(&self, hash: Hash, header: Arc
) -> Result<(), StoreError> { + fn insert(&self, hash: Hash, header: Arc
, block_level: u8) -> Result<(), StoreError> { if self.cached_headers_access.has(hash)? { return Err(StoreError::KeyAlreadyExists(hash.to_string())); } - self.cached_compact_headers_access - .write(hash, CompactHeaderData { daa_score: header.daa_score, timestamp: header.timestamp, bits: header.bits })?; - self.cached_headers_access.write(hash, &header)?; + self.cached_compact_headers_access.write( + hash, + CompactHeaderData { + daa_score: header.daa_score, + timestamp: header.timestamp, + bits: header.bits, + blue_score: header.blue_score, + }, + )?; + self.cached_headers_access.write(hash, &Arc::new(HeaderWithBlockLevel { header, block_level }))?; Ok(()) } } diff --git a/consensus/src/model/stores/mod.rs b/consensus/src/model/stores/mod.rs index 747f109dd..e396f4dca 100644 --- a/consensus/src/model/stores/mod.rs +++ b/consensus/src/model/stores/mod.rs @@ -6,6 +6,7 @@ pub mod depth; pub mod errors; pub mod ghostdag; pub mod headers; +pub mod past_pruning_points; pub mod pruning; pub mod reachability; pub mod relations; diff --git a/consensus/src/model/stores/past_pruning_points.rs b/consensus/src/model/stores/past_pruning_points.rs new file mode 100644 index 000000000..8a3c46a93 --- /dev/null +++ b/consensus/src/model/stores/past_pruning_points.rs @@ -0,0 +1,83 @@ +use std::{fmt::Display, sync::Arc}; + +use super::{ + caching::CachedDbAccessForCopy, + errors::{StoreError, StoreResult}, + DB, +}; +use hashes::Hash; +use rocksdb::WriteBatch; + +#[derive(PartialEq, Eq, Clone, Copy, Hash)] +pub struct Key([u8; 8]); + +impl From for Key { + fn from(value: u64) -> Self { + Self(value.to_le_bytes()) // TODO: Consider using big-endian for future ordering. + } +} + +impl AsRef<[u8]> for Key { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl Display for Key { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", u64::from_le_bytes(self.0)) + } +} + +pub trait PastPruningPointsStoreReader { + fn get(&self, index: u64) -> StoreResult; +} + +pub trait PastPruningPointsStore: PastPruningPointsStoreReader { + // This is append only + fn insert(&self, index: u64, pruning_point: Hash) -> StoreResult<()>; +} + +const STORE_PREFIX: &[u8] = b"past-pruning-points"; + +/// A DB + cache implementation of `PastPruningPointsStore` trait, with concurrency support. +#[derive(Clone)] +pub struct DbPastPruningPointsStore { + raw_db: Arc, + // `CachedDbAccess` is shallow cloned so no need to wrap with Arc + cached_access: CachedDbAccessForCopy, +} + +impl DbPastPruningPointsStore { + pub fn new(db: Arc, cache_size: u64) -> Self { + Self { raw_db: Arc::clone(&db), cached_access: CachedDbAccessForCopy::new(Arc::clone(&db), cache_size, STORE_PREFIX) } + } + + pub fn clone_with_new_cache(&self, cache_size: u64) -> Self { + Self::new(Arc::clone(&self.raw_db), cache_size) + } + + pub fn insert_batch(&self, batch: &mut WriteBatch, index: u64, pruning_point: Hash) -> Result<(), StoreError> { + if self.cached_access.has(index.into())? { + return Err(StoreError::KeyAlreadyExists(index.to_string())); + } + self.cached_access.write_batch(batch, index.into(), pruning_point)?; + Ok(()) + } +} + +impl PastPruningPointsStoreReader for DbPastPruningPointsStore { + fn get(&self, index: u64) -> StoreResult { + self.cached_access.read(index.into()) + } +} + +impl PastPruningPointsStore for DbPastPruningPointsStore { + fn insert(&self, index: u64, pruning_point: Hash) -> StoreResult<()> { + if self.cached_access.has(index.into())? { + return Err(StoreError::KeyAlreadyExists(index.to_string())); + } + self.cached_access.write(index.into(), pruning_point)?; + Ok(()) + } +} diff --git a/consensus/src/model/stores/pruning.rs b/consensus/src/model/stores/pruning.rs index e497c7088..8b7ce248c 100644 --- a/consensus/src/model/stores/pruning.rs +++ b/consensus/src/model/stores/pruning.rs @@ -2,14 +2,40 @@ use std::sync::Arc; use super::{caching::CachedDbItem, errors::StoreResult, DB}; use hashes::Hash; +use rocksdb::WriteBatch; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy, Serialize, Deserialize)] +pub struct PruningPointInfo { + pub pruning_point: Hash, + pub candidate: Hash, + pub index: u64, +} + +impl PruningPointInfo { + pub fn new(pruning_point: Hash, candidate: Hash, index: u64) -> Self { + Self { pruning_point, candidate, index } + } + + pub fn from_genesis(genesis_hash: Hash) -> Self { + Self { pruning_point: genesis_hash, candidate: genesis_hash, index: 0 } + } + + pub fn decompose(self) -> (Hash, Hash, u64) { + (self.pruning_point, self.candidate, self.index) + } +} /// Reader API for `PruningStore`. pub trait PruningStoreReader { fn pruning_point(&self) -> StoreResult; + fn pruning_point_candidate(&self) -> StoreResult; + fn pruning_point_index(&self) -> StoreResult; + fn get(&self) -> StoreResult; } pub trait PruningStore: PruningStoreReader { - fn set(&mut self, hash: Hash) -> StoreResult<()>; + fn set(&mut self, pruning_point: Hash, candidate: Hash, index: u64) -> StoreResult<()>; } const STORE_PREFIX: &[u8] = b"pruning"; @@ -18,29 +44,45 @@ const STORE_PREFIX: &[u8] = b"pruning"; #[derive(Clone)] pub struct DbPruningStore { raw_db: Arc, - pruning_point: CachedDbItem, + cached_access: CachedDbItem, } const PRUNING_POINT_KEY: &[u8] = b"pruning-point"; impl DbPruningStore { pub fn new(db: Arc) -> Self { - Self { raw_db: Arc::clone(&db), pruning_point: CachedDbItem::new(db, PRUNING_POINT_KEY) } + Self { raw_db: Arc::clone(&db), cached_access: CachedDbItem::new(db.clone(), PRUNING_POINT_KEY) } } pub fn clone_with_new_cache(&self) -> Self { Self::new(Arc::clone(&self.raw_db)) } + + pub fn set_batch(&mut self, batch: &mut WriteBatch, pruning_point: Hash, candidate: Hash, index: u64) -> StoreResult<()> { + self.cached_access.write_batch(batch, &PruningPointInfo { pruning_point, candidate, index }) + } } impl PruningStoreReader for DbPruningStore { fn pruning_point(&self) -> StoreResult { - self.pruning_point.read() + Ok(self.cached_access.read()?.pruning_point) + } + + fn pruning_point_candidate(&self) -> StoreResult { + Ok(self.cached_access.read()?.candidate) + } + + fn pruning_point_index(&self) -> StoreResult { + Ok(self.cached_access.read()?.index) + } + + fn get(&self) -> StoreResult { + self.cached_access.read() } } impl PruningStore for DbPruningStore { - fn set(&mut self, hash: Hash) -> StoreResult<()> { - self.pruning_point.write(&hash) + fn set(&mut self, pruning_point: Hash, candidate: Hash, index: u64) -> StoreResult<()> { + self.cached_access.write(&PruningPointInfo { pruning_point, candidate, index }) } } diff --git a/consensus/src/params.rs b/consensus/src/params.rs index 21fe6169f..7e911ac27 100644 --- a/consensus/src/params.rs +++ b/consensus/src/params.rs @@ -15,6 +15,7 @@ pub struct Params { pub mergeset_size_limit: u64, pub merge_depth: u64, pub finality_depth: u64, + pub pruning_depth: u64, pub coinbase_payload_script_public_key_max_len: u8, pub max_coinbase_payload_len: usize, pub max_tx_inputs: usize, @@ -28,6 +29,7 @@ pub struct Params { pub deflationary_phase_daa_score: u64, pub pre_deflationary_phase_base_subsidy: u64, pub skip_proof_of_work: bool, + pub max_block_level: u8, } impl Params { @@ -53,6 +55,7 @@ pub const MAINNET_PARAMS: Params = Params { mergeset_size_limit: (DEFAULT_GHOSTDAG_K as u64) * 10, merge_depth: 3600, finality_depth: 86400, + pruning_depth: 185798, coinbase_payload_script_public_key_max_len: 150, max_coinbase_payload_len: 204, @@ -78,6 +81,48 @@ pub const MAINNET_PARAMS: Params = Params { // Three days in seconds = 3 * 24 * 60 * 60 = 259200 deflationary_phase_daa_score: 15778800 - 259200, pre_deflationary_phase_base_subsidy: 50000000000, + skip_proof_of_work: false, + max_block_level: 225, +}; + +pub const DEVNET_PARAMS: Params = Params { + genesis_hash: Hash::from_bytes([1u8; HASH_SIZE]), // TODO: Use real mainnet genesis here + ghostdag_k: DEFAULT_GHOSTDAG_K, + timestamp_deviation_tolerance: 132, + target_time_per_block: 1000, + max_block_parents: 10, + difficulty_window_size: 2641, + genesis_timestamp: 0, // TODO: Use real value + genesis_bits: 0x207fffff, // TODO: Use real value + mergeset_size_limit: (DEFAULT_GHOSTDAG_K as u64) * 10, + merge_depth: 3600, + finality_depth: 86400, + pruning_depth: 185798, + coinbase_payload_script_public_key_max_len: 150, + max_coinbase_payload_len: 204, + + // This is technically a soft fork from the Go implementation since kaspad's consensus doesn't + // check these rules, but in practice it's encorced by the network layer that limits the message + // size to 1 GB. + // These values should be lowered to more reasonable amounts on the next planned HF/SF. + max_tx_inputs: 1_000_000_000, + max_tx_outputs: 1_000_000_000, + max_signature_script_len: 1_000_000_000, + max_script_public_key_len: 1_000_000_000, + mass_per_tx_byte: 1, + mass_per_script_pub_key_byte: 10, + mass_per_sig_op: 1000, + max_block_mass: 500_000, + + // deflationary_phase_daa_score is the DAA score after which the pre-deflationary period + // switches to the deflationary period. This number is calculated as follows: + // We define a year as 365.25 days + // Half a year in seconds = 365.25 / 2 * 24 * 60 * 60 = 15778800 + // The network was down for three days shortly after launch + // Three days in seconds = 3 * 24 * 60 * 60 = 259200 + deflationary_phase_daa_score: 15778800 - 259200, + pre_deflationary_phase_base_subsidy: 50000000000, skip_proof_of_work: false, + max_block_level: 250, }; diff --git a/consensus/src/pipeline/header_processor/post_pow_validation.rs b/consensus/src/pipeline/header_processor/post_pow_validation.rs index baa06b05c..51336b87f 100644 --- a/consensus/src/pipeline/header_processor/post_pow_validation.rs +++ b/consensus/src/pipeline/header_processor/post_pow_validation.rs @@ -1,8 +1,9 @@ use super::{HeaderProcessingContext, HeaderProcessor}; -use crate::errors::{BlockProcessResult, RuleError}; +use crate::errors::{BlockProcessResult, RuleError, TwoDimVecDisplay}; use crate::model::services::reachability::ReachabilityService; use consensus_core::header::Header; use hashes::Hash; +use std::collections::HashSet; use std::sync::Arc; impl HeaderProcessor { @@ -69,7 +70,23 @@ impl HeaderProcessor { ctx: &mut HeaderProcessingContext, header: &Header, ) -> BlockProcessResult<()> { - // TODO: Implement this + let expected_block_parents = self.parents_manager.calc_block_parents(ctx.pruning_point(), header.direct_parents()); + if header.parents_by_level.len() != expected_block_parents.len() + || !expected_block_parents.iter().enumerate().all(|(block_level, expected_level_parents)| { + let header_level_parents = &header.parents_by_level[block_level]; + if header_level_parents.len() != expected_level_parents.len() { + return false; + } + + let expected_set = HashSet::<&Hash>::from_iter(expected_level_parents); + header_level_parents.iter().all(|header_parent| expected_set.contains(header_parent)) + }) + { + return Err(RuleError::UnexpectedIndirectParents( + TwoDimVecDisplay(expected_block_parents), + TwoDimVecDisplay(header.parents_by_level.clone()), + )); + }; Ok(()) } @@ -78,7 +95,11 @@ impl HeaderProcessor { ctx: &mut HeaderProcessingContext, header: &Header, ) -> BlockProcessResult<()> { - // TODO: Implement this + let expected = + self.pruning_manager.expected_header_pruning_point(ctx.ghostdag_data.as_ref().unwrap().to_compact(), ctx.pruning_info); + if expected != header.pruning_point { + return Err(RuleError::WrongHeaderPruningPoint(expected, header.pruning_point)); + } Ok(()) } @@ -88,8 +109,8 @@ impl HeaderProcessor { header: &Header, ) -> BlockProcessResult<()> { let gd_data = ctx.ghostdag_data.as_ref().unwrap(); - let merge_depth_root = self.depth_manager.calc_merge_depth_root(gd_data, ctx.pruning_point); - let finality_point = self.depth_manager.calc_finality_point(gd_data, ctx.pruning_point); + let merge_depth_root = self.depth_manager.calc_merge_depth_root(gd_data, ctx.pruning_point()); + let finality_point = self.depth_manager.calc_finality_point(gd_data, ctx.pruning_point()); let non_bounded_merge_depth_violating_blues: Vec = self.depth_manager.non_bounded_merge_depth_violating_blues(gd_data, merge_depth_root).collect(); diff --git a/consensus/src/pipeline/header_processor/pre_pow_validation.rs b/consensus/src/pipeline/header_processor/pre_pow_validation.rs index c299e8fe5..15f17507f 100644 --- a/consensus/src/pipeline/header_processor/pre_pow_validation.rs +++ b/consensus/src/pipeline/header_processor/pre_pow_validation.rs @@ -1,9 +1,8 @@ use super::*; use crate::errors::{BlockProcessResult, RuleError}; use crate::model::services::reachability::ReachabilityService; -use crate::model::stores::errors::StoreResultExtensions; -use crate::model::stores::pruning::PruningStoreReader; use consensus_core::header::Header; +use std::cmp::max; use std::sync::Arc; impl HeaderProcessor { @@ -17,7 +16,7 @@ impl HeaderProcessor { } self.check_pruning_violation(ctx, header)?; - self.check_pow(ctx, header)?; + self.check_pow_and_calc_block_level(ctx, header)?; self.check_difficulty_and_daa_score(ctx, header)?; Ok(()) } @@ -27,33 +26,30 @@ impl HeaderProcessor { ctx: &mut HeaderProcessingContext, header: &Header, ) -> BlockProcessResult<()> { - match self.pruning_store.read().pruning_point().unwrap_option() { - None => Ok(()), // It implictly means that genesis is the pruning point - so no violation can exist - Some(pruning_point) => { - let non_pruned_parents = ctx.get_non_pruned_parents(); - if non_pruned_parents.is_empty() { - return Ok(()); - } - - if non_pruned_parents - .iter() - .cloned() - .any(|parent| !self.reachability_service.is_dag_ancestor_of(pruning_point, parent)) - { - return Err(RuleError::PruningViolation(pruning_point)); - } + let non_pruned_parents = ctx.get_non_pruned_parents(); + if non_pruned_parents.is_empty() { + return Ok(()); + } - Ok(()) - } + // We check that the new block is in the future of the pruning point by verifying that at least + // one of its parents is in the pruning point future (or the pruning point itself). Otherwise, + // the Prunality proof implies that the block can be discarded. + if !self.reachability_service.is_dag_ancestor_of_any(ctx.pruning_point(), &mut non_pruned_parents.iter().copied()) { + return Err(RuleError::PruningViolation(ctx.pruning_point())); } + Ok(()) } - fn check_pow(self: &Arc, ctx: &mut HeaderProcessingContext, header: &Header) -> BlockProcessResult<()> { - if self.skip_proof_of_work { - return Ok(()); - } + fn check_pow_and_calc_block_level( + self: &Arc, + ctx: &mut HeaderProcessingContext, + header: &Header, + ) -> BlockProcessResult<()> { let state = pow::State::new(header); - if state.check_pow(header.nonce) { + let (passed, pow) = state.check_pow(header.nonce); + if passed || self.skip_proof_of_work { + let signed_block_level = self.max_block_level as i64 - pow.bits() as i64; + ctx.block_level = Some(max(signed_block_level, 0) as u8); Ok(()) } else { Err(RuleError::InvalidPoW) diff --git a/consensus/src/pipeline/header_processor/processor.rs b/consensus/src/pipeline/header_processor/processor.rs index 67ca87fd1..29a8eaca6 100644 --- a/consensus/src/pipeline/header_processor/processor.rs +++ b/consensus/src/pipeline/header_processor/processor.rs @@ -9,7 +9,8 @@ use crate::{ errors::StoreResultExtensions, ghostdag::{DbGhostdagStore, GhostdagData}, headers::DbHeadersStore, - pruning::{DbPruningStore, PruningStore, PruningStoreReader}, + past_pruning_points::DbPastPruningPointsStore, + pruning::{DbPruningStore, PruningPointInfo, PruningStore, PruningStoreReader}, reachability::{DbReachabilityStore, StagingReachabilityStore}, relations::{DbRelationsStore, RelationsStoreBatchExtensions}, statuses::{ @@ -23,7 +24,8 @@ use crate::{ pipeline::deps_manager::{BlockTask, BlockTaskDependencyManager}, processes::{ block_at_depth::BlockDepthManager, dagtraversalmanager::DagTraversalManager, difficulty::DifficultyManager, - ghostdag::protocol::GhostdagManager, pastmediantime::PastMedianTimeManager, reachability::inquirer as reachability, + ghostdag::protocol::GhostdagManager, parents_builder::ParentsManager, pastmediantime::PastMedianTimeManager, + pruning::PruningManager, reachability::inquirer as reachability, }, test_helpers::header_from_precomputed_hash, }; @@ -42,7 +44,7 @@ use super::super::ProcessingCounters; pub struct HeaderProcessingContext<'a> { pub hash: Hash, pub header: &'a Header, - pub pruning_point: Hash, + pub pruning_info: PruningPointInfo, // Staging data pub ghostdag_data: Option>, @@ -51,17 +53,18 @@ pub struct HeaderProcessingContext<'a> { pub daa_added_blocks: Option>, pub merge_depth_root: Option, pub finality_point: Option, + pub block_level: Option, // Cache non_pruned_parents: Option, } impl<'a> HeaderProcessingContext<'a> { - pub fn new(hash: Hash, header: &'a Header, pruning_point: Hash) -> Self { + pub fn new(hash: Hash, header: &'a Header, pruning_info: PruningPointInfo) -> Self { Self { hash, header, - pruning_point, + pruning_info, ghostdag_data: None, non_pruned_parents: None, block_window_for_difficulty: None, @@ -69,6 +72,7 @@ impl<'a> HeaderProcessingContext<'a> { block_window_for_past_median_time: None, merge_depth_root: None, finality_point: None, + block_level: None, } } @@ -81,6 +85,10 @@ impl<'a> HeaderProcessingContext<'a> { self.non_pruned_parents = Some(non_pruned_parents.clone()); non_pruned_parents } + + pub fn pruning_point(&self) -> Hash { + self.pruning_info.pruning_point + } } pub struct HeaderProcessor { @@ -97,6 +105,7 @@ pub struct HeaderProcessor { pub(super) mergeset_size_limit: u64, pub(super) genesis_bits: u32, pub(super) skip_proof_of_work: bool, + pub(super) max_block_level: u8, // DB db: Arc, @@ -125,6 +134,8 @@ pub struct HeaderProcessor { pub(super) past_median_time_manager: PastMedianTimeManager, pub(super) depth_manager: BlockDepthManager, pub(super) reachability_service: MTReachabilityService, + pub(super) pruning_manager: PruningManager, + pub(super) parents_manager: ParentsManager, // Dependency manager task_manager: BlockTaskDependencyManager, @@ -156,6 +167,8 @@ impl HeaderProcessor { dag_traversal_manager: DagTraversalManager, difficulty_manager: DifficultyManager, depth_manager: BlockDepthManager, + pruning_manager: PruningManager, + parents_manager: ParentsManager, counters: Arc, ) -> Self { Self { @@ -187,6 +200,8 @@ impl HeaderProcessor { reachability_service, past_median_time_manager, depth_manager, + pruning_manager, + parents_manager, task_manager: BlockTaskDependencyManager::new(), counters, timestamp_deviation_tolerance: params.timestamp_deviation_tolerance, @@ -195,6 +210,7 @@ impl HeaderProcessor { mergeset_size_limit: params.mergeset_size_limit, genesis_bits: params.genesis_bits, skip_proof_of_work: params.skip_proof_of_work, + max_block_level: params.max_block_level, } } @@ -257,17 +273,12 @@ impl HeaderProcessor { } // Create processing context - let mut ctx = HeaderProcessingContext::new(header.hash, header, self.pruning_store.read().pruning_point().unwrap()); + let mut ctx = HeaderProcessingContext::new(header.hash, header, self.pruning_store.read().get().unwrap()); - // Run GHOSTDAG for the new header + // Run all header validations for the new header self.pre_ghostdag_validation(&mut ctx, header)?; self.ghostdag_manager.add_block(&mut ctx, header.hash); // TODO: Run GHOSTDAG for all block levels - - // - // TODO: imp all remaining header validation and processing steps :) - // self.pre_pow_validation(&mut ctx, header)?; - if let Err(e) = self.post_pow_validation(&mut ctx, header) { self.statuses_store.write().set(ctx.hash, StatusInvalid).unwrap(); return Err(e); @@ -292,7 +303,7 @@ impl HeaderProcessor { self.block_window_cache_for_difficulty.insert(ctx.hash, Arc::new(ctx.block_window_for_difficulty.unwrap())); self.block_window_cache_for_past_median_time.insert(ctx.hash, Arc::new(ctx.block_window_for_past_median_time.unwrap())); self.daa_store.insert_batch(&mut batch, ctx.hash, Arc::new(ctx.daa_added_blocks.unwrap())).unwrap(); - self.headers_store.insert_batch(&mut batch, ctx.hash, Arc::new(ctx.header.clone())).unwrap(); + self.headers_store.insert_batch(&mut batch, ctx.hash, Arc::new(ctx.header.clone()), ctx.block_level.unwrap()).unwrap(); self.depth_store.insert_batch(&mut batch, ctx.hash, ctx.merge_depth_root.unwrap(), ctx.finality_point.unwrap()).unwrap(); // Create staging reachability store. We use an upgradable read here to avoid concurrent @@ -315,8 +326,11 @@ impl HeaderProcessor { // Non-append only stores need to use write locks. // Note we need to keep the lock write guards until the batch is written. - let relations_write_guard = - self.relations_store.insert_batch(&mut batch, header.hash, BlockHashes::new(header.direct_parents().clone())).unwrap(); + let relations_write_guard = if header.direct_parents().is_empty() { + self.relations_store.insert_batch(&mut batch, header.hash, BlockHashes::new(vec![ORIGIN])).unwrap() + } else { + self.relations_store.insert_batch(&mut batch, header.hash, BlockHashes::new(header.direct_parents().clone())).unwrap() + }; let statuses_write_guard = self.statuses_store.set_batch(&mut batch, ctx.hash, StatusHeaderOnly).unwrap(); @@ -339,16 +353,23 @@ impl HeaderProcessor { return; } - self.pruning_store.write().set(self.genesis_hash).unwrap(); + { + let mut batch = WriteBatch::default(); + let write_guard = self.relations_store.insert_batch(&mut batch, ORIGIN, BlockHashes::new(vec![])); + self.db.write(batch).unwrap(); + } + + self.pruning_store.write().set(self.genesis_hash, self.genesis_hash, 0).unwrap(); let mut header = header_from_precomputed_hash(self.genesis_hash, vec![]); // TODO header.bits = self.genesis_bits; - let mut ctx = HeaderProcessingContext::new(self.genesis_hash, &header, ORIGIN); + let mut ctx = HeaderProcessingContext::new(self.genesis_hash, &header, PruningPointInfo::from_genesis(self.genesis_hash)); self.ghostdag_manager.add_genesis_if_needed(&mut ctx); ctx.block_window_for_difficulty = Some(Default::default()); ctx.block_window_for_past_median_time = Some(Default::default()); ctx.daa_added_blocks = Some(Default::default()); ctx.merge_depth_root = Some(ORIGIN); ctx.finality_point = Some(ORIGIN); + ctx.block_level = Some(self.max_block_level); self.commit_header(ctx, &header); } } diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 5628a57cc..185005dee 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -3,17 +3,29 @@ use crate::{ model::{ services::reachability::MTReachabilityService, stores::{ + errors::StoreResultExtensions, + ghostdag::{DbGhostdagStore, GhostdagStoreReader}, + headers::DbHeadersStore, + past_pruning_points::DbPastPruningPointsStore, + past_pruning_points::PastPruningPointsStore, + pruning::{DbPruningStore, PruningStore, PruningStoreReader}, reachability::DbReachabilityStore, - statuses::{BlockStatus, DbStatusesStore}, + statuses::{BlockStatus, DbStatusesStore, StatusesStore, StatusesStoreReader}, DB, }, }, pipeline::deps_manager::BlockTask, + processes::pruning::PruningManager, }; -use consensus_core::block::Block; +use consensus_core::{block::Block, blockhash::VIRTUAL}; use crossbeam_channel::Receiver; -use parking_lot::RwLock; -use std::sync::Arc; +use hashes::Hash; +use parking_lot::{RwLock, RwLockUpgradableReadGuard}; +use rocksdb::WriteBatch; +use std::sync::{ + atomic::{self, AtomicBool}, + Arc, +}; pub struct VirtualStateProcessor { // Channels @@ -22,21 +34,48 @@ pub struct VirtualStateProcessor { // DB db: Arc, + // Config + genesis_hash: Hash, + // Stores pub(super) statuses_store: Arc>, + pruning_store: Arc>, + past_pruning_points_store: Arc, + ghostdag_store: Arc, // Managers and services pub(super) reachability_service: MTReachabilityService, + pub(super) pruning_manager: PruningManager, + + is_updating_pruning_point_or_candidate: AtomicBool, } impl VirtualStateProcessor { pub fn new( receiver: Receiver, db: Arc, + genesis_hash: Hash, statuses_store: Arc>, + pruning_store: Arc>, + ghostdag_store: Arc, + past_pruning_points_store: Arc, reachability_service: MTReachabilityService, + pruning_manager: PruningManager, ) -> Self { - Self { receiver, db, statuses_store, reachability_service } + Self { + receiver, + db, + genesis_hash, + + statuses_store, + reachability_service, + is_updating_pruning_point_or_candidate: false.into(), + pruning_store, + ghostdag_store, + past_pruning_points_store, + + pruning_manager, + } } pub fn worker(self: &Arc) { @@ -57,4 +96,51 @@ impl VirtualStateProcessor { fn resolve_virtual(self: &Arc, block: &Block) -> BlockProcessResult { Ok(BlockStatus::StatusUTXOPendingVerification) } + + fn maybe_update_pruning_point_and_candidate(self: &Arc) { + if self + .is_updating_pruning_point_or_candidate + .compare_exchange(false, true, atomic::Ordering::Acquire, atomic::Ordering::Relaxed) + .is_err() + { + return; + } + + { + let pruning_read_guard = self.pruning_store.upgradable_read(); + let current_pp = pruning_read_guard.pruning_point().unwrap(); + let current_pp_candidate = pruning_read_guard.pruning_point_candidate().unwrap(); + let virtual_gd = self.ghostdag_store.get_compact_data(VIRTUAL).unwrap(); + let (new_pruning_point, new_candidate) = self.pruning_manager.next_pruning_point_and_candidate_by_block_hash( + virtual_gd, + None, + current_pp_candidate, + current_pp, + ); + + if new_pruning_point != current_pp { + let mut batch = WriteBatch::default(); + let new_pp_index = pruning_read_guard.pruning_point_index().unwrap() + 1; + let mut write_guard = RwLockUpgradableReadGuard::upgrade(pruning_read_guard); + write_guard.set_batch(&mut batch, new_pruning_point, new_candidate, new_pp_index).unwrap(); + self.past_pruning_points_store.insert_batch(&mut batch, new_pp_index, new_pruning_point).unwrap(); + self.db.write(batch).unwrap(); + // TODO: Move PP UTXO etc + } else if new_candidate != current_pp_candidate { + let pp_index = pruning_read_guard.pruning_point_index().unwrap(); + let mut write_guard = RwLockUpgradableReadGuard::upgrade(pruning_read_guard); + write_guard.set(new_pruning_point, new_candidate, pp_index).unwrap(); + } + } + + self.is_updating_pruning_point_or_candidate.store(false, atomic::Ordering::Release); + } + + pub fn process_genesis_if_needed(self: &Arc) { + if let Some(BlockStatus::StatusUTXOValid) = self.statuses_store.read().get(self.genesis_hash).unwrap_option() { + return; + } + self.past_pruning_points_store.insert(0, self.genesis_hash).unwrap(); + self.statuses_store.write().set(self.genesis_hash, BlockStatus::StatusUTXOValid).unwrap(); + } } diff --git a/consensus/src/processes/block_at_depth.rs b/consensus/src/processes/block_at_depth.rs index b93c88007..839946912 100644 --- a/consensus/src/processes/block_at_depth.rs +++ b/consensus/src/processes/block_at_depth.rs @@ -69,7 +69,6 @@ impl Bl let required_blue_score = ghostdag_data.blue_score - depth; for chain_block in self.reachability_service.forward_chain_iterator(current, ghostdag_data.selected_parent, true) { - let chain_block = chain_block.unwrap(); if self.ghostdag_store.get_blue_score(chain_block).unwrap() >= required_blue_score { break; } diff --git a/consensus/src/processes/mod.rs b/consensus/src/processes/mod.rs index c45786b2e..2c805f936 100644 --- a/consensus/src/processes/mod.rs +++ b/consensus/src/processes/mod.rs @@ -4,6 +4,8 @@ pub mod dagtraversalmanager; pub mod difficulty; pub mod ghostdag; pub mod mass; +pub mod parents_builder; pub mod pastmediantime; +pub mod pruning; pub mod reachability; pub mod transaction_validator; diff --git a/consensus/src/processes/parents_builder.rs b/consensus/src/processes/parents_builder.rs new file mode 100644 index 000000000..8c8d30813 --- /dev/null +++ b/consensus/src/processes/parents_builder.rs @@ -0,0 +1,187 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use consensus_core::{blockhash::ORIGIN, header::Header}; +use hashes::Hash; +use itertools::Itertools; +use parking_lot::RwLock; + +use crate::{ + model::{ + services::reachability::{MTReachabilityService, ReachabilityService}, + stores::{ + errors::StoreError, headers::HeaderStoreReader, reachability::ReachabilityStoreReader, relations::RelationsStoreReader, + }, + }, + processes::reachability::ReachabilityError, +}; + +#[derive(Clone)] +pub struct ParentsManager { + max_block_level: u8, + genesis_hash: Hash, + + headers_store: Arc, + reachability_service: MTReachabilityService, + relations_store: Arc>, +} + +impl ParentsManager { + pub fn new( + max_block_level: u8, + genesis_hash: Hash, + headers_store: Arc, + reachability_service: MTReachabilityService, + relations_store: Arc>, + ) -> Self { + Self { max_block_level, genesis_hash, headers_store, reachability_service, relations_store } + } + + pub fn calc_block_parents(&self, pruning_point: Hash, direct_parents: &[Hash]) -> Vec> { + let mut direct_parent_headers = + direct_parents.iter().copied().map(|parent| self.headers_store.get_header_with_block_level(parent).unwrap()).collect_vec(); + + // The first candidates to be added should be from a parent in the future of the pruning + // point, so later on we'll know that every block that doesn't have reachability data + // (i.e. pruned) is necessarily in the past of the current candidates and cannot be + // considered as a valid candidate. + // This is why we sort the direct parent headers in a way that the first one will be + // in the future of the pruning point. + let first_parent_in_future_of_pruning_point_index = direct_parents + .iter() + .copied() + .position(|parent| self.reachability_service.is_dag_ancestor_of(pruning_point, parent)) + .expect("at least one of the parents is expected to be in the future of the pruning point"); + direct_parent_headers.swap(0, first_parent_in_future_of_pruning_point_index); + + let mut candidates_by_level_to_reference_blocks_map = (0..self.max_block_level + 1).map(|level| HashMap::new()).collect_vec(); + // Direct parents are guaranteed to be in one other's anticones so add them all to + // all the block levels they occupy. + for direct_parent_header in direct_parent_headers.iter() { + for level in 0..direct_parent_header.block_level + 1 { + candidates_by_level_to_reference_blocks_map[level as usize] + .insert(direct_parent_header.header.hash, vec![direct_parent_header.header.hash]); + } + } + + let origin_children = self.relations_store.read().get_children(ORIGIN).unwrap(); + let origin_children_headers = + origin_children.iter().copied().map(|parent| self.headers_store.get_header(parent).unwrap()).collect_vec(); + + for direct_parent_header in direct_parent_headers { + for (block_level, direct_parent_level_parents) in self.parents(&direct_parent_header.header).enumerate() { + let is_empty_level = candidates_by_level_to_reference_blocks_map[block_level].is_empty(); + + for parent in direct_parent_level_parents.iter().copied() { + let mut is_in_future_origin_children = false; + for child in origin_children.iter().copied() { + match self.reachability_service.is_dag_ancestor_of_result(child, parent) { + Ok(is_in_future_of_child) => { + if is_in_future_of_child { + is_in_future_origin_children = true; + break; + } + } + Err(ReachabilityError::StoreError(e)) => { + if let StoreError::KeyNotFound(_) = e { + break; + } else { + panic!("Unexpected store error: {:?}", e) + } + } + Err(err) => panic!("Unexpected reachability error: {:?}", err), + } + } + + // Reference blocks are the blocks that are used in reachability queries to check if + // a candidate is in the future of another candidate. In most cases this is just the + // block itself, but in the case where a block doesn't have reachability data we need + // to use some blocks in its future as reference instead. + // If we make sure to add a parent in the future of the pruning point first, we can + // know that any pruned candidate that is in the past of some blocks in the pruning + // point anticone should be a parent (in the relevant level) of one of + // the virtual genesis children in the pruning point anticone. So we can check which + // virtual genesis children have this block as parent and use those block as + // reference blocks. + let reference_blocks = if is_in_future_origin_children { + vec![parent] + } else { + let mut reference_blocks = Vec::with_capacity(origin_children.len()); + for child_header in origin_children_headers.iter() { + if self.parents_at_level(child_header, block_level as u8).contains(&parent) { + reference_blocks.push(child_header.hash); + } + } + reference_blocks + }; + + if is_empty_level { + candidates_by_level_to_reference_blocks_map[block_level].insert(parent, reference_blocks); + continue; + } + + if !is_in_future_origin_children { + continue; + } + + let mut to_remove = HashSet::new(); + for (candidate, candidate_references) in candidates_by_level_to_reference_blocks_map[block_level].iter() { + if self.reachability_service.is_any_dag_ancestor(&mut candidate_references.iter().copied(), parent) { + to_remove.insert(*candidate); + continue; + } + } + + for hash in to_remove.iter() { + candidates_by_level_to_reference_blocks_map[block_level].remove(hash); + } + + let is_ancestor_of_any_candidate = + candidates_by_level_to_reference_blocks_map[block_level].iter().any(|(_, candidate_references)| { + self.reachability_service.is_dag_ancestor_of_any(parent, &mut candidate_references.iter().copied()) + }); + + // We should add the block as a candidate if it's in the future of another candidate + // or in the anticone of all candidates. + if !is_ancestor_of_any_candidate || !to_remove.is_empty() { + candidates_by_level_to_reference_blocks_map[block_level].insert(parent, reference_blocks); + } + } + } + } + + let mut parents = Vec::with_capacity(self.max_block_level as usize); + for (block_level, reference_blocks_map) in candidates_by_level_to_reference_blocks_map.iter().enumerate() { + if block_level > 0 && reference_blocks_map.contains_key(&self.genesis_hash) && reference_blocks_map.len() == 1 { + break; + } + + let level_blocks = reference_blocks_map.keys().copied().collect_vec(); + parents.push(reference_blocks_map.keys().copied().collect_vec()); + } + + parents + } + + pub fn parents<'a>(&'a self, header: &'a Header) -> impl ExactSizeIterator { + (0..self.max_block_level).map(|level| self.parents_at_level(header, level)) + } + + pub fn parents_at_level<'a>(&'a self, header: &'a Header, level: u8) -> &'a [Hash] { + if header.direct_parents().is_empty() { + // If is genesis + &[] + } else if header.parents_by_level.len() > level as usize { + &header.parents_by_level[level as usize][..] + } else { + std::slice::from_ref(&self.genesis_hash) + } + } +} + +#[cfg(test)] +mod tests { + // TODO: add unit-tests for calc_block_parents +} diff --git a/consensus/src/processes/pruning.rs b/consensus/src/processes/pruning.rs new file mode 100644 index 000000000..6d95ee90c --- /dev/null +++ b/consensus/src/processes/pruning.rs @@ -0,0 +1,174 @@ +use std::sync::Arc; + +use crate::model::{ + services::reachability::{MTReachabilityService, ReachabilityService}, + stores::{ + errors::StoreError, + ghostdag::{CompactGhostdagData, GhostdagStoreReader}, + headers::HeaderStoreReader, + past_pruning_points::PastPruningPointsStoreReader, + pruning::PruningPointInfo, + reachability::ReachabilityStoreReader, + }, +}; +use hashes::Hash; + +use super::reachability::ReachabilityError; + +#[derive(Clone)] +pub struct PruningManager { + pruning_depth: u64, + finality_depth: u64, + genesis_hash: Hash, + + reachability_service: MTReachabilityService, + ghostdag_store: Arc, + headers_store: Arc, + past_pruning_points_store: Arc, +} + +impl + PruningManager +{ + pub fn new( + pruning_depth: u64, + finality_depth: u64, + genesis_hash: Hash, + reachability_service: MTReachabilityService, + ghostdag_store: Arc, + headers_store: Arc, + past_pruning_points_store: Arc, + ) -> Self { + Self { + pruning_depth, + finality_depth, + genesis_hash, + reachability_service, + ghostdag_store, + headers_store, + past_pruning_points_store, + } + } + + pub fn next_pruning_point_and_candidate_by_block_hash( + &self, + ghostdag_data: CompactGhostdagData, + suggested_low_hash: Option, + current_candidate: Hash, + current_pruning_point: Hash, + ) -> (Hash, Hash) { + let low_hash = match suggested_low_hash { + Some(suggested) => { + if !self.reachability_service.is_chain_ancestor_of(suggested, current_candidate) { + assert!(self.reachability_service.is_chain_ancestor_of(current_candidate, suggested)); + suggested + } else { + current_candidate + } + } + None => current_candidate, + }; + + let mut new_pruning_point = current_pruning_point; + let mut new_pruning_point_bs = self.ghostdag_store.get_blue_score(new_pruning_point).unwrap(); + let mut new_candidate = current_candidate; + + for selected_child in self.reachability_service.forward_chain_iterator(low_hash, ghostdag_data.selected_parent, true) { + let selected_child_bs = self.ghostdag_store.get_blue_score(selected_child).unwrap(); + + if ghostdag_data.blue_score - selected_child_bs < self.pruning_depth { + break; + } + + new_candidate = selected_child; + let new_candidate_bs = selected_child_bs; + + if self.finality_score(new_candidate_bs) > self.finality_score(new_pruning_point_bs) { + new_pruning_point = new_candidate; + new_pruning_point_bs = new_candidate_bs; + } + } + + (new_pruning_point, new_candidate) + } + + // finality_score is the number of finality intervals passed since + // the given block. + fn finality_score(&self, blue_score: u64) -> u64 { + blue_score / self.finality_depth + } + + pub fn expected_header_pruning_point(&self, ghostdag_data: CompactGhostdagData, pruning_info: PruningPointInfo) -> Hash { + if ghostdag_data.selected_parent == self.genesis_hash { + return self.genesis_hash; + } + + let (current_pruning_point, current_candidate, current_pruning_point_index) = pruning_info.decompose(); + + let sp_header_pp = self.headers_store.get_header(ghostdag_data.selected_parent).unwrap().pruning_point; + let sp_header_pp_blue_score = self.headers_store.get_blue_score(sp_header_pp).unwrap(); + + // If the block doesn't have the pruning in its selected chain we know for sure that it can't trigger a pruning point + // change (we check the selected parent to take care of the case where the block is the virtual which doesn't have reachability data). + let has_pruning_point_in_its_selected_chain = + self.reachability_service.is_chain_ancestor_of(current_pruning_point, ghostdag_data.selected_parent); + + // Note: the pruning point from the POV of the current block is the first block in its chain that is in depth of self.pruning_depth and + // its finality score is greater than the previous pruning point. This is why the diff between finality_score(selected_parent.blue_score + 1) * finality_interval + // and the current block blue score is less than self.pruning_depth we can know for sure that this block didn't trigger a pruning point change. + let min_required_blue_score_for_next_pruning_point = (self.finality_score(sp_header_pp_blue_score) + 1) * self.finality_depth; + let next_or_current_pp = if has_pruning_point_in_its_selected_chain + && min_required_blue_score_for_next_pruning_point + self.pruning_depth <= ghostdag_data.blue_score + { + let suggested_low_hash = match self.reachability_service.is_dag_ancestor_of_result(current_pruning_point, sp_header_pp) { + Ok(is_in_future_of_current_pruning_point) => { + if is_in_future_of_current_pruning_point { + Some(sp_header_pp) + } else { + None + } + } + Err(ReachabilityError::StoreError(e)) => { + if let StoreError::KeyNotFound(_) = e { + None + } else { + panic!("Unexpected store error: {:?}", e) + } + } + Err(err) => panic!("Unexpected reachability error: {:?}", err), + }; + let (next_or_current_pp, _) = self.next_pruning_point_and_candidate_by_block_hash( + ghostdag_data, + suggested_low_hash, + current_candidate, + current_pruning_point, + ); + next_or_current_pp + } else { + sp_header_pp + }; + + if self.is_pruning_point_in_pruning_depth(ghostdag_data.blue_score, next_or_current_pp) { + return next_or_current_pp; + } + + for i in (0..=current_pruning_point_index).rev() { + let past_pp = self.past_pruning_points_store.get(i).unwrap(); + if self.is_pruning_point_in_pruning_depth(ghostdag_data.blue_score, past_pp) { + return past_pp; + } + } + + self.genesis_hash + } + + fn is_pruning_point_in_pruning_depth(&self, pov_blue_score: u64, pruning_point: Hash) -> bool { + let pp_bs = self.headers_store.get_blue_score(pruning_point).unwrap(); + pov_blue_score >= pp_bs + self.pruning_depth + } +} + +#[cfg(test)] +mod tests { + // TODO: add unit-tests for next_pruning_point_and_candidate_by_block_hash and expected_header_pruning_point +} diff --git a/consensus/tests/integration_tests.rs b/consensus/tests/integration_tests.rs index 70eb809bb..52bf355b3 100644 --- a/consensus/tests/integration_tests.rs +++ b/consensus/tests/integration_tests.rs @@ -8,7 +8,7 @@ use consensus::errors::{BlockProcessResult, RuleError}; use consensus::model::stores::ghostdag::{GhostdagStoreReader, KType as GhostdagKType}; use consensus::model::stores::reachability::DbReachabilityStore; use consensus::model::stores::statuses::BlockStatus; -use consensus::params::MAINNET_PARAMS; +use consensus::params::{Params, DEVNET_PARAMS, MAINNET_PARAMS}; use consensus::processes::reachability::tests::{DagBlock, DagBuilder, StoreValidationExtensions}; use consensus_core::block::Block; use consensus_core::header::Header; @@ -147,7 +147,7 @@ fn test_noattack_json() { async fn consensus_sanity_test() { let genesis_child: Hash = 2.into(); - let consensus = TestConsensus::create_from_temp_db(&MAINNET_PARAMS); + let consensus = TestConsensus::create_from_temp_db(&MAINNET_PARAMS.clone_with_skip_pow()); let wait_handles = consensus.init(); consensus @@ -406,8 +406,8 @@ async fn header_in_isolation_validation_test() { #[tokio::test] async fn incest_test() { - let params = &MAINNET_PARAMS; - let consensus = TestConsensus::create_from_temp_db(params); + let params = MAINNET_PARAMS.clone_with_skip_pow(); + let consensus = TestConsensus::create_from_temp_db(¶ms); let wait_handles = consensus.init(); let block = consensus.build_block_with_parents(1.into(), vec![params.genesis_hash]); consensus.validate_and_insert_block(Arc::new(block)).await.unwrap(); @@ -631,17 +631,82 @@ struct RPCBlockVerboseData { Hash: String, } +#[allow(non_snake_case)] +#[derive(Deserialize, Debug)] +struct KaspadGoParams { + K: GhostdagKType, + TimestampDeviationTolerance: u64, + TargetTimePerBlock: u64, + MaxBlockParents: u8, + DifficultyAdjustmentWindowSize: usize, + MergeSetSizeLimit: u64, + MergeDepth: u64, + FinalityDuration: u64, + CoinbasePayloadScriptPublicKeyMaxLength: u8, + MaxCoinbasePayloadLength: usize, + MassPerTxByte: u64, + MassPerSigOp: u64, + MassPerScriptPubKeyByte: u64, + MaxBlockMass: u64, + DeflationaryPhaseDaaScore: u64, + PreDeflationaryPhaseBaseSubsidy: u64, + SkipProofOfWork: bool, + MaxBlockLevel: u8, +} + +impl KaspadGoParams { + fn into_params(self, genesis_header: &Header) -> Params { + let finality_depth = self.FinalityDuration / self.TargetTimePerBlock; + Params { + genesis_hash: genesis_header.hash, + ghostdag_k: self.K, + timestamp_deviation_tolerance: self.TimestampDeviationTolerance, + target_time_per_block: self.TargetTimePerBlock / 1_000_000, + max_block_parents: self.MaxBlockParents, + difficulty_window_size: self.DifficultyAdjustmentWindowSize, + genesis_timestamp: genesis_header.timestamp, + genesis_bits: genesis_header.bits, + mergeset_size_limit: self.MergeSetSizeLimit, + merge_depth: self.MergeDepth, + finality_depth, + pruning_depth: 2 * finality_depth + 4 * self.MergeSetSizeLimit * self.K as u64 + 2 * self.K as u64 + 2, + coinbase_payload_script_public_key_max_len: self.CoinbasePayloadScriptPublicKeyMaxLength, + max_coinbase_payload_len: self.MaxCoinbasePayloadLength, + max_tx_inputs: MAINNET_PARAMS.max_tx_inputs, + max_tx_outputs: MAINNET_PARAMS.max_tx_outputs, + max_signature_script_len: MAINNET_PARAMS.max_signature_script_len, + max_script_public_key_len: MAINNET_PARAMS.max_script_public_key_len, + mass_per_tx_byte: self.MassPerTxByte, + mass_per_script_pub_key_byte: self.MassPerScriptPubKeyByte, + mass_per_sig_op: self.MassPerSigOp, + max_block_mass: self.MaxBlockMass, + deflationary_phase_daa_score: self.DeflationaryPhaseDaaScore, + pre_deflationary_phase_base_subsidy: self.PreDeflationaryPhaseBaseSubsidy, + skip_proof_of_work: self.SkipProofOfWork, + max_block_level: self.MaxBlockLevel, + } + } +} + #[tokio::test] async fn json_test() { let file = File::open("tests/testdata/json_test.json.gz").unwrap(); let decoder = GzDecoder::new(file); let mut lines = BufReader::new(decoder).lines(); - let first_line = lines.next().unwrap(); - let genesis = json_line_to_block(first_line.unwrap()); - let mut params = MAINNET_PARAMS; - params.genesis_bits = genesis.header.bits; - params.genesis_hash = genesis.header.hash; - params.genesis_timestamp = genesis.header.timestamp; + let first_line = lines.next().unwrap().unwrap(); + let go_params_res: Result = serde_json::from_str(&first_line); + let params = if let Ok(go_params) = go_params_res { + let second_line = lines.next().unwrap().unwrap(); + let genesis = json_line_to_block(second_line); + go_params.into_params(&genesis.header) + } else { + let genesis = json_line_to_block(first_line); + let mut params = DEVNET_PARAMS; + params.genesis_bits = genesis.header.bits; + params.genesis_hash = genesis.header.hash; + params.genesis_timestamp = genesis.header.timestamp; + params + }; let consensus = TestConsensus::create_from_temp_db(¶ms); let wait_handles = consensus.init(); @@ -676,7 +741,7 @@ async fn json_concurrency_test() { let mut lines = io::BufReader::new(decoder).lines(); let first_line = lines.next().unwrap(); let genesis = json_line_to_block(first_line.unwrap()); - let mut params = MAINNET_PARAMS; + let mut params = DEVNET_PARAMS; params.genesis_bits = genesis.header.bits; params.genesis_hash = genesis.header.hash; params.genesis_timestamp = genesis.header.timestamp; diff --git a/consensus/tests/testdata/json_test_with_custom_pruning_depth.json.gz b/consensus/tests/testdata/json_test_with_custom_pruning_depth.json.gz new file mode 100644 index 000000000..2ff39edce Binary files /dev/null and b/consensus/tests/testdata/json_test_with_custom_pruning_depth.json.gz differ