Skip to content

Commit

Permalink
check the da cache and the attester cache in responding to RPC reques…
Browse files Browse the repository at this point in the history
…ts (#5138)

Squashed commit of the following:

commit a6144f6
Author: realbigsean <seananderson33@GMAIL.com>
Date:   Thu Feb 15 15:48:07 2024 -0500

    Revert "make rustup update run on the runners"

    This reverts commit d097e9b.

commit d097e9b
Author: realbigsean <seananderson33@GMAIL.com>
Date:   Thu Feb 15 15:16:08 2024 -0500

    make rustup update run on the runners

commit 1be438e
Merge: 597e05f 256d904
Author: realbigsean <seananderson33@GMAIL.com>
Date:   Wed Feb 14 21:55:45 2024 -0500

    Merge branch 'unstable' of https://github.com/sigp/lighthouse into check-da-cache-in-rpc-response

commit 597e05f
Author: realbigsean <seananderson33@GMAIL.com>
Date:   Wed Feb 14 21:54:44 2024 -0500

    rename early attester cache method

commit 0cf408f
Author: realbigsean <seananderson33@GMAIL.com>
Date:   Wed Feb 14 21:53:47 2024 -0500

    add da cache metrics

commit 2f6cf41
Author: realbigsean <seananderson33@GMAIL.com>
Date:   Mon Jan 29 18:31:32 2024 -0500

    update comment

commit 0512420
Author: realbigsean <seananderson33@GMAIL.com>
Date:   Mon Jan 29 18:25:15 2024 -0500

    use the processing cache instead

commit 66b911f
Author: realbigsean <seananderson33@GMAIL.com>
Date:   Mon Jan 29 15:52:42 2024 -0500

    check the da cache and the attester cache in responding to RPC requests
  • Loading branch information
paulhauner committed Feb 18, 2024
1 parent dea9146 commit 3d809d8
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 74 deletions.
33 changes: 15 additions & 18 deletions beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use types::{
};

#[derive(PartialEq)]
pub enum CheckEarlyAttesterCache {
pub enum CheckCaches {
Yes,
No,
}
Expand Down Expand Up @@ -385,14 +385,14 @@ impl<E: EthSpec> EngineRequest<E> {

pub struct BeaconBlockStreamer<T: BeaconChainTypes> {
execution_layer: ExecutionLayer<T::EthSpec>,
check_early_attester_cache: CheckEarlyAttesterCache,
check_caches: CheckCaches,
beacon_chain: Arc<BeaconChain<T>>,
}

impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
pub fn new(
beacon_chain: &Arc<BeaconChain<T>>,
check_early_attester_cache: CheckEarlyAttesterCache,
check_caches: CheckCaches,
) -> Result<Self, BeaconChainError> {
let execution_layer = beacon_chain
.execution_layer
Expand All @@ -402,17 +402,17 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {

Ok(Self {
execution_layer,
check_early_attester_cache,
check_caches,
beacon_chain: beacon_chain.clone(),
})
}

fn check_early_attester_cache(
&self,
root: Hash256,
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
if self.check_early_attester_cache == CheckEarlyAttesterCache::Yes {
self.beacon_chain.early_attester_cache.get_block(root)
fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
if self.check_caches == CheckCaches::Yes {
self.beacon_chain
.data_availability_checker
.get_block(&root)
.or(self.beacon_chain.early_attester_cache.get_block(root))
} else {
None
}
Expand All @@ -422,10 +422,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
let mut db_blocks = Vec::new();

for root in block_roots {
if let Some(cached_block) = self
.check_early_attester_cache(root)
.map(LoadedBeaconBlock::Full)
{
if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) {
db_blocks.push((root, Ok(Some(cached_block))));
continue;
}
Expand Down Expand Up @@ -554,7 +551,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
"Using slower fallback method of eth_getBlockByHash()"
);
for root in block_roots {
let cached_block = self.check_early_attester_cache(root);
let cached_block = self.check_caches(root);
let block_result = if cached_block.is_some() {
Ok(cached_block)
} else {
Expand Down Expand Up @@ -682,7 +679,7 @@ impl From<Error> for BeaconChainError {

#[cfg(test)]
mod tests {
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
use crate::test_utils::{test_spec, BeaconChainHarness, EphemeralHarnessType};
use execution_layer::test_utils::{Block, DEFAULT_ENGINE_CAPABILITIES};
use execution_layer::EngineCapabilities;
Expand Down Expand Up @@ -804,7 +801,7 @@ mod tests {
let start = epoch * slots_per_epoch;
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No)
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No)
.expect("should create streamer");
let (block_tx, mut block_rx) = mpsc::unbounded_channel();
streamer.stream(epoch_roots.clone(), block_tx).await;
Expand Down Expand Up @@ -945,7 +942,7 @@ mod tests {
let start = epoch * slots_per_epoch;
let mut epoch_roots = vec![Hash256::zero(); slots_per_epoch];
epoch_roots[..].clone_from_slice(&block_roots[start..(start + slots_per_epoch)]);
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckEarlyAttesterCache::No)
let streamer = BeaconBlockStreamer::new(&harness.chain, CheckCaches::No)
.expect("should create streamer");
let (block_tx, mut block_rx) = mpsc::unbounded_channel();
streamer.stream(epoch_roots.clone(), block_tx).await;
Expand Down
30 changes: 8 additions & 22 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::attestation_verification::{
VerifiedUnaggregatedAttestation,
};
use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
Expand Down Expand Up @@ -1131,7 +1131,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// ## Errors
///
/// May return a database error.
pub fn get_blocks_checking_early_attester_cache(
pub fn get_blocks_checking_caches(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
Expand All @@ -1144,10 +1144,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>,
Error,
> {
Ok(
BeaconBlockStreamer::<T>::new(self, CheckEarlyAttesterCache::Yes)?
.launch_stream(block_roots, executor),
)
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::Yes)?
.launch_stream(block_roots, executor))
}

pub fn get_blocks(
Expand All @@ -1163,10 +1161,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>,
Error,
> {
Ok(
BeaconBlockStreamer::<T>::new(self, CheckEarlyAttesterCache::No)?
.launch_stream(block_roots, executor),
)
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?
.launch_stream(block_roots, executor))
}

pub fn get_blobs_checking_early_attester_cache(
Expand Down Expand Up @@ -2960,18 +2956,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Ok(commitments) = unverified_block
.block()
.message()
.body()
.blob_kzg_commitments()
{
self.data_availability_checker.notify_block_commitments(
unverified_block.block().slot(),
block_root,
commitments.clone(),
);
};
self.data_availability_checker
.notify_block(block_root, unverified_block.block_cloned());
let r = self
.process_block(block_root, unverified_block, notify_execution_layer, || {
Ok(())
Expand Down
17 changes: 17 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ pub trait IntoExecutionPendingBlock<T: BeaconChainTypes>: Sized {
) -> Result<ExecutionPendingBlock<T>, BlockSlashInfo<BlockError<T::EthSpec>>>;

fn block(&self) -> &SignedBeaconBlock<T::EthSpec>;
fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>>;
}

impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
Expand Down Expand Up @@ -1017,6 +1018,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for GossipVerifiedBlock<T
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
self.block.as_block()
}

fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
self.block.clone()
}
}

impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
Expand Down Expand Up @@ -1168,6 +1173,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for SignatureVerifiedBloc
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
self.block.as_block()
}

fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
self.block.block_cloned()
}
}

impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for Arc<SignedBeaconBlock<T::EthSpec>> {
Expand Down Expand Up @@ -1198,6 +1207,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for Arc<SignedBeaconBlock
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
self
}

fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
self.clone()
}
}

impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for RpcBlock<T::EthSpec> {
Expand Down Expand Up @@ -1228,6 +1241,10 @@ impl<T: BeaconChainTypes> IntoExecutionPendingBlock<T> for RpcBlock<T::EthSpec>
fn block(&self) -> &SignedBeaconBlock<T::EthSpec> {
self.as_block()
}

fn block_cloned(&self) -> Arc<SignedBeaconBlock<T::EthSpec>> {
self.block_cloned()
}
}

impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ impl<E: EthSpec> RpcBlock<E> {
}
}

pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
}
}

pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
match &self.block {
RpcBlockInner::Block(_) => None,
Expand Down
51 changes: 41 additions & 10 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments};
use types::beacon_block_body::KzgCommitmentOpts;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot};

Expand Down Expand Up @@ -192,6 +192,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_blob(blob_id)
}

/// Get a block from the availability cache. Includes any blocks we are currently processing.
pub fn get_block(&self, block_root: &Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
self.processing_cache
.read()
.get(block_root)
.and_then(|cached| cached.block.clone())
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
pub fn put_rpc_blobs(
Expand Down Expand Up @@ -344,20 +352,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block.num_expected_blobs() > 0 && self.da_check_required_for_epoch(block.epoch())
}

/// Adds block commitments to the processing cache. These commitments are unverified but caching
/// Adds a block to the processing cache. This block's commitments are unverified but caching
/// them here is useful to avoid duplicate downloads of blocks, as well as understanding
/// our blob download requirements.
pub fn notify_block_commitments(
&self,
slot: Slot,
block_root: Hash256,
commitments: KzgCommitments<T::EthSpec>,
) {
/// our blob download requirements. We will also serve this over RPC.
pub fn notify_block(&self, block_root: Hash256, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
let slot = block.slot();
self.processing_cache
.write()
.entry(block_root)
.or_insert_with(|| ProcessingComponents::new(slot))
.merge_block(commitments);
.merge_block(block);
}

/// Add a single blob commitment to the processing cache. This commitment is unverified but caching
Expand Down Expand Up @@ -450,6 +454,24 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn persist_all(&self) -> Result<(), AvailabilityCheckError> {
self.availability_cache.write_all_to_disk()
}

/// Collects metrics from the data availability checker.
pub fn metrics(&self) -> DataAvailabilityCheckerMetrics {
DataAvailabilityCheckerMetrics {
processing_cache_size: self.processing_cache.read().len(),
num_store_entries: self.availability_cache.num_store_entries(),
state_cache_size: self.availability_cache.state_cache_size(),
block_cache_size: self.availability_cache.block_cache_size(),
}
}
}

/// Helper struct to group data availability checker metrics.
pub struct DataAvailabilityCheckerMetrics {
pub processing_cache_size: usize,
pub num_store_entries: usize,
pub state_cache_size: usize,
pub block_cache_size: usize,
}

pub fn start_availability_cache_maintenance_service<T: BeaconChainTypes>(
Expand Down Expand Up @@ -597,6 +619,15 @@ pub enum MaybeAvailableBlock<E: EthSpec> {
},
}

impl<E: EthSpec> MaybeAvailableBlock<E> {
pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match self {
Self::Available(block) => block.block_cloned(),
Self::AvailabilityPending { block, .. } => block.clone(),
}
}
}

