Skip to content

Commit

Permalink
store: introduce cold storage (#7871)
Browse files Browse the repository at this point in the history
Add a `cold_store` Cargo feature which enables the option to configure
the node with cold storage.  At the moment, all this does is open the
cold database and doesn’t enable any other features.  The idea is that
this can now allow experimenting with code that needs access to the
cold storage.
  • Loading branch information
mina86 authored and nikurt committed Nov 7, 2022
1 parent b8fefd3 commit 01d2fde
Show file tree
Hide file tree
Showing 21 changed files with 381 additions and 82 deletions.
7 changes: 4 additions & 3 deletions chain/indexer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,10 @@ pub(crate) async fn start(
blocks_sink: mpsc::Sender<StreamerMessage>,
) {
info!(target: INDEXER, "Starting Streamer...");
let indexer_db_path = near_store::NodeStorage::opener(&indexer_config.home_dir, &store_config)
.path()
.join("indexer");
let indexer_db_path =
near_store::NodeStorage::opener(&indexer_config.home_dir, &store_config, None)
.path()
.join("indexer");

// TODO: implement proper error handling
let db = DB::open_default(indexer_db_path).unwrap();
Expand Down
1 change: 1 addition & 0 deletions core/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ no_cache = []
single_thread_rocksdb = [] # Deactivate RocksDB IO background threads
test_features = []
protocol_feature_flat_state = []
cold_store = []

nightly_protocol = []
nightly = [
Expand Down
13 changes: 9 additions & 4 deletions core/store/benches/store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ fn benchmark_write_then_read_successful(
let tmp_dir = tempfile::tempdir().unwrap();
// Use default StoreConfig rather than NodeStorage::test_opener so we’re using the
// same configuration as in production.
let store = NodeStorage::opener(tmp_dir.path(), &Default::default())
.open()
.unwrap()
.get_store(Temperature::Hot);
let store = NodeStorage::opener(
tmp_dir.path(),
&Default::default(),
#[cfg(feature = "cold_store")]
None,
)
.open()
.unwrap()
.get_store(Temperature::Hot);
let keys = generate_keys(num_keys, key_size);
write_to_db(&store, &keys, max_value_size, col);

Expand Down
3 changes: 3 additions & 0 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ use std::io;

use crate::DBCol;

#[cfg(feature = "cold_store")]
mod colddb;
pub mod refcount;
pub(crate) mod rocksdb;
mod slice;
mod testdb;

#[cfg(feature = "cold_store")]
pub use self::colddb::ColdDB;
pub use self::rocksdb::RocksDB;
pub use self::slice::DBSlice;
pub use self::testdb::TestDB;
Expand Down
20 changes: 13 additions & 7 deletions core/store/src/db/colddb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ use crate::DBCol;
/// Lastly, since no data is ever deleted from cold storage, trying to decrease
/// reference of a value count or delete data is ignored and if debug assertions
/// are enabled will cause a panic.
struct ColdDatabase<D: Database = crate::db::RocksDB>(D);
pub struct ColdDB<D: Database = crate::db::RocksDB>(D);

impl<D: Database> ColdDatabase<D> {
impl<D: Database> std::convert::From<D> for ColdDB<D> {
fn from(db: D) -> Self {
Self(db)
}
}

impl<D: Database> ColdDB<D> {
/// Returns raw bytes from the underlying storage.
///
/// Adjusts the key if necessary (see [`get_cold_key`]) and retrieves data
Expand All @@ -51,7 +57,7 @@ impl<D: Database> ColdDatabase<D> {
}
}

impl<D: Database> super::Database for ColdDatabase<D> {
impl<D: Database> super::Database for ColdDB<D> {
fn get_raw_bytes(&self, col: DBCol, key: &[u8]) -> std::io::Result<Option<DBSlice<'_>>> {
match self.get_impl(col, key) {
Ok(Some(value)) if col.is_rc() => {
Expand Down Expand Up @@ -262,8 +268,8 @@ mod test {
const VALUE: &[u8] = "FooBar".as_bytes();

/// Constructs test in-memory database.
fn create_test_db() -> ColdDatabase<crate::db::TestDB> {
ColdDatabase(crate::db::testdb::TestDB::default())
fn create_test_db() -> ColdDB<crate::db::TestDB> {
ColdDB(crate::db::testdb::TestDB::default())
}

fn set(col: DBCol, key: &[u8]) -> DBOp {
Expand Down Expand Up @@ -338,8 +344,8 @@ mod test {

let name = if is_raw { "raw " } else { "cold" };
let value = db.get_raw_bytes(col, &key).unwrap();
// When fetching reference counted column ColdDatabase adds
// reference count to it.
// When fetching reference counted column ColdDB adds reference
// count to it.
let value = pretty_value(value.as_deref(), col.is_rc() && !is_raw);
result.push(format!(" [{name}] get_raw_bytes → {value}"));

Expand Down
2 changes: 1 addition & 1 deletion core/store/src/db/rocksdb/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ fn test_snapshot_recovery() {
{
let mut config = opener.config().clone();
config.path = Some(path);
let opener = crate::NodeStorage::opener(tmpdir.path(), &config);
let opener = crate::NodeStorage::opener(tmpdir.path(), &config, None);
let store = opener.open().unwrap().get_store(crate::Temperature::Hot);
assert_eq!(Some(&b"value"[..]), store.get(COL, KEY).unwrap().as_deref());
}
Expand Down
72 changes: 61 additions & 11 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ pub use crate::opener::{StoreMigrator, StoreOpener, StoreOpenerError};
/// In the future, certain parts of the code may need to access hot or cold
/// storage. Specifically, querying an old block will require reading it from
/// the cold storage.
#[derive(Clone, Copy)]
pub enum Temperature {
Hot,
#[cfg(feature = "cold_store")]
Cold,
}

/// Node’s storage holding chain and all other necessary data.
Expand All @@ -68,7 +71,11 @@ pub enum Temperature {
/// to [`Store`] which will abstract access to only one of the temperatures of
/// the storage.
pub struct NodeStorage {
storage: Arc<dyn Database>,
hot_storage: Arc<dyn Database>,
#[cfg(feature = "cold_store")]
cold_storage: Option<Arc<crate::db::ColdDB>>,
#[cfg(not(feature = "cold_store"))]
cold_storage: Option<std::convert::Infallible>,
}

/// Node’s single storage source.
Expand All @@ -84,10 +91,22 @@ pub struct Store {
storage: Arc<dyn Database>,
}

// Those are temporary. While cold_store feature is stabilised, remove those
// type aliases and just use the type directly.
#[cfg(feature = "cold_store")]
pub type ColdConfig<'a> = Option<&'a StoreConfig>;
#[cfg(not(feature = "cold_store"))]
pub type ColdConfig<'a> = Option<std::convert::Infallible>;

impl NodeStorage {
/// Initialises a new opener with given home directory and store config.
pub fn opener<'a>(home_dir: &std::path::Path, config: &'a StoreConfig) -> StoreOpener<'a> {
StoreOpener::new(home_dir, config)
/// Initialises a new opener with given home directory and hot and cold
/// store config.
pub fn opener<'a>(
home_dir: &std::path::Path,
config: &'a StoreConfig,
cold_config: ColdConfig<'a>,
) -> StoreOpener<'a> {
StoreOpener::new(home_dir, config, cold_config)
}

/// Initialises an opener for a new temporary test store.
Expand All @@ -101,7 +120,7 @@ impl NodeStorage {
pub fn test_opener() -> (tempfile::TempDir, StoreOpener<'static>) {
static CONFIG: Lazy<StoreConfig> = Lazy::new(StoreConfig::test_config);
let dir = tempfile::tempdir().unwrap();
let opener = StoreOpener::new(dir.path(), &CONFIG);
let opener = StoreOpener::new(dir.path(), &CONFIG, None);
(dir, opener)
}

Expand All @@ -115,7 +134,22 @@ impl NodeStorage {
/// possibly [`crate::test_utils::create_test_store`] (depending whether you
/// need [`NodeStorage`] or [`Store`] object.
pub fn new(storage: Arc<dyn Database>) -> Self {
Self { storage }
Self { hot_storage: storage, cold_storage: None }
}

/// Constructs new object backed by given database.
fn from_rocksdb(
hot_storage: crate::db::RocksDB,
#[cfg(feature = "cold_store")] cold_storage: Option<crate::db::RocksDB>,
#[cfg(not(feature = "cold_store"))] cold_storage: Option<std::convert::Infallible>,
) -> Self {
Self {
hot_storage: Arc::new(hot_storage),
#[cfg(feature = "cold_store")]
cold_storage: cold_storage.map(|db| Arc::new(db.into())),
#[cfg(not(feature = "cold_store"))]
cold_storage: cold_storage.map(|_| unreachable!()),
}
}

/// Returns storage for given temperature.
Expand All @@ -132,7 +166,9 @@ impl NodeStorage {
/// cold.
pub fn get_store(&self, temp: Temperature) -> Store {
match temp {
Temperature::Hot => Store { storage: self.storage.clone() },
Temperature::Hot => Store { storage: self.hot_storage.clone() },
#[cfg(feature = "cold_store")]
Temperature::Cold => Store { storage: self.cold_storage.as_ref().unwrap().clone() },
}
}

Expand All @@ -151,9 +187,11 @@ impl NodeStorage {
/// well. For example, garbage collection only ever touches hot storage but
/// it should go through [`Store`] interface since data it manipulates
/// (e.g. blocks) are live in both databases.
pub fn get_inner(&self, temp: Temperature) -> &Arc<dyn Database> {
pub fn _get_inner(&self, temp: Temperature) -> &Arc<dyn Database> {
match temp {
Temperature::Hot => &self.storage,
Temperature::Hot => &self.hot_storage,
#[cfg(feature = "cold_store")]
Temperature::Cold => todo!(),
}
}

Expand All @@ -163,15 +201,27 @@ impl NodeStorage {
/// `Arc::clone`.
pub fn into_inner(self, temp: Temperature) -> Arc<dyn Database> {
match temp {
Temperature::Hot => self.storage,
Temperature::Hot => self.hot_storage,
#[cfg(feature = "cold_store")]
Temperature::Cold => self.cold_storage.unwrap(),
}
}

/// Returns whether the storage has a cold database.
pub fn has_cold(&self) -> bool {
self.cold_storage.is_some()
}

/// Reads database metadata and returns whether the storage is archival.
pub fn is_archive(&self) -> io::Result<bool> {
Ok(match metadata::DbMetadata::read(self.storage.as_ref())?.kind.unwrap() {
if self.cold_storage.is_some() {
return Ok(true);
}
Ok(match metadata::DbMetadata::read(self.hot_storage.as_ref())?.kind.unwrap() {
metadata::DbKind::RPC => false,
metadata::DbKind::Archive => true,
#[cfg(feature = "cold_store")]
metadata::DbKind::Hot | metadata::DbKind::Cold => unreachable!(),
})
}
}
Expand Down
37 changes: 31 additions & 6 deletions core/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,53 @@ pub enum DbKind {
/// The database is an archive database meaning that it is not garbage
/// collected and stores all chain data.
Archive,
#[cfg(feature = "cold_store")]
/// The database is Hot meaning that the node runs in archival mode with
/// a paired Cold database.
Hot,
#[cfg(feature = "cold_store")]
/// The database is Cold meaning that the node runs in archival mode with
/// a paired Hot database.
Cold,
}

pub(super) fn set_store_version(storage: &NodeStorage, version: DbVersion) -> std::io::Result<()> {
set_store_metadata(storage, DbMetadata { version, kind: None })
}

pub(super) fn set_store_metadata(
fn set_db_metadata(
storage: &NodeStorage,
temp: Temperature,
metadata: DbMetadata,
) -> std::io::Result<()> {
let version = metadata.version.to_string().into_bytes();
let kind = metadata.kind.map(|kind| <&str>::from(kind).as_bytes());
let mut store_update = storage.get_store(Temperature::Hot).store_update();
store_update.set(DBCol::DbVersion, VERSION_KEY, &version);
let mut store_update = storage.get_store(temp).store_update();
store_update.set(DBCol::DbVersion, VERSION_KEY, metadata.version.to_string().as_bytes());
if metadata.version >= DB_VERSION_WITH_KIND {
if let Some(kind) = kind {
#[allow(unused_mut)]
let mut kind = metadata.kind;
#[cfg(feature = "cold_store")]
if matches!(temp, Temperature::Cold) || storage.has_cold() {
kind = Some(if matches!(temp, Temperature::Hot) { DbKind::Hot } else { DbKind::Cold });
}
if let Some(kind) = kind.map(|kind| <&str>::from(kind).as_bytes()) {
store_update.set(DBCol::DbVersion, KIND_KEY, kind);
}
}
store_update.commit()
}

pub(super) fn set_store_metadata(
storage: &NodeStorage,
metadata: DbMetadata,
) -> std::io::Result<()> {
set_db_metadata(storage, Temperature::Hot, metadata)?;
#[cfg(feature = "cold_store")]
if storage.has_cold() {
set_db_metadata(storage, Temperature::Cold, metadata)?;
}
Ok(())
}

/// Metadata about a database.
#[derive(Clone, Copy)]
pub(super) struct DbMetadata {
Expand Down
Loading

0 comments on commit 01d2fde

Please sign in to comment.