Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Use the forwards iterator more often #2376

Closed
wants to merge 14 commits into from
182 changes: 142 additions & 40 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
use fork_choice::ForkChoice;
use futures::channel::mpsc::Sender;
use itertools::process_results;
use itertools::Itertools;
use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::{Mutex, RwLock};
use slasher::Slasher;
Expand Down Expand Up @@ -85,6 +86,18 @@ pub const OP_POOL_DB_KEY: Hash256 = Hash256::zero();
pub const ETH1_CACHE_DB_KEY: Hash256 = Hash256::zero();
pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::zero();

/// Defines the behaviour when a block/block-root for a skipped slot is requested.
pub enum WhenSlotSkipped {
/// If the slot is a skip slot, return `None`.
///
/// This is how the HTTP API behaves.
None,
/// If the slot it a skip slot, return the previous non-skipped block.
///
/// This is generally how the specification behaves.
Prev,
}

/// The result of a chain segment processing.
pub enum ChainSegmentResult<T: EthSpec> {
/// Processing this chain segment finished successfully.
Expand Down Expand Up @@ -442,18 +455,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|result| result.map_err(|e| e.into())))
}

/// Traverse backwards from `block_root` to find the root of the ancestor block at `slot`.
pub fn get_ancestor_block_root(
&self,
block_root: Hash256,
slot: Slot,
) -> Result<Option<Hash256>, Error> {
process_results(self.rev_iter_block_roots_from(block_root)?, |mut iter| {
iter.find(|(_, ancestor_slot)| *ancestor_slot == slot)
.map(|(ancestor_block_root, _)| ancestor_block_root)
})
}

/// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to
/// the earliest reachable ancestor (may or may not be genesis).
///
Expand Down Expand Up @@ -489,17 +490,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Returns the block at the given slot, if any. Only returns blocks in the canonical chain.
///
/// Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot.
///
/// ## Errors
///
/// May return a database error.
pub fn block_at_slot(
&self,
slot: Slot,
request_slot: Slot,
skips: WhenSlotSkipped,
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
let root = process_results(self.rev_iter_block_roots()?, |mut iter| {
iter.find(|(_, this_slot)| *this_slot == slot)
.map(|(root, _)| root)
})?;
let root = self.block_root_at_slot(request_slot, skips)?;

if let Some(block_root) = root {
Ok(self.store.get_item(&block_root)?)
Expand All @@ -521,21 +522,132 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.
/// Returns `Ok(None)` if the given `Slot` was skipped.
///
/// ## Notes
///
/// - Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot.
/// - Returns `Ok(None)` for any slot higher than the current wall-clock slot.
pub fn block_root_at_slot(
&self,
request_slot: Slot,
skips: WhenSlotSkipped,
) -> Result<Option<Hash256>, Error> {
match skips {
WhenSlotSkipped::None => self.block_root_at_slot_skips_none(request_slot),
WhenSlotSkipped::Prev => self.block_root_at_slot_skips_prev(request_slot),
}
}

/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.
///
/// ## Notes
///
/// - Returns `Ok(None)` if the given `Slot` was skipped.
/// - Returns `Ok(None)` for any slot higher than the current wall-clock slot.
///
/// ## Errors
///
/// May return a database error.
pub fn block_root_at_slot(&self, slot: Slot) -> Result<Option<Hash256>, Error> {
process_results(self.rev_iter_block_roots()?, |mut iter| {
let root_opt = iter
.find(|(_, this_slot)| *this_slot == slot)
.map(|(root, _)| root);
if let (Some(root), Some((prev_root, _))) = (root_opt, iter.next()) {
return (prev_root != root).then(|| root);
fn block_root_at_slot_skips_none(&self, request_slot: Slot) -> Result<Option<Hash256>, Error> {
if request_slot > self.slot()? {
return Ok(None);
} else if request_slot == self.spec.genesis_slot {
return Ok(Some(self.genesis_block_root));
}

let prev_slot = request_slot.saturating_sub(1_u64);

// Try an optimized path of reading the root directly from the head state.
let fast_lookup: Option<Option<Hash256>> = self.with_head(|head| {
let state = &head.beacon_state;

// Try find the root for the `request_slot`.
let request_root_opt = match state.slot.cmp(&request_slot) {
// It's always a skip slot if the head is less than the request slot, return early.
Ordering::Less => return Ok(Some(None)),
// The request slot is the head slot.
Ordering::Equal => Some(head.beacon_block_root),
// Try find the request slot in the state.
Ordering::Greater => state.get_block_root(request_slot).ok().copied(),
};

if let Some(request_root) = request_root_opt {
if let Ok(prev_root) = state.get_block_root(prev_slot) {
return Ok(Some((*prev_root != request_root).then(|| request_root)));
}
}
root_opt
})

// Fast lookup is not possible.
Ok::<_, Error>(None)
})?;
if let Some(root_opt) = fast_lookup {
return Ok(root_opt);
}

if let Some(((prev_root, _), (curr_root, curr_slot))) =
process_results(self.forwards_iter_block_roots(prev_slot)?, |iter| {
iter.tuple_windows().next()
})?
{
// Sanity check.
if curr_slot != request_slot {
return Err(Error::InconsistentForwardsIter {
request_slot,
slot: curr_slot,
});
}
Ok((curr_root != prev_root).then(|| curr_root))
} else {
Ok(None)
}
}

/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.
///
/// ## Notes
///
/// - Returns the root at the previous non-skipped slot if the given `Slot` was skipped.
/// - Returns `Ok(None)` for any slot higher than the current wall-clock slot.
///
/// ## Errors
///
/// May return a database error.
fn block_root_at_slot_skips_prev(&self, request_slot: Slot) -> Result<Option<Hash256>, Error> {
if request_slot > self.slot()? {
return Ok(None);
} else if request_slot == self.spec.genesis_slot {
return Ok(Some(self.genesis_block_root));
}

// Try an optimized path of reading the root directly from the head state.
let fast_lookup: Option<Hash256> = self.with_head(|head| {
if head.beacon_block.slot() <= request_slot {
// Return the head root if all slots between the request and the head are skipped.
Ok(Some(head.beacon_block_root))
} else if let Ok(root) = head.beacon_state.get_block_root(request_slot) {
// Return the root if it's easily accessible from the head state.
Ok(Some(*root))
} else {
// Fast lookup is not possible.
Ok::<_, Error>(None)
}
})?;
if let Some(root) = fast_lookup {
return Ok(Some(root));
}

process_results(self.forwards_iter_block_roots(request_slot)?, |mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
}
} else {
Ok(None)
}
})?
}

/// Returns the block at the given root, if any.
Expand Down Expand Up @@ -825,16 +937,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(map)
}

