Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partly revert "Fix poll_ready usage in ChainVerifier" #1735

Merged
merged 4 commits into from
Feb 20, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 30 additions & 54 deletions zebra-consensus/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,29 @@ use zebra_state as zs;

use crate::{
block::BlockVerifier,
checkpoint::{CheckpointList, CheckpointVerifier},
block::VerifyBlockError,
checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError},
BoxError, Config,
};

teor2345 marked this conversation as resolved.
Show resolved Hide resolved
/// The bound for each verifier's buffer.
///
/// We choose the verifier buffer bound based on the maximum number of
/// concurrent verifier users, to avoid contention:
/// - the `ChainSync` component
/// - the `Inbound` service
/// - a miner component, which we might add in future, and
/// - 1 extra slot to avoid contention.
///
/// We deliberately add extra slots, because they only cost a small amount of
/// memory, but missing slots can significantly slow down Zebra.
const VERIFIER_BUFFER_BOUND: usize = 4;

/// The chain verifier routes requests to either the checkpoint verifier or the
/// block verifier, depending on the maximum checkpoint height.
struct ChainVerifier<S>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
// Normally, we erase the types on buffer-wrapped services.
// But if we did that here, the block and checkpoint services would be
// type-indistinguishable, risking future substitution errors.
block_verifier: Buffer<BlockVerifier<S>, Arc<block::Block>>,
checkpoint_verifier: Buffer<CheckpointVerifier<S>, Arc<block::Block>>,
block: BlockVerifier<S>,
checkpoint: CheckpointVerifier<S>,
max_checkpoint_height: block::Height,
}

#[derive(Debug, Display, Error)]
pub enum VerifyChainError {
/// block could not be checkpointed
Checkpoint(#[source] BoxError),
Checkpoint(#[source] VerifyCheckpointError),
/// block could not be verified
Block(#[source] BoxError),
Block(#[source] VerifyBlockError),
}

impl<S> Service<Arc<Block>> for ChainVerifier<S>
Expand All @@ -72,34 +57,30 @@ where
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Correctness:
//
// We can't call `poll_ready` on the block and checkpoint verifiers here,
// because each `poll_ready` must be followed by a `call`, and we don't
// know which verifier we're going to choose yet.
// See #1593 for details.
Poll::Ready(Ok(()))
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match (self.checkpoint.poll_ready(cx), self.block.poll_ready(cx)) {
// First, fail if either service fails.
(Poll::Ready(Err(e)), _) => Poll::Ready(Err(VerifyChainError::Checkpoint(e))),
(_, Poll::Ready(Err(e))) => Poll::Ready(Err(VerifyChainError::Block(e))),
// Second, we're unready if either service is unready.
(Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending,
// Finally, we're ready if both services are ready and OK.
(Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
}
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}

fn call(&mut self, block: Arc<Block>) -> Self::Future {
match block.coinbase_height() {
// Correctness:
//
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has
// a matching `call`. See #1593 for details.
Some(height) if height <= self.max_checkpoint_height => self
.checkpoint_verifier
.clone()
.oneshot(block)
.checkpoint
.call(block)
.map_err(VerifyChainError::Checkpoint)
.boxed(),
// This also covers blocks with no height, which the block verifier
// will reject immediately.
_ => self
.block_verifier
.clone()
.oneshot(block)
.block
.call(block)
.map_err(VerifyChainError::Block)
.boxed(),
}
Expand All @@ -122,7 +103,7 @@ where
pub async fn init<S>(
config: Config,
network: Network,
state_service: S,
mut state_service: S,
) -> Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
Expand All @@ -137,13 +118,11 @@ where
.expect("hardcoded checkpoint list extends past sapling activation")
};

// Correctness:
//
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has a
// matching `call`. See #1593 for details.
let tip = match state_service
.clone()
.oneshot(zs::Request::Tip)
.ready_and()
.await
.unwrap()
.call(zs::Request::Tip)
.await
.unwrap()
{
Expand All @@ -152,18 +131,15 @@ where
};
tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier");

let block_verifier = BlockVerifier::new(network, state_service.clone());
let checkpoint_verifier = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);

let block_verifier = Buffer::new(block_verifier, VERIFIER_BUFFER_BOUND);
let checkpoint_verifier = Buffer::new(checkpoint_verifier, VERIFIER_BUFFER_BOUND);
let block = BlockVerifier::new(network, state_service.clone());
let checkpoint = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);

Buffer::new(
BoxService::new(ChainVerifier {
block_verifier,
checkpoint_verifier,
block,
checkpoint,
max_checkpoint_height,
}),
VERIFIER_BUFFER_BOUND,
3,
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
)
}