From 1eebd4d20a251b39615a88ccaef5a6cb268fdef1 Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Wed, 10 Apr 2024 22:13:15 +0300 Subject: [PATCH] Fix for the pruning proof rebuild issue (issue #444) (#449) * add a strict assertion which should catch the pruning bug before actual data is pruned * possible fix: add `block_at_depth_2m` as an additional traversal root * rollback: rollback the previous fix since it's not the root cause * add additional dbg info to assertion * bug fix: write level relations for trusted blocks (blocks in the pruning point anticone of a newly synced node) * enable mainnet mining by default * simplify kip 9 beta condition + more mass tests * set default tracked addresses to 1M * fix tracker prealloc property + adds compile time assertion for upper bound --- .../pipeline/header_processor/processor.rs | 9 +- .../virtual_processor/utxo_validation.rs | 2 +- consensus/src/processes/mass.rs | 201 +++++++----------- consensus/src/processes/pruning_proof/mod.rs | 56 +++++ kaspad/src/args.rs | 16 +- notify/src/address/tracker.rs | 40 ++-- notify/src/listener.rs | 4 +- notify/src/notifier.rs | 2 +- 8 files changed, 180 insertions(+), 150 deletions(-) diff --git a/consensus/src/pipeline/header_processor/processor.rs b/consensus/src/pipeline/header_processor/processor.rs index 2836a9e97..d1b74aeb5 100644 --- a/consensus/src/pipeline/header_processor/processor.rs +++ b/consensus/src/pipeline/header_processor/processor.rs @@ -452,10 +452,16 @@ impl HeaderProcessor { let mut batch = WriteBatch::default(); for (level, datum) in ghostdag_data.iter().enumerate() { - // The data might have been already written when applying the pruning proof. + // This data might have been already written when applying the pruning proof. self.ghostdag_stores[level].insert_batch(&mut batch, ctx.hash, datum).unwrap_or_exists(); } + let mut relations_write = self.relations_stores.write(); + ctx.known_parents.into_iter().enumerate().for_each(|(level, parents_by_level)| { + // This data might have been already written when applying the pruning proof. + relations_write[level].insert_batch(&mut batch, ctx.hash, parents_by_level).unwrap_or_exists(); + }); + let statuses_write = self.statuses_store.set_batch(&mut batch, ctx.hash, StatusHeaderOnly).unwrap(); // Flush the batch to the DB @@ -463,6 +469,7 @@ impl HeaderProcessor { // Calling the drops explicitly after the batch is written in order to avoid possible errors. drop(statuses_write); + drop(relations_write); } pub fn process_genesis(&self) { diff --git a/consensus/src/pipeline/virtual_processor/utxo_validation.rs b/consensus/src/pipeline/virtual_processor/utxo_validation.rs index 3331d3f9e..112976294 100644 --- a/consensus/src/pipeline/virtual_processor/utxo_validation.rs +++ b/consensus/src/pipeline/virtual_processor/utxo_validation.rs @@ -294,7 +294,7 @@ impl VirtualStateProcessor { self.populate_mempool_transaction_in_utxo_context(mutable_tx, utxo_view)?; // For non-activated nets (mainnet, TN10) we can update mempool rules to KIP9 beta asap. For - // TN11 we need to hard-fork consensus first (since the new beta rules are more relaxed) + // TN11 we need to hard-fork consensus first (since the new beta rules are more permissive) let kip9_version = if self.storage_mass_activation_daa_score == u64::MAX { Kip9Version::Beta } else { Kip9Version::Alpha }; // Calc the full contextual mass including storage mass diff --git a/consensus/src/processes/mass.rs b/consensus/src/processes/mass.rs index 68b42a41d..8bb5f3339 100644 --- a/consensus/src/processes/mass.rs +++ b/consensus/src/processes/mass.rs @@ -91,10 +91,13 @@ impl MassCalculator { let ins_len = tx.tx().inputs.len() as u64; /* - KIP-0009 relaxed formula for the cases |O| = 1 OR |O| <= |I| <= 2: - max( 0 , C·( |O|/H(O) - |I|/H(I) ) ) + KIP-0009 relaxed formula for the cases |O| = 1 OR |O| <= |I| <= 2: + max( 0 , C·( |O|/H(O) - |I|/H(I) ) ) + + Note: in the case |I| = 1 both formulas are equal, yet the following code (harmonic_ins) is a bit more efficient. + Hence, we transform the condition to |O| = 1 OR |I| = 1 OR |O| = |I| = 2 which is equivalent (and faster). */ - if version == Kip9Version::Beta && (outs_len == 1 || (outs_len <= ins_len && ins_len <= 2)) { + if version == Kip9Version::Beta && (outs_len == 1 || ins_len == 1 || (outs_len == 2 && ins_len == 2)) { let harmonic_ins = tx .populated_inputs() .map(|(_, entry)| self.storage_mass_parameter / entry.amount) @@ -144,63 +147,8 @@ mod tests { #[test] fn test_mass_storage() { - let script_pub_key = ScriptVec::from_slice(&[]); - let prev_tx_id = TransactionId::from_str("880eb9819a31821d9d2399e2f35e2433b72637e393d71ecc9b8d0250f49153c3").unwrap(); - // Tx with less outs than ins - let tx = Transaction::new( - 0, - vec![ - TransactionInput { - previous_outpoint: TransactionOutpoint { transaction_id: prev_tx_id, index: 0 }, - signature_script: vec![], - sequence: 0, - sig_op_count: 0, - }, - TransactionInput { - previous_outpoint: TransactionOutpoint { transaction_id: prev_tx_id, index: 1 }, - signature_script: vec![], - sequence: 1, - sig_op_count: 0, - }, - TransactionInput { - previous_outpoint: TransactionOutpoint { transaction_id: prev_tx_id, index: 2 }, - signature_script: vec![], - sequence: 2, - sig_op_count: 0, - }, - ], - vec![ - TransactionOutput { value: 300, script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()) }, - TransactionOutput { value: 300, script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()) }, - ], - 1615462089000, - SubnetworkId::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), - 0, - vec![], - ); - - let entries = vec![ - UtxoEntry { - amount: 100, - script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), - block_daa_score: 0, - is_coinbase: false, - }, - UtxoEntry { - amount: 200, - script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), - block_daa_score: 0, - is_coinbase: false, - }, - UtxoEntry { - amount: 300, - script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), - block_daa_score: 0, - is_coinbase: false, - }, - ]; - let mut tx = MutableTransaction::with_entries(tx, entries); + let mut tx = generate_tx_from_amounts(&[100, 200, 300], &[300, 300]); let test_version = Kip9Version::Alpha; // Assert the formula: max( 0 , C·( |O|/H(O) - |I|/A(I) ) ) @@ -218,74 +166,8 @@ mod tests { assert_eq!(storage_mass, storage_mass_parameter / 50 + storage_mass_parameter / 550 - 3 * (storage_mass_parameter / 200)); // Create a tx with more outs than ins - let tx = Transaction::new( - 0, - vec![ - TransactionInput { - previous_outpoint: TransactionOutpoint { transaction_id: prev_tx_id, index: 0 }, - signature_script: vec![], - sequence: 0, - sig_op_count: 0, - }, - TransactionInput { - previous_outpoint: TransactionOutpoint { transaction_id: prev_tx_id, index: 1 }, - signature_script: vec![], - sequence: 1, - sig_op_count: 0, - }, - TransactionInput { - previous_outpoint: TransactionOutpoint { transaction_id: prev_tx_id, index: 2 }, - signature_script: vec![], - sequence: 2, - sig_op_count: 0, - }, - ], - vec![ - TransactionOutput { - value: 10_000 * SOMPI_PER_KASPA, - script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), - }, - TransactionOutput { - value: 10_000 * SOMPI_PER_KASPA, - script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), - }, - TransactionOutput { - value: 10_000 * SOMPI_PER_KASPA, - script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), - }, - TransactionOutput { - value: 10_000 * SOMPI_PER_KASPA, - script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), - }, - ], - 1615462089000, - SubnetworkId::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), - 0, - vec![], - ); - - let entries = vec![ - UtxoEntry { - amount: 10_000 * SOMPI_PER_KASPA, - script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), - block_daa_score: 0, - is_coinbase: false, - }, - UtxoEntry { - amount: 10_000 * SOMPI_PER_KASPA, - script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), - block_daa_score: 0, - is_coinbase: false, - }, - UtxoEntry { - amount: 20_000 * SOMPI_PER_KASPA, - script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), - block_daa_score: 0, - is_coinbase: false, - }, - ]; - let mut tx = MutableTransaction::with_entries(tx, entries); - + let base_value = 10_000 * SOMPI_PER_KASPA; + let mut tx = generate_tx_from_amounts(&[base_value, base_value, base_value * 2], &[base_value; 4]); let storage_mass_parameter = STORAGE_MASS_PARAMETER; let storage_mass = MassCalculator::new(0, 0, 0, storage_mass_parameter).calc_tx_storage_mass(&tx.as_verifiable(), test_version).unwrap(); @@ -305,7 +187,70 @@ mod tests { let storage_mass = MassCalculator::new(0, 0, 0, storage_mass_parameter).calc_tx_storage_mass(&tx.as_verifiable(), test_version).unwrap(); assert_eq!(storage_mass, 0); + } + + #[test] + fn test_mass_storage_beta() { + // 2:2 transaction + let mut tx = generate_tx_from_amounts(&[100, 200], &[50, 250]); + let storage_mass_parameter = 10u64.pow(12); + let test_version = Kip9Version::Beta; + // Assert the formula: max( 0 , C·( |O|/H(O) - |I|/O(I) ) ) + + let storage_mass = + MassCalculator::new(0, 0, 0, storage_mass_parameter).calc_tx_storage_mass(&tx.as_verifiable(), test_version).unwrap(); + assert_eq!(storage_mass, 9000000000); + + // Set outputs to be equal to inputs + tx.tx.outputs[0].value = 100; + tx.tx.outputs[1].value = 200; + let storage_mass = + MassCalculator::new(0, 0, 0, storage_mass_parameter).calc_tx_storage_mass(&tx.as_verifiable(), test_version).unwrap(); + assert_eq!(storage_mass, 0); - drop(script_pub_key); + // Remove an output and make sure the other is small enough to make storage mass greater than zero + tx.tx.outputs.pop(); + tx.tx.outputs[0].value = 50; + let storage_mass = + MassCalculator::new(0, 0, 0, storage_mass_parameter).calc_tx_storage_mass(&tx.as_verifiable(), test_version).unwrap(); + assert_eq!(storage_mass, 5000000000); + } + + fn generate_tx_from_amounts(ins: &[u64], outs: &[u64]) -> MutableTransaction { + let script_pub_key = ScriptVec::from_slice(&[]); + let prev_tx_id = TransactionId::from_str("880eb9819a31821d9d2399e2f35e2433b72637e393d71ecc9b8d0250f49153c3").unwrap(); + let tx = Transaction::new( + 0, + (0..ins.len()) + .map(|i| TransactionInput { + previous_outpoint: TransactionOutpoint { transaction_id: prev_tx_id, index: i as u32 }, + signature_script: vec![], + sequence: 0, + sig_op_count: 0, + }) + .collect(), + outs.iter() + .copied() + .map(|out_amount| TransactionOutput { + value: out_amount, + script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), + }) + .collect(), + 1615462089000, + SubnetworkId::from_bytes([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), + 0, + vec![], + ); + let entries = ins + .iter() + .copied() + .map(|in_amount| UtxoEntry { + amount: in_amount, + script_public_key: ScriptPublicKey::new(0, script_pub_key.clone()), + block_daa_score: 0, + is_coinbase: false, + }) + .collect(); + MutableTransaction::with_entries(tx, entries) } } diff --git a/consensus/src/processes/pruning_proof/mod.rs b/consensus/src/processes/pruning_proof/mod.rs index e3a1f1e3b..9895599db 100644 --- a/consensus/src/processes/pruning_proof/mod.rs +++ b/consensus/src/processes/pruning_proof/mod.rs @@ -691,11 +691,67 @@ impl PruningProofManager { } } + // Temp assertion for verifying a bug fix: assert that the full 2M chain is actually contained in the composed level proof + let set = BlockHashSet::from_iter(headers.iter().map(|h| h.hash)); + let chain_2m = self + .chain_up_to_depth(&*self.ghostdag_stores[level], selected_tip, 2 * self.pruning_proof_m) + .map_err(|err| { + dbg!(level, selected_tip, block_at_depth_2m, root); + format!("Assert 2M chain -- level: {}, err: {}", level, err) + }) + .unwrap(); + let chain_2m_len = chain_2m.len(); + for (i, chain_hash) in chain_2m.into_iter().enumerate() { + if !set.contains(&chain_hash) { + let next_level_tip = selected_tip_by_level[level + 1]; + let next_level_chain_m = + self.chain_up_to_depth(&*self.ghostdag_stores[level + 1], next_level_tip, self.pruning_proof_m).unwrap(); + let next_level_block_m = next_level_chain_m.last().copied().unwrap(); + dbg!(next_level_chain_m.len()); + dbg!(self.ghostdag_stores[level + 1].get_compact_data(next_level_tip).unwrap().blue_score); + dbg!(self.ghostdag_stores[level + 1].get_compact_data(next_level_block_m).unwrap().blue_score); + dbg!(self.ghostdag_stores[level].get_compact_data(selected_tip).unwrap().blue_score); + dbg!(self.ghostdag_stores[level].get_compact_data(block_at_depth_2m).unwrap().blue_score); + dbg!(level, selected_tip, block_at_depth_2m, root); + panic!("Assert 2M chain -- missing block {} at index {} out of {} chain blocks", chain_hash, i, chain_2m_len); + } + } + headers }) .collect_vec() } + /// Copy of `block_at_depth` which returns the full chain up to depth. Temporarily used for assertion purposes. + fn chain_up_to_depth( + &self, + ghostdag_store: &impl GhostdagStoreReader, + high: Hash, + depth: u64, + ) -> Result, PruningProofManagerInternalError> { + let high_gd = ghostdag_store + .get_compact_data(high) + .map_err(|err| PruningProofManagerInternalError::BlockAtDepth(format!("high: {high}, depth: {depth}, {err}")))?; + let mut current_gd = high_gd; + let mut current = high; + let mut res = vec![current]; + while current_gd.blue_score + depth >= high_gd.blue_score { + if current_gd.selected_parent.is_origin() { + break; + } + let prev = current; + current = current_gd.selected_parent; + res.push(current); + current_gd = ghostdag_store.get_compact_data(current).map_err(|err| { + PruningProofManagerInternalError::BlockAtDepth(format!( + "high: {}, depth: {}, current: {}, high blue score: {}, current blue score: {}, {}", + high, depth, prev, high_gd.blue_score, current_gd.blue_score, err + )) + })?; + } + Ok(res) + } + fn block_at_depth( &self, ghostdag_store: &impl GhostdagStoreReader, diff --git a/kaspad/src/args.rs b/kaspad/src/args.rs index d7254fb0d..7483d9091 100644 --- a/kaspad/src/args.rs +++ b/kaspad/src/args.rs @@ -106,9 +106,9 @@ impl Default for Args { outbound_target: 8, inbound_limit: 128, rpc_max_clients: 128, - max_tracked_addresses: Tracker::DEFAULT_MAX_ADDRESSES, + max_tracked_addresses: 0, enable_unsynced_mining: false, - enable_mainnet_mining: false, + enable_mainnet_mining: true, testnet: false, testnet_suffix: 10, devnet: false, @@ -308,7 +308,7 @@ pub fn cli() -> Command { .long("enable-mainnet-mining") .action(ArgAction::SetTrue) .hide(true) - .help("Allow mainnet mining (do not use unless you know what you are doing)"), + .help("Allow mainnet mining (currently enabled by default while the flag is kept for backwards compatibility)"), ) .arg(arg!(--utxoindex "Enable the UTXO index")) .arg( @@ -316,8 +316,9 @@ pub fn cli() -> Command { .long("max-tracked-addresses") .require_equals(true) .value_parser(clap::value_parser!(usize)) - .help(format!("Max preallocated number of addresses tracking UTXO changed events (default: {}, maximum: {}). -Value 0 prevents the preallocation, leading to a 0 memory footprint as long as unused but then to a sub-optimal footprint when used.", Tracker::DEFAULT_MAX_ADDRESSES, Tracker::MAX_ADDRESS_UPPER_BOUND)), + .help(format!("Max (preallocated) number of addresses being tracked for UTXO changed events (default: {}, maximum: {}). +Setting to 0 prevents the preallocation and sets the maximum to {}, leading to 0 memory footprint as long as unused but to sub-optimal footprint if used.", +0, Tracker::MAX_ADDRESS_UPPER_BOUND, Tracker::DEFAULT_MAX_ADDRESSES)), ) .arg(arg!(--testnet "Use the test network")) .arg( @@ -455,6 +456,11 @@ impl Args { #[cfg(feature = "devnet-prealloc")] prealloc_amount: arg_match_unwrap_or::(&m, "prealloc-amount", defaults.prealloc_amount), }; + + if arg_match_unwrap_or::(&m, "enable-mainnet-mining", false) { + println!("\nNOTE: The flag --enable-mainnet-mining is deprecated and defaults to true also w/o explicit setting\n") + } + Ok(args) } } diff --git a/notify/src/address/tracker.rs b/notify/src/address/tracker.rs index 55100a706..a3dc4eb01 100644 --- a/notify/src/address/tracker.rs +++ b/notify/src/address/tracker.rs @@ -218,8 +218,11 @@ struct Inner { /// use `IndexMap` APIs which alter the index order of existing entries. script_pub_keys: IndexMap, - /// Maximum address count that can be registered - max_addresses: Option, + /// Maximum address count that can be registered. Note this must be `<= Index::MAX` since we cast the returned indexes to `Index` + max_addresses: usize, + + /// The preallocation used for the address index (`script_pub_keys`) + addresses_preallocation: Option, /// Set of entries [`Index`] in `script_pub_keys` having their [`RefCount`] at 0 hence considered /// empty. @@ -228,13 +231,22 @@ struct Inner { empty_entries: HashSet, } +/// Fails at compile time if `MAX_ADDRESS_UPPER_BOUND > Index::MAX`. +/// This is mandatory since we cast the returned indexes to `Index` +const _: usize = Index::MAX as usize - Inner::MAX_ADDRESS_UPPER_BOUND; + impl Inner { - /// The upper bound of the maximum address count + /// The upper bound of the maximum address count. Note that the upper bound must + /// never exceed `Index::MAX` since we cast the returned indexes to `Index`. See + /// compile-time assertion above const MAX_ADDRESS_UPPER_BOUND: usize = Self::expand_max_addresses(10_000_000); /// The lower bound of the maximum address count const MAX_ADDRESS_LOWER_BOUND: usize = 6; + /// Expanded count for a maximum of 1M addresses + const DEFAULT_MAX_ADDRESSES: usize = Self::expand_max_addresses(1_000_000); + /// Computes the optimal expanded max address count fitting in the actual allocated size of /// the internal storage structure const fn expand_max_addresses(max_addresses: usize) -> usize { @@ -255,6 +267,7 @@ impl Inner { // Saving one entry for the insert/swap_remove scheme during entry recycling prevents a reallocation // when reaching the maximum. let max_addresses = max_addresses.map(Self::expand_max_addresses); + let addresses_preallocation = max_addresses; let capacity = max_addresses.map(|x| x + 1).unwrap_or_default(); assert!( @@ -262,18 +275,18 @@ impl Inner { "Tracker maximum address count cannot exceed {}", Self::MAX_ADDRESS_UPPER_BOUND ); + let max_addresses = max_addresses.unwrap_or(Self::DEFAULT_MAX_ADDRESSES); + info!("Memory configuration: UTXO changed events wil be tracked for at most {} addresses", max_addresses); let script_pub_keys = IndexMap::with_capacity(capacity); debug!("Creating an address tracker with a capacity of {}", script_pub_keys.capacity()); - if let Some(max_addresses) = max_addresses { - info!("Tracking UTXO changed events for {} addresses at most", max_addresses); - } + let empty_entries = HashSet::with_capacity(capacity); - Self { script_pub_keys, max_addresses, empty_entries } + Self { script_pub_keys, max_addresses, addresses_preallocation, empty_entries } } fn is_full(&self) -> bool { - self.script_pub_keys.len() >= self.max_addresses.unwrap_or(Self::MAX_ADDRESS_UPPER_BOUND) && self.empty_entries.is_empty() + self.script_pub_keys.len() >= self.max_addresses && self.empty_entries.is_empty() } fn get(&self, spk: &ScriptPublicKey) -> Option<(Index, RefCount)> { @@ -396,7 +409,7 @@ impl Tracker { pub const MAX_ADDRESS_UPPER_BOUND: usize = Inner::MAX_ADDRESS_UPPER_BOUND; /// Expanded count for a maximum of 1M addresses - pub const DEFAULT_MAX_ADDRESSES: usize = Self::expand_max_addresses(800); + pub const DEFAULT_MAX_ADDRESSES: usize = Inner::DEFAULT_MAX_ADDRESSES; const ADDRESS_CHUNK_SIZE: usize = 1024; @@ -406,6 +419,9 @@ impl Tracker { Inner::expand_max_addresses(max_addresses) } + /// Creates a new `Tracker` instance. If `max_addresses` is `Some`, uses it to prealloc + /// the internal index as well as for bounding the index size. Otherwise, performs no + /// prealloc while bounding the index size by `Tracker::DEFAULT_MAX_ADDRESSES` pub fn new(max_addresses: Option) -> Self { Self { inner: RwLock::new(Inner::new(max_addresses)) } } @@ -567,8 +583,8 @@ impl Tracker { self.inner.read().script_pub_keys.capacity() } - pub fn max_addresses(&self) -> Option { - self.inner.read().max_addresses + pub fn addresses_preallocation(&self) -> Option { + self.inner.read().addresses_preallocation } } @@ -611,7 +627,7 @@ mod tests { let tracker = Tracker::new(Some(MAX_ADDRESSES)); assert_eq!( - tracker.max_addresses().unwrap(), + tracker.addresses_preallocation().unwrap(), MAX_ADDRESSES, "tracker maximum address count should be expanded to the available allocated entries, minus 1 for a transient insert/swap_remove" ); diff --git a/notify/src/listener.rs b/notify/src/listener.rs index 46a688ad5..fc5254364 100644 --- a/notify/src/listener.rs +++ b/notify/src/listener.rs @@ -48,9 +48,9 @@ where debug!( "Creating a static listener {} with UtxosChanged capacity of {}", connection, - context.address_tracker.max_addresses().unwrap_or_default() + context.address_tracker.addresses_preallocation().unwrap_or_default() ); - context.address_tracker.max_addresses() + context.address_tracker.addresses_preallocation() } UtxosChangedMutationPolicy::Wildcard => None, }; diff --git a/notify/src/notifier.rs b/notify/src/notifier.rs index 41ae6626d..220fd261b 100644 --- a/notify/src/notifier.rs +++ b/notify/src/notifier.rs @@ -310,7 +310,7 @@ where subscriber }); let utxos_changed_capacity = match policies.utxo_changed { - UtxosChangedMutationPolicy::AddressSet => subscription_context.address_tracker.max_addresses(), + UtxosChangedMutationPolicy::AddressSet => subscription_context.address_tracker.addresses_preallocation(), UtxosChangedMutationPolicy::Wildcard => None, }; Self {