Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: allow configuring RocksDB with support for subset of columns #7544

Merged
merged 4 commits into from
Sep 5, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 130 additions & 40 deletions core/store/src/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct RocksDB {
/// Rather than accessing this field directly, use [`RocksDB::cf_handle`]
/// method instead. It returns `&ColumnFamily` which is what you usually
/// want.
cf_handles: enum_map::EnumMap<DBCol, std::ptr::NonNull<ColumnFamily>>,
cf_handles: enum_map::EnumMap<DBCol, Option<std::ptr::NonNull<ColumnFamily>>>,

check_free_space_counter: std::sync::atomic::AtomicU16,
check_free_space_interval: u16,
Expand All @@ -49,14 +49,50 @@ unsafe impl Send for RocksDB {}
unsafe impl Sync for RocksDB {}

impl RocksDB {
/// Opens the database either in read only or in read/write mode depending
/// on the `mode` parameter specified in the store_config.
pub fn open(path: &Path, store_config: &StoreConfig, mode: Mode) -> io::Result<RocksDB> {
/// Opens the database.
///
/// `path` specifies location of the database. It’s assumed that it has
/// been resolved based on configuration in `store_config` and thus path
/// configuration in `store_config` is ignored.
///
/// `store_config` specifies other storage configuration such open files
/// limit or whether to enable collection of statistics.
///
/// `mode` specifies whether to open the database in read/write or read-only
/// mode. In the latter case, the database will not be created if it
/// doesn’t exist nor any migrations will be performed if the database has
/// database version different than expected.
pub fn open(path: &Path, store_config: &StoreConfig, mode: Mode) -> io::Result<Self> {
let columns: Vec<DBCol> = DBCol::iter().collect();
Self::open_with_columns(path, store_config, mode, &columns)
}

/// Opens the database with given set of column families configured.
///
/// With cold storage, we will need to be able to configure the database
/// with only a subset of columns. The `columns` argument specifies which
/// columns to configure in the database.
///
/// Note that RocksDB is weird. It’s not possible to open database in
/// read/write mode without specifying all the column families existing in
/// the database. On the other hand, it’s not possible to open database in
/// read-only mode while specifying column families which don’t exist.
///
/// Furthermore, note that when opening in read/write mode, we configure
/// RocksDB to create missing columns.
///
/// With all that, it’s actually quite messy if at some point we’ll end up
/// opening cold storage as hot since it’ll create all the missing columns.
fn open_with_columns(
path: &Path,
store_config: &StoreConfig,
mode: Mode,
columns: &[DBCol],
) -> io::Result<Self> {
let counter = instance_tracker::InstanceTracker::try_new(store_config.max_open_files)
.map_err(other_error)?;
let (db, db_opt) = Self::open_db(path, store_config, mode)?;
let cf_handles = Self::get_cf_handles(&db);

let (db, db_opt) = Self::open_db(path, store_config, mode, columns)?;
let cf_handles = Self::get_cf_handles(&db, columns);
Ok(Self {
db,
db_opt,
Expand All @@ -68,10 +104,17 @@ impl RocksDB {
})
}

/// Opens the database with all column families configured.
fn open_db(path: &Path, store_config: &StoreConfig, mode: Mode) -> io::Result<(DB, Options)> {
/// Opens the database with given column families configured.
fn open_db(
path: &Path,
store_config: &StoreConfig,
mode: Mode,
columns: &[DBCol],
) -> io::Result<(DB, Options)> {
let options = rocksdb_options(store_config, mode);
let cf_descriptors = DBCol::iter()
let cf_descriptors = columns
.iter()
.copied()
.map(|col| {
rocksdb::ColumnFamilyDescriptor::new(
col_name(col),
Expand Down Expand Up @@ -103,34 +146,81 @@ impl RocksDB {
}

/// Returns mapping from [`DBCol`] to cf handle used with RocksDB calls.
fn get_cf_handles(db: &DB) -> enum_map::EnumMap<DBCol, std::ptr::NonNull<ColumnFamily>> {
///
/// The mapping is created for column families given in the `columns` list
/// only. All other columns will map to `None`.
///
/// ## Safety
///
/// This function is safe but using the returned mapping safely requires
/// that it does not outlive `db` and that `db` is not modified. The safety
/// relies on `db` returning stable mapping for column families.
fn get_cf_handles(
db: &DB,
columns: &[DBCol],
) -> enum_map::EnumMap<DBCol, Option<std::ptr::NonNull<ColumnFamily>>> {
let mut cf_handles = enum_map::EnumMap::default();
for col in DBCol::iter() {
for col in columns.iter().copied() {
let ptr = db
.cf_handle(&col_name(col))
.map_or(std::ptr::null(), |cf| cf as *const ColumnFamily);
cf_handles[col] = std::ptr::NonNull::new(ptr as *mut ColumnFamily);
.and_then(|cf| std::ptr::NonNull::new(cf as *const _ as *mut _))
.unwrap_or_else(|| panic!("Missing cf handle for {col}"));
cf_handles[col] = Some(ptr);
}
cf_handles.map(|col, ptr| {
ptr.unwrap_or_else(|| {
panic!("Missing cf handle for {col}");
})
})
cf_handles
}

/// Returns column family handler to use with RocsDB for given column.
fn cf_handle(&self, col: DBCol) -> &ColumnFamily {
let ptr = self.cf_handles[col];
// SAFETY: The pointers are valid so long as self.db is valid.
unsafe { ptr.as_ref() }
///
/// If the database has not been setup to access given column, panics if
/// debug assertions are enabled or returns an error otherwise.
///
/// ## Safety
///
/// This function is safe so long as `db` field has not been modified since
/// `cf_handles` mapping has been constructed. We technically should mark
/// this function unsafe but to improve ergonomy we didn’t. This is an
/// internal method so hopefully the implementation knows what it’s doing.
fn cf_handle(&self, col: DBCol) -> io::Result<&ColumnFamily> {
if let Some(ptr) = self.cf_handles[col] {
// SAFETY: The pointers are valid so long as self.db is valid.
Ok(unsafe { ptr.as_ref() })
} else if cfg!(debug_assertions) {
panic!("The database instance isn’t setup to access {col}");
} else {
Err(other_error(format!("{col}: no such column")))
}
}

/// Returns iterator over all column families and their handles.
///
/// This is kind of like iterating over all [`DBCol`] variants and calling
/// [`Self::cf_handle`] except this method takes care of properly filtering
/// out column families that the database instance isn’t setup to handle.
///
/// ## Safety
///
/// This function is safe so long as `db` field has not been modified since
/// `cf_handles` mapping has been constructed. We technically should mark
/// this function unsafe but to improve ergonomy we didn’t. This is an
/// internal method so hopefully the implementation knows what it’s doing.
fn cf_handles(&self) -> impl Iterator<Item = (DBCol, &ColumnFamily)> {
self.cf_handles.iter().filter_map(|(col, ptr)| {
if let Some(ptr) = *ptr {
// SAFETY: The pointers are valid so long as self.db is valid.
Some((col, unsafe { ptr.as_ref() }))
} else {
None
}
})
}

fn iter_raw_bytes_impl<'a>(
&'a self,
col: DBCol,
prefix: Option<&'a [u8]>,
) -> RocksDBIterator<'a> {
let cf_handle = self.cf_handle(col);
let cf_handle = self.cf_handle(col).unwrap();
let mut read_options = rocksdb_read_options();
let mode = if let Some(prefix) = prefix {
// prefix_same_as_start doesn’t do anything for us. It takes effect
Expand Down Expand Up @@ -185,7 +275,7 @@ impl Database for RocksDB {
let read_options = rocksdb_read_options();
let result = self
.db
.get_pinned_cf_opt(self.cf_handle(col), key, &read_options)
.get_pinned_cf_opt(self.cf_handle(col)?, key, &read_options)
.map_err(into_other)?
.map(DBSlice::from_rocksdb_slice);
timer.observe_duration();
Expand Down Expand Up @@ -218,24 +308,24 @@ impl Database for RocksDB {
for op in transaction.ops {
match op {
DBOp::Set { col, key, value } => {
batch.put_cf(self.cf_handle(col), key, value);
batch.put_cf(self.cf_handle(col)?, key, value);
}
DBOp::Insert { col, key, value } => {
if cfg!(debug_assertions) {
if let Ok(Some(old_value)) = self.get_raw_bytes(col, &key) {
super::assert_no_overwrite(col, &key, &value, &*old_value)
}
}
batch.put_cf(self.cf_handle(col), key, value);
batch.put_cf(self.cf_handle(col)?, key, value);
}
DBOp::UpdateRefcount { col, key, value } => {
batch.merge_cf(self.cf_handle(col), key, value);
batch.merge_cf(self.cf_handle(col)?, key, value);
}
DBOp::Delete { col, key } => {
batch.delete_cf(self.cf_handle(col), key);
batch.delete_cf(self.cf_handle(col)?, key);
}
DBOp::DeleteAll { col } => {
let cf_handle = self.cf_handle(col);
let cf_handle = self.cf_handle(col)?;
let opt_first = self.db.iterator_cf(cf_handle, IteratorMode::Start).next();
let opt_last = self.db.iterator_cf(cf_handle, IteratorMode::End).next();
assert_eq!(opt_first.is_some(), opt_last.is_some());
Expand All @@ -253,7 +343,7 @@ impl Database for RocksDB {
fn compact(&self) -> io::Result<()> {
let none = Option::<&[u8]>::None;
for col in DBCol::iter() {
self.db.compact_range_cf(self.cf_handle(col), none, none);
self.db.compact_range_cf(self.cf_handle(col)?, none, none);
}
Ok(())
}
Expand All @@ -262,7 +352,7 @@ impl Database for RocksDB {
// Need to iterator over all CFs because the normal `flush()` only
// flushes the default column family.
for col in DBCol::iter() {
self.db.flush_cf(self.cf_handle(col)).map_err(into_other)?;
self.db.flush_cf(self.cf_handle(col)?).map_err(into_other)?;
}
Ok(())
}
Expand Down Expand Up @@ -480,13 +570,13 @@ impl RocksDB {
/// Gets every int property in CF_STAT_NAMES for every column in DBCol.
fn get_cf_statistics(&self, result: &mut StoreStatistics) {
for stat_name in CF_STAT_NAMES {
let mut values = vec![];
for col in DBCol::iter() {
let size = self.db.property_int_value_cf(self.cf_handle(col), stat_name);
if let Ok(Some(value)) = size {
values.push(StatsValue::ColumnValue(col, value as i64));
}
}
let values = self
.cf_handles()
.filter_map(|(col, handle)| {
let property = self.db.property_int_value_cf(handle, stat_name);
Some(StatsValue::ColumnValue(col, property.ok()?? as i64))
})
.collect::<Vec<_>>();
if !values.is_empty() {
result.data.push((stat_name.to_string(), values));
}
Expand Down Expand Up @@ -696,7 +786,7 @@ mod tests {
// single_thread_rocksdb makes compact hang forever
if !cfg!(feature = "single_thread_rocksdb") {
let none = Option::<&[u8]>::None;
let cf = rocksdb.cf_handle(DBCol::State);
let cf = rocksdb.cf_handle(DBCol::State).unwrap();

// I’m not sure why but we need to run compaction twice. If we run
// it only once, we end up with an empty value for the key. This is
Expand Down