Skip to content

Commit

Permalink
Merge of #3870
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Mar 16, 2022
2 parents 413f7fb + 3240332 commit 2f1923f
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 71 deletions.
55 changes: 34 additions & 21 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ use crate::{
finalized_state::{FinalizedState, ZebraDb},
non_finalized_state::{Chain, NonFinalizedState, QueuedBlocks},
pending_utxos::PendingUtxos,
watch_receiver::WatchReceiver,
},
BoxError, CloneError, CommitBlockError, Config, FinalizedBlock, PreparedBlock, Request,
Response, ValidateContextError,
};

pub mod block_iter;
pub mod chain_tip;
pub mod watch_receiver;

pub(crate) mod check;

Expand Down Expand Up @@ -144,7 +146,7 @@ pub struct ReadStateService {
///
/// This chain is only updated between requests,
/// so it might include some block data that is also on `disk`.
best_chain_receiver: watch::Receiver<Option<Arc<Chain>>>,
best_chain_receiver: WatchReceiver<Option<Arc<Chain>>>,

/// The configured Zcash network.
network: Network,
Expand Down Expand Up @@ -290,36 +292,48 @@ impl StateService {
);
self.queued_blocks.prune_by_height(finalized_tip_height);

let best_chain = self.mem.best_chain();
let tip_block = best_chain
.and_then(|chain| chain.tip_block())
.cloned()
.map(ChainTipBlock::from);
let tip_block_height = self.update_latest_chain_channels();

// update metrics using the best non-finalized tip
if let Some(tip_block) = tip_block.as_ref() {
if let Some(tip_block_height) = tip_block_height {
metrics::gauge!(
"state.full_verifier.committed.block.height",
tip_block.height.0 as _
tip_block_height.0 as _
);

// This height gauge is updated for both fully verified and checkpoint blocks.
// These updates can't conflict, because the state makes sure that blocks
// are committed in order.
metrics::gauge!("zcash.chain.verified.block.height", tip_block.height.0 as _);
metrics::gauge!("zcash.chain.verified.block.height", tip_block_height.0 as _);
}

// update the chain watch channels
tracing::trace!("finished processing queued block");
rsp_rx
}

/// Update the [`LatestChainTip`], [`ChainTipChange`], and [`LatestChain`] channels
/// with the latest non-finalized [`ChainTipBlock`] and [`Chain`].
///
/// Returns the latest non-finalized chain tip height,
/// or `None` if the non-finalized state is empty.
#[instrument(level = "debug", skip(self))]
fn update_latest_chain_channels(&mut self) -> Option<block::Height> {
let best_chain = self.mem.best_chain();
let tip_block = best_chain
.and_then(|chain| chain.tip_block())
.cloned()
.map(ChainTipBlock::from);
let tip_block_height = tip_block.as_ref().map(|block| block.height);

// The RPC service uses the ReadStateService, but it is not turned on by default.
if self.best_chain_sender.receiver_count() > 0 {
// If the final receiver was just dropped, ignore the error.
let _ = self.best_chain_sender.send(best_chain.cloned());
}

self.chain_tip_sender.set_best_non_finalized_tip(tip_block);

tracing::trace!("finished processing queued block");
rsp_rx
tip_block_height
}

/// Run contextual validation on the prepared block and add it to the
Expand Down Expand Up @@ -444,8 +458,8 @@ impl StateService {
Some(tip.0 - height.0)
}

/// Return the block identified by either its `height` or `hash`,
/// if it exists in the current best chain.
/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or
/// [`Height`](zebra_chain::block::Height), if it exists in the current best chain.
pub fn best_block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
read::block(self.mem.best_chain(), self.disk.db(), hash_or_height)
}
Expand Down Expand Up @@ -675,7 +689,7 @@ impl ReadStateService {

let read_only_service = Self {
db: disk.db().clone(),
best_chain_receiver,
best_chain_receiver: WatchReceiver::new(best_chain_receiver),
network: disk.network(),
};

Expand Down Expand Up @@ -849,12 +863,11 @@ impl Service<Request> for ReadStateService {
let state = self.clone();

async move {
Ok(read::block(
state.best_chain_receiver.borrow().clone().as_ref(),
&state.db,
hash_or_height,
))
.map(Response::Block)
let block = state.best_chain_receiver.with_watch_data(|best_chain| {
read::block(best_chain, &state.db, hash_or_height)
});

Ok(Response::Block(block))
}
.boxed()
}
Expand Down
94 changes: 52 additions & 42 deletions zebra-state/src/service/chain_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use zebra_chain::{
transaction,
};

use crate::{request::ContextuallyValidBlock, FinalizedBlock};
use crate::{
request::ContextuallyValidBlock, service::watch_receiver::WatchReceiver, FinalizedBlock,
};

use TipAction::*;

Expand Down Expand Up @@ -154,6 +156,9 @@ impl ChainTipSender {
/// Update the latest finalized tip.
///
/// May trigger an update to the best tip.
//
// TODO: when we replace active_value with `watch::Sender::borrow`,
// refactor instrument to avoid multiple borrows, to prevent deadlocks
#[instrument(
skip(self, new_tip),
fields(
Expand All @@ -175,6 +180,9 @@ impl ChainTipSender {
/// Update the latest non-finalized tip.
///
/// May trigger an update to the best tip.
//
// TODO: when we replace active_value with `watch::Sender::borrow`,
// refactor instrument to avoid multiple borrows, to prevent deadlocks
#[instrument(
skip(self, new_tip),
fields(
Expand Down Expand Up @@ -250,65 +258,67 @@ impl ChainTipSender {
#[derive(Clone, Debug)]
pub struct LatestChainTip {
/// The receiver for the current chain tip's data.
receiver: watch::Receiver<ChainTipData>,
receiver: WatchReceiver<ChainTipData>,
}

impl LatestChainTip {
/// Create a new [`LatestChainTip`] from a watch channel receiver.
fn new(receiver: watch::Receiver<ChainTipData>) -> Self {
Self { receiver }
Self {
receiver: WatchReceiver::new(receiver),
}
}

/// Retrieve a result `R` from the current [`ChainTipBlock`], if it's available.
/// Maps the current data `ChainTipData` to `Option<U>`
/// by applying a function to the watched value,
/// while holding the receiver lock as briefly as possible.
///
/// This helper method is a shorter way to borrow the value from the [`watch::Receiver`] and
/// extract some information from it, while also adding the current chain tip block's fields as
/// records to the current span.
///
/// A single read lock is kept during the execution of the method, and it is dropped at the end
/// of it.
/// A single read lock is acquired to clone `T`, and then released after the clone.
/// See the performance note on [`WatchReceiver::with_watch_data`].
///
/// # Correctness
///
/// To prevent deadlocks:
///
/// - `receiver.borrow()` should not be called before this method while in the same scope.
/// - `receiver.borrow()` should not be called inside the `action` closure.
///
/// It is important to avoid calling `borrow` more than once in the same scope, which
/// effectively tries to acquire two read locks to the shared data in the watch channel. If
/// that is done, there's a chance that the [`watch::Sender`] tries to send a value, which
/// starts acquiring a write-lock, and prevents further read-locks from being acquired until
/// the update is finished.
///
/// What can happen in that scenario is:
///
/// 1. The receiver manages to acquire a read-lock for the first `borrow`
/// 2. The sender starts acquiring the write-lock
/// 3. The receiver fails to acquire a read-lock for the second `borrow`
///
/// Now both the sender and the receivers hang, because the sender won't release the lock until
/// it can update the value, and the receiver won't release its first read-lock until it
/// acquires the second read-lock and finishes what it's doing.
fn with_chain_tip_block<R>(&self, action: impl FnOnce(&ChainTipBlock) -> R) -> Option<R> {
/// To avoid deadlocks, see the correctness note on [`WatchReceiver::with_watch_data`].
fn with_chain_tip_block<U, F>(&self, f: F) -> Option<U>
where
F: FnOnce(&ChainTipBlock) -> U,
{
let span = tracing::Span::current();
let borrow_guard = self.receiver.borrow();
let chain_tip_block = borrow_guard.as_ref();

span.record(
"height",
&tracing::field::debug(chain_tip_block.map(|block| block.height)),
);
span.record(
"hash",
&tracing::field::debug(chain_tip_block.map(|block| block.hash)),
);
span.record(
"transaction_count",
&tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
);
let register_span_fields = |chain_tip_block: Option<&ChainTipBlock>| {
span.record(
"height",
&tracing::field::debug(chain_tip_block.map(|block| block.height)),
);
span.record(
"hash",
&tracing::field::debug(chain_tip_block.map(|block| block.hash)),
);
span.record(
"time",
&tracing::field::debug(chain_tip_block.map(|block| block.time)),
);
span.record(
"previous_hash",
&tracing::field::debug(chain_tip_block.map(|block| block.previous_block_hash)),
);
span.record(
"transaction_count",
&tracing::field::debug(chain_tip_block.map(|block| block.transaction_hashes.len())),
);
};

self.receiver.with_watch_data(|chain_tip_block| {
// TODO: replace with Option::inspect when it stabilises
// https://github.com/rust-lang/rust/issues/91345
register_span_fields(chain_tip_block.as_ref());

chain_tip_block.map(action)
chain_tip_block.as_ref().map(f)
})
}
}

Expand Down
3 changes: 2 additions & 1 deletion zebra-state/src/service/finalized_state/zebra_db/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ impl ZebraDb {
self.db.zs_get(height_by_hash, &hash)
}

/// Returns the given block if it exists.
/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or
/// [`Height`](zebra_chain::block::Height), if it exists in the finalized chain.
pub fn block(&self, hash_or_height: HashOrHeight) -> Option<Arc<Block>> {
let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();
let block_by_height = self.db.cf_handle("block_by_height").unwrap();
Expand Down
3 changes: 2 additions & 1 deletion zebra-state/src/service/non_finalized_state/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ impl Chain {
Ok(Some(forked))
}

/// Returns the [`ContextuallyValidBlock`] at a given height or hash in this chain.
/// Returns the [`ContextuallyValidBlock`] with [`Hash`](zebra_chain::block::Hash) or
/// [`Height`](zebra_chain::block::Height), if it exists in this chain.
pub fn block(&self, hash_or_height: HashOrHeight) -> Option<&ContextuallyValidBlock> {
let height =
hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?;
Expand Down
25 changes: 19 additions & 6 deletions zebra-state/src/service/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,28 @@ use crate::{
HashOrHeight,
};

/// Return the block identified by either its `height` or `hash` if it exists
/// in the non-finalized `chain` or finalized `db`.
pub(crate) fn block(
chain: Option<&Arc<Chain>>,
/// Returns the [`Block`] with [`Hash`](zebra_chain::block::Hash) or
/// [`Height`](zebra_chain::block::Height),
/// if it exists in the non-finalized `chain` or finalized `db`.
pub(crate) fn block<C>(
chain: Option<C>,
db: &ZebraDb,
hash_or_height: HashOrHeight,
) -> Option<Arc<Block>> {
) -> Option<Arc<Block>>
where
C: AsRef<Chain>,
{
// # Correctness
//
// The StateService commits blocks to the finalized state before updating the latest chain,
// and it can commit additional blocks after we've cloned this `chain` variable.
//
// Since blocks are the same in the finalized and non-finalized state,
// we check the most efficient alternative first.
// (`chain` is always in memory, but `db` stores blocks on disk, with a memory cache.)
chain
.and_then(|chain| chain.block(hash_or_height))
.as_ref()
.and_then(|chain| chain.as_ref().block(hash_or_height))
.map(|contextual| contextual.block.clone())
.or_else(|| db.block(hash_or_height))
}
Loading

0 comments on commit 2f1923f

Please sign in to comment.