/// Returns the block canonical root of the current canonical chain at a given slot.
///
/// Returns `None` if the given slot doesn't exist in the chain.
pub fn root_at_slot(&self, target_slot: Slot) -> Result<Option<Hash256>, Error> {
process_results(self.rev_iter_block_roots()?, |mut iter| {
iter.find(|(_, slot)| *slot == target_slot)
.map(|(root, _)| root)
})
}

/// Returns the block canonical root of the current canonical chain at a given slot, starting from the given state.
///
/// Returns `None` if the given slot doesn't exist in the chain.
Expand Down Expand Up @@ -2324,10 +2426,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_head_subscribers() {
if let Ok(Some(current_duty_dependent_root)) =
self.root_at_slot(target_epoch_start_slot - 1)
self.block_root_at_slot(target_epoch_start_slot - 1, WhenSlotSkipped::Prev)
{
if let Ok(Some(previous_duty_dependent_root)) =
self.root_at_slot(prev_target_epoch_start_slot - 1)
if let Ok(Some(previous_duty_dependent_root)) = self
.block_root_at_slot(prev_target_epoch_start_slot - 1, WhenSlotSkipped::Prev)
{
event_handler.register(EventKind::Head(SseHead {
slot: head_slot,
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ pub enum BeaconChainError {
state_epoch: Epoch,
shuffling_epoch: Epoch,
},
InconsistentForwardsIter {
request_slot: Slot,
slot: Slot,
},
}

easy_from_to!(SlotProcessingError, BeaconChainError);
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ mod validator_pubkey_cache;

pub use self::beacon_chain::{
AttestationProcessingOutcome, BeaconChain, BeaconChainTypes, BeaconStore, ChainSegmentResult,
ForkChoiceError, StateSkipConfig, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
ForkChoiceError, StateSkipConfig, WhenSlotSkipped, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
};
pub use self::beacon_snapshot::BeaconSnapshot;
pub use self::chain_config::ChainConfig;
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/tests/attestation_production.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ extern crate lazy_static;

use beacon_chain::{
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy},
StateSkipConfig,
StateSkipConfig, WhenSlotSkipped,
};
use store::config::StoreConfig;
use tree_hash::TreeHash;
Expand Down Expand Up @@ -60,7 +60,7 @@ fn produces_attestations() {
};

let block = chain
.block_at_slot(block_slot)
.block_at_slot(block_slot, WhenSlotSkipped::Prev)
.expect("should get block")
.expect("block should not be skipped");
let block_root = block.message.tree_hash_root();
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/tests/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ extern crate lazy_static;
use beacon_chain::{
attestation_verification::Error as AttnError,
test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType},
BeaconChain, BeaconChainTypes,
BeaconChain, BeaconChainTypes, WhenSlotSkipped,
};
use int_to_bytes::int_to_bytes32;
use state_processing::{
Expand Down Expand Up @@ -912,7 +912,7 @@ fn attestation_that_skips_epochs() {
let earlier_slot = (current_epoch - 2).start_slot(MainnetEthSpec::slots_per_epoch());
let earlier_block = harness
.chain
.block_at_slot(earlier_slot)
.block_at_slot(earlier_slot, WhenSlotSkipped::Prev)
.expect("should not error getting block at slot")
.expect("should find block at slot");

Expand Down
Loading