Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

check the da cache and the attester cache in responding to RPC requests #5138

Merged
merged 8 commits into from
Feb 19, 2024
33 changes: 15 additions & 18 deletions beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use types::{
};

#[derive(PartialEq)]
pub enum CheckEarlyAttesterCache {
pub enum CheckCaches {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not use bool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually try to use enums over bools when we need to pass something into a function. If you end up with two bools as params it can be ambiguous

Yes,
No,
}
Expand Down Expand Up @@ -385,14 +385,14 @@ impl<E: EthSpec> EngineRequest<E> {

pub struct BeaconBlockStreamer<T: BeaconChainTypes> {
execution_layer: ExecutionLayer<T::EthSpec>,
check_early_attester_cache: CheckEarlyAttesterCache,
check_caches: CheckCaches,
beacon_chain: Arc<BeaconChain<T>>,
}

impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
pub fn new(
beacon_chain: &Arc<BeaconChain<T>>,
check_early_attester_cache: CheckEarlyAttesterCache,
check_caches: CheckCaches,
) -> Result<Self, BeaconChainError> {
let execution_layer = beacon_chain
.execution_layer
Expand All @@ -402,17 +402,17 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {

Ok(Self {
execution_layer,
check_early_attester_cache,
check_caches,
beacon_chain: beacon_chain.clone(),
})
}

fn check_early_attester_cache(
&self,
root: Hash256,
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
if self.check_early_attester_cache == CheckEarlyAttesterCache::Yes {
self.beacon_chain.early_attester_cache.get_block(root)
fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
if self.check_caches == CheckCaches::Yes {
self.beacon_chain
.data_availability_checker
.get_block(&root)
.or(self.beacon_chain.early_attester_cache.get_block(root))
} else {
None
}
Expand All @@ -422,10 +422,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
let mut db_blocks = Vec::new();

for root in block_roots {
if let Some(cached_block) = self
.check_early_attester_cache(root)
.map(LoadedBeaconBlock::Full)
{
if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) {
db_blocks.push((root, Ok(Some(cached_block))));
continue;
}
Expand Down Expand Up @@ -554,7 +551,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
"Using slower fallback method of eth_getBlockByHash()"
);
for root in block_roots {
let cached_block = self.check_early_attester_cache(root);
let cached_block = self.check_caches(root);
let block_result = if cached_block.is_some() {
Ok(cached_block)
} else {
Expand Down Expand Up @@ -682,7 +679,7 @@ impl From<Error> for BeaconChainError {

#[cfg(test)]
mod tests {
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType};
use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES};
use execution_layer::EngineCapabilities;
Expand Down Expand Up @@ -804,7 +801,7 @@ mod tests {
let start = epoch * slots_per_epoch;
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No)
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No)
.expect("should create streamer");
let (block_tx, mut block_rx) = mpsc::unbounded_channel();
streamer.stream(epoch_roots.clone(), block_tx).await;
Expand Down Expand Up @@ -945,7 +942,7 @@ mod tests {
let start = epoch * slots_per_epoch;
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No)
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No)
.expect("should create streamer");
let (block_tx, mut block_rx) = mpsc::unbounded_channel();
streamer.stream(epoch_roots.clone(), block_tx).await;
Expand Down
30 changes: 8 additions & 22 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::attestation_verification::{
VerifiedUnaggregatedAttestation,
};
use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
Expand Down Expand Up @@ -1131,7 +1131,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// ## Errors
///
/// May return a database error.
pub fn get_blocks_checking_early_attester_cache(
pub fn get_blocks_checking_caches(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
Expand All @@ -1144,10 +1144,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably rename this function get_blocks_checking_caches or something along these lines.. perhaps we should come up with a unified way of referring to these two caches.. idk if we'd just call them "early caches" or something..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error,
> {
Ok(
BeaconBlockStreamer::<T>::new(self, CheckEarlyAttesterCache::Yes)?
.launch_stream(block_roots, executor),
)
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::Yes)?
.launch_stream(block_roots, executor))
}

pub fn get_blocks(
Expand All @@ -1163,10 +1161,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>,
Error,
> {
Ok(
BeaconBlockStreamer::<T>::new(self, CheckEarlyAttesterCache::No)?
.launch_stream(block_roots, executor),
)
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?
.launch_stream(block_roots, executor))
}

pub fn get_blobs_checking_early_attester_cache(
Expand Down Expand Up @@ -2960,18 +2956,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Ok(commitments) = unverified_block
.block()
.message()
.body()
.blob_kzg_commitments()
{
self.data_availability_checker.notify_block_commitments(
unverified_block.block().slot(),
block_root,
commitments.clone(),
);
};
self.data_availability_checker
.notify_block(block_root, unverified_block.block_cloned());
let r = self
.process_block(block_root, unverified_block, notify_execution_layer, || {
Ok(())
Expand Down
17 changes: 17 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ pub trait IntoExecutionPendingBlock<T: BeaconChainTypes>: Sized {
) -> Result<ExecutionPendingBlock<T>, BlockSlashInfo<BlockError<T::EthSpec>>>;

fn block(&self) -> &SignedBeaconBlock<T::EthSpec>;
fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>>;
}

impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
Expand Down Expand Up @@ -1017,6 +1018,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for GossipVerifiedBlock<T
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
self.block.as_block()
}

fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
self.block.clone()
}
}

impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
Expand Down Expand Up @@ -1168,6 +1173,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for SignatureVerifiedBloc
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
self.block.as_block()
}

fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
self.block.block_cloned()
}
}

impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for Arc<SignedBeaconBlock<T::EthSpec>> {
Expand Down Expand Up @@ -1198,6 +1207,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for Arc<SignedBeaconBlock
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
self
}

fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
self.clone()
}
}

impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for RpcBlock<T::EthSpec> {
Expand Down Expand Up @@ -1228,6 +1241,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for RpcBlock<T::EthSpec>
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
self.as_block()
}

fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
self.block_cloned()
}
}

impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ impl<E: EthSpec> RpcBlock<E> {
}
}

pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
}
}

pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
match &self.block {
RpcBlockInner::Block(_) => None,
Expand Down
51 changes: 41 additions & 10 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments};
use types::beacon_block_body::KzgCommitmentOpts;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};

Expand Down Expand Up @@ -192,6 +192,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_blob(blob_id)
}

/// Get a block from the availability cache. Includes any blocks we are currently processing.
pub fn get_block(&self, block_root: &Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
self.processing_cache
.read()
.get(block_root)
.and_then(|cached| cached.block.clone())
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
pub fn put_rpc_blobs(
Expand Down Expand Up @@ -344,20 +352,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block.num_expected_blobs() > 0 && self.da_check_required_for_epoch(block.epoch())
}

/// Adds block commitments to the processing cache. These commitments are unverified but caching
/// Adds a block to the processing cache. This block's commitments are unverified but caching
/// them here is useful to avoid duplicate downloads of blocks, as well as understanding
/// our blob download requirements.
pub fn notify_block_commitments(
&self,
slot: Slot,
block_root: Hash256,
commitments: KzgCommitments<T::EthSpec>,
) {
/// our blob download requirements. We will also serve this over RPC.
pub fn notify_block(&self, block_root: Hash256, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
let slot = block.slot();
self.processing_cache
.write()
.entry(block_root)
.or_insert_with(|| ProcessingComponents::new(slot))
.merge_block(commitments);
.merge_block(block);
}

/// Add a single blob commitment to the processing cache. This commitment is unverified but caching
Expand Down Expand Up @@ -450,6 +454,24 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn persist_all(&self) -> Result<(), AvailabilityCheckError> {
self.availability_cache.write_all_to_disk()
}

/// Collects metrics from the data availability checker.
pub fn metrics(&self) -> DataAvailabilityCheckerMetrics {
DataAvailabilityCheckerMetrics {
processing_cache_size: self.processing_cache.read().len(),
num_store_entries: self.availability_cache.num_store_entries(),
state_cache_size: self.availability_cache.state_cache_size(),
block_cache_size: self.availability_cache.block_cache_size(),
}
}
}

/// Helper struct to group data availability checker metrics.
pub struct DataAvailabilityCheckerMetrics {
pub processing_cache_size: usize,
pub num_store_entries: usize,
pub state_cache_size: usize,
pub block_cache_size: usize,
}

pub fn start_availability_cache_maintenance_service<T: BeaconChainTypes>(
Expand Down Expand Up @@ -597,6 +619,15 @@ pub enum MaybeAvailableBlock<E: EthSpec> {
},
}

impl<E: EthSpec> MaybeAvailableBlock<E> {
pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match self {
Self::Available(block) => block.block_cloned(),
Self::AvailabilityPending { block, .. } => block.clone(),
}
}
}

#[derive(Debug, Clone)]
pub enum MissingBlobs {
/// We know for certain these blobs are missing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ macro_rules! impl_availability_view {

impl_availability_view!(
ProcessingComponents,
KzgCommitments<E>,
Arc<SignedBeaconBlock<E>>,
KzgCommitment,
block_commitments,
block,
blob_commitments
);

Expand Down Expand Up @@ -212,12 +212,6 @@ pub trait GetCommitment<E: EthSpec> {
fn get_commitment(&self) -> &KzgCommitment;
}

// These implementations are required to implement `AvailabilityView` for `ProcessingView`.
impl<E: EthSpec> GetCommitments<E> for KzgCommitments<E> {
fn get_commitments(&self) -> KzgCommitments<E> {
self.clone()
}
}
impl<E: EthSpec> GetCommitment<E> for KzgCommitment {
fn get_commitment(&self) -> &KzgCommitment {
self
Expand Down Expand Up @@ -310,7 +304,7 @@ pub mod tests {
}

type ProcessingViewSetup<E> = (
KzgCommitments<E>,
Arc<SignedBeaconBlock<E>>,
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
);
Expand All @@ -320,12 +314,6 @@ pub mod tests {
valid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
invalid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
) -> ProcessingViewSetup<E> {
let commitments = block
.message()
.body()
.blob_kzg_commitments()
.unwrap()
.clone();
let blobs = FixedVector::from(
valid_blobs
.iter()
Expand All @@ -338,7 +326,7 @@ pub mod tests {
.map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment))
.collect::<Vec<_>>(),
);
(commitments, blobs, invalid_blobs)
(Arc::new(block), blobs, invalid_blobs)
}

type PendingComponentsSetup<E> = (
Expand Down
Loading
Loading