diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index 88aac9f74ed..167da08d192 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use tokio::sync::watch; +use tracing::instrument; use zebra_chain::{ block, @@ -97,7 +98,7 @@ pub struct ChainTipSender { /// /// Once this flag is set, we ignore the finalized state. /// `None` tips don't set this flag. - non_finalized_tip: bool, + use_non_finalized_tip: bool, /// The sender channel for chain tip data. sender: watch::Sender, @@ -110,14 +111,18 @@ pub struct ChainTipSender { impl ChainTipSender { /// Create new linked instances of [`ChainTipSender`], [`LatestChainTip`], and [`ChainTipChange`], /// using an `initial_tip` and a [`Network`]. + #[instrument(skip(initial_tip), fields(new_height, new_hash))] pub fn new( initial_tip: impl Into>, network: Network, ) -> (Self, LatestChainTip, ChainTipChange) { + let initial_tip = initial_tip.into(); + ChainTipSender::record_new_tip(&initial_tip); + let (sender, receiver) = watch::channel(None); let mut sender = ChainTipSender { - non_finalized_tip: false, + use_non_finalized_tip: false, sender, active_value: None, }; @@ -133,8 +138,20 @@ impl ChainTipSender { /// Update the latest finalized tip. /// /// May trigger an update to the best tip. - pub fn set_finalized_tip(&mut self, new_tip: impl Into>) { - if !self.non_finalized_tip { + #[instrument( + skip(self, new_tip), + fields( + old_use_non_finalized_tip = ?self.use_non_finalized_tip, + old_height = ?self.active_value.as_ref().map(|block| block.height), + old_hash = ?self.active_value.as_ref().map(|block| block.hash), + new_height, + new_hash, + ))] + pub fn set_finalized_tip(&mut self, new_tip: impl Into> + Clone) { + let new_tip = new_tip.into(); + ChainTipSender::record_new_tip(&new_tip); + + if !self.use_non_finalized_tip { self.update(new_tip); } } @@ -142,13 +159,26 @@ impl ChainTipSender { /// Update the latest non-finalized tip. /// /// May trigger an update to the best tip. - pub fn set_best_non_finalized_tip(&mut self, new_tip: impl Into>) { + #[instrument( + skip(self, new_tip), + fields( + old_use_non_finalized_tip = ?self.use_non_finalized_tip, + old_height = ?self.active_value.as_ref().map(|block| block.height), + old_hash = ?self.active_value.as_ref().map(|block| block.hash), + new_height, + new_hash, + ))] + pub fn set_best_non_finalized_tip( + &mut self, + new_tip: impl Into> + Clone, + ) { let new_tip = new_tip.into(); + ChainTipSender::record_new_tip(&new_tip); // once the non-finalized state becomes active, it is always populated // but ignoring `None`s makes the tests easier if new_tip.is_some() { - self.non_finalized_tip = true; + self.use_non_finalized_tip = true; self.update(new_tip) } } @@ -157,9 +187,7 @@ impl ChainTipSender { /// /// An update is only sent if the current best tip is different from the last best tip /// that was sent. - fn update(&mut self, new_tip: impl Into>) { - let new_tip = new_tip.into(); - + fn update(&mut self, new_tip: Option) { let needs_update = match (new_tip.as_ref(), self.active_value.as_ref()) { // since the blocks have been contextually validated, // we know their hashes cover all the block data @@ -173,6 +201,19 @@ impl ChainTipSender { self.active_value = new_tip; } } + + /// Record `new_tip` in the current span. + /// + /// Callers should create a new span with empty `new_height` and `new_hash` fields. + fn record_new_tip(new_tip: &Option) { + let span = tracing::Span::current(); + + let new_height = new_tip.as_ref().map(|block| block.height); + let new_hash = new_tip.as_ref().map(|block| block.hash); + + span.record("new_height", &tracing::field::debug(new_height)); + span.record("new_hash", &tracing::field::debug(new_hash)); + } } /// Efficient access to the state's current best chain tip. @@ -205,11 +246,23 @@ impl LatestChainTip { impl ChainTip for LatestChainTip { /// Return the height of the best chain tip. + #[instrument( + skip(self), + fields( + height = ?self.receiver.borrow().as_ref().map(|block| block.height), + hash = ?self.receiver.borrow().as_ref().map(|block| block.hash), + ))] fn best_tip_height(&self) -> Option { self.receiver.borrow().as_ref().map(|block| block.height) } /// Return the block hash of the best chain tip. + #[instrument( + skip(self), + fields( + height = ?self.receiver.borrow().as_ref().map(|block| block.height), + hash = ?self.receiver.borrow().as_ref().map(|block| block.hash), + ))] fn best_tip_hash(&self) -> Option { self.receiver.borrow().as_ref().map(|block| block.hash) } @@ -218,6 +271,13 @@ impl ChainTip for LatestChainTip { /// /// All transactions with these mined IDs should be rejected from the mempool, /// even if their authorizing data is different. + #[instrument( + skip(self), + fields( + height = ?self.receiver.borrow().as_ref().map(|block| block.height), + hash = ?self.receiver.borrow().as_ref().map(|block| block.hash), + transaction_count = ?self.receiver.borrow().as_ref().map(|block| block.transaction_hashes.len()), + ))] fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> { self.receiver .borrow() @@ -310,6 +370,14 @@ impl ChainTipChange { /// /// If a lot of blocks are committed at the same time, /// the change will skip some blocks, and return a [`Reset`]. + #[instrument( + skip(self), + fields( + current_height = ?self.receiver.borrow().as_ref().map(|block| block.height), + current_hash = ?self.receiver.borrow().as_ref().map(|block| block.hash), + last_change_hash = ?self.last_change_hash, + network = ?self.network, + ))] pub async fn wait_for_tip_change(&mut self) -> Result { let block = self.tip_block_change().await?; @@ -325,6 +393,14 @@ impl ChainTipChange { /// - `None` if there has been no change. /// /// See [`wait_for_tip_change`] for details. + #[instrument( + skip(self), + fields( + current_height = ?self.receiver.borrow().as_ref().map(|block| block.height), + current_hash = ?self.receiver.borrow().as_ref().map(|block| block.hash), + last_change_hash = ?self.last_change_hash, + network = ?self.network, + ))] pub fn last_tip_change(&mut self) -> Option { // Obtain the tip block. let block = self.best_tip_block()?; @@ -346,7 +422,7 @@ impl ChainTipChange { // check for an edge case that's dealt with by other code assert!( Some(block.hash) != self.last_change_hash, - "ChainTipSender ignores unchanged tips" + "ChainTipSender and ChainTipChange ignore unchanged tips" ); // If the previous block hash doesn't match, reset. @@ -410,7 +486,17 @@ impl ChainTipChange { // Wait until there is actually Some block, // so we don't have `Option`s inside `TipAction`s. if let Some(block) = self.best_tip_block() { - return Ok(block); + // Wait until we have a new block + // + // last_tip_change() updates last_change_hash, but it doesn't call receiver.changed(). + // So code that uses both sync and async methods can have spurious pending changes. + // + // TODO: use `receiver.borrow_and_update()` in `best_tip_block()`, + // once we upgrade to tokio 1.0 (#2200) + // and remove this extra check + if Some(block.hash) != self.last_change_hash { + return Ok(block); + } } } } @@ -462,4 +548,18 @@ impl TipAction { hash: block.hash, } } + + /// Converts this [`TipAction`] into a [`Reset`]. + /// + /// Designed for use in tests. + #[cfg(test)] + pub(crate) fn into_reset(self) -> Self { + match self { + Grow { block } => Reset { + height: block.height, + hash: block.hash, + }, + reset @ Reset { .. } => reset, + } + } } diff --git a/zebra-state/src/service/chain_tip/tests/prop.rs b/zebra-state/src/service/chain_tip/tests/prop.rs index 3a7a3cf0f04..7c903e10563 100644 --- a/zebra-state/src/service/chain_tip/tests/prop.rs +++ b/zebra-state/src/service/chain_tip/tests/prop.rs @@ -1,4 +1,4 @@ -use std::{env, sync::Arc}; +use std::{collections::HashSet, env, sync::Arc}; use futures::FutureExt; use proptest::prelude::*; @@ -8,12 +8,18 @@ use zebra_chain::{ block::Block, chain_tip::ChainTip, fmt::{DisplayToDebug, SummaryDebug}, - parameters::Network, + parameters::{Network, NetworkUpgrade}, }; use crate::service::chain_tip::{ChainTipBlock, ChainTipSender, TipAction}; -const DEFAULT_BLOCK_VEC_PROPTEST_CASES: u32 = 4; +use TipChangeCheck::*; + +/// The default number of proptest cases for these tests. +/// +/// Currently, there are 24 different test case combinations, +/// and each test `Vec` has an average of 50 blocks. +const DEFAULT_BLOCK_VEC_PROPTEST_CASES: u32 = 8; proptest! { #![proptest_config( @@ -27,7 +33,7 @@ proptest! { /// or otherwise the finalized tip. #[test] fn best_tip_is_latest_non_finalized_then_latest_finalized( - tip_updates in any::>>(), + tip_updates in any::>>(), network in any::(), ) { let (mut chain_tip_sender, latest_chain_tip, mut chain_tip_change) = ChainTipSender::new(None, network); @@ -36,81 +42,215 @@ proptest! { let mut latest_non_finalized_tip = None; let mut seen_non_finalized_tip = false; - for update in tip_updates { - match update { - BlockUpdate::Finalized(block) => { - let chain_tip = block.clone().map(|block| ChainTipBlock::from(block.0)); - chain_tip_sender.set_finalized_tip(chain_tip.clone()); - if let Some(block) = block { - latest_finalized_tip = Some((chain_tip, block)); - } + let mut pending_action = None; + let mut last_block_hash = None; + let mut chain_hashes = HashSet::new(); + + for (mut update, connection, tip_change_check) in tip_updates { + // prepare the update + if connection.is_grow() { + if let (Some(mut block), Some(last_block_hash)) = (update.block(), last_block_hash) { + Arc::make_mut(&mut block).header.previous_block_hash = last_block_hash; + *update.block_mut() = Some(block); } - BlockUpdate::NonFinalized(block) => { - let chain_tip = block.clone().map(|block| ChainTipBlock::from(block.0)); - chain_tip_sender.set_best_non_finalized_tip(chain_tip.clone()); - if let Some(block) = block { - latest_non_finalized_tip = Some((chain_tip, block)); - seen_non_finalized_tip = true; - } + } + + let block = update.block(); + let chain_tip = block.clone().map(|block| ChainTipBlock::from(block.0)); + + if let Some(chain_tip) = chain_tip.clone() { + if chain_hashes.contains(&chain_tip.hash) { + // skip duplicate blocks - they are rejected by zebra-state + continue; } + last_block_hash = Some(chain_tip.hash); + chain_hashes.insert(chain_tip.hash); } - } - let expected_tip = if seen_non_finalized_tip { - latest_non_finalized_tip - } else { - latest_finalized_tip - }; - - let chain_tip_height = expected_tip - .as_ref() - .and_then(|(chain_tip, _block)| chain_tip.as_ref()) - .map(|chain_tip| chain_tip.height); - let expected_height = expected_tip.as_ref().and_then(|(_chain_tip, block)| block.coinbase_height()); - prop_assert_eq!(latest_chain_tip.best_tip_height(), chain_tip_height); - prop_assert_eq!(latest_chain_tip.best_tip_height(), expected_height); - - let chain_tip_hash = expected_tip - .as_ref() - .and_then(|(chain_tip, _block)| chain_tip.as_ref()) - .map(|chain_tip| chain_tip.hash); - let expected_hash = expected_tip.as_ref().map(|(_chain_tip, block)| block.hash()); - prop_assert_eq!(latest_chain_tip.best_tip_hash(), chain_tip_hash); - prop_assert_eq!(latest_chain_tip.best_tip_hash(), expected_hash); - - let chain_tip_transaction_ids = expected_tip - .as_ref() - .and_then(|(chain_tip, _block)| chain_tip.as_ref()) - .map(|chain_tip| chain_tip.transaction_hashes.clone()) - .unwrap_or_else(|| Arc::new([])); - let expected_transaction_ids = expected_tip - .as_ref() - .iter() - .flat_map(|(_chain_tip, block)| block.transactions.clone()) - .map(|transaction| transaction.hash()) - .collect(); - prop_assert_eq!( - latest_chain_tip.best_tip_mined_transaction_ids(), - chain_tip_transaction_ids - ); - prop_assert_eq!( - latest_chain_tip.best_tip_mined_transaction_ids(), - expected_transaction_ids - ); - - prop_assert_eq!( - chain_tip_change - .wait_for_tip_change() - .now_or_never() - .transpose() - .expect("watch sender is not dropped"), - expected_tip.map(|(_chain_tip, block)| TipAction::reset_with(block.0.into())) - ); + // do the update + if update.is_finalized() { + chain_tip_sender.set_finalized_tip(chain_tip.clone()); + if let Some(block) = block { + latest_finalized_tip = Some((chain_tip.unwrap(), block)); + } + } else { + chain_tip_sender.set_best_non_finalized_tip(chain_tip.clone()); + if let Some(block) = block { + latest_non_finalized_tip = Some((chain_tip.unwrap(), block)); + seen_non_finalized_tip = true; + } + } + + // check the results + let expected_tip = if seen_non_finalized_tip { + latest_non_finalized_tip.clone() + } else { + latest_finalized_tip.clone() + }; + + let chain_tip_height = expected_tip + .as_ref() + .map(|(chain_tip, _block)| chain_tip.height); + let expected_height = expected_tip.as_ref().and_then(|(_chain_tip, block)| block.coinbase_height()); + prop_assert_eq!(latest_chain_tip.best_tip_height(), chain_tip_height); + prop_assert_eq!(latest_chain_tip.best_tip_height(), expected_height); + + let chain_tip_hash = expected_tip + .as_ref() + .map(|(chain_tip, _block)| chain_tip.hash); + let expected_hash = expected_tip.as_ref().map(|(_chain_tip, block)| block.hash()); + prop_assert_eq!(latest_chain_tip.best_tip_hash(), chain_tip_hash); + prop_assert_eq!(latest_chain_tip.best_tip_hash(), expected_hash); + + let chain_tip_transaction_ids = expected_tip + .as_ref() + .map(|(chain_tip, _block)| chain_tip.transaction_hashes.clone()) + .unwrap_or_else(|| Arc::new([])); + let expected_transaction_ids = expected_tip + .as_ref() + .iter() + .flat_map(|(_chain_tip, block)| block.transactions.clone()) + .map(|transaction| transaction.hash()) + .collect(); + prop_assert_eq!( + latest_chain_tip.best_tip_mined_transaction_ids(), + chain_tip_transaction_ids + ); + prop_assert_eq!( + latest_chain_tip.best_tip_mined_transaction_ids(), + expected_transaction_ids + ); + + let old_last_change_hash = chain_tip_change.last_change_hash; + + let new_action = expected_tip.and_then(|(chain_tip, block)| { + if Some(chain_tip.hash) == old_last_change_hash { + // some updates don't do anything, so there's no new action + None + } else if Some(chain_tip.previous_block_hash) != old_last_change_hash + || NetworkUpgrade::is_activation_height(network, chain_tip.height) + { + Some(TipAction::reset_with(block.0.into())) + } else { + Some(TipAction::grow_with(block.0.into())) + } + }); + + let expected_action = match (pending_action.clone(), new_action.clone()) { + (Some(pending_action), Some(new_action)) if pending_action == new_action => Some(new_action), + (Some(_pending_action), Some(new_action)) => Some(new_action.into_reset()), + (None, new_action) => new_action, + (pending_action, None) => pending_action, + }; + + match tip_change_check { + WaitFor => { + // TODO: use `unconstrained` to avoid spurious cooperative multitasking waits + // (needs a recent tokio version) + // See: + // https://github.com/ZcashFoundation/zebra/pull/2777#discussion_r712488817 + // https://docs.rs/tokio/1.11.0/tokio/task/index.html#cooperative-scheduling + // https://tokio.rs/blog/2020-04-preemption + prop_assert_eq!( + chain_tip_change + .wait_for_tip_change() + .now_or_never() + .transpose() + .expect("watch sender is not dropped"), + expected_action, + "\n\ + unexpected wait_for_tip_change TipAction\n\ + new_action: {:?}\n\ + pending_action: {:?}\n\ + old last_change_hash: {:?}\n\ + new last_change_hash: {:?}", + new_action, + pending_action, + old_last_change_hash, + chain_tip_change.last_change_hash + ); + pending_action = None; + } + + Last => { + prop_assert_eq!( + chain_tip_change.last_tip_change(), + expected_action, + "\n\ + unexpected last_tip_change TipAction\n\ + new_action: {:?}\n\ + pending_action: {:?}\n\ + old last_change_hash: {:?}\n\ + new last_change_hash: {:?}", + new_action, + pending_action, + old_last_change_hash, + chain_tip_change.last_change_hash + ); + pending_action = None; + } + + Skip => { + pending_action = expected_action; + } + } + } } } +/// Block update test cases for [`ChainTipSender`] #[derive(Arbitrary, Clone, Debug)] enum BlockUpdate { Finalized(Option>>), NonFinalized(Option>>), } + +impl BlockUpdate { + /// Returns the inner block, regardless of variant. + pub fn block(&self) -> Option>> { + match self { + BlockUpdate::Finalized(block) => block.clone(), + BlockUpdate::NonFinalized(block) => block.clone(), + } + } + + /// Returns a mutable reference to the inner block, regardless of variant. + pub fn block_mut(&mut self) -> &mut Option>> { + match self { + BlockUpdate::Finalized(block) => block, + BlockUpdate::NonFinalized(block) => block, + } + } + + /// Is it finalized? + pub fn is_finalized(&self) -> bool { + matches!(self, BlockUpdate::Finalized(_)) + } +} + +/// Block update test case variants for [`ChainTipChange`] +#[derive(Arbitrary, Copy, Clone, Debug, Eq, PartialEq)] +enum BlockConnection { + Reset, + Grow, +} + +impl BlockConnection { + /// Is this a grow? + pub fn is_grow(&self) -> bool { + *self == BlockConnection::Grow + } +} + +/// Block update checks for [`ChainTipChange`] +#[derive(Arbitrary, Copy, Clone, Debug, Eq, PartialEq)] +enum TipChangeCheck { + /// Check that `wait_for_tip_change` returns the correct result + WaitFor, + + /// Check that `last_tip_change` returns the correct result + Last, + + /// Don't check this case (causes a `TipAction::Reset` in the next check) + Skip, +} diff --git a/zebra-state/src/service/chain_tip/tests/vectors.rs b/zebra-state/src/service/chain_tip/tests/vectors.rs index 9b80dfd6c84..29bb8cc2d05 100644 --- a/zebra-state/src/service/chain_tip/tests/vectors.rs +++ b/zebra-state/src/service/chain_tip/tests/vectors.rs @@ -39,6 +39,13 @@ fn chain_tip_change_is_initially_not_ready() { let (_chain_tip_sender, _latest_chain_tip, mut chain_tip_change) = ChainTipSender::new(None, Mainnet); + // TODO: use `tokio::task::unconstrained` to avoid spurious waits from tokio's cooperative multitasking + // (needs a recent tokio version) + // See: + // https://github.com/ZcashFoundation/zebra/pull/2777#discussion_r712488817 + // https://docs.rs/tokio/1.11.0/tokio/task/index.html#cooperative-scheduling + // https://tokio.rs/blog/2020-04-preemption + let first = chain_tip_change .wait_for_tip_change() .now_or_never() @@ -47,6 +54,8 @@ fn chain_tip_change_is_initially_not_ready() { assert_eq!(first, None); + assert_eq!(chain_tip_change.last_tip_change(), None); + // try again, just to be sure let first = chain_tip_change .wait_for_tip_change() @@ -56,6 +65,8 @@ fn chain_tip_change_is_initially_not_ready() { assert_eq!(first, None); + assert_eq!(chain_tip_change.last_tip_change(), None); + // also test our manual `Clone` impl #[allow(clippy::redundant_clone)] let first_clone = chain_tip_change @@ -66,4 +77,6 @@ fn chain_tip_change_is_initially_not_ready() { .expect("watch sender is not dropped"); assert_eq!(first_clone, None); + + assert_eq!(chain_tip_change.last_tip_change(), None); }