diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3bf75284779..dcfc8e8412a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3661,16 +3661,15 @@ impl BeaconChain { } } - if let Some(_data_columns) = data_columns { - // TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073 - // if !data_columns.is_empty() { - // debug!( - // self.log, "Writing data_columns to store"; - // "block_root" => %block_root, - // "count" => data_columns.len(), - // ); - // ops.push(StoreOp::PutDataColumns(block_root, data_columns)); - // } + if let Some(data_columns) = data_columns { + if !data_columns.is_empty() { + debug!( + self.log, "Writing data_columns to store"; + "block_root" => %block_root, + "count" => data_columns.len(), + ); + ops.push(StoreOp::PutDataColumns(block_root, data_columns)); + } } let txn_lock = self.store.hot_db.begin_rw_transaction(); diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index b4336a054e2..2178798bb60 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -108,14 +108,14 @@ impl DataAvailabilityChecker { let custody_column_count = custody_subnet_count.saturating_mul(spec.data_columns_per_subnet()); - let overflow_cache = DataAvailabilityCheckerInner::new( + let inner = DataAvailabilityCheckerInner::new( OVERFLOW_LRU_CAPACITY, store, custody_column_count, spec.clone(), )?; Ok(Self { - availability_cache: Arc::new(overflow_cache), + availability_cache: Arc::new(inner), slot_clock, kzg, log: log.clone(), @@ -123,6 +123,12 @@ impl DataAvailabilityChecker { }) } + pub fn get_custody_columns_count(&self) -> usize { + self.availability_cache + .custody_subnet_count() + .saturating_mul(self.spec.data_columns_per_subnet()) + } + /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. pub fn get_execution_valid_block( diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 6c9964bdf86..34a40282737 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -9,7 +9,6 @@ use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::BeaconChainTypes; use lru::LruCache; use parking_lot::RwLock; -use ssz_derive::{Decode, Encode}; use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::sync::Arc; @@ -20,7 +19,7 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; /// /// The blobs are all gossip and kzg verified. /// The block has completed all verifications except the availability check. -#[derive(Encode, Decode, Clone)] +#[derive(Clone)] pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, E::MaxBlobsPerBlock>, @@ -303,6 +302,15 @@ impl PendingComponents { }); } } + + if let Some(kzg_verified_data_column) = self.verified_data_columns.first() { + let epoch = kzg_verified_data_column + .as_data_column() + .slot() + .epoch(E::slots_per_epoch()); + return Some(epoch); + } + None }) } @@ -336,6 +344,10 @@ impl DataAvailabilityCheckerInner { }) } + pub fn custody_subnet_count(&self) -> usize { + self.custody_column_count + } + /// Returns true if the block root is known, without altering the LRU ordering pub fn get_execution_valid_block( &self, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index da639e3695e..af3fbab6aed 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -250,6 +250,10 @@ impl KzgVerifiedCustodyDataColumn { pub fn into_inner(self) -> Arc> { self.data } + + pub fn as_data_column(&self) -> &DataColumnSidecar { + &self.data + } } /// Complete kzg verification for a `DataColumnSidecar`. diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index aa2fac2afc8..4b63ac6915a 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -9,6 +9,7 @@ use state_processing::{ use std::borrow::Cow; use std::iter; use std::time::Duration; +use store::metadata::DataColumnInfo; use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore}; use types::{Hash256, Slot}; @@ -66,6 +67,7 @@ impl BeaconChain { .get_anchor_info() .ok_or(HistoricalBlockError::NoAnchorInfo)?; let blob_info = self.store.get_blob_info(); + let data_column_info = self.store.get_data_column_info(); // Take all blocks with slots less than the oldest block slot. let num_relevant = blocks.partition_point(|available_block| { @@ -90,18 +92,27 @@ impl BeaconChain { return Ok(0); } - let n_blobs_lists_to_import = blocks_to_import + // Blobs are stored per block, and data columns are each stored individually + let n_blob_ops_per_block = if self.spec.is_peer_das_scheduled() { + self.data_availability_checker.get_custody_columns_count() + } else { + 1 + }; + + let blob_batch_size = blocks_to_import .iter() .filter(|available_block| available_block.blobs().is_some()) - .count(); + .count() + .saturating_mul(n_blob_ops_per_block); let mut expected_block_root = anchor_info.oldest_block_parent; let mut prev_block_slot = anchor_info.oldest_block_slot; let mut chunk_writer = ChunkWriter::::new(&self.store.cold_db, prev_block_slot.as_usize())?; let mut new_oldest_blob_slot = blob_info.oldest_blob_slot; + let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot; - let mut blob_batch = Vec::with_capacity(n_blobs_lists_to_import); + let mut blob_batch = Vec::with_capacity(blob_batch_size); let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); let mut hot_batch = Vec::with_capacity(blocks_to_import.len()); let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); @@ -129,11 +140,10 @@ impl BeaconChain { .blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch); } // Store the data columns too - if let Some(_data_columns) = maybe_data_columns { - // TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073 - // new_oldest_data_column_slot = Some(block.slot()); - // self.store - // .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch); + if let Some(data_columns) = maybe_data_columns { + new_oldest_data_column_slot = Some(block.slot()); + self.store + .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch); } // Store block roots, including at all skip slots in the freezer DB. @@ -212,7 +222,7 @@ impl BeaconChain { self.store.hot_db.do_atomically(hot_batch)?; self.store.cold_db.do_atomically(cold_batch)?; - let mut anchor_and_blob_batch = Vec::with_capacity(2); + let mut anchor_and_blob_batch = Vec::with_capacity(3); // Update the blob info. if new_oldest_blob_slot != blob_info.oldest_blob_slot { @@ -228,6 +238,19 @@ impl BeaconChain { } } + // Update the data column info. + if new_oldest_data_column_slot != data_column_info.oldest_data_column_slot { + if let Some(oldest_data_column_slot) = new_oldest_data_column_slot { + let new_data_column_info = DataColumnInfo { + oldest_data_column_slot: Some(oldest_data_column_slot), + }; + anchor_and_blob_batch.push( + self.store + .compare_and_set_data_column_info(data_column_info, new_data_column_info)?, + ); + } + } + // Update the anchor. let new_anchor = AnchorInfo { oldest_block_slot: prev_block_slot,