Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve state cache eviction and reduce mem usage #4762

Merged
merged 2 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use serde_derive::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::io::Write;
use types::{EthSpec, Unsigned};
use zstd::Encoder;

pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 16;
// Only used in tests. Mainnet sets a higher default on the CLI.
pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 8;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64;
pub const DEFAULT_STATE_CACHE_SIZE: usize = 128;
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
Expand Down Expand Up @@ -64,6 +66,10 @@ pub enum StoreConfigError {
config: OnDiskStoreConfig,
on_disk: OnDiskStoreConfig,
},
InvalidEpochsPerStateDiff {
epochs_per_state_diff: u64,
max_supported: u64,
},
}

impl Default for StoreConfig {
Expand Down Expand Up @@ -107,8 +113,14 @@ impl StoreConfig {
Ok(())
}

/// Check that the configuration is valid.
pub fn verify<E: EthSpec>(&self) -> Result<(), StoreConfigError> {
self.verify_compression_level()?;
self.verify_epochs_per_state_diff::<E>()
}

/// Check that the compression level is valid.
pub fn verify_compression_level(&self) -> Result<(), StoreConfigError> {
fn verify_compression_level(&self) -> Result<(), StoreConfigError> {
if zstd::compression_level_range().contains(&self.compression_level) {
Ok(())
} else {
Expand All @@ -118,6 +130,21 @@ impl StoreConfig {
}
}

/// Check that the configuration is valid.
pub fn verify_epochs_per_state_diff<E: EthSpec>(&self) -> Result<(), StoreConfigError> {
// To build state diffs we need to be able to determine the previous state root from the
// state itself, which requires reading back in the state_roots array.
let max_supported = E::SlotsPerHistoricalRoot::to_u64() / E::slots_per_epoch();
if self.epochs_per_state_diff <= max_supported {
Ok(())
} else {
Err(StoreConfigError::InvalidEpochsPerStateDiff {
epochs_per_state_diff: self.epochs_per_state_diff,
max_supported,
})
}
}

/// Estimate the size of `len` bytes after compression at the current compression level.
pub fn estimate_compressed_size(&self, len: usize) -> usize {
if self.compression_level == 0 {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
spec: ChainSpec,
log: Logger,
) -> Result<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>, Error> {
config.verify_compression_level()?;
config.verify::<E>()?;

let hierarchy = config.hierarchy_config.to_moduli()?;

Expand Down Expand Up @@ -189,7 +189,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
spec: ChainSpec,
log: Logger,
) -> Result<Arc<Self>, Error> {
config.verify_compression_level()?;
config.verify::<E>()?;

let hierarchy = config.hierarchy_config.to_moduli()?;

Expand Down
76 changes: 75 additions & 1 deletion beacon_node/store/src/state_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@ use crate::Error;
use lru::LruCache;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::num::NonZeroUsize;
use types::{BeaconState, EthSpec, Hash256, Slot};
use types::{BeaconState, Epoch, EthSpec, Hash256, Slot};

/// Fraction of the LRU cache to leave intact during culling.
const CULL_EXEMPT_NUMERATOR: usize = 1;
const CULL_EXEMPT_DENOMINATOR: usize = 10;

/// States that are less than or equal to this many epochs old *could* become finalized and will not
/// be culled from the cache.
const EPOCH_FINALIZATION_LIMIT: u64 = 4;

#[derive(Debug)]
pub struct FinalizedState<E: EthSpec> {
Expand All @@ -27,6 +35,8 @@ pub struct StateCache<E: EthSpec> {
finalized_state: Option<FinalizedState<E>>,
states: LruCache<Hash256, BeaconState<E>>,
block_map: BlockMap,
capacity: NonZeroUsize,
max_epoch: Epoch,
}

#[derive(Debug)]
Expand All @@ -42,6 +52,8 @@ impl<E: EthSpec> StateCache<E> {
finalized_state: None,
states: LruCache::new(capacity),
block_map: BlockMap::default(),
capacity,
max_epoch: Epoch::new(0),
}
}

Expand Down Expand Up @@ -115,6 +127,14 @@ impl<E: EthSpec> StateCache<E> {
});
}

// Update the cache's idea of the max epoch.
self.max_epoch = std::cmp::max(state.current_epoch(), self.max_epoch);

// If the cache is full, use the custom cull routine to make room.
if let Some(over_capacity) = self.len().checked_sub(self.capacity.get()) {
self.cull(over_capacity + 1);
}

// Insert the full state into the cache.
self.states.put(state_root, state.clone());

Expand Down Expand Up @@ -166,6 +186,60 @@ impl<E: EthSpec> StateCache<E> {
}
}
}

/// Cull approximately `count` states from the cache.
///
/// States are culled LRU, with the following extra order imposed:
///
/// - Advanced states.
/// - Mid-epoch unadvanced states.
/// - Epoch-boundary states that are too old to be finalized.
/// - Epoch-boundary states that could be finalized.
pub fn cull(&mut self, count: usize) {
let cull_exempt = std::cmp::max(
1,
self.len() * CULL_EXEMPT_NUMERATOR / CULL_EXEMPT_DENOMINATOR,
);

// Stage 1: gather states to cull.
let mut advanced_state_roots = vec![];
let mut mid_epoch_state_roots = vec![];
let mut old_boundary_state_roots = vec![];
let mut good_boundary_state_roots = vec![];
for (&state_root, state) in self.states.iter().skip(cull_exempt) {
let is_advanced = state.slot() > state.latest_block_header().slot;
let is_boundary = state.slot() % E::slots_per_epoch() == 0;
let could_finalize =
(self.max_epoch - state.current_epoch()) <= EPOCH_FINALIZATION_LIMIT;

if is_advanced {
advanced_state_roots.push(state_root);
} else if !is_boundary {
mid_epoch_state_roots.push(state_root);
} else if !could_finalize {
old_boundary_state_roots.push(state_root);
} else {
good_boundary_state_roots.push(state_root);
}

// Terminate early in the common case where we've already found enough junk to cull.
if advanced_state_roots.len() == count {
break;
}
}

// Stage 2: delete.
// This could probably be more efficient in how it interacts with the block map.
for state_root in advanced_state_roots
.iter()
.chain(mid_epoch_state_roots.iter())
.chain(old_boundary_state_roots.iter())
.chain(good_boundary_state_roots.iter())
.take(count)
{
self.delete_state(state_root);
}
}
}

impl BlockMap {
Expand Down
7 changes: 1 addition & 6 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1795,12 +1795,7 @@ fn epochs_per_migration_override() {
fn epochs_per_state_diff_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config.store.epochs_per_state_diff,
beacon_node::beacon_chain::store::config::DEFAULT_EPOCHS_PER_STATE_DIFF
)
});
.with_config(|config| assert_eq!(config.store.epochs_per_state_diff, 16));
}
#[test]
fn epochs_per_state_diff_override() {
Expand Down