#[derive(Debug, Clone)]
pub enum MissingBlobs {
/// We know for certain these blobs are missing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ macro_rules! impl_availability_view {

impl_availability_view!(
ProcessingComponents,
KzgCommitments<E>,
Arc<SignedBeaconBlock<E>>,
KzgCommitment,
block_commitments,
block,
blob_commitments
);

Expand Down Expand Up @@ -212,12 +212,6 @@ pub trait GetCommitment<E: EthSpec> {
fn get_commitment(&self) -> &KzgCommitment;
}

// These implementations are required to implement `AvailabilityView` for `ProcessingView`.
impl<E: EthSpec> GetCommitments<E> for KzgCommitments<E> {
fn get_commitments(&self) -> KzgCommitments<E> {
self.clone()
}
}
impl<E: EthSpec> GetCommitment<E> for KzgCommitment {
fn get_commitment(&self) -> &KzgCommitment {
self
Expand Down Expand Up @@ -310,7 +304,7 @@ pub mod tests {
}

type ProcessingViewSetup<E> = (
KzgCommitments<E>,
Arc<SignedBeaconBlock<E>>,
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
);
Expand All @@ -320,12 +314,6 @@ pub mod tests {
valid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
invalid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
) -> ProcessingViewSetup<E> {
let commitments = block
.message()
.body()
.blob_kzg_commitments()
.unwrap()
.clone();
let blobs = FixedVector::from(
valid_blobs
.iter()
Expand All @@ -338,7 +326,7 @@ pub mod tests {
.map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment))
.collect::<Vec<_>>(),
);
(commitments, blobs, invalid_blobs)
(Arc::new(block), blobs, invalid_blobs)
}

type PendingComponentsSetup<E> = (
Expand Down
Loading

0 comments on commit 3d809d8

Please sign in to comment.