Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop panicking when using sync and async methods on the same ChainTipChange #2800

Merged
merged 12 commits into from
Sep 28, 2021
122 changes: 111 additions & 11 deletions zebra-state/src/service/chain_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use std::sync::Arc;

use tokio::sync::watch;
use tracing::instrument;

use zebra_chain::{
block,
Expand Down Expand Up @@ -97,7 +98,7 @@ pub struct ChainTipSender {
///
/// Once this flag is set, we ignore the finalized state.
/// `None` tips don't set this flag.
non_finalized_tip: bool,
use_non_finalized_tip: bool,

/// The sender channel for chain tip data.
sender: watch::Sender<ChainTipData>,
Expand All @@ -110,14 +111,18 @@ pub struct ChainTipSender {
impl ChainTipSender {
/// Create new linked instances of [`ChainTipSender`], [`LatestChainTip`], and [`ChainTipChange`],
/// using an `initial_tip` and a [`Network`].
#[instrument(skip(initial_tip), fields(new_height, new_hash))]
pub fn new(
initial_tip: impl Into<Option<ChainTipBlock>>,
network: Network,
) -> (Self, LatestChainTip, ChainTipChange) {
let initial_tip = initial_tip.into();
ChainTipSender::record_new_tip(&initial_tip);

let (sender, receiver) = watch::channel(None);

let mut sender = ChainTipSender {
non_finalized_tip: false,
use_non_finalized_tip: false,
sender,
active_value: None,
};
Expand All @@ -133,22 +138,47 @@ impl ChainTipSender {
/// Update the latest finalized tip.
///
/// May trigger an update to the best tip.
pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>>) {
if !self.non_finalized_tip {
#[instrument(
skip(self, new_tip),
fields(
old_use_non_finalized_tip = ?self.use_non_finalized_tip,
old_height = ?self.active_value.as_ref().map(|block| block.height),
old_hash = ?self.active_value.as_ref().map(|block| block.hash),
new_height,
new_hash,
))]
pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>> + Clone) {
let new_tip = new_tip.into();
ChainTipSender::record_new_tip(&new_tip);

if !self.use_non_finalized_tip {
self.update(new_tip);
}
}

/// Update the latest non-finalized tip.
///
/// May trigger an update to the best tip.
pub fn set_best_non_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>>) {
#[instrument(
skip(self, new_tip),
fields(
old_use_non_finalized_tip = ?self.use_non_finalized_tip,
old_height = ?self.active_value.as_ref().map(|block| block.height),
old_hash = ?self.active_value.as_ref().map(|block| block.hash),
new_height,
new_hash,
))]
pub fn set_best_non_finalized_tip(
&mut self,
new_tip: impl Into<Option<ChainTipBlock>> + Clone,
) {
let new_tip = new_tip.into();
ChainTipSender::record_new_tip(&new_tip);

// once the non-finalized state becomes active, it is always populated
// but ignoring `None`s makes the tests easier
if new_tip.is_some() {
self.non_finalized_tip = true;
self.use_non_finalized_tip = true;
self.update(new_tip)
}
}
Expand All @@ -157,9 +187,7 @@ impl ChainTipSender {
///
/// An update is only sent if the current best tip is different from the last best tip
/// that was sent.
fn update(&mut self, new_tip: impl Into<Option<ChainTipBlock>>) {
let new_tip = new_tip.into();

fn update(&mut self, new_tip: Option<ChainTipBlock>) {
let needs_update = match (new_tip.as_ref(), self.active_value.as_ref()) {
// since the blocks have been contextually validated,
// we know their hashes cover all the block data
Expand All @@ -173,6 +201,19 @@ impl ChainTipSender {
self.active_value = new_tip;
}
}

/// Record `new_tip` in the current span.
///
/// Callers should create a new span with empty `new_height` and `new_hash` fields.
fn record_new_tip(new_tip: &Option<ChainTipBlock>) {
let span = tracing::Span::current();

let new_height = new_tip.as_ref().map(|block| block.height);
let new_hash = new_tip.as_ref().map(|block| block.hash);

span.record("new_height", &tracing::field::debug(new_height));
span.record("new_hash", &tracing::field::debug(new_hash));
}
}

/// Efficient access to the state's current best chain tip.
Expand Down Expand Up @@ -205,11 +246,23 @@ impl LatestChainTip {

impl ChainTip for LatestChainTip {
/// Return the height of the best chain tip.
#[instrument(
skip(self),
fields(
height = ?self.receiver.borrow().as_ref().map(|block| block.height),
hash = ?self.receiver.borrow().as_ref().map(|block| block.hash),
))]
fn best_tip_height(&self) -> Option<block::Height> {
self.receiver.borrow().as_ref().map(|block| block.height)
}

/// Return the block hash of the best chain tip.
#[instrument(
skip(self),
fields(
height = ?self.receiver.borrow().as_ref().map(|block| block.height),
hash = ?self.receiver.borrow().as_ref().map(|block| block.hash),
))]
fn best_tip_hash(&self) -> Option<block::Hash> {
self.receiver.borrow().as_ref().map(|block| block.hash)
}
Expand All @@ -218,6 +271,13 @@ impl ChainTip for LatestChainTip {
///
/// All transactions with these mined IDs should be rejected from the mempool,
/// even if their authorizing data is different.
#[instrument(
skip(self),
fields(
height = ?self.receiver.borrow().as_ref().map(|block| block.height),
hash = ?self.receiver.borrow().as_ref().map(|block| block.hash),
transaction_count = ?self.receiver.borrow().as_ref().map(|block| block.transaction_hashes.len()),
))]
fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
self.receiver
.borrow()
Expand Down Expand Up @@ -310,6 +370,14 @@ impl ChainTipChange {
///
/// If a lot of blocks are committed at the same time,
/// the change will skip some blocks, and return a [`Reset`].
#[instrument(
skip(self),
fields(
current_height = ?self.receiver.borrow().as_ref().map(|block| block.height),
current_hash = ?self.receiver.borrow().as_ref().map(|block| block.hash),
last_change_hash = ?self.last_change_hash,
network = ?self.network,
))]
pub async fn wait_for_tip_change(&mut self) -> Result<TipAction, watch::error::RecvError> {
let block = self.tip_block_change().await?;

Expand All @@ -325,6 +393,14 @@ impl ChainTipChange {
/// - `None` if there has been no change.
///
/// See [`wait_for_tip_change`] for details.
#[instrument(
skip(self),
fields(
current_height = ?self.receiver.borrow().as_ref().map(|block| block.height),
current_hash = ?self.receiver.borrow().as_ref().map(|block| block.hash),
last_change_hash = ?self.last_change_hash,
network = ?self.network,
))]
pub fn last_tip_change(&mut self) -> Option<TipAction> {
// Obtain the tip block.
let block = self.best_tip_block()?;
Expand All @@ -346,7 +422,7 @@ impl ChainTipChange {
// check for an edge case that's dealt with by other code
assert!(
Some(block.hash) != self.last_change_hash,
"ChainTipSender ignores unchanged tips"
"ChainTipSender and ChainTipChange ignore unchanged tips"
);

// If the previous block hash doesn't match, reset.
Expand Down Expand Up @@ -410,7 +486,17 @@ impl ChainTipChange {
// Wait until there is actually Some block,
// so we don't have `Option`s inside `TipAction`s.
if let Some(block) = self.best_tip_block() {
return Ok(block);
// Wait until we have a new block
//
// last_tip_change() updates last_change_hash, but it doesn't call receiver.changed().
// So code that uses both sync and async methods can have spurious pending changes.
//
// TODO: use `receiver.borrow_and_update()` in `best_tip_block()`,
// once we upgrade to tokio 1.0 (#2200)
// and remove this extra check
if Some(block.hash) != self.last_change_hash {
return Ok(block);
}
}
}
}
Expand Down Expand Up @@ -462,4 +548,18 @@ impl TipAction {
hash: block.hash,
}
}

/// Converts this [`TipAction`] into a [`Reset`].
///
/// Designed for use in tests.
#[cfg(test)]
pub(crate) fn into_reset(self) -> Self {
match self {
Grow { block } => Reset {
height: block.height,
hash: block.hash,
},
reset @ Reset { .. } => reset,
}
}
}
Loading