diff --git a/storage/aptosdb/src/ledger_db/mod.rs b/storage/aptosdb/src/ledger_db/mod.rs index e0d3119ecfc0d..da5d8377fbb26 100644 --- a/storage/aptosdb/src/ledger_db/mod.rs +++ b/storage/aptosdb/src/ledger_db/mod.rs @@ -364,7 +364,7 @@ impl LedgerDb { &gen_rocksdb_options(db_config, true), path.clone(), name, - Self::get_column_families_by_name(name), + Self::gen_cfds_by_name(db_config, name), )? } else { DB::open_cf( diff --git a/storage/aptosdb/src/state_kv_db.rs b/storage/aptosdb/src/state_kv_db.rs index efd99adacec86..73bb6594b026c 100644 --- a/storage/aptosdb/src/state_kv_db.rs +++ b/storage/aptosdb/src/state_kv_db.rs @@ -5,9 +5,7 @@ use crate::{ common::NUM_STATE_SHARDS, - db_options::{ - gen_state_kv_cfds, state_kv_db_column_families, state_kv_db_new_key_column_families, - }, + db_options::gen_state_kv_cfds, metrics::OTHER_TIMERS_SECONDS, schema::{ db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}, @@ -268,11 +266,7 @@ impl StateKvDb { &gen_rocksdb_options(state_kv_db_config, true), path, name, - if enable_sharding { - state_kv_db_new_key_column_families() - } else { - state_kv_db_column_families() - }, + gen_state_kv_cfds(state_kv_db_config, enable_sharding), )? } else { DB::open_cf( diff --git a/storage/aptosdb/src/state_merkle_db.rs b/storage/aptosdb/src/state_merkle_db.rs index 4c957969728d0..4c42a61042712 100644 --- a/storage/aptosdb/src/state_merkle_db.rs +++ b/storage/aptosdb/src/state_merkle_db.rs @@ -3,7 +3,7 @@ use crate::{ common::NUM_STATE_SHARDS, - db_options::{gen_state_merkle_cfds, state_merkle_db_column_families}, + db_options::gen_state_merkle_cfds, lru_node_cache::LruNodeCache, metrics::{NODE_CACHE_SECONDS, OTHER_TIMERS_SECONDS}, schema::{ @@ -638,7 +638,7 @@ impl StateMerkleDb { &gen_rocksdb_options(state_merkle_db_config, true), path, name, - state_merkle_db_column_families(), + gen_state_merkle_cfds(state_merkle_db_config), )? } else { DB::open_cf( diff --git a/storage/schemadb/src/lib.rs b/storage/schemadb/src/lib.rs index c9f2ee176d172..a1ef74b9ce751 100644 --- a/storage/schemadb/src/lib.rs +++ b/storage/schemadb/src/lib.rs @@ -106,6 +106,13 @@ impl SchemaBatch { } } +#[derive(Debug)] +enum OpenMode<'a> { + ReadWrite, + ReadOnly, + Secondary(&'a Path), +} + /// This DB is a schematized RocksDB wrapper where all data passed in and out are typed according to /// [`Schema`]s. #[derive(Debug)] @@ -143,27 +150,7 @@ impl DB { name: &str, cfds: Vec, ) -> DbResult { - // ignore error, since it'll fail to list cfs on the first open - let existing_cfs = rocksdb::DB::list_cf(db_opts, path.de_unc()).unwrap_or_default(); - - let unrecognized_cfds = existing_cfs - .iter() - .map(AsRef::as_ref) - .collect::>() - .difference(&cfds.iter().map(|cfd| cfd.name()).collect()) - .map(|cf| { - warn!("Unrecognized CF: {}", cf); - - let mut cf_opts = Options::default(); - cf_opts.set_compression_type(DBCompressionType::Lz4); - ColumnFamilyDescriptor::new(cf.to_string(), cf_opts) - }) - .collect::>(); - let all_cfds = cfds.into_iter().chain(unrecognized_cfds); - - let inner = - rocksdb::DB::open_cf_descriptors(db_opts, path.de_unc(), all_cfds).into_db_res()?; - Ok(Self::log_construct(name, inner)) + Self::open_cf_impl(db_opts, path, name, cfds, OpenMode::ReadWrite) } /// Open db in readonly mode @@ -173,14 +160,9 @@ impl DB { opts: &Options, path: impl AsRef, name: &str, - cfs: Vec, + cfds: Vec, ) -> DbResult { - let error_if_log_file_exists = false; - let inner = - rocksdb::DB::open_cf_for_read_only(opts, path.de_unc(), cfs, error_if_log_file_exists) - .into_db_res()?; - - Ok(Self::log_construct(name, inner)) + Self::open_cf_impl(opts, path, name, cfds, OpenMode::ReadOnly) } pub fn open_cf_as_secondary>( @@ -188,20 +170,75 @@ impl DB { primary_path: P, secondary_path: P, name: &str, - cfs: Vec, + cfds: Vec, ) -> DbResult { - let inner = rocksdb::DB::open_cf_as_secondary( + Self::open_cf_impl( opts, - primary_path.de_unc(), - secondary_path.de_unc(), - cfs, + primary_path, + name, + cfds, + OpenMode::Secondary(secondary_path.as_ref()), ) + } + + fn open_cf_impl( + db_opts: &Options, + path: impl AsRef, + name: &str, + cfds: Vec, + open_mode: OpenMode, + ) -> DbResult { + // ignore error, since it'll fail to list cfs on the first open + let existing_cfs = rocksdb::DB::list_cf(db_opts, path.de_unc()).unwrap_or_default(); + + let unrecognized_cfds = existing_cfs + .iter() + .map(AsRef::as_ref) + .collect::>() + .difference(&cfds.iter().map(|cfd| cfd.name()).collect()) + .map(|cf| { + warn!("Unrecognized CF: {}", cf); + + let mut cf_opts = Options::default(); + cf_opts.set_compression_type(DBCompressionType::Lz4); + ColumnFamilyDescriptor::new(cf.to_string(), cf_opts) + }) + .collect::>(); + let all_cfds = cfds.into_iter().chain(unrecognized_cfds); + + let inner = { + use rocksdb::DB; + use OpenMode::*; + + match open_mode { + ReadWrite => DB::open_cf_descriptors(db_opts, path.de_unc(), all_cfds), + ReadOnly => { + DB::open_cf_descriptors_read_only( + db_opts, + path.de_unc(), + all_cfds, + false, /* error_if_log_file_exist */ + ) + }, + Secondary(secondary_path) => DB::open_cf_descriptors_as_secondary( + db_opts, + path.de_unc(), + secondary_path, + all_cfds, + ), + } + } .into_db_res()?; - Ok(Self::log_construct(name, inner)) + + Ok(Self::log_construct(name, open_mode, inner)) } - fn log_construct(name: &str, inner: rocksdb::DB) -> DB { - info!(rocksdb_name = name, "Opened RocksDB."); + fn log_construct(name: &str, open_mode: OpenMode, inner: rocksdb::DB) -> DB { + info!( + rocksdb_name = name, + open_mode = ?open_mode, + "Opened RocksDB." + ); DB { name: name.to_string(), inner, diff --git a/storage/schemadb/tests/db.rs b/storage/schemadb/tests/db.rs index b162ac4d0244e..9853332ff710f 100644 --- a/storage/schemadb/tests/db.rs +++ b/storage/schemadb/tests/db.rs @@ -10,7 +10,7 @@ use aptos_schemadb::{ }; use aptos_storage_interface::AptosDbError; use byteorder::{LittleEndian, ReadBytesExt}; -use rocksdb::DEFAULT_COLUMN_FAMILY_NAME; +use rocksdb::{ColumnFamilyDescriptor, DEFAULT_COLUMN_FAMILY_NAME}; // Creating two schemas that share exactly the same structure but are stored in different column // families. Also note that the key and value are of the same type `TestField`. By implementing @@ -81,6 +81,13 @@ fn get_column_families() -> Vec { ] } +fn get_cfds() -> Vec { + get_column_families() + .iter() + .map(|cf_name| ColumnFamilyDescriptor::new(*cf_name, rocksdb::Options::default())) + .collect() +} + fn open_db(dir: &aptos_temppath::TempPath) -> DB { let mut db_opts = rocksdb::Options::default(); db_opts.create_if_missing(true); @@ -89,13 +96,8 @@ fn open_db(dir: &aptos_temppath::TempPath) -> DB { } fn open_db_read_only(dir: &aptos_temppath::TempPath) -> DB { - DB::open_cf_readonly( - &rocksdb::Options::default(), - dir.path(), - "test", - get_column_families(), - ) - .expect("Failed to open DB.") + DB::open_cf_readonly(&rocksdb::Options::default(), dir.path(), "test", get_cfds()) + .expect("Failed to open DB.") } fn open_db_as_secondary(dir: &aptos_temppath::TempPath, dir_sec: &aptos_temppath::TempPath) -> DB { @@ -104,7 +106,7 @@ fn open_db_as_secondary(dir: &aptos_temppath::TempPath, dir_sec: &aptos_temppath dir.path(), dir_sec.path(), "test", - get_column_families(), + get_cfds(), ) .expect("Failed to open DB.") }