diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 26b1d07a655..835d257a28d 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -4,9 +4,11 @@ use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::io::Write; +use types::{EthSpec, Unsigned}; use zstd::Encoder; -pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 16; +// Only used in tests. Mainnet sets a higher default on the CLI. +pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 8; pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64; pub const DEFAULT_STATE_CACHE_SIZE: usize = 128; pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1; @@ -64,6 +66,10 @@ pub enum StoreConfigError { config: OnDiskStoreConfig, on_disk: OnDiskStoreConfig, }, + InvalidEpochsPerStateDiff { + epochs_per_state_diff: u64, + max_supported: u64, + }, } impl Default for StoreConfig { @@ -107,8 +113,14 @@ impl StoreConfig { Ok(()) } + /// Check that the configuration is valid. + pub fn verify(&self) -> Result<(), StoreConfigError> { + self.verify_compression_level()?; + self.verify_epochs_per_state_diff::() + } + /// Check that the compression level is valid. - pub fn verify_compression_level(&self) -> Result<(), StoreConfigError> { + fn verify_compression_level(&self) -> Result<(), StoreConfigError> { if zstd::compression_level_range().contains(&self.compression_level) { Ok(()) } else { @@ -118,6 +130,21 @@ impl StoreConfig { } } + /// Check that the configuration is valid. + pub fn verify_epochs_per_state_diff(&self) -> Result<(), StoreConfigError> { + // To build state diffs we need to be able to determine the previous state root from the + // state itself, which requires reading back in the state_roots array. + let max_supported = E::SlotsPerHistoricalRoot::to_u64() / E::slots_per_epoch(); + if self.epochs_per_state_diff <= max_supported { + Ok(()) + } else { + Err(StoreConfigError::InvalidEpochsPerStateDiff { + epochs_per_state_diff: self.epochs_per_state_diff, + max_supported, + }) + } + } + /// Estimate the size of `len` bytes after compression at the current compression level. pub fn estimate_compressed_size(&self, len: usize) -> usize { if self.compression_level == 0 { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index ed4bab8dc54..36c59023690 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -142,7 +142,7 @@ impl HotColdDB, MemoryStore> { spec: ChainSpec, log: Logger, ) -> Result, MemoryStore>, Error> { - config.verify_compression_level()?; + config.verify::()?; let hierarchy = config.hierarchy_config.to_moduli()?; @@ -189,7 +189,7 @@ impl HotColdDB, LevelDB> { spec: ChainSpec, log: Logger, ) -> Result, Error> { - config.verify_compression_level()?; + config.verify::()?; let hierarchy = config.hierarchy_config.to_moduli()?; diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index 95ad09e261f..36783283341 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -2,7 +2,15 @@ use crate::Error; use lru::LruCache; use std::collections::{BTreeMap, HashMap, HashSet}; use std::num::NonZeroUsize; -use types::{BeaconState, EthSpec, Hash256, Slot}; +use types::{BeaconState, Epoch, EthSpec, Hash256, Slot}; + +/// Fraction of the LRU cache to leave intact during culling. +const CULL_EXEMPT_NUMERATOR: usize = 1; +const CULL_EXEMPT_DENOMINATOR: usize = 10; + +/// States that are less than or equal to this many epochs old *could* become finalized and will not +/// be culled from the cache. +const EPOCH_FINALIZATION_LIMIT: u64 = 4; #[derive(Debug)] pub struct FinalizedState { @@ -27,6 +35,8 @@ pub struct StateCache { finalized_state: Option>, states: LruCache>, block_map: BlockMap, + capacity: NonZeroUsize, + max_epoch: Epoch, } #[derive(Debug)] @@ -42,6 +52,8 @@ impl StateCache { finalized_state: None, states: LruCache::new(capacity), block_map: BlockMap::default(), + capacity, + max_epoch: Epoch::new(0), } } @@ -115,6 +127,14 @@ impl StateCache { }); } + // Update the cache's idea of the max epoch. + self.max_epoch = std::cmp::max(state.current_epoch(), self.max_epoch); + + // If the cache is full, use the custom cull routine to make room. + if let Some(over_capacity) = self.len().checked_sub(self.capacity.get()) { + self.cull(over_capacity + 1); + } + // Insert the full state into the cache. self.states.put(state_root, state.clone()); @@ -166,6 +186,60 @@ impl StateCache { } } } + + /// Cull approximately `count` states from the cache. + /// + /// States are culled LRU, with the following extra order imposed: + /// + /// - Advanced states. + /// - Mid-epoch unadvanced states. + /// - Epoch-boundary states that are too old to be finalized. + /// - Epoch-boundary states that could be finalized. + pub fn cull(&mut self, count: usize) { + let cull_exempt = std::cmp::max( + 1, + self.len() * CULL_EXEMPT_NUMERATOR / CULL_EXEMPT_DENOMINATOR, + ); + + // Stage 1: gather states to cull. + let mut advanced_state_roots = vec![]; + let mut mid_epoch_state_roots = vec![]; + let mut old_boundary_state_roots = vec![]; + let mut good_boundary_state_roots = vec![]; + for (&state_root, state) in self.states.iter().skip(cull_exempt) { + let is_advanced = state.slot() > state.latest_block_header().slot; + let is_boundary = state.slot() % E::slots_per_epoch() == 0; + let could_finalize = + (self.max_epoch - state.current_epoch()) <= EPOCH_FINALIZATION_LIMIT; + + if is_advanced { + advanced_state_roots.push(state_root); + } else if !is_boundary { + mid_epoch_state_roots.push(state_root); + } else if !could_finalize { + old_boundary_state_roots.push(state_root); + } else { + good_boundary_state_roots.push(state_root); + } + + // Terminate early in the common case where we've already found enough junk to cull. + if advanced_state_roots.len() == count { + break; + } + } + + // Stage 2: delete. + // This could probably be more efficient in how it interacts with the block map. + for state_root in advanced_state_roots + .iter() + .chain(mid_epoch_state_roots.iter()) + .chain(old_boundary_state_roots.iter()) + .chain(good_boundary_state_roots.iter()) + .take(count) + { + self.delete_state(state_root); + } + } } impl BlockMap { diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 5069ed696ca..d380b8ffe2c 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1795,12 +1795,7 @@ fn epochs_per_migration_override() { fn epochs_per_state_diff_default() { CommandLineTest::new() .run_with_zero_port() - .with_config(|config| { - assert_eq!( - config.store.epochs_per_state_diff, - beacon_node::beacon_chain::store::config::DEFAULT_EPOCHS_PER_STATE_DIFF - ) - }); + .with_config(|config| assert_eq!(config.store.epochs_per_state_diff, 16)); } #[test] fn epochs_per_state_diff_override() {