diff --git a/Cargo.lock b/Cargo.lock index a2014728e96..dc2f3f70f52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5129,6 +5129,7 @@ dependencies = [ "slasher", "slashing_protection", "slog", + "store", "task_executor", "tempfile", "types", @@ -8219,6 +8220,7 @@ dependencies = [ "metrics", "parking_lot 0.12.3", "rand", + "redb", "safe_arith", "serde", "slog", diff --git a/Makefile b/Makefile index fd7d45f26a0..0e1600a1959 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ BUILD_PATH_AARCH64 = "target/$(AARCH64_TAG)/release" PINNED_NIGHTLY ?= nightly # List of features to use when cross-compiling. Can be overridden via the environment. -CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx,slasher-redb,jemalloc +CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx,slasher-redb,jemalloc,beacon-node-leveldb,beacon-node-redb # Cargo profile for Cross builds. Default is for local builds, CI uses an override. CROSS_PROFILE ?= release diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a78ae266e5a..0ad308771da 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1396,6 +1396,7 @@ impl BeaconChain { sync_committee_period, count, &self.spec, + self.log.clone(), ) } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 40361574aff..81996d3d978 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -304,7 +304,6 @@ impl PendingComponents { }; (Some(verified_blobs), None) }; - let executed_block = recover(diet_executed_block)?; let AvailabilityPendingExecutedBlock { @@ -676,7 +675,7 @@ mod test { use slog::{info, Logger}; use state_processing::ConsensusContext; use std::collections::VecDeque; - use store::{HotColdDB, ItemStore, LevelDB, StoreConfig}; + use store::{database::interface::BeaconNodeBackend, HotColdDB, ItemStore, StoreConfig}; use tempfile::{tempdir, TempDir}; use types::non_zero_usize::new_non_zero_usize; use types::{ExecPayload, MinimalEthSpec}; @@ -688,7 +687,7 @@ mod test { db_path: &TempDir, spec: Arc, log: Logger, - ) -> Arc, LevelDB>> { + ) -> Arc, BeaconNodeBackend>> { let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); let blobs_path = db_path.path().join("blobs_db"); @@ -862,7 +861,11 @@ mod test { ) where E: EthSpec, - T: BeaconChainTypes, ColdStore = LevelDB, EthSpec = E>, + T: BeaconChainTypes< + HotStore = BeaconNodeBackend, + ColdStore = BeaconNodeBackend, + EthSpec = E, + >, { let log = test_logger(); let chain_db_path = tempdir().expect("should get temp dir"); diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index ddae54f464b..9c049ffb2e8 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -10,10 +10,7 @@ use std::borrow::Cow; use std::iter; use std::time::Duration; use store::metadata::DataColumnInfo; -use store::{ - get_key_for_col, AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, - KeyValueStoreOp, -}; +use store::{AnchorInfo, BlobInfo, DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp}; use strum::IntoStaticStr; use types::{FixedBytesExtended, Hash256, Slot}; @@ -151,9 +148,11 @@ impl BeaconChain { } // Store block roots, including at all skip slots in the freezer DB. + let column: &str = DBColumn::BeaconBlockRoots.into(); for slot in (block.slot().as_u64()..prev_block_slot.as_u64()).rev() { cold_batch.push(KeyValueStoreOp::PutKeyValue( - get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()), + column.to_owned(), + slot.to_be_bytes().to_vec(), block_root.as_slice().to_vec(), )); } @@ -167,9 +166,11 @@ impl BeaconChain { // completion. if expected_block_root == self.genesis_block_root { let genesis_slot = self.spec.genesis_slot; + let column: &str = DBColumn::BeaconBlockRoots.into(); for slot in genesis_slot.as_u64()..prev_block_slot.as_u64() { cold_batch.push(KeyValueStoreOp::PutKeyValue( - get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()), + column.to_owned(), + slot.to_be_bytes().to_vec(), self.genesis_block_root.as_slice().to_vec(), )); } diff --git a/beacon_node/beacon_chain/src/light_client_server_cache.rs b/beacon_node/beacon_chain/src/light_client_server_cache.rs index 78442d8df08..f6a8021fb04 100644 --- a/beacon_node/beacon_chain/src/light_client_server_cache.rs +++ b/beacon_node/beacon_chain/src/light_client_server_cache.rs @@ -2,7 +2,7 @@ use crate::errors::BeaconChainError; use crate::{metrics, BeaconChainTypes, BeaconStore}; use parking_lot::{Mutex, RwLock}; use safe_arith::SafeArith; -use slog::{debug, Logger}; +use slog::{debug, error, Logger}; use ssz::Decode; use std::num::NonZeroUsize; use std::sync::Arc; @@ -272,13 +272,33 @@ impl LightClientServerCache { start_period: u64, count: u64, chain_spec: &ChainSpec, + log: Logger, ) -> Result>, BeaconChainError> { let column = DBColumn::LightClientUpdate; let mut light_client_updates = vec![]; - for res in store - .hot_db - .iter_column_from::>(column, &start_period.to_le_bytes()) - { + + let results = store.hot_db.iter_column_from::>( + column, + &start_period.to_le_bytes(), + move |sync_committee_bytes, _| match u64::from_ssz_bytes(sync_committee_bytes) { + Ok(sync_committee_period) => { + if sync_committee_period >= start_period + count { + return false; + } + true + } + Err(e) => { + error!( + log, + "Error decoding sync committee bytes from the db"; + "error" => ?e + ); + false + } + }, + ); + + for res in results? { let (sync_committee_bytes, light_client_update_bytes) = res?; let sync_committee_period = u64::from_ssz_bytes(&sync_committee_bytes) .map_err(store::errors::Error::SszDecodeError)?; @@ -298,6 +318,7 @@ impl LightClientServerCache { light_client_updates.push(light_client_update); } + Ok(light_client_updates) } diff --git a/beacon_node/beacon_chain/src/otb_verification_service.rs b/beacon_node/beacon_chain/src/otb_verification_service.rs index 31034a7d59b..fe3d7f2e18e 100644 --- a/beacon_node/beacon_chain/src/otb_verification_service.rs +++ b/beacon_node/beacon_chain/src/otb_verification_service.rs @@ -120,7 +120,7 @@ pub fn load_optimistic_transition_blocks( chain: &BeaconChain, ) -> Result, StoreError> { process_results( - chain.store.hot_db.iter_column::(OTBColumn), + chain.store.hot_db.iter_column::(OTBColumn)?, |iter| { iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes)) .collect() diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs index fcc8b9884ac..af052018ef3 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs @@ -3,9 +3,7 @@ use crate::validator_pubkey_cache::DatabasePubkey; use slog::{info, Logger}; use ssz::{Decode, Encode}; use std::sync::Arc; -use store::{ - get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, -}; +use store::{DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem}; use types::{Hash256, PublicKey}; const LOG_EVERY: usize = 200_000; @@ -21,7 +19,7 @@ pub fn upgrade_to_v21( // Iterate through all pubkeys and decompress them. for (i, res) in db .hot_db - .iter_column::(DBColumn::PubkeyCache) + .iter_column::(DBColumn::PubkeyCache)? .enumerate() { let (key, value) = res?; @@ -53,7 +51,7 @@ pub fn downgrade_from_v21( // Iterate through all pubkeys and recompress them. for (i, res) in db .hot_db - .iter_column::(DBColumn::PubkeyCache) + .iter_column::(DBColumn::PubkeyCache)? .enumerate() { let (key, value) = res?; @@ -62,9 +60,10 @@ pub fn downgrade_from_v21( message: format!("{e:?}"), })?; - let db_key = get_key_for_col(DBColumn::PubkeyCache.into(), key.as_slice()); + let column: &str = DBColumn::PubkeyCache.into(); ops.push(KeyValueStoreOp::PutKeyValue( - db_key, + column.to_owned(), + key.as_slice().to_vec(), pubkey_bytes.as_ssz_bytes(), )); diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs index fcb78ab8010..ed7abe53963 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use store::chunked_iter::ChunkedVectorIter; use store::{ chunked_vector::BlockRootsChunked, - get_key_for_col, metadata::{ SchemaVersion, ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_UNINITIALIZED, STATE_UPPER_LIMIT_NO_RETAIN, }, @@ -132,12 +131,9 @@ pub fn delete_old_schema_freezer_data( ]; for column in columns { - for res in db.cold_db.iter_column_keys::>(column) { + for res in db.cold_db.iter_column_keys::>(column)? { let key = res?; - cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col( - column.as_str(), - &key, - ))); + cold_ops.push(KeyValueStoreOp::DeleteKey(column.as_str().to_owned(), key)); } } let delete_ops = cold_ops.len(); @@ -172,8 +168,10 @@ pub fn write_new_schema_block_roots( // Store the genesis block root if it would otherwise not be stored. if oldest_block_slot != 0 { + let column: &str = DBColumn::BeaconBlockRoots.into(); cold_ops.push(KeyValueStoreOp::PutKeyValue( - get_key_for_col(DBColumn::BeaconBlockRoots.into(), &0u64.to_be_bytes()), + column.to_owned(), + 0u64.to_be_bytes().to_vec(), genesis_block_root.as_slice().to_vec(), )); } @@ -189,11 +187,10 @@ pub fn write_new_schema_block_roots( // OK to hold these in memory (10M slots * 43 bytes per KV ~= 430 MB). for (i, (slot, block_root)) in block_root_iter.enumerate() { + let column: &str = DBColumn::BeaconBlockRoots.into(); cold_ops.push(KeyValueStoreOp::PutKeyValue( - get_key_for_col( - DBColumn::BeaconBlockRoots.into(), - &(slot as u64).to_be_bytes(), - ), + column.to_owned(), + slot.to_be_bytes().to_vec(), block_root.as_slice().to_vec(), )); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 093ee0c44b4..7039c4a612d 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -56,7 +56,8 @@ use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, LazyLock}; use std::time::Duration; -use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore}; +use store::database::interface::BeaconNodeBackend; +use store::{config::StoreConfig, HotColdDB, ItemStore, MemoryStore}; use task_executor::TaskExecutor; use task_executor::{test_utils::TestRuntime, ShutdownReason}; use tree_hash::TreeHash; @@ -116,7 +117,7 @@ pub fn get_kzg(spec: &ChainSpec) -> Arc { pub type BaseHarnessType = Witness, E, THotStore, TColdStore>; -pub type DiskHarnessType = BaseHarnessType, LevelDB>; +pub type DiskHarnessType = BaseHarnessType, BeaconNodeBackend>; pub type EphemeralHarnessType = BaseHarnessType, MemoryStore>; pub type BoxedMutator = Box< @@ -299,7 +300,10 @@ impl Builder> { impl Builder> { /// Disk store, start from genesis. - pub fn fresh_disk_store(mut self, store: Arc, LevelDB>>) -> Self { + pub fn fresh_disk_store( + mut self, + store: Arc, BeaconNodeBackend>>, + ) -> Self { let validator_keypairs = self .validator_keypairs .clone() @@ -324,7 +328,10 @@ impl Builder> { } /// Disk store, resume. - pub fn resumed_disk_store(mut self, store: Arc, LevelDB>>) -> Self { + pub fn resumed_disk_store( + mut self, + store: Arc, BeaconNodeBackend>>, + ) -> Self { let mutator = move |builder: BeaconChainBuilder<_>| { builder .resume_from_db() diff --git a/beacon_node/beacon_chain/tests/op_verification.rs b/beacon_node/beacon_chain/tests/op_verification.rs index df0d561e1cd..44fb298d6c6 100644 --- a/beacon_node/beacon_chain/tests/op_verification.rs +++ b/beacon_node/beacon_chain/tests/op_verification.rs @@ -14,7 +14,8 @@ use state_processing::per_block_processing::errors::{ AttesterSlashingInvalid, BlockOperationError, ExitInvalid, ProposerSlashingInvalid, }; use std::sync::{Arc, LazyLock}; -use store::{LevelDB, StoreConfig}; +use store::database::interface::BeaconNodeBackend; +use store::StoreConfig; use tempfile::{tempdir, TempDir}; use types::*; @@ -26,7 +27,7 @@ static KEYPAIRS: LazyLock> = type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; -type HotColdDB = store::HotColdDB, LevelDB>; +type HotColdDB = store::HotColdDB, BeaconNodeBackend>; fn get_store(db_path: &TempDir) -> Arc { let spec = Arc::new(test_spec::()); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 522020e476d..f33740ab4d5 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -25,10 +25,11 @@ use std::collections::HashSet; use std::convert::TryInto; use std::sync::{Arc, LazyLock}; use std::time::Duration; +use store::database::interface::BeaconNodeBackend; use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION, STATE_UPPER_LIMIT_NO_RETAIN}; use store::{ iter::{BlockRootsIterator, StateRootsIterator}, - BlobInfo, DBColumn, HotColdDB, LevelDB, StoreConfig, + BlobInfo, DBColumn, HotColdDB, StoreConfig, }; use tempfile::{tempdir, TempDir}; use tokio::time::sleep; @@ -46,7 +47,7 @@ static KEYPAIRS: LazyLock> = type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; -fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { +fn get_store(db_path: &TempDir) -> Arc, BeaconNodeBackend>> { get_store_generic(db_path, StoreConfig::default(), test_spec::()) } @@ -54,7 +55,7 @@ fn get_store_generic( db_path: &TempDir, config: StoreConfig, spec: ChainSpec, -) -> Arc, LevelDB>> { +) -> Arc, BeaconNodeBackend>> { let hot_path = db_path.path().join("chain_db"); let cold_path = db_path.path().join("freezer_db"); let blobs_path = db_path.path().join("blobs_db"); @@ -73,7 +74,7 @@ fn get_store_generic( } fn get_harness( - store: Arc, LevelDB>>, + store: Arc, BeaconNodeBackend>>, validator_count: usize, ) -> TestHarness { // Most tests expect to retain historic states, so we use this as the default. @@ -85,7 +86,7 @@ fn get_harness( } fn get_harness_generic( - store: Arc, LevelDB>>, + store: Arc, BeaconNodeBackend>>, validator_count: usize, chain_config: ChainConfig, ) -> TestHarness { @@ -243,7 +244,6 @@ async fn full_participation_no_skips() { AttestationStrategy::AllValidators, ) .await; - check_finalization(&harness, num_blocks_produced); check_split_slot(&harness, store); check_chain_dump(&harness, num_blocks_produced + 1); @@ -2146,7 +2146,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_startup() { .unwrap_err(); assert_eq!( - store.iter_temporary_state_roots().count(), + store.iter_temporary_state_roots().unwrap().count(), block_slot.as_usize() - 1 ); store @@ -2165,7 +2165,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_startup() { // On startup, the store should garbage collect all the temporary states. let store = get_store(&db_path); - assert_eq!(store.iter_temporary_state_roots().count(), 0); + assert_eq!(store.iter_temporary_state_roots().unwrap().count(), 0); } #[tokio::test] @@ -2201,7 +2201,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() { .unwrap_err(); assert_eq!( - store.iter_temporary_state_roots().count(), + store.iter_temporary_state_roots().unwrap().count(), block_slot.as_usize() - 1 ); @@ -2220,7 +2220,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() { assert_ne!(store.get_split_slot(), 0); // Check that temporary states have been pruned. - assert_eq!(store.iter_temporary_state_roots().count(), 0); + assert_eq!(store.iter_temporary_state_roots().unwrap().count(), 0); } #[tokio::test] @@ -3488,7 +3488,10 @@ fn check_finalization(harness: &TestHarness, expected_slot: u64) { } /// Check that the HotColdDB's split_slot is equal to the start slot of the last finalized epoch. -fn check_split_slot(harness: &TestHarness, store: Arc, LevelDB>>) { +fn check_split_slot( + harness: &TestHarness, + store: Arc, BeaconNodeBackend>>, +) { let split_slot = store.get_split_slot(); assert_eq!( harness diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 961f5140f92..6ec7a9452f8 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -15,7 +15,7 @@ use beacon_chain::{ eth1_chain::{CachingEth1Backend, Eth1Chain}, slot_clock::{SlotClock, SystemTimeSlotClock}, state_advance_timer::spawn_state_advance_timer, - store::{HotColdDB, ItemStore, LevelDB, StoreConfig}, + store::{HotColdDB, ItemStore, StoreConfig}, BeaconChain, BeaconChainTypes, Eth1ChainBackend, MigratorConfig, ServerSentEventHandler, }; use beacon_chain::{Kzg, LightClientProducerEvent}; @@ -43,6 +43,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; +use store::database::interface::BeaconNodeBackend; use timer::spawn_timer; use tokio::sync::oneshot; use types::{ @@ -1032,7 +1033,7 @@ where } impl - ClientBuilder, LevelDB>> + ClientBuilder, BeaconNodeBackend>> where TSlotClock: SlotClock + 'static, TEth1Backend: Eth1ChainBackend + 'static, diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 940f3ae9c0c..f4c7a4fdb44 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1923,10 +1923,10 @@ impl ApiTester { current_sync_committee_period as u64, 1, &self.chain.spec, + test_logger(), ) .unwrap(); - assert_eq!(1, expected.len()); assert_eq!(result.clone().unwrap().len(), expected.len()); self } @@ -1952,7 +1952,6 @@ impl ApiTester { .get_light_client_bootstrap(&self.chain.store, &block_root, 1u64, &self.chain.spec); assert!(expected.is_ok()); - assert_eq!(result.unwrap().data, expected.unwrap().unwrap().0); self diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 87c6e84ba70..53b3ebefafe 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -1591,5 +1591,14 @@ pub fn cli_app() -> Command { .action(ArgAction::Set) .display_order(0) ) + .arg( + Arg::new("beacon-node-backend") + .long("beacon-node-backend") + .value_name("DATABASE") + .value_parser(store::config::DatabaseBackend::VARIANTS.to_vec()) + .help("Set the database backend to be used by the beacon node backend.") + .action(ArgAction::Set) + .display_order(0) + ) .group(ArgGroup::new("enable_http").args(["http", "gui", "staking"]).multiple(true)) } diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 9413eb3924e..ae0fee24ec2 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -2,7 +2,6 @@ mod cli; mod config; pub use beacon_chain; -use beacon_chain::store::LevelDB; use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, slot_clock::SystemTimeSlotClock, }; @@ -16,11 +15,19 @@ use slasher::{DatabaseBackendOverride, Slasher}; use slog::{info, warn}; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +use store::database::interface::BeaconNodeBackend; use types::{ChainSpec, Epoch, EthSpec, ForkName}; /// A type-alias to the tighten the definition of a production-intended `Client`. -pub type ProductionClient = - Client, E, LevelDB, LevelDB>>; +pub type ProductionClient = Client< + Witness< + SystemTimeSlotClock, + CachingEth1Backend, + E, + BeaconNodeBackend, + BeaconNodeBackend, + >, +>; /// The beacon node `Client` that will be used in production. /// diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 7cee16c3535..f836e91ee74 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -4,6 +4,11 @@ version = "0.2.0" authors = ["Paul Hauner "] edition = { workspace = true } +[features] +default = ["redb"] +leveldb = ["dep:leveldb"] +redb = ["dep:redb"] + [dev-dependencies] tempfile = { workspace = true } beacon_chain = { workspace = true } @@ -12,7 +17,8 @@ rand = { workspace = true, features = ["small_rng"] } [dependencies] db-key = "0.0.5" -leveldb = { version = "0.8" } +leveldb = { version = "0.8.6", optional = true } +redb = { version = "2.1.3", optional = true } parking_lot = { workspace = true } itertools = { workspace = true } ethereum_ssz = { workspace = true } diff --git a/beacon_node/store/src/chunked_vector.rs b/beacon_node/store/src/chunked_vector.rs index 83b8da2a189..4711d958e3b 100644 --- a/beacon_node/store/src/chunked_vector.rs +++ b/beacon_node/store/src/chunked_vector.rs @@ -691,8 +691,12 @@ where key: &[u8], ops: &mut Vec, ) -> Result<(), Error> { - let db_key = get_key_for_col(column.into(), key); - ops.push(KeyValueStoreOp::PutKeyValue(db_key, self.encode()?)); + let column_name: &str = column.into(); + ops.push(KeyValueStoreOp::PutKeyValue( + column_name.to_owned(), + key.to_vec(), + self.encode()?, + )); Ok(()) } diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 4f675305706..64765fd66a0 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -1,16 +1,23 @@ use crate::hdiff::HierarchyConfig; +use crate::superstruct; use crate::{AnchorInfo, DBColumn, Error, Split, StoreItem}; use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::io::Write; use std::num::NonZeroUsize; -use superstruct::superstruct; +use strum::{Display, EnumString, EnumVariantNames}; use types::non_zero_usize::new_non_zero_usize; use types::EthSpec; use zstd::Encoder; -// Only used in tests. Mainnet sets a higher default on the CLI. +#[cfg(all(feature = "redb", not(feature = "leveldb")))] +pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Redb; +#[cfg(feature = "leveldb")] +pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::LevelDb; + +pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048; +pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192; pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 8; pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(64); pub const DEFAULT_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(128); @@ -40,6 +47,8 @@ pub struct StoreConfig { pub compact_on_prune: bool, /// Whether to prune payloads on initialization and finalization. pub prune_payloads: bool, + /// Database backend to use. + pub backend: DatabaseBackend, /// State diff hierarchy. pub hierarchy_config: HierarchyConfig, /// Whether to prune blobs older than the blob data availability boundary. @@ -104,6 +113,7 @@ impl Default for StoreConfig { compact_on_init: false, compact_on_prune: true, prune_payloads: true, + backend: DEFAULT_BACKEND, hierarchy_config: HierarchyConfig::default(), prune_blobs: true, epochs_per_blob_prune: DEFAULT_EPOCHS_PER_BLOB_PRUNE, @@ -340,3 +350,14 @@ mod test { assert_eq!(config_out, config); } } + +#[derive( + Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, Display, EnumString, EnumVariantNames, +)] +#[strum(serialize_all = "lowercase")] +pub enum DatabaseBackend { + #[cfg(feature = "leveldb")] + LevelDb, + #[cfg(feature = "redb")] + Redb, +} diff --git a/beacon_node/store/src/database.rs b/beacon_node/store/src/database.rs new file mode 100644 index 00000000000..2232f73c5cc --- /dev/null +++ b/beacon_node/store/src/database.rs @@ -0,0 +1,5 @@ +pub mod interface; +#[cfg(feature = "leveldb")] +pub mod leveldb_impl; +#[cfg(feature = "redb")] +pub mod redb_impl; diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs new file mode 100644 index 00000000000..7827856757a --- /dev/null +++ b/beacon_node/store/src/database/interface.rs @@ -0,0 +1,235 @@ +#[cfg(feature = "leveldb")] +use crate::database::leveldb_impl; +#[cfg(feature = "redb")] +use crate::database::redb_impl; +use crate::{config::DatabaseBackend, KeyValueStoreOp, StoreConfig}; +use crate::{metrics, ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, KeyValueStore}; +use std::collections::HashSet; +use std::path::Path; +use types::EthSpec; + +pub enum BeaconNodeBackend { + #[cfg(feature = "leveldb")] + LevelDb(leveldb_impl::LevelDB), + #[cfg(feature = "redb")] + Redb(redb_impl::Redb), +} + +impl ItemStore for BeaconNodeBackend {} + +impl KeyValueStore for BeaconNodeBackend { + fn get_bytes(&self, column: &str, key: &[u8]) -> Result>, Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::get_bytes(txn, column, key), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::get_bytes(txn, column, key), + } + } + + fn put_bytes(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::put_bytes_with_options( + txn, + column, + key, + value, + txn.write_options(), + ), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::put_bytes_with_options( + txn, + column, + key, + value, + txn.write_options(), + ), + } + } + + fn put_bytes_sync(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::put_bytes_with_options( + txn, + column, + key, + value, + txn.write_options_sync(), + ), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::put_bytes_with_options( + txn, + column, + key, + value, + txn.write_options_sync(), + ), + } + } + + fn sync(&self) -> Result<(), Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::put_bytes_with_options( + txn, + "sync", + b"sync", + b"sync", + txn.write_options_sync(), + ), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::sync(txn), + } + } + + fn key_exists(&self, column: &str, key: &[u8]) -> Result { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::key_exists(txn, column, key), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::key_exists(txn, column, key), + } + } + + fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::key_delete(txn, column, key), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::key_delete(txn, column, key), + } + } + + fn do_atomically(&self, batch: Vec) -> Result<(), Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::do_atomically(txn, batch), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::do_atomically(txn, batch), + } + } + + fn begin_rw_transaction(&self) -> parking_lot::MutexGuard<()> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::begin_rw_transaction(txn), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::begin_rw_transaction(txn), + } + } + + fn compact(&self) -> Result<(), Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::compact(txn), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::compact(txn), + } + } + + fn iter_column_keys_from(&self, _column: DBColumn, from: &[u8]) -> ColumnKeyIter { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => { + leveldb_impl::LevelDB::iter_column_keys_from(txn, _column, from) + } + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => { + redb_impl::Redb::iter_column_keys_from(txn, _column, from) + } + } + } + + fn iter_column_keys(&self, _column: DBColumn) -> ColumnKeyIter { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => { + leveldb_impl::LevelDB::iter_column_keys(txn, _column) + } + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::iter_column_keys(txn, _column), + } + } + + fn iter_column_from( + &self, + column: DBColumn, + from: &[u8], + predicate: impl Fn(&[u8], &[u8]) -> bool + 'static, + ) -> ColumnIter { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => { + leveldb_impl::LevelDB::iter_column_from(txn, column, from, predicate) + } + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => { + redb_impl::Redb::iter_column_from(txn, column, from, predicate) + } + } + } + + fn compact_column(&self, _column: DBColumn) -> Result<(), Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::compact_column(txn, _column), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::compact(txn), + } + } + + fn delete_batch(&self, col: &str, ops: HashSet<&[u8]>) -> Result<(), Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::delete_batch(txn, col, ops), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::delete_batch(txn, col, ops), + } + } + + fn delete_while( + &self, + column: DBColumn, + f: impl FnMut(&[u8]) -> Result, + ) -> Result<(), Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(_txn) => todo!(), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::delete_while(txn, column, f), + } + } +} + +impl BeaconNodeBackend { + pub fn open(config: &StoreConfig, path: &Path) -> Result { + metrics::inc_counter_vec(&metrics::DISK_DB_TYPE, &[&config.backend.to_string()]); + match config.backend { + #[cfg(feature = "leveldb")] + DatabaseBackend::LevelDb => { + leveldb_impl::LevelDB::open(path).map(BeaconNodeBackend::LevelDb) + } + #[cfg(feature = "redb")] + DatabaseBackend::Redb => redb_impl::Redb::open(path).map(BeaconNodeBackend::Redb), + } + } +} + +pub struct WriteOptions { + /// fsync before acknowledging a write operation. + pub sync: bool, +} + +impl WriteOptions { + pub fn new() -> Self { + WriteOptions { sync: false } + } +} + +impl Default for WriteOptions { + fn default() -> Self { + Self::new() + } +} diff --git a/beacon_node/store/src/database/leveldb_impl.rs b/beacon_node/store/src/database/leveldb_impl.rs new file mode 100644 index 00000000000..75ff83eb824 --- /dev/null +++ b/beacon_node/store/src/database/leveldb_impl.rs @@ -0,0 +1,297 @@ +use crate::hot_cold_store::{BytesKey, HotColdDBError}; +use crate::Key; +use crate::{ + get_key_for_col, metrics, ColumnIter, ColumnKeyIter, DBColumn, Error, KeyValueStoreOp, +}; +use leveldb::{ + compaction::Compaction, + database::{ + batch::{Batch, Writebatch}, + kv::KV, + Database, + }, + iterator::{Iterable, LevelDBIterator}, + options::{Options, ReadOptions}, +}; +use parking_lot::{Mutex, MutexGuard}; +use std::collections::HashSet; +use std::marker::PhantomData; +use std::path::Path; +use types::{EthSpec, FixedBytesExtended, Hash256}; + +use super::interface::WriteOptions; + +pub struct LevelDB { + db: Database, + /// A mutex to synchronise sensitive read-write transactions. + transaction_mutex: Mutex<()>, + _phantom: PhantomData, +} + +impl From for leveldb::options::WriteOptions { + fn from(options: WriteOptions) -> Self { + // Assuming LevelDBWriteOptions has a new method that accepts a bool parameter for sync. + let mut opts = leveldb::options::WriteOptions::new(); + opts.sync = options.sync; + opts + } +} + +impl LevelDB { + pub fn open(path: &Path) -> Result { + let mut options = Options::new(); + + options.create_if_missing = true; + + let db = Database::open(path, options)?; + let transaction_mutex = Mutex::new(()); + + Ok(Self { + db, + transaction_mutex, + _phantom: PhantomData, + }) + } + + pub fn read_options(&self) -> ReadOptions { + ReadOptions::new() + } + + pub fn write_options(&self) -> WriteOptions { + WriteOptions::new() + } + + pub fn write_options_sync(&self) -> WriteOptions { + let mut opts = WriteOptions::new(); + opts.sync = true; + opts + } + + pub fn put_bytes_with_options( + &self, + col: &str, + key: &[u8], + val: &[u8], + opts: WriteOptions, + ) -> Result<(), Error> { + let column_key = get_key_for_col(col, key); + + metrics::inc_counter_vec(&metrics::DISK_DB_WRITE_COUNT, &[col]); + metrics::inc_counter_vec_by(&metrics::DISK_DB_WRITE_BYTES, &[col], val.len() as u64); + let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); + + self.db + .put(opts.into(), BytesKey::from_vec(column_key), val) + .map_err(Into::into) + .map(|()| { + metrics::stop_timer(timer); + }) + } + + /// Store some `value` in `column`, indexed with `key`. + pub fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { + self.put_bytes_with_options(col, key, val, self.write_options()) + } + + pub fn put_bytes_sync(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { + self.put_bytes_with_options(col, key, val, self.write_options_sync()) + } + + pub fn sync(&self) -> Result<(), Error> { + self.put_bytes_sync("sync", b"sync", b"sync") + } + + // Retrieve some bytes in `column` with `key`. + pub fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { + let column_key = get_key_for_col(col, key); + + metrics::inc_counter_vec(&metrics::DISK_DB_READ_COUNT, &[col]); + let timer = metrics::start_timer(&metrics::DISK_DB_READ_TIMES); + + self.db + .get(self.read_options(), BytesKey::from_vec(column_key)) + .map_err(Into::into) + .map(|opt| { + opt.inspect(|bytes| { + metrics::inc_counter_vec_by( + &metrics::DISK_DB_READ_BYTES, + &[col], + bytes.len() as u64, + ); + metrics::stop_timer(timer); + }) + }) + } + + /// Return `true` if `key` exists in `column`. + pub fn key_exists(&self, col: &str, key: &[u8]) -> Result { + let column_key = get_key_for_col(col, key); + + metrics::inc_counter_vec(&metrics::DISK_DB_EXISTS_COUNT, &[col]); + + self.db + .get(self.read_options(), BytesKey::from_vec(column_key)) + .map_err(Into::into) + .map(|val| val.is_some()) + } + + /// Removes `key` from `column`. + pub fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { + let column_key = get_key_for_col(col, key); + + metrics::inc_counter_vec(&metrics::DISK_DB_DELETE_COUNT, &[col]); + + self.db + .delete(self.write_options().into(), BytesKey::from_vec(column_key)) + .map_err(Into::into) + } + + pub fn do_atomically(&self, ops_batch: Vec) -> Result<(), Error> { + let mut leveldb_batch = Writebatch::new(); + for op in ops_batch { + match op { + KeyValueStoreOp::PutKeyValue(col, key, value) => { + let _timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); + metrics::inc_counter_vec_by( + &metrics::DISK_DB_WRITE_BYTES, + &[&col], + value.len() as u64, + ); + metrics::inc_counter_vec(&metrics::DISK_DB_WRITE_COUNT, &[&col]); + let column_key = get_key_for_col(&col, &key); + leveldb_batch.put(BytesKey::from_vec(column_key), &value); + } + + KeyValueStoreOp::DeleteKey(col, key) => { + let _timer = metrics::start_timer(&metrics::DISK_DB_DELETE_TIMES); + metrics::inc_counter_vec(&metrics::DISK_DB_DELETE_COUNT, &[&col]); + let column_key = get_key_for_col(&col, &key); + leveldb_batch.delete(BytesKey::from_vec(column_key)); + } + } + } + self.db.write(self.write_options().into(), &leveldb_batch)?; + Ok(()) + } + + pub fn begin_rw_transaction(&self) -> MutexGuard<()> { + self.transaction_mutex.lock() + } + + /// Compact all values in the states and states flag columns. + pub fn compact(&self) -> Result<(), Error> { + let _timer = metrics::start_timer(&metrics::DISK_DB_COMPACT_TIMES); + let endpoints = |column: DBColumn| { + ( + BytesKey::from_vec(get_key_for_col(column.as_str(), Hash256::zero().as_slice())), + BytesKey::from_vec(get_key_for_col( + column.as_str(), + Hash256::repeat_byte(0xff).as_slice(), + )), + ) + }; + + for (start_key, end_key) in [ + endpoints(DBColumn::BeaconStateTemporary), + endpoints(DBColumn::BeaconState), + endpoints(DBColumn::BeaconStateSummary), + ] { + self.db.compact(&start_key, &end_key); + } + + Ok(()) + } + + pub fn compact_column(&self, column: DBColumn) -> Result<(), Error> { + // Use key-size-agnostic keys [] and 0xff..ff with a minimum of 32 bytes to account for + // columns that may change size between sub-databases or schema versions. + let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), &[])); + let end_key = BytesKey::from_vec(get_key_for_col( + column.as_str(), + &vec![0xff; std::cmp::max(column.key_size(), 32)], + )); + self.db.compact(&start_key, &end_key); + Ok(()) + } + + pub fn iter_column_from( + &self, + column: DBColumn, + from: &[u8], + predicate: impl Fn(&[u8], &[u8]) -> bool + 'static, + ) -> ColumnIter { + let start_key = BytesKey::from_vec(get_key_for_col(column.into(), from)); + + let iter = self.db.iter(self.read_options()); + iter.seek(&start_key); + + Ok(Box::new( + iter.take_while(move |(key, _)| { + let Some(trimmed_key) = key.remove_column_variable(column) else { + return false; + }; + let Some(trimmed_start_key) = start_key.remove_column_variable(column) else { + return false; + }; + key.matches_column(column) && predicate(trimmed_key, trimmed_start_key) + }) + .map(move |(bytes_key, value)| { + metrics::inc_counter_vec(&metrics::DISK_DB_READ_COUNT, &[column.into()]); + metrics::inc_counter_vec_by( + &metrics::DISK_DB_READ_BYTES, + &[column.into()], + value.len() as u64, + ); + let key = bytes_key.remove_column_variable(column).ok_or_else(|| { + HotColdDBError::IterationError { + unexpected_key: bytes_key.clone(), + } + })?; + Ok((K::from_bytes(key)?, value)) + }), + )) + } + + pub fn iter_column_keys_from(&self, column: DBColumn, from: &[u8]) -> ColumnKeyIter { + let start_key = BytesKey::from_vec(get_key_for_col(column.into(), from)); + + let iter = self.db.keys_iter(self.read_options()); + iter.seek(&start_key); + + Ok(Box::new( + iter.take_while(move |key| key.matches_column(column)) + .map(move |bytes_key| { + metrics::inc_counter_vec(&metrics::DISK_DB_KEY_READ_COUNT, &[column.into()]); + metrics::inc_counter_vec_by( + &metrics::DISK_DB_KEY_READ_BYTES, + &[column.into()], + bytes_key.key.len() as u64, + ); + let key = &bytes_key.key[column.as_bytes().len()..]; + K::from_bytes(key) + }), + )) + } + + /// Iterate through all keys and values in a particular column. + pub fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { + self.iter_column_keys_from(column, &vec![0; column.key_size()]) + } + + pub fn iter_column(&self, column: DBColumn) -> ColumnIter { + self.iter_column_from(column, &vec![0; column.key_size()], move |key, _| { + metrics::inc_counter_vec(&metrics::DISK_DB_READ_COUNT, &[column.into()]); + BytesKey::from_vec(key.to_vec()).matches_column(column) + }) + } + + pub fn delete_batch(&self, col: &str, ops: HashSet<&[u8]>) -> Result<(), Error> { + let mut leveldb_batch = Writebatch::new(); + for op in ops { + let column_key = get_key_for_col(col, op); + leveldb_batch.delete(BytesKey::from_vec(column_key)); + } + self.db.write(self.write_options().into(), &leveldb_batch)?; + Ok(()) + } +} diff --git a/beacon_node/store/src/database/redb_impl.rs b/beacon_node/store/src/database/redb_impl.rs new file mode 100644 index 00000000000..8a92888c298 --- /dev/null +++ b/beacon_node/store/src/database/redb_impl.rs @@ -0,0 +1,322 @@ +use crate::{metrics, ColumnIter, ColumnKeyIter, Key}; +use crate::{DBColumn, Error, KeyValueStoreOp}; +use parking_lot::{Mutex, MutexGuard, RwLock}; +use redb::TableDefinition; +use std::collections::HashSet; +use std::{borrow::BorrowMut, marker::PhantomData, path::Path}; +use strum::IntoEnumIterator; +use types::EthSpec; + +use super::interface::WriteOptions; + +pub const DB_FILE_NAME: &str = "database.redb"; + +pub struct Redb { + db: RwLock, + transaction_mutex: Mutex<()>, + _phantom: PhantomData, +} + +impl From for redb::Durability { + fn from(options: WriteOptions) -> Self { + if options.sync { + redb::Durability::Immediate + } else { + redb::Durability::Eventual + } + } +} + +impl Redb { + pub fn open(path: &Path) -> Result { + let path = if path.is_dir() { + path.join(DB_FILE_NAME) + } else { + path.to_path_buf() + }; + let db = redb::Database::create(path)?; + let transaction_mutex = Mutex::new(()); + + for column in DBColumn::iter() { + Redb::::create_table(&db, column.into())?; + } + + Ok(Self { + db: db.into(), + transaction_mutex, + _phantom: PhantomData, + }) + } + + fn create_table(db: &redb::Database, table_name: &str) -> Result<(), Error> { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(table_name); + let tx = db.begin_write()?; + tx.open_table(table_definition)?; + tx.commit().map_err(Into::into) + } + + pub fn write_options(&self) -> WriteOptions { + WriteOptions::new() + } + + pub fn write_options_sync(&self) -> WriteOptions { + let mut opts = WriteOptions::new(); + opts.sync = true; + opts + } + + pub fn begin_rw_transaction(&self) -> MutexGuard<()> { + self.transaction_mutex.lock() + } + + pub fn put_bytes_with_options( + &self, + col: &str, + key: &[u8], + val: &[u8], + opts: WriteOptions, + ) -> Result<(), Error> { + metrics::inc_counter_vec(&metrics::DISK_DB_WRITE_COUNT, &[col]); + metrics::inc_counter_vec_by(&metrics::DISK_DB_WRITE_BYTES, &[col], val.len() as u64); + let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); + + let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(col); + let open_db = self.db.read(); + let mut tx = open_db.begin_write()?; + tx.set_durability(opts.into()); + let mut table = tx.open_table(table_definition)?; + + table.insert(key, val).map(|_| { + metrics::stop_timer(timer); + })?; + drop(table); + tx.commit().map_err(Into::into) + } + + /// Store some `value` in `column`, indexed with `key`. + pub fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { + self.put_bytes_with_options(col, key, val, self.write_options()) + } + + pub fn put_bytes_sync(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { + self.put_bytes_with_options(col, key, val, self.write_options_sync()) + } + + pub fn sync(&self) -> Result<(), Error> { + self.put_bytes_sync("sync", b"sync", b"sync") + } + + // Retrieve some bytes in `column` with `key`. + pub fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { + metrics::inc_counter_vec(&metrics::DISK_DB_READ_COUNT, &[col]); + let timer = metrics::start_timer(&metrics::DISK_DB_READ_TIMES); + + let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(col); + let open_db = self.db.read(); + let tx = open_db.begin_read()?; + let table = tx.open_table(table_definition)?; + + let result = table.get(key)?; + + match result { + Some(access_guard) => { + let value = access_guard.value().to_vec(); + metrics::inc_counter_vec_by( + &metrics::DISK_DB_READ_BYTES, + &[col], + value.len() as u64, + ); + metrics::stop_timer(timer); + Ok(Some(value)) + } + None => { + metrics::stop_timer(timer); + Ok(None) + } + } + } + + /// Return `true` if `key` exists in `column`. + pub fn key_exists(&self, col: &str, key: &[u8]) -> Result { + metrics::inc_counter_vec(&metrics::DISK_DB_EXISTS_COUNT, &[col]); + + let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(col); + let open_db = self.db.read(); + let tx = open_db.begin_read()?; + let table = tx.open_table(table_definition)?; + + table + .get(key) + .map_err(Into::into) + .map(|access_guard| access_guard.is_some()) + } + + /// Removes `key` from `column`. + pub fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(col); + let open_db = self.db.read(); + let tx = open_db.begin_write()?; + let mut table = tx.open_table(table_definition)?; + metrics::inc_counter_vec(&metrics::DISK_DB_DELETE_COUNT, &[col]); + + table.remove(key).map(|_| ())?; + drop(table); + tx.commit().map_err(Into::into) + } + + pub fn do_atomically(&self, ops_batch: Vec) -> Result<(), Error> { + let open_db = self.db.read(); + let mut tx = open_db.begin_write()?; + tx.set_durability(self.write_options().into()); + for op in ops_batch { + match op { + KeyValueStoreOp::PutKeyValue(column, key, value) => { + let _timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); + metrics::inc_counter_vec_by( + &metrics::DISK_DB_WRITE_BYTES, + &[&column], + value.len() as u64, + ); + metrics::inc_counter_vec(&metrics::DISK_DB_WRITE_COUNT, &[&column]); + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(&column); + + let mut table = tx.open_table(table_definition)?; + table.insert(key.as_slice(), value.as_slice())?; + drop(table); + } + + KeyValueStoreOp::DeleteKey(column, key) => { + metrics::inc_counter_vec(&metrics::DISK_DB_DELETE_COUNT, &[&column]); + let _timer = metrics::start_timer(&metrics::DISK_DB_DELETE_TIMES); + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(&column); + + let mut table = tx.open_table(table_definition)?; + table.remove(key.as_slice())?; + drop(table); + } + } + } + + tx.commit()?; + Ok(()) + } + + /// Compact all values in the states and states flag columns. + pub fn compact(&self) -> Result<(), Error> { + let _timer = metrics::start_timer(&metrics::DISK_DB_COMPACT_TIMES); + let mut open_db = self.db.write(); + let mut_db = open_db.borrow_mut(); + mut_db.compact().map_err(Into::into).map(|_| ()) + } + + pub fn iter_column_keys_from(&self, column: DBColumn, from: &[u8]) -> ColumnKeyIter { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(column.into()); + + let iter = { + let open_db = self.db.read(); + let read_txn = open_db.begin_read()?; + let table = read_txn.open_table(table_definition)?; + table.range(from..)?.map(move |res| { + let (key, _) = res?; + metrics::inc_counter_vec(&metrics::DISK_DB_KEY_READ_COUNT, &[column.into()]); + metrics::inc_counter_vec_by( + &metrics::DISK_DB_KEY_READ_BYTES, + &[column.into()], + key.value().len() as u64, + ); + K::from_bytes(key.value()) + }) + }; + + Ok(Box::new(iter)) + } + + /// Iterate through all keys and values in a particular column. + pub fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { + self.iter_column_keys_from(column, &vec![0; column.key_size()]) + } + + pub fn iter_column_from( + &self, + column: DBColumn, + from: &[u8], + predicate: impl Fn(&[u8], &[u8]) -> bool + 'static, + ) -> ColumnIter { + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(column.into()); + + let prefix = from.to_vec(); + + let iter = { + let open_db = self.db.read(); + let read_txn = open_db.begin_read()?; + let table = read_txn.open_table(table_definition)?; + + table + .range(from..)? + .take_while(move |res| match res.as_ref() { + Ok((key, _)) => predicate(key.value(), prefix.as_slice()), + Err(_) => false, + }) + .map(move |res| { + let (key, value) = res?; + metrics::inc_counter_vec(&metrics::DISK_DB_READ_COUNT, &[column.into()]); + metrics::inc_counter_vec_by( + &metrics::DISK_DB_READ_BYTES, + &[column.into()], + value.value().len() as u64, + ); + Ok((K::from_bytes(key.value())?, value.value().to_vec())) + }) + }; + + Ok(Box::new(iter)) + } + + pub fn iter_column(&self, column: DBColumn) -> ColumnIter { + self.iter_column_from(column, &vec![0; column.key_size()], |_, _| true) + } + + pub fn delete_batch(&self, col: &str, ops: HashSet<&[u8]>) -> Result<(), Error> { + let open_db = self.db.read(); + let mut tx = open_db.begin_write()?; + + tx.set_durability(redb::Durability::None); + + let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(col); + + let mut table = tx.open_table(table_definition)?; + table.retain(|key, _| !ops.contains(key))?; + + drop(table); + tx.commit()?; + Ok(()) + } + + pub fn delete_while( + &self, + column: DBColumn, + mut f: impl FnMut(&[u8]) -> Result, + ) -> Result<(), Error> { + let open_db = self.db.read(); + let mut tx = open_db.begin_write()?; + + tx.set_durability(redb::Durability::None); + + let table_definition: TableDefinition<'_, &[u8], &[u8]> = + TableDefinition::new(column.into()); + + let mut table = tx.open_table(table_definition)?; + table.retain(|_, value| !f(value).unwrap_or(false))?; + + // extract_iter.for_each(|_| { + // metrics::inc_counter_vec(&metrics::DISK_DB_DELETE_COUNT, &[col]); + // }); + drop(table); + tx.commit()?; + Ok(()) + } +} diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 6bb4edee6b2..67d38803101 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -2,6 +2,8 @@ use crate::chunked_vector::ChunkError; use crate::config::StoreConfigError; use crate::hot_cold_store::HotColdDBError; use crate::{hdiff, DBColumn}; +#[cfg(feature = "leveldb")] +use leveldb::error::Error as LevelDBError; use ssz::DecodeError; use state_processing::BlockReplayError; use types::{milhouse, BeaconStateError, EpochCacheError, Hash256, InconsistentFork, Slot}; @@ -48,6 +50,18 @@ pub enum Error { MissingGenesisState, MissingSnapshot(Slot), BlockReplayError(BlockReplayError), + AddPayloadLogicError, + SlotClockUnavailableForMigration, + InvalidKey, + InvalidBytes, + UnableToDowngrade, + InconsistentFork(InconsistentFork), + #[cfg(feature = "leveldb")] + LevelDbError(LevelDBError), + #[cfg(feature = "redb")] + RedbError(redb::Error), + CacheBuildError(EpochCacheError), + RandaoMixOutOfBounds, MilhouseError(milhouse::Error), Compression(std::io::Error), FinalizedStateDecreasingSlot, @@ -56,17 +70,11 @@ pub enum Error { state_root: Hash256, slot: Slot, }, - AddPayloadLogicError, - InvalidKey, - InvalidBytes, - InconsistentFork(InconsistentFork), Hdiff(hdiff::Error), - CacheBuildError(EpochCacheError), ForwardsIterInvalidColumn(DBColumn), ForwardsIterGap(DBColumn, Slot, Slot), StateShouldNotBeRequired(Slot), MissingBlock(Hash256), - RandaoMixOutOfBounds, GenesisStateUnknown, ArithError(safe_arith::ArithError), } @@ -145,6 +153,62 @@ impl From for Error { } } +#[cfg(feature = "leveldb")] +impl From for Error { + fn from(e: LevelDBError) -> Error { + Error::LevelDbError(e) + } +} + +#[cfg(feature = "redb")] +impl From for Error { + fn from(e: redb::Error) -> Self { + Error::RedbError(e) + } +} + +#[cfg(feature = "redb")] +impl From for Error { + fn from(e: redb::TableError) -> Self { + Error::RedbError(e.into()) + } +} + +#[cfg(feature = "redb")] +impl From for Error { + fn from(e: redb::TransactionError) -> Self { + Error::RedbError(e.into()) + } +} + +#[cfg(feature = "redb")] +impl From for Error { + fn from(e: redb::DatabaseError) -> Self { + Error::RedbError(e.into()) + } +} + +#[cfg(feature = "redb")] +impl From for Error { + fn from(e: redb::StorageError) -> Self { + Error::RedbError(e.into()) + } +} + +#[cfg(feature = "redb")] +impl From for Error { + fn from(e: redb::CommitError) -> Self { + Error::RedbError(e.into()) + } +} + +#[cfg(feature = "redb")] +impl From for Error { + fn from(e: redb::CompactionError) -> Self { + Error::RedbError(e.into()) + } +} + impl From for Error { fn from(e: EpochCacheError) -> Error { Error::CacheBuildError(e) diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index e0f44f3affb..2749620aa51 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -4,7 +4,6 @@ use crate::{ColumnIter, DBColumn, HotColdDB, ItemStore}; use itertools::process_results; use std::marker::PhantomData; use types::{BeaconState, EthSpec, Hash256, Slot}; - pub type HybridForwardsBlockRootsIterator<'a, E, Hot, Cold> = HybridForwardsIterator<'a, E, Hot, Cold>; pub type HybridForwardsStateRootsIterator<'a, E, Hot, Cold> = @@ -140,7 +139,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> } let start = start_slot.as_u64().to_be_bytes(); Ok(Self { - inner: store.cold_db.iter_column_from(column, &start), + inner: store.cold_db.iter_column_from(column, &start, |_, _| true), column, next_slot: start_slot, end_slot, @@ -158,7 +157,11 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator if self.next_slot == self.end_slot { return None; } - self.inner + let Ok(inner) = self.inner.as_mut() else { + return None; + }; + + inner .next()? .and_then(|(slot_bytes, root_bytes)| { let slot = slot_bytes diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs index 5f8ed8f5e73..1e3609d03fb 100644 --- a/beacon_node/store/src/garbage_collection.rs +++ b/beacon_node/store/src/garbage_collection.rs @@ -1,10 +1,11 @@ //! Garbage collection process that runs at start-up to clean up the database. +use crate::database::interface::BeaconNodeBackend; use crate::hot_cold_store::HotColdDB; -use crate::{Error, LevelDB, StoreOp}; +use crate::{DBColumn, Error}; use slog::debug; use types::EthSpec; -impl HotColdDB, LevelDB> +impl HotColdDB, BeaconNodeBackend> where E: EthSpec, { @@ -16,21 +17,25 @@ where /// Delete the temporary states that were leftover by failed block imports. pub fn delete_temp_states(&self) -> Result<(), Error> { - let delete_ops = - self.iter_temporary_state_roots() - .try_fold(vec![], |mut ops, state_root| { - let state_root = state_root?; - ops.push(StoreOp::DeleteState(state_root, None)); - Result::<_, Error>::Ok(ops) - })?; - - if !delete_ops.is_empty() { + let mut ops = vec![]; + self.iter_temporary_state_roots()?.for_each(|state_root| { + if let Ok(state_root) = state_root { + ops.push(state_root); + } + }); + if !ops.is_empty() { debug!( self.log, "Garbage collecting {} temporary states", - delete_ops.len() + ops.len() ); - self.do_atomically_with_block_and_blobs_cache(delete_ops)?; + let state_col: &str = DBColumn::BeaconState.into(); + let summary_col: &str = DBColumn::BeaconStateSummary.into(); + let temp_state_col: &str = DBColumn::BeaconStateTemporary.into(); + + self.delete_batch(state_col, ops.clone())?; + self.delete_batch(summary_col, ops.clone())?; + self.delete_batch(temp_state_col, ops)?; } Ok(()) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 4942b148810..7a958a5fcae 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1,10 +1,10 @@ use crate::config::{OnDiskStoreConfig, StoreConfig}; +use crate::database::interface::BeaconNodeBackend; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; use crate::hdiff::{HDiff, HDiffBuffer, HierarchyModuli, StorageStrategy}; use crate::historic_state_cache::HistoricStateCache; use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; -use crate::leveldb_store::{BytesKey, LevelDB}; use crate::memory_store::MemoryStore; use crate::metadata::{ AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion, @@ -13,13 +13,11 @@ use crate::metadata::{ PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, }; use crate::state_cache::{PutStateOutcome, StateCache}; -use crate::{ - get_data_column_key, get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, - KeyValueStoreOp, StoreItem, StoreOp, -}; -use crate::{metrics, parse_data_column_key}; -use itertools::{process_results, Itertools}; -use leveldb::iterator::LevelDBIterator; +use crate::{get_data_column_key, metrics, parse_data_column_key, KeyValueStore}; +use crate::{DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, StoreItem, StoreOp}; +use db_key::Key; +use itertools::process_results; +use itertools::Itertools; use lru::LruCache; use parking_lot::{Mutex, RwLock}; use safe_arith::SafeArith; @@ -231,7 +229,7 @@ impl HotColdDB, MemoryStore> { } } -impl HotColdDB, LevelDB> { +impl HotColdDB, BeaconNodeBackend> { /// Open a new or existing database, with the given paths to the hot and cold DBs. /// /// The `migrate_schema` function is passed in so that the parent `BeaconChain` can provide @@ -249,7 +247,7 @@ impl HotColdDB, LevelDB> { let hierarchy = config.hierarchy_config.to_moduli()?; - let hot_db = LevelDB::open(hot_path)?; + let hot_db = BeaconNodeBackend::open(&config, hot_path)?; let anchor_info = RwLock::new(Self::load_anchor_info(&hot_db)?); let db = HotColdDB { @@ -257,8 +255,8 @@ impl HotColdDB, LevelDB> { anchor_info, blob_info: RwLock::new(BlobInfo::default()), data_column_info: RwLock::new(DataColumnInfo::default()), - cold_db: LevelDB::open(cold_path)?, - blobs_db: LevelDB::open(blobs_db_path)?, + blobs_db: BeaconNodeBackend::open(&config, blobs_db_path)?, + cold_db: BeaconNodeBackend::open(&config, cold_path)?, hot_db, block_cache: Mutex::new(BlockCache::new(config.block_cache_size)), state_cache: Mutex::new(StateCache::new(config.state_cache_size)), @@ -407,24 +405,11 @@ impl HotColdDB, LevelDB> { } /// Return an iterator over the state roots of all temporary states. - pub fn iter_temporary_state_roots(&self) -> impl Iterator> + '_ { - let column = DBColumn::BeaconStateTemporary; - let start_key = - BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_slice())); - - let keys_iter = self.hot_db.keys_iter(); - keys_iter.seek(&start_key); - - keys_iter - .take_while(move |key| key.matches_column(column)) - .map(move |bytes_key| { - bytes_key.remove_column(column).ok_or_else(|| { - HotColdDBError::IterationError { - unexpected_key: bytes_key, - } - .into() - }) - }) + pub fn iter_temporary_state_roots( + &self, + ) -> Result> + '_, Error> { + self.hot_db + .iter_column_keys::(DBColumn::BeaconStateTemporary) } } @@ -536,9 +521,10 @@ impl, Cold: ItemStore> HotColdDB blinded_block: &SignedBeaconBlock>, ops: &mut Vec, ) { - let db_key = get_key_for_col(DBColumn::BeaconBlock.into(), key.as_slice()); + let column_name: &str = DBColumn::BeaconBlock.into(); ops.push(KeyValueStoreOp::PutKeyValue( - db_key, + column_name.to_owned(), + key.as_slice().into(), blinded_block.as_ssz_bytes(), )); } @@ -794,26 +780,54 @@ impl, Cold: ItemStore> HotColdDB ) -> Result>, Error> { let column = DBColumn::LightClientUpdate; let mut light_client_updates = vec![]; - for res in self - .hot_db - .iter_column_from::>(column, &start_period.to_le_bytes()) - { - let (sync_committee_bytes, light_client_update_bytes) = res?; - let sync_committee_period = u64::from_ssz_bytes(&sync_committee_bytes)?; - let epoch = sync_committee_period - .safe_mul(self.spec.epochs_per_sync_committee_period.into())?; + // TODO(modularize-backend) we are calculating sync committee period from ssz twice + self.hot_db + .iter_column_from::>( + column, + &start_period.to_le_bytes(), + move |sync_committee_bytes, _| { + let Ok(sync_committee_period) = u64::from_ssz_bytes(sync_committee_bytes) + else { + // TODO(modularize-backend) logging + return false; + }; - let fork_name = self.spec.fork_name_at_epoch(epoch.into()); + if sync_committee_period >= start_period + count { + // TODO(modularize-backend) logging + return false; + } - let light_client_update = - LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name)?; + true + }, + )? + .for_each(|res| { + let Ok((sync_committee_bytes, light_client_update_bytes)) = res else { + // TODO(modularize-backend) logging + return; + }; - light_client_updates.push(light_client_update); + let Ok(sync_committee_period) = u64::from_ssz_bytes(&sync_committee_bytes) else { + // TODO(modularize-backend) logging + return; + }; - if sync_committee_period >= start_period + count { - break; - } - } + let Ok(epoch) = sync_committee_period + .safe_mul(self.spec.epochs_per_sync_committee_period.into()) + else { + // TODO(modularize-backend) logging + return; + }; + + let fork_name = self.spec.fork_name_at_epoch(epoch.into()); + + let Ok(light_client_update) = + LightClientUpdate::from_ssz_bytes(&light_client_update_bytes, &fork_name) + else { + // TODO(modularize-backend) logging + return; + }; + light_client_updates.push(light_client_update); + }); Ok(light_client_updates) } @@ -872,8 +886,12 @@ impl, Cold: ItemStore> HotColdDB blobs: BlobSidecarList, ops: &mut Vec, ) { - let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_slice()); - ops.push(KeyValueStoreOp::PutKeyValue(db_key, blobs.as_ssz_bytes())); + let column_key: &str = DBColumn::BeaconBlob.into(); + ops.push(KeyValueStoreOp::PutKeyValue( + column_key.to_owned(), + key.as_slice().to_vec(), + blobs.as_ssz_bytes(), + )); } pub fn data_columns_as_kv_store_ops( @@ -883,12 +901,10 @@ impl, Cold: ItemStore> HotColdDB ops: &mut Vec, ) { for data_column in data_columns { - let db_key = get_key_for_col( - DBColumn::BeaconDataColumn.into(), - &get_data_column_key(block_root, &data_column.index), - ); + let column: &str = DBColumn::BeaconDataColumn.into(); ops.push(KeyValueStoreOp::PutKeyValue( - db_key, + column.to_owned(), + get_data_column_key(block_root, &data_column.index), data_column.as_ssz_bytes(), )); } @@ -1202,63 +1218,77 @@ impl, Cold: ItemStore> HotColdDB } StoreOp::DeleteStateTemporaryFlag(state_root) => { - let db_key = - get_key_for_col(TemporaryFlag::db_column().into(), state_root.as_slice()); - key_value_batch.push(KeyValueStoreOp::DeleteKey(db_key)); + let column_name: &str = TemporaryFlag::db_column().into(); + key_value_batch.push(KeyValueStoreOp::DeleteKey( + column_name.to_owned(), + state_root.as_slice().to_vec(), + )); } StoreOp::DeleteBlock(block_root) => { - let key = get_key_for_col(DBColumn::BeaconBlock.into(), block_root.as_slice()); - key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + let column_name: &str = DBColumn::BeaconBlock.into(); + key_value_batch.push(KeyValueStoreOp::DeleteKey( + column_name.to_owned(), + block_root.as_slice().to_vec(), + )); } StoreOp::DeleteBlobs(block_root) => { - let key = get_key_for_col(DBColumn::BeaconBlob.into(), block_root.as_slice()); - key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + let column_name: &str = DBColumn::BeaconBlob.into(); + key_value_batch.push(KeyValueStoreOp::DeleteKey( + column_name.to_owned(), + block_root.as_slice().to_vec(), + )); } StoreOp::DeleteDataColumns(block_root, column_indices) => { for index in column_indices { - let key = get_key_for_col( - DBColumn::BeaconDataColumn.into(), - &get_data_column_key(&block_root, &index), - ); - key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + let key = get_data_column_key(&block_root, &index); + let column_name: &str = DBColumn::BeaconDataColumn.into(); + key_value_batch + .push(KeyValueStoreOp::DeleteKey(column_name.to_owned(), key)); } } StoreOp::DeleteState(state_root, slot) => { // Delete the hot state summary. - let state_summary_key = - get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_slice()); - key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key)); + let summary_column_name: &str = DBColumn::BeaconStateSummary.into(); + key_value_batch.push(KeyValueStoreOp::DeleteKey( + summary_column_name.to_owned(), + state_root.as_slice().to_vec(), + )); // Delete the state temporary flag (if any). Temporary flags are commonly // created by the state advance routine. - let state_temp_key = get_key_for_col( - DBColumn::BeaconStateTemporary.into(), - state_root.as_slice(), - ); - key_value_batch.push(KeyValueStoreOp::DeleteKey(state_temp_key)); + let temporary_column_name: &str = DBColumn::BeaconStateTemporary.into(); + key_value_batch.push(KeyValueStoreOp::DeleteKey( + temporary_column_name.into(), + state_root.as_slice().to_vec(), + )); if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) { - let state_key = - get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice()); - key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key)); + let column_name: &str = DBColumn::BeaconState.into(); + key_value_batch.push(KeyValueStoreOp::DeleteKey( + column_name.to_owned(), + state_root.as_slice().to_vec(), + )); } } StoreOp::DeleteExecutionPayload(block_root) => { - let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_slice()); - key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + let column_name: &str = DBColumn::ExecPayload.into(); + key_value_batch.push(KeyValueStoreOp::DeleteKey( + column_name.to_owned(), + block_root.as_slice().to_vec(), + )); } StoreOp::DeleteSyncCommitteeBranch(block_root) => { - let key = get_key_for_col( - DBColumn::SyncCommitteeBranch.into(), - block_root.as_slice(), - ); - key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + let column_name: &str = DBColumn::SyncCommitteeBranch.into(); + key_value_batch.push(KeyValueStoreOp::DeleteKey( + column_name.to_owned(), + block_root.as_slice().to_vec(), + )); } StoreOp::KeyValueOp(kv_op) => { @@ -1269,6 +1299,19 @@ impl, Cold: ItemStore> HotColdDB Ok(key_value_batch) } + pub fn delete_batch(&self, col: &str, ops: Vec) -> Result<(), Error> { + let new_ops: HashSet<&[u8]> = ops.iter().map(|v| v.as_slice()).collect(); + self.hot_db.delete_batch(col, new_ops) + } + + pub fn delete_while( + &self, + column: DBColumn, + f: impl Fn(&[u8]) -> Result, + ) -> Result<(), Error> { + self.hot_db.delete_while(column, f) + } + pub fn do_atomically_with_block_and_blobs_cache( &self, batch: Vec>, @@ -1607,11 +1650,11 @@ impl, Cold: ItemStore> HotColdDB ops: &mut Vec, ) -> Result<(), Error> { ops.push(ColdStateSummary { slot }.as_kv_store_op(*state_root)); + + let column: &str = DBColumn::BeaconStateRoots.into(); ops.push(KeyValueStoreOp::PutKeyValue( - get_key_for_col( - DBColumn::BeaconStateRoots.into(), - &slot.as_u64().to_be_bytes(), - ), + column.to_owned(), + slot.as_u64().to_be_bytes().to_vec(), state_root.as_slice().to_vec(), )); Ok(()) @@ -1678,11 +1721,12 @@ impl, Cold: ItemStore> HotColdDB out }; - let key = get_key_for_col( - DBColumn::BeaconStateSnapshot.into(), - &state.slot().as_u64().to_be_bytes(), - ); - ops.push(KeyValueStoreOp::PutKeyValue(key, compressed_value)); + let column: &str = DBColumn::BeaconStateSnapshot.into(); + ops.push(KeyValueStoreOp::PutKeyValue( + column.to_owned(), + state.slot().as_u64().to_be_bytes().to_vec(), + compressed_value, + )); Ok(()) } @@ -1731,11 +1775,12 @@ impl, Cold: ItemStore> HotColdDB }; let diff_bytes = diff.as_ssz_bytes(); - let key = get_key_for_col( - DBColumn::BeaconStateDiff.into(), - &state.slot().as_u64().to_be_bytes(), - ); - ops.push(KeyValueStoreOp::PutKeyValue(key, diff_bytes)); + let column: &str = DBColumn::BeaconStateDiff.into(); + ops.push(KeyValueStoreOp::PutKeyValue( + column.to_owned(), + state.slot().as_u64().to_be_bytes().to_vec(), + diff_bytes, + )); Ok(()) } @@ -2070,8 +2115,12 @@ impl, Cold: ItemStore> HotColdDB /// Fetch all keys in the data_column column with prefix `block_root` pub fn get_data_column_keys(&self, block_root: Hash256) -> Result, Error> { self.blobs_db - .iter_raw_keys(DBColumn::BeaconDataColumn, block_root.as_slice()) - .map(|key| key.and_then(|key| parse_data_column_key(key).map(|key| key.1))) + .iter_column_from::>( + DBColumn::BeaconDataColumn, + block_root.as_slice(), + move |key, _| key.starts_with(block_root.as_slice()), + )? + .map(|result| result.and_then(|(key, _)| parse_data_column_key(key).map(|key| key.1))) .collect() } @@ -2150,10 +2199,13 @@ impl, Cold: ItemStore> HotColdDB schema_version: SchemaVersion, mut ops: Vec, ) -> Result<(), Error> { - let column = SchemaVersion::db_column().into(); + let column: &str = SchemaVersion::db_column().into(); let key = SCHEMA_VERSION_KEY.as_slice(); - let db_key = get_key_for_col(column, key); - let op = KeyValueStoreOp::PutKeyValue(db_key, schema_version.as_store_bytes()); + let op = KeyValueStoreOp::PutKeyValue( + column.to_owned(), + key.to_vec(), + schema_version.as_store_bytes(), + ); ops.push(op); self.hot_db.do_atomically(ops) @@ -2534,9 +2586,11 @@ impl, Cold: ItemStore> HotColdDB block_root: Hash256, ) -> Result, Error> { let mut ops = vec![]; + let column: &str = DBColumn::BeaconBlockRoots.into(); for slot in start_slot.as_u64()..end_slot.as_u64() { ops.push(KeyValueStoreOp::PutKeyValue( - get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()), + column.to_owned(), + slot.to_be_bytes().to_vec(), block_root.as_slice().to_vec(), )); } @@ -2758,77 +2812,55 @@ impl, Cold: ItemStore> HotColdDB "data_availability_boundary" => data_availability_boundary, ); - let mut ops = vec![]; - let mut last_pruned_block_root = None; + let mut guard = self.block_cache.lock(); - for res in self.forwards_block_roots_iterator_until(oldest_blob_slot, end_slot, || { - let (_, split_state) = self - .get_advanced_hot_state(split.block_root, split.slot, split.state_root)? - .ok_or(HotColdDBError::MissingSplitState( - split.state_root, - split.slot, - ))?; + let remove_blob_if = |blobs_bytes: &[u8]| { + let blobs = BlobSidecarList::from_ssz_bytes(blobs_bytes)?; + let Some(blob): Option<&Arc>> = blobs.first() else { + return Ok(false); + }; - Ok((split_state, split.block_root)) - })? { - let (block_root, slot) = match res { - Ok(tuple) => tuple, - Err(e) => { - warn!( - self.log, - "Stopping blob pruning early"; - "error" => ?e, - ); - break; - } + if blob.slot() <= end_slot { + // Delete from the cache + guard.delete_blobs(&blob.block_root()); + // Delete from the on-disk db + return Ok(true); }; + Ok(false) + }; - if Some(block_root) != last_pruned_block_root { - if self - .spec - .is_peer_das_enabled_for_epoch(slot.epoch(E::slots_per_epoch())) - { - // data columns - let indices = self.get_data_column_keys(block_root)?; - if !indices.is_empty() { - trace!( - self.log, - "Pruning data columns of block"; - "slot" => slot, - "block_root" => ?block_root, - ); - last_pruned_block_root = Some(block_root); - ops.push(StoreOp::DeleteDataColumns(block_root, indices)); - } - } else if self.blobs_exist(&block_root)? { - trace!( - self.log, - "Pruning blobs of block"; - "slot" => slot, - "block_root" => ?block_root, - ); - last_pruned_block_root = Some(block_root); - ops.push(StoreOp::DeleteBlobs(block_root)); - } - } + self.blobs_db + .delete_while(DBColumn::BeaconBlob, remove_blob_if)?; - if slot >= end_slot { - break; - } + drop(guard); + + if self.spec.is_peer_das_enabled_for_epoch(start_epoch) { + let remove_data_column_if = |blobs_bytes: &[u8]| { + let data_column: DataColumnSidecar = + DataColumnSidecar::from_ssz_bytes(blobs_bytes)?; + + if data_column.slot() <= end_slot { + return Ok(true); + }; + + Ok(false) + }; + + self.blobs_db + .delete_while(DBColumn::BeaconDataColumn, remove_data_column_if)?; } - let blob_lists_pruned = ops.len(); + let new_blob_info = BlobInfo { oldest_blob_slot: Some(end_slot + 1), blobs_db: blob_info.blobs_db, }; - let update_blob_info = self.compare_and_set_blob_info(blob_info, new_blob_info)?; - ops.push(StoreOp::KeyValueOp(update_blob_info)); - self.do_atomically_with_block_and_blobs_cache(ops)?; + let op = self.compare_and_set_blob_info(blob_info, new_blob_info)?; + self.do_atomically_with_block_and_blobs_cache(vec![StoreOp::KeyValueOp(op)])?; + debug!( self.log, "Blob pruning complete"; - "blob_lists_pruned" => blob_lists_pruned, ); Ok(()) @@ -2888,12 +2920,9 @@ impl, Cold: ItemStore> HotColdDB columns.extend(previous_schema_columns); for column in columns { - for res in self.cold_db.iter_column_keys::>(column) { + for res in self.cold_db.iter_column_keys::>(column)? { let key = res?; - cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col( - column.as_str(), - &key, - ))); + cold_ops.push(KeyValueStoreOp::DeleteKey(column.as_str().to_owned(), key)); } } let delete_ops = cold_ops.len(); @@ -2935,7 +2964,7 @@ impl, Cold: ItemStore> HotColdDB let mut state_delete_batch = vec![]; for res in self .hot_db - .iter_column::(DBColumn::BeaconStateSummary) + .iter_column::(DBColumn::BeaconStateSummary)? { let (state_root, summary_bytes) = res?; let summary = HotStateSummary::from_ssz_bytes(&summary_bytes)?; @@ -3019,6 +3048,7 @@ pub fn migrate_database, Cold: ItemStore>( }) .collect::, _>>()?; + let column: &str = DBColumn::BeaconBlockRoots.into(); // Then, iterate states in slot ascending order, as they are stored wrt previous states. for (block_root, state_root, slot) in state_roots.into_iter().rev() { // Delete the execution payload if payload pruning is enabled. At a skipped slot we may @@ -3031,10 +3061,8 @@ pub fn migrate_database, Cold: ItemStore>( // Store the slot to block root mapping. cold_db_block_ops.push(KeyValueStoreOp::PutKeyValue( - get_key_for_col( - DBColumn::BeaconBlockRoots.into(), - &slot.as_u64().to_be_bytes(), - ), + column.to_owned(), + slot.as_u64().to_be_bytes().to_vec(), block_root.as_slice().to_vec(), )); @@ -3285,3 +3313,57 @@ impl StoreItem for TemporaryFlag { Ok(TemporaryFlag) } } + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct BytesKey { + pub key: Vec, +} + +impl Key for BytesKey { + fn from_u8(key: &[u8]) -> Self { + Self { key: key.to_vec() } + } + + fn as_slice T>(&self, f: F) -> T { + f(self.key.as_slice()) + } +} + +impl BytesKey { + pub fn starts_with(&self, prefix: &Self) -> bool { + self.key.starts_with(&prefix.key) + } + + /// Return `true` iff this `BytesKey` was created with the given `column`. + pub fn matches_column(&self, column: DBColumn) -> bool { + self.key.starts_with(column.as_bytes()) + } + + /// Remove the column from a key, returning its `Hash256` portion. + pub fn remove_column(&self, column: DBColumn) -> Option { + if self.matches_column(column) { + let subkey = &self.key[column.as_bytes().len()..]; + if subkey.len() == 32 { + return Some(Hash256::from_slice(subkey)); + } + } + None + } + + /// Remove the column from a key. + /// + /// Will return `None` if the value doesn't match the column or has the wrong length. + pub fn remove_column_variable(&self, column: DBColumn) -> Option<&[u8]> { + if self.matches_column(column) { + let subkey = &self.key[column.as_bytes().len()..]; + if subkey.len() == column.key_size() { + return Some(subkey); + } + } + None + } + + pub fn from_vec(key: Vec) -> Self { + Self { key } + } +} diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index 48c289f2b2d..2c8510a5bf7 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -13,8 +13,12 @@ pub fn store_full_state( }; metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as u64); metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT); - let key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice()); - ops.push(KeyValueStoreOp::PutKeyValue(key, bytes)); + let column_name: &str = DBColumn::BeaconState.into(); + ops.push(KeyValueStoreOp::PutKeyValue( + column_name.to_owned(), + state_root.as_slice().to_vec(), + bytes, + )); Ok(()) } diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs deleted file mode 100644 index 720afd0f3f6..00000000000 --- a/beacon_node/store/src/leveldb_store.rs +++ /dev/null @@ -1,310 +0,0 @@ -use super::*; -use crate::hot_cold_store::HotColdDBError; -use leveldb::compaction::Compaction; -use leveldb::database::batch::{Batch, Writebatch}; -use leveldb::database::kv::KV; -use leveldb::database::Database; -use leveldb::error::Error as LevelDBError; -use leveldb::iterator::{Iterable, KeyIterator, LevelDBIterator}; -use leveldb::options::{Options, ReadOptions, WriteOptions}; -use parking_lot::Mutex; -use std::marker::PhantomData; -use std::path::Path; - -/// A wrapped leveldb database. -pub struct LevelDB { - db: Database, - /// A mutex to synchronise sensitive read-write transactions. - transaction_mutex: Mutex<()>, - _phantom: PhantomData, -} - -impl LevelDB { - /// Open a database at `path`, creating a new database if one does not already exist. - pub fn open(path: &Path) -> Result { - let mut options = Options::new(); - - options.create_if_missing = true; - - let db = Database::open(path, options)?; - let transaction_mutex = Mutex::new(()); - - Ok(Self { - db, - transaction_mutex, - _phantom: PhantomData, - }) - } - - fn read_options(&self) -> ReadOptions { - ReadOptions::new() - } - - fn write_options(&self) -> WriteOptions { - WriteOptions::new() - } - - fn write_options_sync(&self) -> WriteOptions { - let mut opts = WriteOptions::new(); - opts.sync = true; - opts - } - - fn put_bytes_with_options( - &self, - col: &str, - key: &[u8], - val: &[u8], - opts: WriteOptions, - ) -> Result<(), Error> { - let column_key = get_key_for_col(col, key); - - metrics::inc_counter_vec(&metrics::DISK_DB_WRITE_COUNT, &[col]); - metrics::inc_counter_vec_by(&metrics::DISK_DB_WRITE_BYTES, &[col], val.len() as u64); - let _timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); - - self.db - .put(opts, BytesKey::from_vec(column_key), val) - .map_err(Into::into) - } - - pub fn keys_iter(&self) -> KeyIterator { - self.db.keys_iter(self.read_options()) - } -} - -impl KeyValueStore for LevelDB { - /// Store some `value` in `column`, indexed with `key`. - fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { - self.put_bytes_with_options(col, key, val, self.write_options()) - } - - fn put_bytes_sync(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { - self.put_bytes_with_options(col, key, val, self.write_options_sync()) - } - - fn sync(&self) -> Result<(), Error> { - self.put_bytes_sync("sync", b"sync", b"sync") - } - - /// Retrieve some bytes in `column` with `key`. - fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { - let column_key = get_key_for_col(col, key); - - metrics::inc_counter_vec(&metrics::DISK_DB_READ_COUNT, &[col]); - let timer = metrics::start_timer(&metrics::DISK_DB_READ_TIMES); - - self.db - .get(self.read_options(), BytesKey::from_vec(column_key)) - .map_err(Into::into) - .map(|opt| { - opt.inspect(|bytes| { - metrics::inc_counter_vec_by( - &metrics::DISK_DB_READ_BYTES, - &[col], - bytes.len() as u64, - ); - metrics::stop_timer(timer); - }) - }) - } - - /// Return `true` if `key` exists in `column`. - fn key_exists(&self, col: &str, key: &[u8]) -> Result { - let column_key = get_key_for_col(col, key); - - metrics::inc_counter_vec(&metrics::DISK_DB_EXISTS_COUNT, &[col]); - - self.db - .get(self.read_options(), BytesKey::from_vec(column_key)) - .map_err(Into::into) - .map(|val| val.is_some()) - } - - /// Removes `key` from `column`. - fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { - let column_key = get_key_for_col(col, key); - - metrics::inc_counter_vec(&metrics::DISK_DB_DELETE_COUNT, &[col]); - - self.db - .delete(self.write_options(), BytesKey::from_vec(column_key)) - .map_err(Into::into) - } - - fn do_atomically(&self, ops_batch: Vec) -> Result<(), Error> { - let mut leveldb_batch = Writebatch::new(); - for op in ops_batch { - match op { - KeyValueStoreOp::PutKeyValue(key, value) => { - let col = get_col_from_key(&key).unwrap_or("unknown".to_owned()); - metrics::inc_counter_vec(&metrics::DISK_DB_WRITE_COUNT, &[&col]); - metrics::inc_counter_vec_by( - &metrics::DISK_DB_WRITE_BYTES, - &[&col], - value.len() as u64, - ); - - leveldb_batch.put(BytesKey::from_vec(key), &value); - } - - KeyValueStoreOp::DeleteKey(key) => { - let col = get_col_from_key(&key).unwrap_or("unknown".to_owned()); - metrics::inc_counter_vec(&metrics::DISK_DB_DELETE_COUNT, &[&col]); - - leveldb_batch.delete(BytesKey::from_vec(key)); - } - } - } - - let _timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); - - self.db.write(self.write_options(), &leveldb_batch)?; - Ok(()) - } - - fn begin_rw_transaction(&self) -> MutexGuard<()> { - self.transaction_mutex.lock() - } - - fn compact_column(&self, column: DBColumn) -> Result<(), Error> { - // Use key-size-agnostic keys [] and 0xff..ff with a minimum of 32 bytes to account for - // columns that may change size between sub-databases or schema versions. - let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), &[])); - let end_key = BytesKey::from_vec(get_key_for_col( - column.as_str(), - &vec![0xff; std::cmp::max(column.key_size(), 32)], - )); - self.db.compact(&start_key, &end_key); - Ok(()) - } - - fn iter_column_from(&self, column: DBColumn, from: &[u8]) -> ColumnIter { - let start_key = BytesKey::from_vec(get_key_for_col(column.into(), from)); - let iter = self.db.iter(self.read_options()); - iter.seek(&start_key); - - Box::new( - iter.take_while(move |(key, _)| key.matches_column(column)) - .map(move |(bytes_key, value)| { - let key = bytes_key.remove_column_variable(column).ok_or_else(|| { - HotColdDBError::IterationError { - unexpected_key: bytes_key.clone(), - } - })?; - Ok((K::from_bytes(key)?, value)) - }), - ) - } - - fn iter_raw_entries(&self, column: DBColumn, prefix: &[u8]) -> RawEntryIter { - let start_key = BytesKey::from_vec(get_key_for_col(column.into(), prefix)); - - let iter = self.db.iter(self.read_options()); - iter.seek(&start_key); - - Box::new( - iter.take_while(move |(key, _)| key.key.starts_with(start_key.key.as_slice())) - .map(move |(bytes_key, value)| { - let subkey = &bytes_key.key[column.as_bytes().len()..]; - Ok((Vec::from(subkey), value)) - }), - ) - } - - fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter { - let start_key = BytesKey::from_vec(get_key_for_col(column.into(), prefix)); - - let iter = self.db.keys_iter(self.read_options()); - iter.seek(&start_key); - - Box::new( - iter.take_while(move |key| key.key.starts_with(start_key.key.as_slice())) - .map(move |bytes_key| { - let subkey = &bytes_key.key[column.as_bytes().len()..]; - Ok(Vec::from(subkey)) - }), - ) - } - - /// Iterate through all keys and values in a particular column. - fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { - let start_key = - BytesKey::from_vec(get_key_for_col(column.into(), &vec![0; column.key_size()])); - - let iter = self.db.keys_iter(self.read_options()); - iter.seek(&start_key); - - Box::new( - iter.take_while(move |key| key.matches_column(column)) - .map(move |bytes_key| { - let key = bytes_key.remove_column_variable(column).ok_or_else(|| { - HotColdDBError::IterationError { - unexpected_key: bytes_key.clone(), - } - })?; - K::from_bytes(key) - }), - ) - } -} - -impl ItemStore for LevelDB {} - -/// Used for keying leveldb. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub struct BytesKey { - key: Vec, -} - -impl db_key::Key for BytesKey { - fn from_u8(key: &[u8]) -> Self { - Self { key: key.to_vec() } - } - - fn as_slice T>(&self, f: F) -> T { - f(self.key.as_slice()) - } -} - -impl BytesKey { - pub fn starts_with(&self, prefix: &Self) -> bool { - self.key.starts_with(&prefix.key) - } - - /// Return `true` iff this `BytesKey` was created with the given `column`. - pub fn matches_column(&self, column: DBColumn) -> bool { - self.key.starts_with(column.as_bytes()) - } - - /// Remove the column from a 32 byte key, yielding the `Hash256` key. - pub fn remove_column(&self, column: DBColumn) -> Option { - let key = self.remove_column_variable(column)?; - (column.key_size() == 32).then(|| Hash256::from_slice(key)) - } - - /// Remove the column from a key. - /// - /// Will return `None` if the value doesn't match the column or has the wrong length. - pub fn remove_column_variable(&self, column: DBColumn) -> Option<&[u8]> { - if self.matches_column(column) { - let subkey = &self.key[column.as_bytes().len()..]; - if subkey.len() == column.key_size() { - return Some(subkey); - } - } - None - } - - pub fn from_vec(key: Vec) -> Self { - Self { key } - } -} - -impl From for Error { - fn from(e: LevelDBError) -> Error { - Error::DBError { - message: format!("{:?}", e), - } - } -} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 0498c7c1e2c..94b3cd6e7d8 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -18,7 +18,6 @@ pub mod hdiff; pub mod historic_state_cache; pub mod hot_cold_store; mod impls; -mod leveldb_store; mod memory_store; pub mod metadata; pub mod metrics; @@ -26,12 +25,11 @@ pub mod partial_beacon_state; pub mod reconstruct; pub mod state_cache; +pub mod database; pub mod iter; - pub use self::config::StoreConfig; pub use self::consensus_context::OnDiskConsensusContext; pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split}; -pub use self::leveldb_store::LevelDB; pub use self::memory_store::MemoryStore; pub use crate::metadata::BlobInfo; pub use errors::Error; @@ -39,17 +37,19 @@ pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metadata::AnchorInfo; pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; +use std::collections::HashSet; use std::sync::Arc; -use strum::{EnumString, IntoStaticStr}; +use strum::{EnumIter, EnumString, IntoStaticStr}; pub use types::*; const DATA_COLUMN_DB_KEY_SIZE: usize = 32 + 8; -pub type ColumnIter<'a, K> = Box), Error>> + 'a>; -pub type ColumnKeyIter<'a, K> = Box> + 'a>; +pub type ColumnIter<'a, K> = + Result), Error>> + 'a>, Error>; +pub type ColumnKeyIter<'a, K> = Result> + 'a>, Error>; -pub type RawEntryIter<'a> = Box, Vec), Error>> + 'a>; -pub type RawKeyIter<'a> = Box, Error>> + 'a>; +pub type RawEntryIter<'a> = + Result, Vec), Error>> + 'a>, Error>; pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Retrieve some bytes in `column` with `key`. @@ -100,20 +100,33 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Iterate through all keys and values in a particular column. fn iter_column(&self, column: DBColumn) -> ColumnIter { - self.iter_column_from(column, &vec![0; column.key_size()]) + self.iter_column_from(column, &vec![0; column.key_size()], |_, _| true) } - /// Iterate through all keys and values in a column from a given starting point. - fn iter_column_from(&self, column: DBColumn, from: &[u8]) -> ColumnIter; + /// Iterate through all keys and values in a column from a given starting point that fulfill the given predicate. + fn iter_column_from( + &self, + column: DBColumn, + from: &[u8], + predicate: impl Fn(&[u8], &[u8]) -> bool + 'static, + ) -> ColumnIter; fn iter_raw_entries(&self, _column: DBColumn, _prefix: &[u8]) -> RawEntryIter { - Box::new(std::iter::empty()) + Ok(Box::new(std::iter::empty())) } - fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter; + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter; /// Iterate through all keys in a particular column. - fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter; + fn iter_column_keys_from(&self, column: DBColumn, from: &[u8]) -> ColumnKeyIter; + + fn delete_batch(&self, column: &str, ops: HashSet<&[u8]>) -> Result<(), Error>; + + fn delete_while( + &self, + column: DBColumn, + f: impl FnMut(&[u8]) -> Result, + ) -> Result<(), Error>; } pub trait Key: Sized + 'static { @@ -174,8 +187,12 @@ pub fn parse_data_column_key(data: Vec) -> Result<(Hash256, ColumnIndex), Er #[must_use] #[derive(Clone)] pub enum KeyValueStoreOp { - PutKeyValue(Vec, Vec), - DeleteKey(Vec), + // Indicate that a PUT operation should be made + // to the db store for a (Column, Key, Value) + PutKeyValue(String, Vec, Vec), + // Indicate that a DELETE operation should be made + // to the db store for a (Column, Key) + DeleteKey(String, Vec), } pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'static { @@ -245,7 +262,7 @@ pub enum StoreOp<'a, E: EthSpec> { } /// A unique column identifier. -#[derive(Debug, Clone, Copy, PartialEq, IntoStaticStr, EnumString)] +#[derive(Debug, Clone, Copy, PartialEq, IntoStaticStr, EnumString, EnumIter)] pub enum DBColumn { /// For data related to the database itself. #[strum(serialize = "bma")] @@ -419,13 +436,19 @@ pub trait StoreItem: Sized { fn from_store_bytes(bytes: &[u8]) -> Result; fn as_kv_store_op(&self, key: Hash256) -> KeyValueStoreOp { - let db_key = get_key_for_col(Self::db_column().into(), key.as_slice()); - KeyValueStoreOp::PutKeyValue(db_key, self.as_store_bytes()) + let column_name: &str = Self::db_column().into(); + KeyValueStoreOp::PutKeyValue( + column_name.to_owned(), + key.as_slice().to_vec(), + self.as_store_bytes(), + ) } } #[cfg(test)] mod tests { + use crate::database::interface::BeaconNodeBackend; + use super::*; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; @@ -475,7 +498,7 @@ mod tests { fn simplediskdb() { let dir = tempdir().unwrap(); let path = dir.path(); - let store = LevelDB::open(path).unwrap(); + let store = BeaconNodeBackend::open(&StoreConfig::default(), path).unwrap(); test_impl(store); } diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 4c7bfdf10ff..a5d797299bb 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,9 +1,9 @@ use crate::{ - get_key_for_col, leveldb_store::BytesKey, ColumnIter, ColumnKeyIter, DBColumn, Error, - ItemStore, Key, KeyValueStore, KeyValueStoreOp, RawKeyIter, + errors::Error as DBError, get_key_for_col, hot_cold_store::BytesKey, ColumnIter, ColumnKeyIter, + DBColumn, Error, ItemStore, Key, KeyValueStore, KeyValueStoreOp, }; use parking_lot::{Mutex, MutexGuard, RwLock}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::marker::PhantomData; use types::*; @@ -66,19 +66,28 @@ impl KeyValueStore for MemoryStore { fn do_atomically(&self, batch: Vec) -> Result<(), Error> { for op in batch { match op { - KeyValueStoreOp::PutKeyValue(key, value) => { - self.db.write().insert(BytesKey::from_vec(key), value); + KeyValueStoreOp::PutKeyValue(col, key, value) => { + let column_key = get_key_for_col(&col, &key); + self.db + .write() + .insert(BytesKey::from_vec(column_key), value); } - KeyValueStoreOp::DeleteKey(key) => { - self.db.write().remove(&BytesKey::from_vec(key)); + KeyValueStoreOp::DeleteKey(col, key) => { + let column_key = get_key_for_col(&col, &key); + self.db.write().remove(&BytesKey::from_vec(column_key)); } } } Ok(()) } - fn iter_column_from(&self, column: DBColumn, from: &[u8]) -> ColumnIter { + fn iter_column_from( + &self, + column: DBColumn, + from: &[u8], + predicate: impl Fn(&[u8], &[u8]) -> bool + 'static, + ) -> ColumnIter { // We use this awkward pattern because we can't lock the `self.db` field *and* maintain a // reference to the lock guard across calls to `.next()`. This would be require a // struct with a field (the iterator) which references another field (the lock guard). @@ -89,38 +98,69 @@ impl KeyValueStore for MemoryStore { .read() .range(start_key..) .take_while(|(k, _)| k.remove_column_variable(column).is_some()) + .take_while(|(k, _)| predicate(k.key.as_slice(), vec![0].as_slice())) .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) .collect::>(); - Box::new(keys.into_iter().filter_map(move |key| { + Ok(Box::new(keys.into_iter().filter_map(move |key| { self.get_bytes(col, &key).transpose().map(|res| { let k = K::from_bytes(&key)?; let v = res?; Ok((k, v)) }) - })) + }))) } - fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter { - let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), prefix)); + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { + Ok(Box::new( + self.iter_column(column)?.map(|res| res.map(|(k, _)| k)), + )) + } + + fn begin_rw_transaction(&self) -> MutexGuard<()> { + self.transaction_mutex.lock() + } + + fn compact_column(&self, _column: DBColumn) -> Result<(), Error> { + Ok(()) + } + + fn iter_column_keys_from(&self, column: DBColumn, from: &[u8]) -> ColumnKeyIter { + // We use this awkward pattern because we can't lock the `self.db` field *and* maintain a + // reference to the lock guard across calls to `.next()`. This would be require a + // struct with a field (the iterator) which references another field (the lock guard). + let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), from)); let keys = self .db .read() - .range(start_key.clone()..) - .take_while(|(k, _)| k.starts_with(&start_key)) + .range(start_key..) + .take_while(|(k, _)| k.remove_column_variable(column).is_some()) .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) .collect::>(); - Box::new(keys.into_iter().map(Ok)) - } - - fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { - Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k))) + Ok(Box::new( + keys.into_iter().map(move |key| K::from_bytes(&key)), + )) } - fn begin_rw_transaction(&self) -> MutexGuard<()> { - self.transaction_mutex.lock() + fn delete_batch(&self, col: &str, ops: HashSet<&[u8]>) -> Result<(), DBError> { + for op in ops { + let column_key = get_key_for_col(col, op); + self.db.write().remove(&BytesKey::from_vec(column_key)); + } + Ok(()) } - fn compact_column(&self, _column: DBColumn) -> Result<(), Error> { + fn delete_while( + &self, + column: DBColumn, + mut f: impl FnMut(&[u8]) -> Result, + ) -> Result<(), Error> { + self.db.write().retain(|key, value| { + if key.remove_column_variable(column).is_some() { + !f(value).unwrap_or(false) + } else { + true + } + }); Ok(()) } } diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index f0dd0617905..6f9f667917f 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -33,6 +33,13 @@ pub static DISK_DB_READ_BYTES: LazyLock> = LazyLock::new(| &["col"], ) }); +pub static DISK_DB_KEY_READ_BYTES: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "store_disk_db_key_read_bytes_total", + "Number of key bytes read from the hot on-disk DB", + &["col"], + ) +}); pub static DISK_DB_READ_COUNT: LazyLock> = LazyLock::new(|| { try_create_int_counter_vec( "store_disk_db_read_count_total", @@ -40,6 +47,13 @@ pub static DISK_DB_READ_COUNT: LazyLock> = LazyLock::new(| &["col"], ) }); +pub static DISK_DB_KEY_READ_COUNT: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "store_disk_db_read_count_total", + "Total number of key reads to the hot on-disk DB", + &["col"], + ) +}); pub static DISK_DB_WRITE_COUNT: LazyLock> = LazyLock::new(|| { try_create_int_counter_vec( "store_disk_db_write_count_total", @@ -66,6 +80,12 @@ pub static DISK_DB_EXISTS_COUNT: LazyLock> = LazyLock::new &["col"], ) }); +pub static DISK_DB_DELETE_TIMES: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "store_disk_db_delete_seconds", + "Time taken to delete bytes from the store.", + ) +}); pub static DISK_DB_DELETE_COUNT: LazyLock> = LazyLock::new(|| { try_create_int_counter_vec( "store_disk_db_delete_count_total", @@ -73,6 +93,19 @@ pub static DISK_DB_DELETE_COUNT: LazyLock> = LazyLock::new &["col"], ) }); +pub static DISK_DB_COMPACT_TIMES: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "store_disk_db_compact_seconds", + "Time taken to run compaction on the DB.", + ) +}); +pub static DISK_DB_TYPE: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "store_disk_db_type", + "The on-disk database type being used", + &["db_type"], + ) +}); /* * Anchor Info */ diff --git a/beacon_node/store/src/partial_beacon_state.rs b/beacon_node/store/src/partial_beacon_state.rs index 2eb40f47b18..b4d86501c34 100644 --- a/beacon_node/store/src/partial_beacon_state.rs +++ b/beacon_node/store/src/partial_beacon_state.rs @@ -2,8 +2,8 @@ use crate::chunked_vector::{ load_variable_list_from_db, load_vector_from_db, BlockRootsChunked, HistoricalRoots, HistoricalSummaries, RandaoMixes, StateRootsChunked, }; -use crate::{Error, KeyValueStore}; -use ssz::{Decode, DecodeError}; +use crate::{DBColumn, Error, KeyValueStore, KeyValueStoreOp}; +use ssz::{Decode, DecodeError, Encode}; use ssz_derive::{Decode, Encode}; use std::sync::Arc; use types::historical_summary::HistoricalSummary; @@ -167,6 +167,16 @@ impl PartialBeaconState { )) } + /// Prepare the partial state for storage in the KV database. + pub fn as_kv_store_op(&self, state_root: Hash256) -> KeyValueStoreOp { + let column_name: &str = DBColumn::BeaconState.into(); + KeyValueStoreOp::PutKeyValue( + column_name.to_owned(), + state_root.as_slice().to_vec(), + self.as_ssz_bytes(), + ) + } + pub fn load_block_roots>( &mut self, store: &S, diff --git a/book/src/help_bn.md b/book/src/help_bn.md index 55815fbdfe0..8fce4c9301e 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -11,6 +11,9 @@ Options: --auto-compact-db Enable or disable automatic compaction of the database on finalization. [default: true] + --beacon-node-backend + Set the database backend to be used by the beacon node backend. + [possible values: leveldb, redb] --blob-prune-margin-epochs The margin for blob pruning in epochs. The oldest blobs are pruned up until data_availability_boundary - blob_prune_margin_epochs. [default: diff --git a/book/src/installation-source.md b/book/src/installation-source.md index 3c9f27d236a..19098a5bc8d 100644 --- a/book/src/installation-source.md +++ b/book/src/installation-source.md @@ -154,7 +154,7 @@ You can customise the features that Lighthouse is built with using the `FEATURES variable. E.g. ``` -FEATURES=gnosis,slasher-lmdb make +FEATURES=gnosis,slasher-lmdb,beacon-node-leveldb make ``` Commonly used features include: @@ -163,11 +163,12 @@ Commonly used features include: - `portable`: the default feature as Lighthouse now uses runtime detection of hardware CPU features. - `slasher-lmdb`: support for the LMDB slasher backend. Enabled by default. - `slasher-mdbx`: support for the MDBX slasher backend. +- `beacon-node-leveldb`: support for the leveldb backend. Enabled by default. - `jemalloc`: use [`jemalloc`][jemalloc] to allocate memory. Enabled by default on Linux and macOS. Not supported on Windows. - `spec-minimal`: support for the minimal preset (useful for testing). -Default features (e.g. `slasher-lmdb`) may be opted out of using the `--no-default-features` +Default features (e.g. `slasher-lmdb`, `beacon-node-leveldb`) may be opted out of using the `--no-default-features` argument for `cargo`, which can be plumbed in via the `CARGO_INSTALL_EXTRA_FLAGS` environment variable. E.g. diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index fc15e98616b..c23d9e5dd42 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -16,10 +16,12 @@ use slog::{info, warn, Logger}; use std::fs; use std::io::Write; use std::path::PathBuf; +use store::KeyValueStore; use store::{ + database::interface::BeaconNodeBackend, errors::Error, metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, - DBColumn, HotColdDB, KeyValueStore, LevelDB, + DBColumn, HotColdDB, }; use strum::{EnumString, EnumVariantNames}; use types::{BeaconState, EthSpec, Slot}; @@ -55,7 +57,7 @@ pub fn display_db_version( let blobs_path = client_config.get_blobs_db_path(); let mut version = CURRENT_SCHEMA_VERSION; - HotColdDB::, LevelDB>::open( + HotColdDB::, BeaconNodeBackend>::open( &hot_path, &cold_path, &blobs_path, @@ -145,11 +147,14 @@ pub fn inspect_db( let mut num_keys = 0; let sub_db = if inspect_config.freezer { - LevelDB::::open(&cold_path).map_err(|e| format!("Unable to open freezer DB: {e:?}"))? + BeaconNodeBackend::::open(&client_config.store, &cold_path) + .map_err(|e| format!("Unable to open freezer DB: {e:?}"))? } else if inspect_config.blobs_db { - LevelDB::::open(&blobs_path).map_err(|e| format!("Unable to open blobs DB: {e:?}"))? + BeaconNodeBackend::::open(&client_config.store, &blobs_path) + .map_err(|e| format!("Unable to open blobs DB: {e:?}"))? } else { - LevelDB::::open(&hot_path).map_err(|e| format!("Unable to open hot DB: {e:?}"))? + BeaconNodeBackend::::open(&client_config.store, &hot_path) + .map_err(|e| format!("Unable to open hot DB: {e:?}"))? }; let skip = inspect_config.skip.unwrap_or(0); @@ -167,6 +172,7 @@ pub fn inspect_db( for res in sub_db .iter_column::>(inspect_config.column) + .map_err(|e| format!("Unable to iterate columns: {:?}", e))? .skip(skip) .take(limit) { @@ -263,11 +269,20 @@ pub fn compact_db( let column = compact_config.column; let (sub_db, db_name) = if compact_config.freezer { - (LevelDB::::open(&cold_path)?, "freezer_db") + ( + BeaconNodeBackend::::open(&client_config.store, &cold_path)?, + "freezer_db", + ) } else if compact_config.blobs_db { - (LevelDB::::open(&blobs_path)?, "blobs_db") + ( + BeaconNodeBackend::::open(&client_config.store, &blobs_path)?, + "blobs_db", + ) } else { - (LevelDB::::open(&hot_path)?, "hot_db") + ( + BeaconNodeBackend::::open(&client_config.store, &hot_path)?, + "hot_db", + ) }; info!( log, @@ -303,7 +318,7 @@ pub fn migrate_db( let mut from = CURRENT_SCHEMA_VERSION; let to = migrate_config.to; - let db = HotColdDB::, LevelDB>::open( + let db = HotColdDB::, BeaconNodeBackend>::open( &hot_path, &cold_path, &blobs_path, @@ -343,7 +358,7 @@ pub fn prune_payloads( let cold_path = client_config.get_freezer_db_path(); let blobs_path = client_config.get_blobs_db_path(); - let db = HotColdDB::, LevelDB>::open( + let db = HotColdDB::, BeaconNodeBackend>::open( &hot_path, &cold_path, &blobs_path, @@ -369,7 +384,7 @@ pub fn prune_blobs( let cold_path = client_config.get_freezer_db_path(); let blobs_path = client_config.get_blobs_db_path(); - let db = HotColdDB::, LevelDB>::open( + let db = HotColdDB::, BeaconNodeBackend>::open( &hot_path, &cold_path, &blobs_path, @@ -406,7 +421,7 @@ pub fn prune_states( let cold_path = client_config.get_freezer_db_path(); let blobs_path = client_config.get_blobs_db_path(); - let db = HotColdDB::, LevelDB>::open( + let db = HotColdDB::, BeaconNodeBackend>::open( &hot_path, &cold_path, &blobs_path, diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index dd1cb68f066..4165397a2f5 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -7,7 +7,7 @@ autotests = false rust-version = "1.80.0" [features] -default = ["slasher-lmdb"] +default = ["slasher-lmdb", "beacon-node-leveldb"] # Writes debugging .ssz files to /tmp during block processing. write_ssz_files = ["beacon_node/write_ssz_files"] # Compiles the BLS crypto code so that the binary is portable across machines. @@ -24,6 +24,11 @@ slasher-mdbx = ["slasher/mdbx"] slasher-lmdb = ["slasher/lmdb"] # Support slasher redb backend. slasher-redb = ["slasher/redb"] +# Supports beacon node leveldb backend. +beacon-node-leveldb = ["store/leveldb"] +# Supports beacon node redb backend. +beacon-node-redb = ["store/redb"] + # Deprecated. This is now enabled by default on non windows targets. jemalloc = [] @@ -61,6 +66,7 @@ database_manager = { path = "../database_manager" } slasher = { workspace = true } validator_manager = { path = "../validator_manager" } logging = { workspace = true } +store = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 6e730c007f1..61f80bca796 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1,11 +1,12 @@ -use beacon_node::ClientConfig as Config; - use crate::exec::{CommandLineTestExec, CompletedTest}; use beacon_node::beacon_chain::chain_config::{ DisallowedReOrgOffsets, DEFAULT_RE_ORG_CUTOFF_DENOMINATOR, DEFAULT_RE_ORG_HEAD_THRESHOLD, DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, }; -use beacon_node::beacon_chain::graffiti_calculator::GraffitiOrigin; +use beacon_node::{ + beacon_chain::graffiti_calculator::GraffitiOrigin, + beacon_chain::store::config::DatabaseBackend as BeaconNodeBackend, ClientConfig as Config, +}; use beacon_processor::BeaconProcessorConfig; use eth1::Eth1Endpoint; use lighthouse_network::PeerId; @@ -2676,3 +2677,13 @@ fn genesis_state_url_value() { assert_eq!(config.genesis_state_url_timeout, Duration::from_secs(42)); }); } + +#[test] +fn beacon_node_backend_override() { + CommandLineTest::new() + .flag("beacon-node-backend", Some("leveldb")) + .run_with_zero_port() + .with_config(|config| { + assert_eq!(config.store.backend, BeaconNodeBackend::LevelDb); + }); +}