Skip to content

Commit

Permalink
Make the state service use broadcast channels (#1137)
Browse files Browse the repository at this point in the history
And refactor error handling
  • Loading branch information
yaahc authored Oct 9, 2020
1 parent 76e7e3d commit b3634fa
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 24 deletions.
41 changes: 41 additions & 0 deletions zebra-state/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::sync::Arc;
use thiserror::Error;

/// A wrapper for type erased errors that is itself clonable and implements the
/// Error trait
#[derive(Debug, Error, Clone)]
#[error(transparent)]
pub struct CloneError {
source: Arc<dyn std::error::Error + Send + Sync + 'static>,
}

impl From<CommitBlockError> for CloneError {
fn from(source: CommitBlockError) -> Self {
let source = Arc::new(source);
Self { source }
}
}

impl From<BoxError> for CloneError {
fn from(source: BoxError) -> Self {
let source = Arc::from(source);
Self { source }
}
}

/// A boxed [`std::error::Error`].
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// An error describing the reason a block could not be committed to the state.
#[derive(Debug, Error)]
#[error("block is not contextually valid")]
pub struct CommitBlockError(#[from] ValidateContextError);

/// An error describing why a block failed contextual validation.
#[derive(displaydoc::Display, Debug, Error)]
#[non_exhaustive]
pub enum ValidateContextError {
/// block.height is lower than the current finalized height
#[non_exhaustive]
OrphanedBlock,
}
5 changes: 2 additions & 3 deletions zebra-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

mod config;
mod constants;
mod error;
mod request;
mod response;
mod service;
Expand All @@ -22,9 +23,7 @@ use service::QueuedBlock;
use sled_state::FinalizedState;

pub use config::Config;
pub use error::{BoxError, CloneError, CommitBlockError, ValidateContextError};
pub use request::{HashOrHeight, Request};
pub use response::Response;
pub use service::init;

/// A boxed [`std::error::Error`].
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
36 changes: 16 additions & 20 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ use std::{

use futures::future::{FutureExt, TryFutureExt};
use memory_state::{NonFinalizedState, QueuedBlocks};
use thiserror::Error;
use tokio::sync::oneshot;
use tokio::sync::broadcast;
use tower::{buffer::Buffer, util::BoxService, Service};
use tracing::instrument;
use zebra_chain::{
block::{self, Block},
parameters::Network,
};

use crate::{BoxError, Config, FinalizedState, Request, Response};
use crate::{
BoxError, CloneError, CommitBlockError, Config, FinalizedState, Request, Response,
ValidateContextError,
};

mod memory_state;

Expand All @@ -27,7 +29,7 @@ pub struct QueuedBlock {
// TODO: add these parameters when we can compute anchors.
// sprout_anchor: sprout::tree::Root,
// sapling_anchor: sapling::tree::Root,
pub rsp_tx: oneshot::Sender<Result<block::Hash, BoxError>>,
pub rsp_tx: broadcast::Sender<Result<block::Hash, CloneError>>,
}

struct StateService {
Expand All @@ -39,16 +41,6 @@ struct StateService {
queued_blocks: QueuedBlocks,
}

#[derive(Debug, Error)]
#[error("block is not contextually valid")]
struct CommitError(#[from] ValidateContextError);

#[derive(displaydoc::Display, Debug, Error)]
enum ValidateContextError {
/// block.height is lower than the current finalized height
OrphanedBlock,
}

impl StateService {
pub fn new(config: Config, network: Network) -> Self {
let sled = FinalizedState::new(&config, network);
Expand Down Expand Up @@ -96,7 +88,7 @@ impl StateService {

/// Run contextual validation on `block` and add it to the non-finalized
/// state if it is contextually valid.
fn validate_and_commit(&mut self, block: Arc<Block>) -> Result<(), CommitError> {
fn validate_and_commit(&mut self, block: Arc<Block>) -> Result<(), CommitBlockError> {
self.check_contextual_validity(&block)?;
let parent_hash = block.header.previous_block_hash;

Expand Down Expand Up @@ -128,7 +120,7 @@ impl StateService {
let result = self
.validate_and_commit(block)
.map(|()| hash)
.map_err(Into::into);
.map_err(CloneError::from);
let _ = rsp_tx.send(result);
new_parents.push(hash);
}
Expand Down Expand Up @@ -168,29 +160,33 @@ impl Service<Request> for StateService {
fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::CommitBlock { block } => {
let (rsp_tx, rsp_rx) = oneshot::channel();
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);

self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx });

async move {
rsp_rx
.recv()
.await
.expect("sender oneshot is not dropped")
.expect("sender is not dropped")
.map(Response::Committed)
.map_err(Into::into)
}
.boxed()
}
Request::CommitFinalizedBlock { block } => {
let (rsp_tx, rsp_rx) = oneshot::channel();
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);

self.sled
.queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx });

async move {
rsp_rx
.recv()
.await
.expect("sender oneshot is not dropped")
.expect("sender is not dropped")
.map(Response::Committed)
.map_err(Into::into)
}
.boxed()
}
Expand Down
2 changes: 1 addition & 1 deletion zebra-state/src/sled_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl FinalizedState {
fn commit_finalized(&mut self, queued_block: QueuedBlock) {
let QueuedBlock { block, rsp_tx } = queued_block;
let result = self.commit_finalized_direct(block);
let _ = rsp_tx.send(result);
let _ = rsp_tx.send(result.map_err(Into::into));
}

// TODO: this impl works only during checkpointing, it needs to be rewritten
Expand Down

0 comments on commit b3634fa

Please sign in to comment.