Skip to content

Commit

Permalink
store: allow configuring RocksDB with support for subset of columns (#…
Browse files Browse the repository at this point in the history
…7544)

With cold storage approaching, it’s gonig to be necessary to be able to
open RocksDB instance with only a subset of defined columns.  Change how
RocksDB deals with column family handles.  Specifically, get_cf_handles
now accepts an iterator over DBCol values as argument and cf_handle will
return an error if trying to get a handle for a column that wasn’t set
up.
  • Loading branch information
mina86 authored Sep 5, 2022
1 parent ccd88d6 commit 36e3162
Showing 1 changed file with 130 additions and 40 deletions.
170 changes: 130 additions & 40 deletions core/store/src/db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct RocksDB {
/// Rather than accessing this field directly, use [`RocksDB::cf_handle`]
/// method instead. It returns `&ColumnFamily` which is what you usually
/// want.
cf_handles: enum_map::EnumMap<DBCol, std::ptr::NonNull<ColumnFamily>>,
cf_handles: enum_map::EnumMap<DBCol, Option<std::ptr::NonNull<ColumnFamily>>>,

check_free_space_counter: std::sync::atomic::AtomicU16,
check_free_space_interval: u16,
Expand All @@ -49,14 +49,50 @@ unsafe impl Send for RocksDB {}
unsafe impl Sync for RocksDB {}

impl RocksDB {
/// Opens the database either in read only or in read/write mode depending
/// on the `mode` parameter specified in the store_config.
pub fn open(path: &Path, store_config: &StoreConfig, mode: Mode) -> io::Result<RocksDB> {
/// Opens the database.
///
/// `path` specifies location of the database. It’s assumed that it has
/// been resolved based on configuration in `store_config` and thus path
/// configuration in `store_config` is ignored.
///
/// `store_config` specifies other storage configuration such open files
/// limit or whether to enable collection of statistics.
///
/// `mode` specifies whether to open the database in read/write or read-only
/// mode. In the latter case, the database will not be created if it
/// doesn’t exist nor any migrations will be performed if the database has
/// database version different than expected.
pub fn open(path: &Path, store_config: &StoreConfig, mode: Mode) -> io::Result<Self> {
let columns: Vec<DBCol> = DBCol::iter().collect();
Self::open_with_columns(path, store_config, mode, &columns)
}

/// Opens the database with given set of column families configured.
///
/// With cold storage, we will need to be able to configure the database
/// with only a subset of columns. The `columns` argument specifies which
/// columns to configure in the database.
///
/// Note that RocksDB is weird. It’s not possible to open database in
/// read/write mode without specifying all the column families existing in
/// the database. On the other hand, it’s not possible to open database in
/// read-only mode while specifying column families which don’t exist.
///
/// Furthermore, note that when opening in read/write mode, we configure
/// RocksDB to create missing columns.
///
/// With all that, it’s actually quite messy if at some point we’ll end up
/// opening cold storage as hot since it’ll create all the missing columns.
fn open_with_columns(
path: &Path,
store_config: &StoreConfig,
mode: Mode,
columns: &[DBCol],
) -> io::Result<Self> {
let counter = instance_tracker::InstanceTracker::try_new(store_config.max_open_files)
.map_err(other_error)?;
let (db, db_opt) = Self::open_db(path, store_config, mode)?;
let cf_handles = Self::get_cf_handles(&db);

let (db, db_opt) = Self::open_db(path, store_config, mode, columns)?;
let cf_handles = Self::get_cf_handles(&db, columns);
Ok(Self {
db,
db_opt,
Expand All @@ -68,10 +104,17 @@ impl RocksDB {
})
}

/// Opens the database with all column families configured.
fn open_db(path: &Path, store_config: &StoreConfig, mode: Mode) -> io::Result<(DB, Options)> {
/// Opens the database with given column families configured.
fn open_db(
path: &Path,
store_config: &StoreConfig,
mode: Mode,
columns: &[DBCol],
) -> io::Result<(DB, Options)> {
let options = rocksdb_options(store_config, mode);
let cf_descriptors = DBCol::iter()
let cf_descriptors = columns
.iter()
.copied()
.map(|col| {
rocksdb::ColumnFamilyDescriptor::new(
col_name(col),
Expand Down Expand Up @@ -103,34 +146,81 @@ impl RocksDB {
}

/// Returns mapping from [`DBCol`] to cf handle used with RocksDB calls.
fn get_cf_handles(db: &DB) -> enum_map::EnumMap<DBCol, std::ptr::NonNull<ColumnFamily>> {
///
/// The mapping is created for column families given in the `columns` list
/// only. All other columns will map to `None`.
///
/// ## Safety
///
/// This function is safe but using the returned mapping safely requires
/// that it does not outlive `db` and that `db` is not modified. The safety
/// relies on `db` returning stable mapping for column families.
fn get_cf_handles(
db: &DB,
columns: &[DBCol],
) -> enum_map::EnumMap<DBCol, Option<std::ptr::NonNull<ColumnFamily>>> {
let mut cf_handles = enum_map::EnumMap::default();
for col in DBCol::iter() {
for col in columns.iter().copied() {
let ptr = db
.cf_handle(&col_name(col))
.map_or(std::ptr::null(), |cf| cf as *const ColumnFamily);
cf_handles[col] = std::ptr::NonNull::new(ptr as *mut ColumnFamily);
.and_then(|cf| std::ptr::NonNull::new(cf as *const _ as *mut _))
.unwrap_or_else(|| panic!("Missing cf handle for {col}"));
cf_handles[col] = Some(ptr);
}
cf_handles.map(|col, ptr| {
ptr.unwrap_or_else(|| {
panic!("Missing cf handle for {col}");
})
})
cf_handles
}

/// Returns column family handler to use with RocsDB for given column.
fn cf_handle(&self, col: DBCol) -> &ColumnFamily {
let ptr = self.cf_handles[col];
// SAFETY: The pointers are valid so long as self.db is valid.
unsafe { ptr.as_ref() }
///
/// If the database has not been setup to access given column, panics if
/// debug assertions are enabled or returns an error otherwise.
///
/// ## Safety
///
/// This function is safe so long as `db` field has not been modified since
/// `cf_handles` mapping has been constructed. We technically should mark
/// this function unsafe but to improve ergonomy we didn’t. This is an
/// internal method so hopefully the implementation knows what it’s doing.
fn cf_handle(&self, col: DBCol) -> io::Result<&ColumnFamily> {
if let Some(ptr) = self.cf_handles[col] {
// SAFETY: The pointers are valid so long as self.db is valid.
Ok(unsafe { ptr.as_ref() })
} else if cfg!(debug_assertions) {
panic!("The database instance isn’t setup to access {col}");
} else {
Err(other_error(format!("{col}: no such column")))
}
}

/// Returns iterator over all column families and their handles.
///
/// This is kind of like iterating over all [`DBCol`] variants and calling
/// [`Self::cf_handle`] except this method takes care of properly filtering
/// out column families that the database instance isn’t setup to handle.
///
/// ## Safety
///
/// This function is safe so long as `db` field has not been modified since
/// `cf_handles` mapping has been constructed. We technically should mark
/// this function unsafe but to improve ergonomy we didn’t. This is an
/// internal method so hopefully the implementation knows what it’s doing.
fn cf_handles(&self) -> impl Iterator<Item = (DBCol, &ColumnFamily)> {
self.cf_handles.iter().filter_map(|(col, ptr)| {
if let Some(ptr) = *ptr {
// SAFETY: The pointers are valid so long as self.db is valid.
Some((col, unsafe { ptr.as_ref() }))
} else {
None
}
})
}

fn iter_raw_bytes_impl<'a>(
&'a self,
col: DBCol,
prefix: Option<&'a [u8]>,
) -> RocksDBIterator<'a> {
let cf_handle = self.cf_handle(col);
let cf_handle = self.cf_handle(col).unwrap();
let mut read_options = rocksdb_read_options();
let mode = if let Some(prefix) = prefix {
// prefix_same_as_start doesn’t do anything for us. It takes effect
Expand Down Expand Up @@ -185,7 +275,7 @@ impl Database for RocksDB {
let read_options = rocksdb_read_options();
let result = self
.db
.get_pinned_cf_opt(self.cf_handle(col), key, &read_options)
.get_pinned_cf_opt(self.cf_handle(col)?, key, &read_options)
.map_err(into_other)?
.map(DBSlice::from_rocksdb_slice);
timer.observe_duration();
Expand Down Expand Up @@ -218,24 +308,24 @@ impl Database for RocksDB {
for op in transaction.ops {
match op {
DBOp::Set { col, key, value } => {
batch.put_cf(self.cf_handle(col), key, value);
batch.put_cf(self.cf_handle(col)?, key, value);
}
DBOp::Insert { col, key, value } => {
if cfg!(debug_assertions) {
if let Ok(Some(old_value)) = self.get_raw_bytes(col, &key) {
super::assert_no_overwrite(col, &key, &value, &*old_value)
}
}
batch.put_cf(self.cf_handle(col), key, value);
batch.put_cf(self.cf_handle(col)?, key, value);
}
DBOp::UpdateRefcount { col, key, value } => {
batch.merge_cf(self.cf_handle(col), key, value);
batch.merge_cf(self.cf_handle(col)?, key, value);
}
DBOp::Delete { col, key } => {
batch.delete_cf(self.cf_handle(col), key);
batch.delete_cf(self.cf_handle(col)?, key);
}
DBOp::DeleteAll { col } => {
let cf_handle = self.cf_handle(col);
let cf_handle = self.cf_handle(col)?;
let opt_first = self.db.iterator_cf(cf_handle, IteratorMode::Start).next();
let opt_last = self.db.iterator_cf(cf_handle, IteratorMode::End).next();
assert_eq!(opt_first.is_some(), opt_last.is_some());
Expand All @@ -253,7 +343,7 @@ impl Database for RocksDB {
fn compact(&self) -> io::Result<()> {
let none = Option::<&[u8]>::None;
for col in DBCol::iter() {
self.db.compact_range_cf(self.cf_handle(col), none, none);
self.db.compact_range_cf(self.cf_handle(col)?, none, none);
}
Ok(())
}
Expand All @@ -262,7 +352,7 @@ impl Database for RocksDB {
// Need to iterator over all CFs because the normal `flush()` only
// flushes the default column family.
for col in DBCol::iter() {
self.db.flush_cf(self.cf_handle(col)).map_err(into_other)?;
self.db.flush_cf(self.cf_handle(col)?).map_err(into_other)?;
}
Ok(())
}
Expand Down Expand Up @@ -480,13 +570,13 @@ impl RocksDB {
/// Gets every int property in CF_STAT_NAMES for every column in DBCol.
fn get_cf_statistics(&self, result: &mut StoreStatistics) {
for stat_name in CF_STAT_NAMES {
let mut values = vec![];
for col in DBCol::iter() {
let size = self.db.property_int_value_cf(self.cf_handle(col), stat_name);
if let Ok(Some(value)) = size {
values.push(StatsValue::ColumnValue(col, value as i64));
}
}
let values = self
.cf_handles()
.filter_map(|(col, handle)| {
let property = self.db.property_int_value_cf(handle, stat_name);
Some(StatsValue::ColumnValue(col, property.ok()?? as i64))
})
.collect::<Vec<_>>();
if !values.is_empty() {
result.data.push((stat_name.to_string(), values));
}
Expand Down Expand Up @@ -696,7 +786,7 @@ mod tests {
// single_thread_rocksdb makes compact hang forever
if !cfg!(feature = "single_thread_rocksdb") {
let none = Option::<&[u8]>::None;
let cf = rocksdb.cf_handle(DBCol::State);
let cf = rocksdb.cf_handle(DBCol::State).unwrap();

// I’m not sure why but we need to run compaction twice. If we run
// it only once, we end up with an empty value for the key. This is
Expand Down

0 comments on commit 36e3162

Please sign in to comment.