Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: introduce cold storage #7871

Merged
merged 10 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions chain/indexer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,10 @@ pub(crate) async fn start(
blocks_sink: mpsc::Sender<StreamerMessage>,
) {
info!(target: INDEXER, "Starting Streamer...");
let indexer_db_path = near_store::NodeStorage::opener(&indexer_config.home_dir, &store_config)
.path()
.join("indexer");
let indexer_db_path =
near_store::NodeStorage::opener(&indexer_config.home_dir, &store_config, None)
.path()
.join("indexer");

// TODO: implement proper error handling
let db = DB::open_default(indexer_db_path).unwrap();
Expand Down
1 change: 1 addition & 0 deletions core/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ no_cache = []
single_thread_rocksdb = [] # Deactivate RocksDB IO background threads
test_features = []
protocol_feature_flat_state = []
cold_store = []

nightly_protocol = []
nightly = [
Expand Down
13 changes: 9 additions & 4 deletions core/store/benches/store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ fn benchmark_write_then_read_successful(
let tmp_dir = tempfile::tempdir().unwrap();
// Use default StoreConfig rather than NodeStorage::test_opener so we’re using the
// same configuration as in production.
let store = NodeStorage::opener(tmp_dir.path(), &Default::default())
.open()
.unwrap()
.get_store(Temperature::Hot);
let store = NodeStorage::opener(
tmp_dir.path(),
&Default::default(),
#[cfg(feature = "cold_store")]
None,
)
.open()
.unwrap()
.get_store(Temperature::Hot);
let keys = generate_keys(num_keys, key_size);
write_to_db(&store, &keys, max_value_size, col);

Expand Down
3 changes: 3 additions & 0 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ use std::io;

use crate::DBCol;

#[cfg(feature = "cold_store")]
mod colddb;
pub mod refcount;
pub(crate) mod rocksdb;
mod slice;
mod testdb;

#[cfg(feature = "cold_store")]
pub use self::colddb::ColdDB;
pub use self::rocksdb::RocksDB;
pub use self::slice::DBSlice;
pub use self::testdb::TestDB;
Expand Down
20 changes: 13 additions & 7 deletions core/store/src/db/colddb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ use crate::DBCol;
/// Lastly, since no data is ever deleted from cold storage, trying to decrease
/// reference of a value count or delete data is ignored and if debug assertions
/// are enabled will cause a panic.
struct ColdDatabase<D: Database = crate::db::RocksDB>(D);
pub struct ColdDB<D: Database = crate::db::RocksDB>(D);

impl<D: Database> ColdDatabase<D> {
impl<D: Database> std::convert::From<D> for ColdDB<D> {
fn from(db: D) -> Self {
Self(db)
}
}

impl<D: Database> ColdDB<D> {
/// Returns raw bytes from the underlying storage.
///
/// Adjusts the key if necessary (see [`get_cold_key`]) and retrieves data
Expand All @@ -51,7 +57,7 @@ impl<D: Database> ColdDatabase<D> {
}
}

impl<D: Database> super::Database for ColdDatabase<D> {
impl<D: Database> super::Database for ColdDB<D> {
fn get_raw_bytes(&self, col: DBCol, key: &[u8]) -> std::io::Result<Option<DBSlice<'_>>> {
match self.get_impl(col, key) {
Ok(Some(value)) if col.is_rc() => {
Expand Down Expand Up @@ -262,8 +268,8 @@ mod test {
const VALUE: &[u8] = "FooBar".as_bytes();

/// Constructs test in-memory database.
fn create_test_db() -> ColdDatabase<crate::db::TestDB> {
ColdDatabase(crate::db::testdb::TestDB::default())
fn create_test_db() -> ColdDB<crate::db::TestDB> {
ColdDB(crate::db::testdb::TestDB::default())
}

fn set(col: DBCol, key: &[u8]) -> DBOp {
Expand Down Expand Up @@ -338,8 +344,8 @@ mod test {

let name = if is_raw { "raw " } else { "cold" };
let value = db.get_raw_bytes(col, &key).unwrap();
// When fetching reference counted column ColdDatabase adds
// reference count to it.
// When fetching reference counted column ColdDB adds reference
// count to it.
let value = pretty_value(value.as_deref(), col.is_rc() && !is_raw);
result.push(format!(" [{name}] get_raw_bytes → {value}"));

Expand Down
2 changes: 1 addition & 1 deletion core/store/src/db/rocksdb/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ fn test_snapshot_recovery() {
{
let mut config = opener.config().clone();
config.path = Some(path);
let opener = crate::NodeStorage::opener(tmpdir.path(), &config);
let opener = crate::NodeStorage::opener(tmpdir.path(), &config, None);
let store = opener.open().unwrap().get_store(crate::Temperature::Hot);
assert_eq!(Some(&b"value"[..]), store.get(COL, KEY).unwrap().as_deref());
}
Expand Down
72 changes: 61 additions & 11 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ pub use crate::opener::{StoreMigrator, StoreOpener, StoreOpenerError};
/// In the future, certain parts of the code may need to access hot or cold
/// storage. Specifically, querying an old block will require reading it from
/// the cold storage.
#[derive(Clone, Copy)]
pub enum Temperature {
Hot,
#[cfg(feature = "cold_store")]
Cold,
}

/// Node’s storage holding chain and all other necessary data.
Expand All @@ -68,7 +71,11 @@ pub enum Temperature {
/// to [`Store`] which will abstract access to only one of the temperatures of
/// the storage.
pub struct NodeStorage {
storage: Arc<dyn Database>,
hot_storage: Arc<dyn Database>,
#[cfg(feature = "cold_store")]
cold_storage: Option<Arc<crate::db::ColdDB>>,
#[cfg(not(feature = "cold_store"))]
cold_storage: Option<std::convert::Infallible>,
}

/// Node’s single storage source.
Expand All @@ -84,10 +91,22 @@ pub struct Store {
storage: Arc<dyn Database>,
}

// Those are temporary. While cold_store feature is stabilised, remove those
// type aliases and just use the type directly.
#[cfg(feature = "cold_store")]
pub type ColdConfig<'a> = Option<&'a StoreConfig>;
#[cfg(not(feature = "cold_store"))]
pub type ColdConfig<'a> = Option<std::convert::Infallible>;

impl NodeStorage {
/// Initialises a new opener with given home directory and store config.
pub fn opener<'a>(home_dir: &std::path::Path, config: &'a StoreConfig) -> StoreOpener<'a> {
StoreOpener::new(home_dir, config)
/// Initialises a new opener with given home directory and hot and cold
/// store config.
pub fn opener<'a>(
home_dir: &std::path::Path,
config: &'a StoreConfig,
cold_config: ColdConfig<'a>,
) -> StoreOpener<'a> {
StoreOpener::new(home_dir, config, cold_config)
}

/// Initialises an opener for a new temporary test store.
Expand All @@ -101,7 +120,7 @@ impl NodeStorage {
pub fn test_opener() -> (tempfile::TempDir, StoreOpener<'static>) {
static CONFIG: Lazy<StoreConfig> = Lazy::new(StoreConfig::test_config);
let dir = tempfile::tempdir().unwrap();
let opener = StoreOpener::new(dir.path(), &CONFIG);
let opener = StoreOpener::new(dir.path(), &CONFIG, None);
(dir, opener)
}

Expand All @@ -115,7 +134,22 @@ impl NodeStorage {
/// possibly [`crate::test_utils::create_test_store`] (depending whether you
/// need [`NodeStorage`] or [`Store`] object.
pub fn new(storage: Arc<dyn Database>) -> Self {
Self { storage }
Self { hot_storage: storage, cold_storage: None }
}

/// Constructs new object backed by given database.
fn from_rocksdb(
hot_storage: crate::db::RocksDB,
#[cfg(feature = "cold_store")] cold_storage: Option<crate::db::RocksDB>,
#[cfg(not(feature = "cold_store"))] cold_storage: Option<std::convert::Infallible>,
) -> Self {
Self {
hot_storage: Arc::new(hot_storage),
#[cfg(feature = "cold_store")]
cold_storage: cold_storage.map(|db| Arc::new(db.into())),
#[cfg(not(feature = "cold_store"))]
cold_storage: cold_storage.map(|_| unreachable!()),
}
}

/// Returns storage for given temperature.
Expand All @@ -132,7 +166,9 @@ impl NodeStorage {
/// cold.
pub fn get_store(&self, temp: Temperature) -> Store {
match temp {
Temperature::Hot => Store { storage: self.storage.clone() },
Temperature::Hot => Store { storage: self.hot_storage.clone() },
#[cfg(feature = "cold_store")]
Temperature::Cold => Store { storage: self.cold_storage.as_ref().unwrap().clone() },
}
}

Expand All @@ -151,9 +187,11 @@ impl NodeStorage {
/// well. For example, garbage collection only ever touches hot storage but
/// it should go through [`Store`] interface since data it manipulates
/// (e.g. blocks) are live in both databases.
pub fn get_inner(&self, temp: Temperature) -> &Arc<dyn Database> {
pub fn _get_inner(&self, temp: Temperature) -> &Arc<dyn Database> {
match temp {
Temperature::Hot => &self.storage,
Temperature::Hot => &self.hot_storage,
#[cfg(feature = "cold_store")]
Temperature::Cold => todo!(),
}
}

Expand All @@ -163,15 +201,27 @@ impl NodeStorage {
/// `Arc::clone`.
pub fn into_inner(self, temp: Temperature) -> Arc<dyn Database> {
match temp {
Temperature::Hot => self.storage,
Temperature::Hot => self.hot_storage,
#[cfg(feature = "cold_store")]
Temperature::Cold => self.cold_storage.unwrap(),
}
}

/// Returns whether the storage has a cold database.
pub fn has_cold(&self) -> bool {
self.cold_storage.is_some()
}

/// Reads database metadata and returns whether the storage is archival.
pub fn is_archive(&self) -> io::Result<bool> {
Ok(match metadata::DbMetadata::read(self.storage.as_ref())?.kind.unwrap() {
if self.cold_storage.is_some() {
return Ok(true);
}
Ok(match metadata::DbMetadata::read(self.hot_storage.as_ref())?.kind.unwrap() {
metadata::DbKind::RPC => false,
metadata::DbKind::Archive => true,
#[cfg(feature = "cold_store")]
metadata::DbKind::Hot | metadata::DbKind::Cold => unreachable!(),
})
}
}
Expand Down
37 changes: 31 additions & 6 deletions core/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,53 @@ pub enum DbKind {
/// The database is an archive database meaning that it is not garbage
/// collected and stores all chain data.
Archive,
#[cfg(feature = "cold_store")]
/// The database is Hot meaning that the node runs in archival mode with
/// a paired Cold database.
Hot,
#[cfg(feature = "cold_store")]
/// The database is Cold meaning that the node runs in archival mode with
/// a paired Hot database.
Cold,
}

pub(super) fn set_store_version(storage: &NodeStorage, version: DbVersion) -> std::io::Result<()> {
set_store_metadata(storage, DbMetadata { version, kind: None })
}

pub(super) fn set_store_metadata(
fn set_db_metadata(
storage: &NodeStorage,
temp: Temperature,
metadata: DbMetadata,
) -> std::io::Result<()> {
let version = metadata.version.to_string().into_bytes();
let kind = metadata.kind.map(|kind| <&str>::from(kind).as_bytes());
let mut store_update = storage.get_store(Temperature::Hot).store_update();
store_update.set(DBCol::DbVersion, VERSION_KEY, &version);
let mut store_update = storage.get_store(temp).store_update();
store_update.set(DBCol::DbVersion, VERSION_KEY, metadata.version.to_string().as_bytes());
if metadata.version >= DB_VERSION_WITH_KIND {
if let Some(kind) = kind {
#[allow(unused_mut)]
let mut kind = metadata.kind;
#[cfg(feature = "cold_store")]
if matches!(temp, Temperature::Cold) || storage.has_cold() {
kind = Some(if matches!(temp, Temperature::Hot) { DbKind::Hot } else { DbKind::Cold });
}
if let Some(kind) = kind.map(|kind| <&str>::from(kind).as_bytes()) {
store_update.set(DBCol::DbVersion, KIND_KEY, kind);
}
}
store_update.commit()
}

pub(super) fn set_store_metadata(
storage: &NodeStorage,
metadata: DbMetadata,
) -> std::io::Result<()> {
set_db_metadata(storage, Temperature::Hot, metadata)?;
#[cfg(feature = "cold_store")]
if storage.has_cold() {
set_db_metadata(storage, Temperature::Cold, metadata)?;
}
Ok(())
}

/// Metadata about a database.
#[derive(Clone, Copy)]
pub(super) struct DbMetadata {
Expand Down
Loading