Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle duplicate blocks in zebra-state #1198

Merged
merged 11 commits into from
Oct 26, 2020
47 changes: 25 additions & 22 deletions book/src/dev/rfcs/0005-state-updates.md
Original file line number Diff line number Diff line change
Expand Up @@ -526,32 +526,35 @@ The state service uses the following entry points:

New `non-finalized` blocks are commited as follows:

### `pub(super) fn queue_and_commit_non_finalized_blocks(&mut self, new: Arc<Block>) -> tokio::sync::broadcast::Receiver<block::Hash>`
### `pub(super) fn queue_and_commit_non_finalized_blocks(&mut self, new: Arc<Block>) -> tokio::sync::oneshot::Receiver<block::Hash>`

1. If a duplicate block exists in the queue:
yaahc marked this conversation as resolved.
Show resolved Hide resolved
- Find the `QueuedBlock` for that existing duplicate block
- Create an extra receiver for the existing block, using `block.rsp_tx.subscribe`,
- Drop the newly received duplicate block
- Return the extra receiver, so it can be used in the response future for the duplicate block request

2. Create a `QueuedBlock` for `block`:
- Create a `tokio::sync::broadcast` channel
- Use that channel to create a `QueuedBlock` for `block`.

3. If a duplicate block exists in a non-finalized chain, or the finalized chain,
1. If a duplicate block exists in a non-finalized chain, or the finalized chain,
yaahc marked this conversation as resolved.
Show resolved Hide resolved
it has already been successfully verified:
- Broadcast `Ok(block.hash())` via `block.rsp_tx`, and return the receiver for the block's channel
- create a new oneshot channel
- immediately send `Err(DuplicateBlock)` drop the sender
yaahc marked this conversation as resolved.
Show resolved Hide resolved
- return the reciever

4. Add `block` to `self.queued_blocks`

5. If `block.header.previous_block_hash` is not present in the finalized or
2. If a duplicate block exists in the queue:
yaahc marked this conversation as resolved.
Show resolved Hide resolved
- Find the `QueuedBlock` for that existing duplicate block
- create a new channel for the new request
- replace the old sender in `queued_block` with the new sender
- send `Err(DuplicateBlock)` through the old sender channel
yaahc marked this conversation as resolved.
Show resolved Hide resolved
- continue to use the new receiver

3. Else create a `QueuedBlock` for `block`:
- Create a `tokio::sync::oneshot` channel
- Use that channel to create a `QueuedBlock` for `block`
- Add `block` to `self.queued_blocks`
- continue to use the new receiver

4. If `block.header.previous_block_hash` is not present in the finalized or
non-finalized state:
- Return the receiver for the block's channel

6. Else iteratively attempt to process queued blocks by their parent hash
5. Else iteratively attempt to process queued blocks by their parent hash
starting with `block.header.previous_block_hash`

7. While there are recently commited parent hashes to process
6. While there are recently commited parent hashes to process
- Dequeue all blocks waiting on `parent` with `let queued_children =
self.queued_blocks.dequeue_children(parent);`
- for each queued `block`
Expand All @@ -569,17 +572,17 @@ New `non-finalized` blocks are commited as follows:
- Add `block.hash` to the set of recently commited parent hashes to
process

8. While the length of the non-finalized portion of the best chain is greater
7. While the length of the non-finalized portion of the best chain is greater
than the reorg limit
- Remove the lowest height block from the non-finalized state with
`self.mem.finalize();`
- Commit that block to the finalized state with
`self.sled.commit_finalized_direct(finalized);`

9. Prune orphaned blocks from `self.queued_blocks` with
8. Prune orphaned blocks from `self.queued_blocks` with
`self.queued_blocks.prune_by_height(finalized_height);`
10. Return the receiver for the block's channel

9. Return the receiver for the block's channel

