From 8fc105c898ed21759b573377296b66d3e005f1a9 Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Tue, 4 Oct 2022 16:12:45 +0200 Subject: [PATCH 1/7] wip --- chain/indexer/src/streamer/mod.rs | 7 +- core/store/Cargo.toml | 1 + core/store/benches/store_bench.rs | 13 +- core/store/src/db.rs | 3 + core/store/src/db/colddb.rs | 20 +- core/store/src/db/rocksdb/snapshot.rs | 2 +- core/store/src/lib.rs | 72 ++++++- core/store/src/metadata.rs | 37 +++- core/store/src/opener.rs | 196 +++++++++++++++--- genesis-tools/genesis-populate/src/main.rs | 2 +- .../genesis-populate/src/state_dump.rs | 4 +- nearcore/Cargo.toml | 2 + nearcore/benches/store.rs | 2 +- nearcore/src/config.rs | 8 +- nearcore/src/lib.rs | 34 ++- neard/Cargo.toml | 1 + runtime/runtime-params-estimator/src/main.rs | 9 +- test-utils/store-validator/src/main.rs | 2 +- tools/mock-node/src/setup.rs | 2 +- tools/state-viewer/src/cli.rs | 3 +- 20 files changed, 345 insertions(+), 75 deletions(-) diff --git a/chain/indexer/src/streamer/mod.rs b/chain/indexer/src/streamer/mod.rs index 6f6f0986367..4af7dbe7bbd 100644 --- a/chain/indexer/src/streamer/mod.rs +++ b/chain/indexer/src/streamer/mod.rs @@ -290,9 +290,10 @@ pub(crate) async fn start( blocks_sink: mpsc::Sender, ) { info!(target: INDEXER, "Starting Streamer..."); - let indexer_db_path = near_store::NodeStorage::opener(&indexer_config.home_dir, &store_config) - .path() - .join("indexer"); + let indexer_db_path = + near_store::NodeStorage::opener(&indexer_config.home_dir, &store_config, None) + .path() + .join("indexer"); // TODO: implement proper error handling let db = DB::open_default(indexer_db_path).unwrap(); diff --git a/core/store/Cargo.toml b/core/store/Cargo.toml index d56f6cca717..2ac82d946db 100644 --- a/core/store/Cargo.toml +++ b/core/store/Cargo.toml @@ -57,6 +57,7 @@ no_cache = [] single_thread_rocksdb = [] # Deactivate RocksDB IO background threads test_features = [] protocol_feature_flat_state = [] +cold_store = [] nightly_protocol = [] nightly = [ diff --git a/core/store/benches/store_bench.rs b/core/store/benches/store_bench.rs index 4e5bc2284c6..0368b82c5e5 100644 --- a/core/store/benches/store_bench.rs +++ b/core/store/benches/store_bench.rs @@ -19,10 +19,15 @@ fn benchmark_write_then_read_successful( let tmp_dir = tempfile::tempdir().unwrap(); // Use default StoreConfig rather than NodeStorage::test_opener so we’re using the // same configuration as in production. - let store = NodeStorage::opener(tmp_dir.path(), &Default::default()) - .open() - .unwrap() - .get_store(Temperature::Hot); + let store = NodeStorage::opener( + tmp_dir.path(), + &Default::default(), + #[cfg(feature = "cold_store")] + None, + ) + .open() + .unwrap() + .get_store(Temperature::Hot); let keys = generate_keys(num_keys, key_size); write_to_db(&store, &keys, max_value_size, col); diff --git a/core/store/src/db.rs b/core/store/src/db.rs index eb91d0db29d..d7edbdf832a 100644 --- a/core/store/src/db.rs +++ b/core/store/src/db.rs @@ -2,12 +2,15 @@ use std::io; use crate::DBCol; +#[cfg(feature = "cold_store")] mod colddb; pub mod refcount; pub(crate) mod rocksdb; mod slice; mod testdb; +#[cfg(feature = "cold_store")] +pub use self::colddb::ColdDB; pub use self::rocksdb::RocksDB; pub use self::slice::DBSlice; pub use self::testdb::TestDB; diff --git a/core/store/src/db/colddb.rs b/core/store/src/db/colddb.rs index 6ba0f25e5cf..9ac0ff8842a 100644 --- a/core/store/src/db/colddb.rs +++ b/core/store/src/db/colddb.rs @@ -35,9 +35,15 @@ use crate::DBCol; /// Lastly, since no data is ever deleted from cold storage, trying to decrease /// reference of a value count or delete data is ignored and if debug assertions /// are enabled will cause a panic. -struct ColdDatabase(D); +pub struct ColdDB(D); -impl ColdDatabase { +impl std::convert::From for ColdDB { + fn from(db: D) -> Self { + Self(db) + } +} + +impl ColdDB { /// Returns raw bytes from the underlying storage. /// /// Adjusts the key if necessary (see [`get_cold_key`]) and retrieves data @@ -51,7 +57,7 @@ impl ColdDatabase { } } -impl super::Database for ColdDatabase { +impl super::Database for ColdDB { fn get_raw_bytes(&self, col: DBCol, key: &[u8]) -> std::io::Result>> { match self.get_impl(col, key) { Ok(Some(value)) if col.is_rc() => { @@ -262,8 +268,8 @@ mod test { const VALUE: &[u8] = "FooBar".as_bytes(); /// Constructs test in-memory database. - fn create_test_db() -> ColdDatabase { - ColdDatabase(crate::db::testdb::TestDB::default()) + fn create_test_db() -> ColdDB { + ColdDB(crate::db::testdb::TestDB::default()) } fn set(col: DBCol, key: &[u8]) -> DBOp { @@ -338,8 +344,8 @@ mod test { let name = if is_raw { "raw " } else { "cold" }; let value = db.get_raw_bytes(col, &key).unwrap(); - // When fetching reference counted column ColdDatabase adds - // reference count to it. + // When fetching reference counted column ColdDB adds reference + // count to it. let value = pretty_value(value.as_deref(), col.is_rc() && !is_raw); result.push(format!(" [{name}] get_raw_bytes → {value}")); diff --git a/core/store/src/db/rocksdb/snapshot.rs b/core/store/src/db/rocksdb/snapshot.rs index 88d2b32d230..6a2d255cb02 100644 --- a/core/store/src/db/rocksdb/snapshot.rs +++ b/core/store/src/db/rocksdb/snapshot.rs @@ -190,7 +190,7 @@ fn test_snapshot_recovery() { { let mut config = opener.config().clone(); config.path = Some(path); - let opener = crate::NodeStorage::opener(tmpdir.path(), &config); + let opener = crate::NodeStorage::opener(tmpdir.path(), &config, None); let store = opener.open().unwrap().get_store(crate::Temperature::Hot); assert_eq!(Some(&b"value"[..]), store.get(COL, KEY).unwrap().as_deref()); } diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index ba90c1a08f7..f143c09c321 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -57,8 +57,11 @@ pub use crate::opener::{StoreMigrator, StoreOpener, StoreOpenerError}; /// In the future, certain parts of the code may need to access hot or cold /// storage. Specifically, querying an old block will require reading it from /// the cold storage. +#[derive(Clone, Copy)] pub enum Temperature { Hot, + #[cfg(feature = "cold_store")] + Cold, } /// Node’s storage holding chain and all other necessary data. @@ -68,7 +71,11 @@ pub enum Temperature { /// to [`Store`] which will abstract access to only one of the temperatures of /// the storage. pub struct NodeStorage { - storage: Arc, + hot_storage: Arc, + #[cfg(feature = "cold_store")] + cold_storage: Option>, + #[cfg(not(feature = "cold_store"))] + cold_storage: Option, } /// Node’s single storage source. @@ -84,10 +91,22 @@ pub struct Store { storage: Arc, } +// Those are temporary. While cold_store feature is stabilised, remove those +// type aliases and just use the type directly. +#[cfg(feature = "cold_store")] +pub type ColdConfig<'a> = Option<&'a StoreConfig>; +#[cfg(not(feature = "cold_store"))] +pub type ColdConfig<'a> = Option; + impl NodeStorage { - /// Initialises a new opener with given home directory and store config. - pub fn opener<'a>(home_dir: &std::path::Path, config: &'a StoreConfig) -> StoreOpener<'a> { - StoreOpener::new(home_dir, config) + /// Initialises a new opener with given home directory and hot and cold + /// store config. + pub fn opener<'a>( + home_dir: &std::path::Path, + config: &'a StoreConfig, + cold_config: ColdConfig<'a>, + ) -> StoreOpener<'a> { + StoreOpener::new(home_dir, config, cold_config) } /// Initialises an opener for a new temporary test store. @@ -101,7 +120,7 @@ impl NodeStorage { pub fn test_opener() -> (tempfile::TempDir, StoreOpener<'static>) { static CONFIG: Lazy = Lazy::new(StoreConfig::test_config); let dir = tempfile::tempdir().unwrap(); - let opener = StoreOpener::new(dir.path(), &CONFIG); + let opener = StoreOpener::new(dir.path(), &CONFIG, None); (dir, opener) } @@ -115,7 +134,22 @@ impl NodeStorage { /// possibly [`crate::test_utils::create_test_store`] (depending whether you /// need [`NodeStorage`] or [`Store`] object. pub fn new(storage: Arc) -> Self { - Self { storage } + Self { hot_storage: storage, cold_storage: None } + } + + /// Constructs new object backed by given database. + fn from_rocksdb( + hot_storage: crate::db::RocksDB, + #[cfg(feature = "cold_store")] cold_storage: Option, + #[cfg(not(feature = "cold_store"))] cold_storage: Option, + ) -> Self { + Self { + hot_storage: Arc::new(hot_storage), + #[cfg(feature = "cold_store")] + cold_storage: cold_storage.map(|db| Arc::new(db.into())), + #[cfg(not(feature = "cold_store"))] + cold_storage: cold_storage.map(|_| unreachable!()), + } } /// Returns storage for given temperature. @@ -132,7 +166,9 @@ impl NodeStorage { /// cold. pub fn get_store(&self, temp: Temperature) -> Store { match temp { - Temperature::Hot => Store { storage: self.storage.clone() }, + Temperature::Hot => Store { storage: self.hot_storage.clone() }, + #[cfg(feature = "cold_store")] + Temperature::Cold => Store { storage: self.cold_storage.as_ref().unwrap().clone() }, } } @@ -151,9 +187,11 @@ impl NodeStorage { /// well. For example, garbage collection only ever touches hot storage but /// it should go through [`Store`] interface since data it manipulates /// (e.g. blocks) are live in both databases. - pub fn get_inner(&self, temp: Temperature) -> &Arc { + pub fn _get_inner(&self, temp: Temperature) -> &Arc { match temp { - Temperature::Hot => &self.storage, + Temperature::Hot => &self.hot_storage, + #[cfg(feature = "cold_store")] + Temperature::Cold => todo!(), } } @@ -163,15 +201,27 @@ impl NodeStorage { /// `Arc::clone`. pub fn into_inner(self, temp: Temperature) -> Arc { match temp { - Temperature::Hot => self.storage, + Temperature::Hot => self.hot_storage, + #[cfg(feature = "cold_store")] + Temperature::Cold => self.cold_storage.unwrap(), } } + /// Returns whether the storage has a cold database. + pub fn has_cold(&self) -> bool { + self.cold_storage.is_some() + } + /// Reads database metadata and returns whether the storage is archival. pub fn is_archive(&self) -> io::Result { - Ok(match metadata::DbMetadata::read(self.storage.as_ref())?.kind.unwrap() { + if self.cold_storage.is_some() { + return Ok(true); + } + Ok(match metadata::DbMetadata::read(self.hot_storage.as_ref())?.kind.unwrap() { metadata::DbKind::RPC => false, metadata::DbKind::Archive => true, + #[cfg(feature = "cold_store")] + metadata::DbKind::Hot | metadata::DbKind::Cold => unreachable!(), }) } } diff --git a/core/store/src/metadata.rs b/core/store/src/metadata.rs index debb406e15b..6e7f9e53223 100644 --- a/core/store/src/metadata.rs +++ b/core/store/src/metadata.rs @@ -35,28 +35,53 @@ pub enum DbKind { /// The database is an archive database meaning that it is not garbage /// collected and stores all chain data. Archive, + #[cfg(feature = "cold_store")] + /// The database is Hot meaning that the node runs in archival mode with + /// a paired Cold database. + Hot, + #[cfg(feature = "cold_store")] + /// The database is Cold meaning that the node runs in archival mode with + /// a paired Hot database. + Cold, } pub(super) fn set_store_version(storage: &NodeStorage, version: DbVersion) -> std::io::Result<()> { set_store_metadata(storage, DbMetadata { version, kind: None }) } -pub(super) fn set_store_metadata( +fn set_db_metadata( storage: &NodeStorage, + temp: Temperature, metadata: DbMetadata, ) -> std::io::Result<()> { - let version = metadata.version.to_string().into_bytes(); - let kind = metadata.kind.map(|kind| <&str>::from(kind).as_bytes()); - let mut store_update = storage.get_store(Temperature::Hot).store_update(); - store_update.set(DBCol::DbVersion, VERSION_KEY, &version); + let mut store_update = storage.get_store(temp).store_update(); + store_update.set(DBCol::DbVersion, VERSION_KEY, metadata.version.to_string().as_bytes()); if metadata.version >= DB_VERSION_WITH_KIND { - if let Some(kind) = kind { + #[allow(unused_mut)] + let mut kind = metadata.kind; + #[cfg(feature = "cold_store")] + if matches!(temp, Temperature::Cold) || storage.has_cold() { + kind = Some(if matches!(temp, Temperature::Hot) { DbKind::Hot } else { DbKind::Cold }); + } + if let Some(kind) = kind.map(|kind| <&str>::from(kind).as_bytes()) { store_update.set(DBCol::DbVersion, KIND_KEY, kind); } } store_update.commit() } +pub(super) fn set_store_metadata( + storage: &NodeStorage, + metadata: DbMetadata, +) -> std::io::Result<()> { + set_db_metadata(storage, Temperature::Hot, metadata)?; + #[cfg(feature = "cold_store")] + if storage.has_cold() { + set_db_metadata(storage, Temperature::Cold, metadata)?; + } + Ok(()) +} + /// Metadata about a database. #[derive(Clone, Copy)] pub(super) struct DbMetadata { diff --git a/core/store/src/opener.rs b/core/store/src/opener.rs index 458d59c0d5c..b0c37e26537 100644 --- a/core/store/src/opener.rs +++ b/core/store/src/opener.rs @@ -5,8 +5,6 @@ use crate::metadata::{ }; use crate::{Mode, NodeStorage, StoreConfig}; -const STORE_PATH: &str = "data"; - #[derive(Debug, thiserror::Error)] pub enum StoreOpenerError { /// I/O or RocksDB-level error while opening or accessing the database. @@ -25,6 +23,21 @@ pub enum StoreOpenerError { #[error("Database already exists")] DbAlreadyExists, + /// Hot database exists but cold doesn’t or the other way around. + #[error("Hot and cold databases must either both exist or not")] + HotColdExistenceMismatch, + + /// Hot database exists but cold doesn’t or the other way around. + #[error( + "Hot database version ({hot_version}) doesn’t match \ + cold databases version ({cold_version})" + )] + HotColdVersionMismatch { hot_version: DbVersion, cold_version: DbVersion }, + + /// Hot database exists but cold doesn’t or the other way around. + #[error("{which} database kind should be {want} but got {got:?}")] + DbKindMismatch { which: &'static str, got: Option, want: DbKind }, + /// Unable to create a migration snapshot because one already exists. #[error( "Migration snapshot already exists at {0}; \ @@ -110,10 +123,11 @@ impl From for StoreOpenerError { /// .open(); /// ``` pub struct StoreOpener<'a> { - /// Opener for a single RocksDB instance. - /// - /// pub(crate) for testing. - db: DBOpener<'a>, + /// Opener for an instance of RPC or Hot RocksDB store. + hot: DBOpener<'a>, + + /// Opener for an instance of Cold RocksDB store if one was configured. + cold: Option>, /// What kind of database we should expect; if `None`, the kind of the /// database is not checked. @@ -138,8 +152,17 @@ struct DBOpener<'a> { impl<'a> StoreOpener<'a> { /// Initialises a new opener with given home directory and store config. - pub(crate) fn new(home_dir: &std::path::Path, config: &'a StoreConfig) -> Self { - Self { db: DBOpener::new(home_dir, config), expected_kind: None, migrator: None } + pub(crate) fn new( + home_dir: &std::path::Path, + config: &'a StoreConfig, + cold_config: super::ColdConfig<'a>, + ) -> Self { + Self { + hot: DBOpener::new(home_dir, config, "data"), + cold: cold_config.map(|config| ColdDBOpener::new(home_dir, config, "cold-data")), + expected_kind: None, + migrator: None, + } } /// Configures whether archive or RPC storage is expected. @@ -167,12 +190,12 @@ impl<'a> StoreOpener<'a> { /// /// Does not check whether the database actually exists. pub fn path(&self) -> &std::path::Path { - &self.db.path + &self.hot.path } #[cfg(test)] pub(crate) fn config(&self) -> &StoreConfig { - self.db.config + self.hot.config } /// Opens the storage in read-write mode. @@ -190,10 +213,53 @@ impl<'a> StoreOpener<'a> { /// other hand, if mode is [`Mode::Create`], fails if the database already /// exists. pub fn open_in_mode(&self, mode: Mode) -> Result { - if let Some(metadata) = self.db.get_metadata()? { + let hot_meta = self.hot.get_metadata()?; + let cold_meta = self.cold.as_ref().map(|db| db.get_metadata()).transpose()?; + + if let Some(hot_meta) = hot_meta { + if let Some(Some(cold_meta)) = cold_meta { + assert!(cfg!(feature = "cold_store")); + // If cold database exists, hot and cold databases must have the + // same version and to be Hot and Cold kinds respectively. + if hot_meta.version != cold_meta.version { + return Err(StoreOpenerError::HotColdVersionMismatch { + hot_version: hot_meta.version, + cold_version: cold_meta.version, + }); + } + #[cfg(feature = "cold_store")] + if hot_meta.kind != Some(DbKind::Hot) { + return Err(StoreOpenerError::DbKindMismatch { + which: "Hot", + got: hot_meta.kind, + want: DbKind::Hot, + }); + } + #[cfg(feature = "cold_store")] + if cold_meta.kind != Some(DbKind::Cold) { + return Err(StoreOpenerError::DbKindMismatch { + which: "Cold", + got: cold_meta.kind, + want: DbKind::Cold, + }); + } + } else if cold_meta.is_some() { + // If cold database is configured and hot database exists, + // cold database must exist as well. + assert!(cfg!(feature = "cold_store")); + return Err(StoreOpenerError::HotColdExistenceMismatch); + } else if !matches!(hot_meta.kind, None | Some(DbKind::RPC | DbKind::Archive)) { + // If cold database is not configured, hot database must be + // RPC or Archive kind. + return Err(StoreOpenerError::DbKindMismatch { + which: "Hot", + got: hot_meta.kind, + want: self.expected_kind.unwrap_or(DbKind::RPC), + }); + } self.open_existing( mode.but_cannot_create().ok_or(StoreOpenerError::DbAlreadyExists)?, - metadata, + hot_meta, ) } else if mode.can_create() { self.open_new() @@ -210,25 +276,34 @@ impl<'a> StoreOpener<'a> { let snapshot = self.apply_migrations(mode, metadata)?; tracing::info!(target: "near", path=%self.path().display(), "Opening an existing RocksDB database"); - let (storage, metadata) = self.open_storage(mode, DB_VERSION)?; - self.ensure_kind(&storage, metadata)?; + let (storage, hot_meta, cold_meta) = self.open_storage(mode, DB_VERSION)?; + if let Some(_cold_meta) = cold_meta { + assert!(cfg!(feature = "cold_store")); + // open_storage has verified this. + #[cfg(feature = "cold_store")] + assert_eq!(Some(DbKind::Hot), hot_meta.kind); + #[cfg(feature = "cold_store")] + assert_eq!(Some(DbKind::Cold), _cold_meta.kind); + } else { + self.ensure_kind(&storage, hot_meta)?; + } snapshot.remove()?; Ok(storage) } - /// Makes sure that database’s kind + /// Makes sure that database’s kind is correct. fn ensure_kind( &self, storage: &NodeStorage, metadata: DbMetadata, - ) -> Result { + ) -> Result<(), StoreOpenerError> { let expected = match self.expected_kind { Some(kind) => kind, - None => return Ok(metadata.kind.unwrap()), + None => return Ok(()), }; if expected == metadata.kind.unwrap() { - return Ok(expected); + return Ok(()); } if expected == DbKind::RPC { @@ -243,14 +318,15 @@ impl<'a> StoreOpener<'a> { DbMetadata { version: metadata.version, kind: self.expected_kind }, )?; } - return Ok(DbKind::Archive); + Ok(()) } fn open_new(&self) -> Result { tracing::info!(target: "near", path=%self.path().display(), "Creating a new RocksDB database"); - let db = self.db.create()?; - let storage = NodeStorage::new(std::sync::Arc::new(db)); + let hot = self.hot.create()?; + let cold = self.cold.as_ref().map(|db| db.create()).transpose()?; + let storage = NodeStorage::from_rocksdb(hot, cold); set_store_metadata( &storage, DbMetadata { version: DB_VERSION, kind: self.expected_kind.or(Some(DbKind::RPC)) }, @@ -296,7 +372,7 @@ impl<'a> StoreOpener<'a> { }); } - let snapshot = Snapshot::new(&self.db.path, &self.db.config)?; + let snapshot = Snapshot::new(&self.hot.path, &self.hot.config)?; for version in metadata.version..DB_VERSION { tracing::info!(target: "near", path=%self.path().display(), @@ -321,9 +397,31 @@ impl<'a> StoreOpener<'a> { &self, mode: Mode, want_version: DbVersion, - ) -> std::io::Result<(NodeStorage, DbMetadata)> { - let (db, metadata) = self.db.open(mode, want_version)?; - Ok((NodeStorage::new(std::sync::Arc::new(db)), metadata)) + ) -> std::io::Result<(NodeStorage, DbMetadata, Option)> { + let (hot, hot_meta) = self.hot.open(mode, want_version)?; + let (cold, cold_meta) = + match self.cold.as_ref().map(|opener| opener.open(mode, want_version)).transpose()? { + None => (None, None), + Some((db, meta)) => (Some(db), Some(meta)), + }; + + // Those are mostly sanity checks. If any of those conditions fails + // than either there’s bug in code or someone does something weird on + // the file system and tries to switch databases under us. + match (hot_meta.kind, cold_meta.map(|meta| meta.kind)) { + (Some(DbKind::RPC | DbKind::Archive), None) => Ok(()), + (kind, None) => Err(format!("unexpected DbKind {kind:?}; expected RPC or Archive")), + #[cfg(feature = "cold_store")] + (Some(DbKind::Hot), Some(Some(DbKind::Cold))) => Ok(()), + #[cfg(feature = "cold_store")] + (Some(DbKind::Hot), Some(kind)) => { + Err(format!("unexpected DbKind {kind:?}; expected Cold")) + } + (kind, Some(_)) => Err(format!("unexpected DbKind {kind:?}; expected Hot")), + } + .map_err(|msg| std::io::Error::new(std::io::ErrorKind::Other, msg))?; + + Ok((NodeStorage::from_rocksdb(hot, cold), hot_meta, cold_meta)) } } @@ -332,9 +430,9 @@ impl<'a> DBOpener<'a> { /// /// The path to the database is resolved based on the path in config with /// given home_dir as base directory for resolving relative paths. - fn new(home_dir: &std::path::Path, config: &'a StoreConfig) -> Self { + fn new(home_dir: &std::path::Path, config: &'a StoreConfig, default_path: &str) -> Self { let path = - home_dir.join(config.path.as_deref().unwrap_or(std::path::Path::new(STORE_PATH))); + home_dir.join(config.path.as_deref().unwrap_or(std::path::Path::new(default_path))); Self { path, config } } @@ -406,3 +504,47 @@ pub trait StoreMigrator { /// equal to [`DB_VERSION`]. fn migrate(&self, storage: &NodeStorage, version: DbVersion) -> anyhow::Result<()>; } + +// This is only here to make conditional compilation simpler. Once cold_store +// feature is stabilised, get rid of it and use DBOpener directly. +use cold_db_opener::ColdDBOpener; + +#[cfg(feature = "cold_store")] +mod cold_db_opener { + pub(super) type ColdDBOpener<'a> = super::DBOpener<'a>; +} + +#[cfg(not(feature = "cold_store"))] +mod cold_db_opener { + use super::*; + + pub(super) enum OpenerImpl {} + + impl OpenerImpl { + pub(super) fn new( + _home_dir: &std::path::Path, + _config: std::convert::Infallible, + _default_path: &str, + ) -> Self { + unreachable!() + } + + pub(super) fn get_metadata(&self) -> std::io::Result> { + unreachable!() + } + + pub(super) fn open( + &self, + _mode: Mode, + _want_version: DbVersion, + ) -> std::io::Result<(std::convert::Infallible, DbMetadata)> { + unreachable!() + } + + pub(super) fn create(&self) -> std::io::Result { + unreachable!() + } + } + + pub(super) type ColdDBOpener<'a> = OpenerImpl; +} diff --git a/genesis-tools/genesis-populate/src/main.rs b/genesis-tools/genesis-populate/src/main.rs index 2260776d0b5..a69037aad1b 100644 --- a/genesis-tools/genesis-populate/src/main.rs +++ b/genesis-tools/genesis-populate/src/main.rs @@ -25,7 +25,7 @@ fn main() { let near_config = load_config(home_dir, GenesisValidationMode::Full) .unwrap_or_else(|e| panic!("Error loading config: {:#}", e)); - let store = near_store::NodeStorage::opener(home_dir, &near_config.config.store) + let store = near_store::NodeStorage::opener(home_dir, &near_config.config.store, None) .open() .unwrap() .get_store(near_store::Temperature::Hot); diff --git a/genesis-tools/genesis-populate/src/state_dump.rs b/genesis-tools/genesis-populate/src/state_dump.rs index a27deabc4d8..79a890fdaab 100644 --- a/genesis-tools/genesis-populate/src/state_dump.rs +++ b/genesis-tools/genesis-populate/src/state_dump.rs @@ -20,7 +20,9 @@ impl StateDump { let storage = TestDB::new(); near_store::NodeStorage::new(storage) } else { - near_store::NodeStorage::opener(store_home_dir, &Default::default()).open().unwrap() + near_store::NodeStorage::opener(store_home_dir, &Default::default(), None) + .open() + .unwrap() }; let store = node_storage.get_store(Temperature::Hot); let state_file = dir.join(STATE_DUMP_FILE); diff --git a/nearcore/Cargo.toml b/nearcore/Cargo.toml index e99207599f3..96f21f355f3 100644 --- a/nearcore/Cargo.toml +++ b/nearcore/Cargo.toml @@ -138,3 +138,5 @@ sandbox = [ io_trace = ["near-vm-runner/io_trace"] shardnet = ["near-network/shardnet"] + +cold_store = ["near-store/cold_store"] diff --git a/nearcore/benches/store.rs b/nearcore/benches/store.rs index 4ba25551775..81e4776f60f 100644 --- a/nearcore/benches/store.rs +++ b/nearcore/benches/store.rs @@ -28,7 +28,7 @@ fn read_trie_items(bench: &mut Bencher, shard_id: usize, mode: Mode) { bench.iter(move || { tracing::info!(target: "neard", "{:?}", home_dir); - let store = near_store::NodeStorage::opener(&home_dir, &near_config.config.store) + let store = near_store::NodeStorage::opener(&home_dir, &near_config.config.store, None) .open_in_mode(mode) .unwrap() .get_store(Temperature::Hot); diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 45adc4c62e2..cd304cc8484 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -321,8 +321,12 @@ pub struct Config { /// If set, overrides value in genesis configuration. #[serde(skip_serializing_if = "Option::is_none")] pub max_gas_burnt_view: Option, - /// Different parameters to configure/optimize underlying storage. + /// Different parameters to configure underlying storage. pub store: near_store::StoreConfig, + /// Different parameters to configure underlying cold storage. + #[cfg(feature = "cold_store")] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub cold_store: Option, // TODO(mina86): Remove those two altogether at some point. We need to be // somewhat careful though and make sure that we don’t start silently @@ -368,6 +372,8 @@ impl Default for Config { db_migration_snapshot_path: None, use_db_migration_snapshot: None, store: near_store::StoreConfig::default(), + #[cfg(feature = "cold_store")] + cold_store: None, } } } diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 2327b53754a..793a3108202 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -54,9 +54,16 @@ pub fn get_default_home() -> PathBuf { /// being opened. fn open_storage(home_dir: &Path, near_config: &mut NearConfig) -> anyhow::Result { let migrator = migrations::Migrator::new(near_config); - let opener = NodeStorage::opener(home_dir, &near_config.config.store) - .with_migrator(&migrator) - .expect_archive(near_config.client_config.archive); + let opener = NodeStorage::opener( + home_dir, + &near_config.config.store, + #[cfg(feature = "cold_store")] + near_config.config.cold_store.as_ref(), + #[cfg(not(feature = "cold_store"))] + None, + ) + .with_migrator(&migrator) + .expect_archive(near_config.client_config.archive); let storage = match opener.open() { Ok(storage) => Ok(storage), Err(StoreOpenerError::IO(err)) => { @@ -66,6 +73,23 @@ fn open_storage(home_dir: &Path, near_config: &mut NearConfig) -> anyhow::Result Err(StoreOpenerError::DbDoesNotExist) => unreachable!(), // Cannot happen with Mode::ReadWrite Err(StoreOpenerError::DbAlreadyExists) => unreachable!(), + Err(StoreOpenerError::HotColdExistenceMismatch) => { + Err(anyhow::anyhow!( + "Hot and cold databases must but exist or both not exist.\n\ + Note that at this moment it’s not possible to convert RPC or legacy archive database into split hot+cold database.\n\ + To set up node in that configuration, start with neither of the databases existing.", + )) + }, + Err(err @ StoreOpenerError::HotColdVersionMismatch { .. }) => { + Err(anyhow::anyhow!("{err}")) + }, + Err(StoreOpenerError::DbKindMismatch { which, got, want }) => { + Err(if let Some(got) = got { + anyhow::anyhow!("{which} database kind should be {want} but got {got}") + } else { + anyhow::anyhow!("{which} database kind should be {want} but none was set") + }) + } Err(StoreOpenerError::SnapshotAlreadyExists(snap_path)) => { Err(anyhow::anyhow!( "Detected an existing database migration snapshot at ‘{}’.\n\ @@ -259,7 +283,7 @@ pub fn recompress_storage(home_dir: &Path, opts: RecompressOpts) -> anyhow::Resu skip_columns.push(DBCol::TrieChanges); } - let src_opener = NodeStorage::opener(home_dir, &config.store); + let src_opener = NodeStorage::opener(home_dir, &config.store, None); let src_path = src_opener.path(); let mut dst_config = config.store.clone(); @@ -267,7 +291,7 @@ pub fn recompress_storage(home_dir: &Path, opts: RecompressOpts) -> anyhow::Resu // Note: opts.dest_dir is resolved relative to current working directory // (since it’s a command line option) which is why we set home to cwd. let cwd = std::env::current_dir()?; - let dst_opener = NodeStorage::opener(&cwd, &dst_config); + let dst_opener = NodeStorage::opener(&cwd, &dst_config, None); let dst_path = dst_opener.path(); info!(target: "recompress", diff --git a/neard/Cargo.toml b/neard/Cargo.toml index 28ce6b54dae..917114571c4 100644 --- a/neard/Cargo.toml +++ b/neard/Cargo.toml @@ -55,6 +55,7 @@ rosetta_rpc = ["nearcore/rosetta_rpc"] json_rpc = ["nearcore/json_rpc"] protocol_feature_fix_staking_threshold = ["nearcore/protocol_feature_fix_staking_threshold"] protocol_feature_flat_state = ["nearcore/protocol_feature_flat_state"] +cold_store = ["nearcore/cold_store", "near-store/cold_store"] nightly = [ "nightly_protocol", diff --git a/runtime/runtime-params-estimator/src/main.rs b/runtime/runtime-params-estimator/src/main.rs index ef6e979652f..55f6d50b884 100644 --- a/runtime/runtime-params-estimator/src/main.rs +++ b/runtime/runtime-params-estimator/src/main.rs @@ -156,10 +156,11 @@ fn main() -> anyhow::Result<()> { let near_config = nearcore::load_config(&state_dump_path, GenesisValidationMode::Full) .context("Error loading config")?; - let store = near_store::NodeStorage::opener(&state_dump_path, &near_config.config.store) - .open() - .unwrap() - .get_store(near_store::Temperature::Hot); + let store = + near_store::NodeStorage::opener(&state_dump_path, &near_config.config.store, None) + .open() + .unwrap() + .get_store(near_store::Temperature::Hot); GenesisBuilder::from_config_and_store(&state_dump_path, near_config, store) .add_additional_accounts(cli_args.additional_accounts_num) .add_additional_accounts_contract(contract_code.to_vec()) diff --git a/test-utils/store-validator/src/main.rs b/test-utils/store-validator/src/main.rs index e58e6358a6a..9acfb61590c 100644 --- a/test-utils/store-validator/src/main.rs +++ b/test-utils/store-validator/src/main.rs @@ -30,7 +30,7 @@ fn main() { let near_config = load_config(home_dir, GenesisValidationMode::Full) .unwrap_or_else(|e| panic!("Error loading config: {:#}", e)); - let store = near_store::NodeStorage::opener(home_dir, &near_config.config.store) + let store = near_store::NodeStorage::opener(home_dir, &near_config.config.store, None) .open() .unwrap() .get_store(near_store::Temperature::Hot); diff --git a/tools/mock-node/src/setup.rs b/tools/mock-node/src/setup.rs index 89c0240687c..265bda9c0d1 100644 --- a/tools/mock-node/src/setup.rs +++ b/tools/mock-node/src/setup.rs @@ -33,7 +33,7 @@ fn setup_runtime( let store = if in_memory_storage { create_test_store() } else { - near_store::NodeStorage::opener(home_dir, &config.config.store) + near_store::NodeStorage::opener(home_dir, &config.config.store, None) .open() .unwrap() .get_store(near_store::Temperature::Hot) diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index 472d6c65907..968a475ed0f 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -76,7 +76,8 @@ impl StateViewerSubCommand { pub fn run(self, home_dir: &Path, genesis_validation: GenesisValidationMode, mode: Mode) { let near_config = load_config(home_dir, genesis_validation) .unwrap_or_else(|e| panic!("Error loading config: {:#}", e)); - let store_opener = near_store::NodeStorage::opener(home_dir, &near_config.config.store); + let store_opener = + near_store::NodeStorage::opener(home_dir, &near_config.config.store, None); let store = store_opener.open_in_mode(mode).unwrap(); let hot = store.get_store(near_store::Temperature::Hot); match self { From f7026d0af14d3b552d0c8ff5a3a154803934062a Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Wed, 19 Oct 2022 14:34:57 +0200 Subject: [PATCH 2/7] wip --- core/store/src/opener.rs | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/core/store/src/opener.rs b/core/store/src/opener.rs index b0c37e26537..4c5bf72b323 100644 --- a/core/store/src/opener.rs +++ b/core/store/src/opener.rs @@ -408,18 +408,28 @@ impl<'a> StoreOpener<'a> { // Those are mostly sanity checks. If any of those conditions fails // than either there’s bug in code or someone does something weird on // the file system and tries to switch databases under us. - match (hot_meta.kind, cold_meta.map(|meta| meta.kind)) { - (Some(DbKind::RPC | DbKind::Archive), None) => Ok(()), - (kind, None) => Err(format!("unexpected DbKind {kind:?}; expected RPC or Archive")), - #[cfg(feature = "cold_store")] - (Some(DbKind::Hot), Some(Some(DbKind::Cold))) => Ok(()), + if let Some(_cold_meta) = cold_meta { #[cfg(feature = "cold_store")] - (Some(DbKind::Hot), Some(kind)) => { - Err(format!("unexpected DbKind {kind:?}; expected Cold")) + if hot_meta.kind != Some(DbKind::Hot) { + Err((hot_meta.kind, "Hot")) + } else if _cold_meta.kind != Some(DbKind::Cold) { + Err((_cold_meta.kind, "Cold")) + } else { + Ok(()) } - (kind, Some(_)) => Err(format!("unexpected DbKind {kind:?}; expected Hot")), + #[cfg(not(feature = "cold_store"))] + Ok(()) + } else if matches!(hot_meta.kind, None | Some(DbKind::RPC | DbKind::Archive)) { + Ok(()) + } else { + Err((hot_meta.kind, "RPC or Archive")) } - .map_err(|msg| std::io::Error::new(std::io::ErrorKind::Other, msg))?; + .map_err(|(got, want)| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("unexpected DbKind {got:?}; expected {want}"), + ) + })?; Ok((NodeStorage::from_rocksdb(hot, cold), hot_meta, cold_meta)) } From edb1d1d0ae19159d35d5cfd37fa90ef51cdc1124 Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Wed, 19 Oct 2022 14:53:04 +0200 Subject: [PATCH 3/7] wip --- core/store/src/opener.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/core/store/src/opener.rs b/core/store/src/opener.rs index 4c5bf72b323..f3882495ffc 100644 --- a/core/store/src/opener.rs +++ b/core/store/src/opener.rs @@ -273,7 +273,7 @@ impl<'a> StoreOpener<'a> { mode: Mode, metadata: DbMetadata, ) -> Result { - let snapshot = self.apply_migrations(mode, metadata)?; + let snapshots = self.apply_migrations(mode, metadata)?; tracing::info!(target: "near", path=%self.path().display(), "Opening an existing RocksDB database"); let (storage, hot_meta, cold_meta) = self.open_storage(mode, DB_VERSION)?; @@ -287,7 +287,8 @@ impl<'a> StoreOpener<'a> { } else { self.ensure_kind(&storage, hot_meta)?; } - snapshot.remove()?; + snapshots.0.remove()?; + snapshots.1.remove()?; Ok(storage) } @@ -339,9 +340,9 @@ impl<'a> StoreOpener<'a> { &self, mode: Mode, metadata: DbMetadata, - ) -> Result { + ) -> Result<(Snapshot, Snapshot), StoreOpenerError> { if metadata.version == DB_VERSION { - return Ok(Snapshot::none()); + return Ok((Snapshot::none(), Snapshot::none())); } else if metadata.version > DB_VERSION { return Err(StoreOpenerError::DbVersionTooNew { got: metadata.version, @@ -372,7 +373,11 @@ impl<'a> StoreOpener<'a> { }); } - let snapshot = Snapshot::new(&self.hot.path, &self.hot.config)?; + let hot_snapshot = self.hot.snapshot()?; + let cold_snapshot = match self.cold { + None => Snapshot::none(), + Some(ref opener) => opener.snapshot()?, + }; for version in metadata.version..DB_VERSION { tracing::info!(target: "near", path=%self.path().display(), @@ -390,7 +395,7 @@ impl<'a> StoreOpener<'a> { set_store_version(&storage, 10000)?; } - Ok(snapshot) + Ok((hot_snapshot, cold_snapshot)) } fn open_storage( @@ -489,6 +494,11 @@ impl<'a> DBOpener<'a> { fn create(&self) -> std::io::Result { RocksDB::open(&self.path, &self.config, Mode::Create) } + + /// Creates a new snapshot for the database. + fn snapshot(&self) -> Result { + Snapshot::new(&self.path, &self.config) + } } pub trait StoreMigrator { @@ -554,6 +564,10 @@ mod cold_db_opener { pub(super) fn create(&self) -> std::io::Result { unreachable!() } + + pub(super) fn snapshot(&self) -> Result { + Ok(Snapshot::none()) + } } pub(super) type ColdDBOpener<'a> = OpenerImpl; From 0aa78baf33a8eae0087c8c421b0c9b7032d13275 Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Tue, 25 Oct 2022 00:09:44 +0200 Subject: [PATCH 4/7] review --- core/store/src/opener.rs | 8 ++++++-- nearcore/src/lib.rs | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/store/src/opener.rs b/core/store/src/opener.rs index f3882495ffc..5b81bcb7486 100644 --- a/core/store/src/opener.rs +++ b/core/store/src/opener.rs @@ -27,14 +27,18 @@ pub enum StoreOpenerError { #[error("Hot and cold databases must either both exist or not")] HotColdExistenceMismatch, - /// Hot database exists but cold doesn’t or the other way around. + /// Hot and cold databases have different versions. #[error( "Hot database version ({hot_version}) doesn’t match \ cold databases version ({cold_version})" )] HotColdVersionMismatch { hot_version: DbVersion, cold_version: DbVersion }, - /// Hot database exists but cold doesn’t or the other way around. + /// Database has incorrect kind. + /// + /// Specifically, this happens if node is running with a single database and + /// its kind is not RPC or Archive; or it’s running with two databases and + /// their types aren’t Hot and Cold respectively. #[error("{which} database kind should be {want} but got {got:?}")] DbKindMismatch { which: &'static str, got: Option, want: DbKind }, diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 329c1c9b0f9..518a5c72020 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -75,8 +75,8 @@ fn open_storage(home_dir: &Path, near_config: &mut NearConfig) -> anyhow::Result Err(StoreOpenerError::DbAlreadyExists) => unreachable!(), Err(StoreOpenerError::HotColdExistenceMismatch) => { Err(anyhow::anyhow!( - "Hot and cold databases must but exist or both not exist.\n\ - Note that at this moment it’s not possible to convert RPC or legacy archive database into split hot+cold database.\n\ + "Hot and cold databases must either both exist or both not exist.\n\ + Note that at this moment it’s not possible to convert and RPC or legacy archive database into split hot+cold database.\n\ To set up node in that configuration, start with neither of the databases existing.", )) }, From 70029af0dbc0af2fcc69336c22bf12e67fc30603 Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Tue, 25 Oct 2022 01:21:22 +0200 Subject: [PATCH 5/7] wip --- tools/mirror/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/mirror/src/lib.rs b/tools/mirror/src/lib.rs index 079ffe74472..4a1ab006034 100644 --- a/tools/mirror/src/lib.rs +++ b/tools/mirror/src/lib.rs @@ -135,7 +135,7 @@ struct TxMirror { fn open_db>(home: P, config: &NearConfig) -> anyhow::Result { let db_path = - near_store::NodeStorage::opener(home.as_ref(), &config.config.store).path().join("mirror"); + near_store::NodeStorage::opener(home.as_ref(), &config.config.store, None).path().join("mirror"); let mut options = rocksdb::Options::default(); options.create_missing_column_families(true); options.create_if_missing(true); From 80e3b7261c4b2369270c34c98166516cbc673e04 Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Mon, 24 Oct 2022 23:11:18 +0100 Subject: [PATCH 6/7] =?UTF-8?q?core:=20add=20chain=20Error=20=E2=86=92=20T?= =?UTF-8?q?xStatusError=20conversion=20(#7912)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Having conversion from near_chain_primitves::Error to TxStatusError eliminates a handful of trivial map_error calls. --- chain/client-primitives/src/types.rs | 6 ++++++ chain/client/src/view_client.rs | 17 ++++++----------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/chain/client-primitives/src/types.rs b/chain/client-primitives/src/types.rs index ad2de642944..fedd8af30cd 100644 --- a/chain/client-primitives/src/types.rs +++ b/chain/client-primitives/src/types.rs @@ -598,6 +598,12 @@ pub enum TxStatusError { TimeoutError, } +impl From for TxStatusError { + fn from(error: near_chain_primitives::Error) -> Self { + Self::ChainError(error) + } +} + impl Message for TxStatus { type Result = Result, TxStatusError>; } diff --git a/chain/client/src/view_client.rs b/chain/client/src/view_client.rs index 4557883ac9a..6458a211e76 100644 --- a/chain/client/src/view_client.rs +++ b/chain/client/src/view_client.rs @@ -371,7 +371,7 @@ impl ViewClientActor { } } - let head = self.chain.head().map_err(|e| TxStatusError::ChainError(e))?; + let head = self.chain.head()?; let target_shard_id = self .runtime_adapter .account_id_to_shard_id(&signer_account_id, &head.epoch_id) @@ -386,10 +386,8 @@ impl ViewClientActor { match self.chain.get_final_transaction_result(&tx_hash) { Ok(tx_result) => { let res = if fetch_receipt { - let final_result = self - .chain - .get_final_transaction_result_with_receipt(tx_result) - .map_err(|e| TxStatusError::ChainError(e))?; + let final_result = + self.chain.get_final_transaction_result_with_receipt(tx_result)?; FinalExecutionOutcomeViewEnum::FinalExecutionOutcomeWithReceipt( final_result, ) @@ -407,7 +405,7 @@ impl ViewClientActor { } Err(err) => { warn!(target: "client", ?err, "Error trying to get transaction result"); - Err(TxStatusError::ChainError(err)) + Err(err.into()) } } } else { @@ -417,10 +415,7 @@ impl ViewClientActor { .runtime_adapter .account_id_to_shard_id(&signer_account_id, &head.epoch_id) .map_err(|err| TxStatusError::InternalError(err.to_string()))?; - let validator = self - .chain - .find_validator_for_forwarding(target_shard_id) - .map_err(|e| TxStatusError::ChainError(e))?; + let validator = self.chain.find_validator_for_forwarding(target_shard_id)?; self.network_adapter.do_send( PeerManagerMessageRequest::NetworkRequests(NetworkRequests::TxStatus( @@ -907,7 +902,7 @@ impl Handler> for ViewClientActor { } Err(e) => match e { near_chain::Error::DBNotFoundErr(_) => { - let head = self.chain.head().map_err(|e| TxStatusError::ChainError(e))?; + let head = self.chain.head()?; let target_shard_id = self.runtime_adapter.account_id_to_shard_id(&account_id, &head.epoch_id)?; if self.runtime_adapter.cares_about_shard( From e83610968addd7c2c0480d2e8e707300ccbfe2fa Mon Sep 17 00:00:00 2001 From: Michal Nazarewicz Date: Tue, 25 Oct 2022 01:33:14 +0200 Subject: [PATCH 7/7] fmt --- tools/mirror/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tools/mirror/src/lib.rs b/tools/mirror/src/lib.rs index 4a1ab006034..78e2996b44d 100644 --- a/tools/mirror/src/lib.rs +++ b/tools/mirror/src/lib.rs @@ -134,8 +134,9 @@ struct TxMirror { } fn open_db>(home: P, config: &NearConfig) -> anyhow::Result { - let db_path = - near_store::NodeStorage::opener(home.as_ref(), &config.config.store, None).path().join("mirror"); + let db_path = near_store::NodeStorage::opener(home.as_ref(), &config.config.store, None) + .path() + .join("mirror"); let mut options = rocksdb::Options::default(); options.create_missing_column_families(true); options.create_if_missing(true);