## Sled data structures
[sled]: #sled
Expand Down
59 changes: 43 additions & 16 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use futures::future::{FutureExt, TryFutureExt};
use memory_state::{NonFinalizedState, QueuedBlocks};
use tokio::sync::broadcast;
use tokio::sync::oneshot;
use tower::{buffer::Buffer, util::BoxService, Service};
use tracing::instrument;
use zebra_chain::{
Expand All @@ -18,8 +18,7 @@ use zebra_chain::{
};

use crate::{
BoxError, CloneError, CommitBlockError, Config, FinalizedState, Request, Response,
ValidateContextError,
BoxError, CommitBlockError, Config, FinalizedState, Request, Response, ValidateContextError,
};

mod memory_state;
Expand All @@ -32,7 +31,7 @@ pub struct QueuedBlock {
// TODO: add these parameters when we can compute anchors.
// sprout_anchor: sprout::tree::Root,
// sapling_anchor: sapling::tree::Root,
pub rsp_tx: broadcast::Sender<Result<block::Hash, CloneError>>,
pub rsp_tx: oneshot::Sender<Result<block::Hash, BoxError>>,
}

struct StateService {
Expand Down Expand Up @@ -73,14 +72,33 @@ impl StateService {
/// in RFC0005.
///
/// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
#[instrument(skip(self, new))]
fn queue_and_commit_non_finalized_blocks(&mut self, new: QueuedBlock) {
let parent_hash = new.block.header.previous_block_hash;
#[instrument(skip(self, block))]
fn queue_and_commit_non_finalized_blocks(
&mut self,
block: Arc<Block>,
) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
let hash = block.hash();
let parent_hash = block.header.previous_block_hash;

self.queued_blocks.queue(new);
if self.contains_committed_block(&block) {
yaahc marked this conversation as resolved.
Show resolved Hide resolved
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Err("duplicate block".into()));
return rsp_rx;
}

yaahc marked this conversation as resolved.
Show resolved Hide resolved
let rsp_rx = if let Some(queued_block) = self.queued_blocks.get_mut(&hash) {
let (mut rsp_tx, rsp_rx) = oneshot::channel();
std::mem::swap(&mut queued_block.rsp_tx, &mut rsp_tx);
let _ = rsp_tx.send(Err("duplicate block".into()));
rsp_rx
} else {
let (rsp_tx, rsp_rx) = oneshot::channel();
self.queued_blocks.queue(QueuedBlock { block, rsp_tx });
rsp_rx
};

if !self.can_fork_chain_at(&parent_hash) {
return;
return rsp_rx;
}

self.process_queued(parent_hash);
Expand All @@ -96,6 +114,8 @@ impl StateService {
.prune_by_height(self.sled.finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state",
));

rsp_rx
}

/// Run contextual validation on `block` and add it to the non-finalized
Expand All @@ -118,6 +138,17 @@ impl StateService {
self.mem.any_chain_contains(hash) || &self.sled.finalized_tip_hash() == hash
}

/// Returns true if the given hash has been committed to either the finalized
/// or non-finalized state.
fn contains_committed_block(&self, block: &Block) -> bool {
let hash = block.hash();
let height = block
.coinbase_height()
.expect("coinbase heights should be valid");

self.mem.any_chain_contains(&hash) || self.sled.get_hash(height) == Some(hash)
}

/// Attempt to validate and commit all queued blocks whose parents have
/// recently arrived starting from `new_parent`, in breadth-first ordering.
#[instrument(skip(self))]
Expand All @@ -132,7 +163,7 @@ impl StateService {
let result = self
.validate_and_commit(block)
.map(|()| hash)
.map_err(CloneError::from);
.map_err(BoxError::from);
let _ = rsp_tx.send(result);
new_parents.push(hash);
}
Expand Down Expand Up @@ -179,14 +210,11 @@ impl Service<Request> for StateService {
fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::CommitBlock { block } => {
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);

self.pending_utxos.check_block(&block);
self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx });
let rsp_rx = self.queue_and_commit_non_finalized_blocks(block);

async move {
rsp_rx
.recv()
.await
.expect("sender is not dropped")
.map(Response::Committed)
Expand All @@ -195,15 +223,14 @@ impl Service<Request> for StateService {
.boxed()
}
Request::CommitFinalizedBlock { block } => {
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);
let (rsp_tx, rsp_rx) = oneshot::channel();

self.pending_utxos.check_block(&block);
self.sled
.queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx });

async move {
rsp_rx
.recv()
.await
.expect("sender is not dropped")
.map(Response::Committed)
Expand Down
5 changes: 5 additions & 0 deletions zebra-state/src/service/memory_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,11 @@ impl QueuedBlocks {
.remove(&hash);
}
}

/// Return the queued block if it has already been registered
pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedBlock> {
self.blocks.get_mut(&hash)
}
}

#[cfg(test)]
Expand Down
8 changes: 8 additions & 0 deletions zebra-state/src/sled_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ impl FinalizedState {
}
}

/// Returns the finalized hash for a given `block::Height` if it is present.
pub fn get_hash(&self, height: block::Height) -> Option<block::Hash> {
self.hash_by_height
.get(&height.0.to_be_bytes())
.expect("sled errors aren't handled")
yaahc marked this conversation as resolved.
Show resolved Hide resolved
.map(|bytes| block::Hash(bytes.as_ref().try_into().unwrap()))
}

pub fn block(
&self,
hash_or_height: HashOrHeight,
Expand Down