Skip to content

Commit

Permalink
store: refactor StoreOpener; add Store::opener and related changes (#…
Browse files Browse the repository at this point in the history
…7015)

Firstly, replace StoreOpener constructors with static methods on
the Store struct.  Specifically, add Store::opener (which acts just
like StoreOpener::new which is now private) and Store::tmp_opener
(which replaces StoreOpener::with_default_config).

Secondly, incorporate creation of the temporary directory into
the Store::tmp_opener method.  Previously, test code would create
a temporary directory and use StoreOpener::with_default_config.
Now, this is all rolled into single Store::tmp_opener method.

The motivation for that is to make it painfully obvious that
Store::tmp_opener is meant for tests only.  Anything that operates
on a non-temporary storage will have to use Store::opener and
provide it with a StoreConfig.

Lastly, change StoreOpener to resolve the path to the storage during
construction.  This replaces `home_dir` that the struct was holding
with a `path: PathBuf`.  This means that StoreOpener::get_path will
no longer resolve the path on each call and have its return value
ready to go.

Issue: #6857
  • Loading branch information
mina86 authored and nikurt committed Jun 13, 2022
1 parent e49ae5a commit a6cab5d
Show file tree
Hide file tree
Showing 17 changed files with 116 additions and 139 deletions.
6 changes: 3 additions & 3 deletions chain/indexer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ pub(crate) async fn start(
blocks_sink: mpsc::Sender<StreamerMessage>,
) {
info!(target: INDEXER, "Starting Streamer...");
let mut indexer_db_path =
near_store::StoreOpener::new(&indexer_config.home_dir, &store_config).get_path();
indexer_db_path.push("indexer");
let indexer_db_path = near_store::Store::opener(&indexer_config.home_dir, &store_config)
.get_path()
.join("indexer");

// TODO: implement proper error handling
let db = DB::open_default(indexer_db_path).unwrap();
Expand Down
37 changes: 18 additions & 19 deletions chain/network/src/peer_manager/peer_store_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use near_crypto::{KeyType, SecretKey};
use near_network_primitives::types::{Blacklist, BlacklistEntry};
use near_store::test_utils::create_test_store;
use near_store::StoreOpener;
use near_store::{Store, StoreOpener};
use std::collections::HashSet;
use std::net::{Ipv4Addr, SocketAddrV4};

Expand Down Expand Up @@ -29,32 +29,32 @@ fn gen_peer_info(port: u16) -> PeerInfo {

#[test]
fn ban_store() {
let tmp_dir = tempfile::Builder::new().prefix("_test_store_ban").tempdir().unwrap();
let (_tmp_dir, opener) = Store::tmp_opener();
let peer_info_a = gen_peer_info(0);
let peer_info_to_ban = gen_peer_info(1);
let boot_nodes = vec![peer_info_a, peer_info_to_ban.clone()];
{
let store = StoreOpener::with_default_config(tmp_dir.path()).open();
let store = opener.open();
let mut peer_store = PeerStore::new(store, &boot_nodes, Default::default()).unwrap();
assert_eq!(peer_store.healthy_peers(3).len(), 2);
peer_store.peer_ban(&peer_info_to_ban.id, ReasonForBan::Abusive).unwrap();
assert_eq!(peer_store.healthy_peers(3).len(), 1);
}
{
let store_new = StoreOpener::with_default_config(tmp_dir.path()).open();
let store_new = opener.open();
let peer_store_new = PeerStore::new(store_new, &boot_nodes, Default::default()).unwrap();
assert_eq!(peer_store_new.healthy_peers(3).len(), 1);
}
}

#[test]
fn test_unconnected_peer() {
let tmp_dir = tempfile::Builder::new().prefix("_test_store_ban").tempdir().unwrap();
let (_tmp_dir, opener) = Store::tmp_opener();
let peer_info_a = gen_peer_info(0);
let peer_info_to_ban = gen_peer_info(1);
let boot_nodes = vec![peer_info_a, peer_info_to_ban];
{
let store = StoreOpener::with_default_config(tmp_dir.path()).open();
let store = opener.open();
let peer_store = PeerStore::new(store, &boot_nodes, Default::default()).unwrap();
assert!(peer_store.unconnected_peer(|_| false).is_some());
assert!(peer_store.unconnected_peer(|_| true).is_none());
Expand Down Expand Up @@ -283,8 +283,7 @@ fn check_ignore_blacklisted_peers() {

#[test]
fn remove_blacklisted_peers_from_store() {
let tmp_dir =
tempfile::Builder::new().prefix("_remove_blacklisted_peers_from_store").tempdir().unwrap();
let (_tmp_dir, opener) = Store::tmp_opener();
let (peer_ids, peer_infos): (Vec<_>, Vec<_>) = (0..3)
.map(|i| {
let id = get_peer_id(format!("node{}", i));
Expand All @@ -295,24 +294,24 @@ fn remove_blacklisted_peers_from_store() {

// Add three peers.
{
let store = StoreOpener::with_default_config(tmp_dir.path()).open();
let store = opener.open();
let mut peer_store = PeerStore::new(store, &[], Default::default()).unwrap();
peer_store.add_indirect_peers(peer_infos.clone().into_iter()).unwrap();
}
assert_peers_in_store(tmp_dir.path(), &peer_ids);
assert_peers_in_store(&opener, &peer_ids);

// Blacklisted peers are removed from the store.
{
let store = StoreOpener::with_default_config(tmp_dir.path()).open();
let store = opener.open();
let blacklist: Blacklist =
[BlacklistEntry::from_addr(peer_infos[2].addr.unwrap())].into_iter().collect();
let _peer_store = PeerStore::new(store, &[], blacklist).unwrap();
}
assert_peers_in_store(tmp_dir.path(), &peer_ids[0..2]);
assert_peers_in_store(&opener, &peer_ids[0..2]);
}

fn assert_peers_in_store(store_path: &std::path::Path, expected: &[PeerId]) {
let store = StoreOpener::with_default_config(store_path).open();
fn assert_peers_in_store(opener: &StoreOpener, expected: &[PeerId]) {
let store = opener.open();
let stored_peers: HashSet<PeerId> = HashSet::from_iter(
store.iter(DBCol::Peers).map(|(key, _)| PeerId::try_from_slice(key.as_ref()).unwrap()),
);
Expand All @@ -336,7 +335,7 @@ fn assert_peers_in_cache(

#[test]
fn test_delete_peers() {
let tmp_dir = tempfile::Builder::new().prefix("_test_delete_peers").tempdir().unwrap();
let (_tmp_dir, opener) = Store::tmp_opener();
let (peer_ids, peer_infos): (Vec<_>, Vec<_>) = (0..3)
.map(|i| {
let id = get_peer_id(format!("node{}", i));
Expand All @@ -348,18 +347,18 @@ fn test_delete_peers() {
peer_infos.iter().map(|info| info.addr.unwrap().clone()).collect::<Vec<_>>();

{
let store = StoreOpener::with_default_config(tmp_dir.path()).open();
let store = opener.open();
let mut peer_store = PeerStore::new(store, &[], Default::default()).unwrap();
peer_store.add_indirect_peers(peer_infos.into_iter()).unwrap();
}
assert_peers_in_store(tmp_dir.path(), &peer_ids);
assert_peers_in_store(&opener, &peer_ids);

{
let store = StoreOpener::with_default_config(tmp_dir.path()).open();
let store = opener.open();
let mut peer_store = PeerStore::new(store, &[], Default::default()).unwrap();
assert_peers_in_cache(&peer_store, &peer_ids, &peer_addresses);
peer_store.delete_peers(&peer_ids).unwrap();
assert_peers_in_cache(&peer_store, &[], &[]);
}
assert_peers_in_store(tmp_dir.path(), &[]);
assert_peers_in_store(&opener, &[]);
}
2 changes: 1 addition & 1 deletion core/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum-map = "2.1.0"
rocksdb = { version = "0.18.0", default-features = false, features = ["snappy", "lz4", "zstd", "zlib"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tempfile = "3"
num_cpus = "1.11"
rand = "0.7"
strum = { version = "0.24", features = ["derive"] }
Expand All @@ -36,7 +37,6 @@ near-cache = { path = "../../utils/near-cache" }

[dev-dependencies]
assert_matches = "1.5.0"
tempfile = "3"
bencher = "0.1.5"
rand = "0.7"

Expand Down
6 changes: 3 additions & 3 deletions core/store/benches/store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ extern crate bencher;

use bencher::{black_box, Bencher};
use near_primitives::errors::StorageError;
use near_store::{DBCol, Store, StoreOpener};
use near_store::{DBCol, Store};
use std::time::{Duration, Instant};

/// Run a benchmark to generate `num_keys` keys, each of size `key_size`, then write then
Expand All @@ -16,8 +16,8 @@ fn benchmark_write_then_read_successful(
max_value_size: usize,
col: DBCol,
) {
let tmp_dir = tempfile::Builder::new().tempdir().unwrap();
let store = StoreOpener::with_default_config(tmp_dir.path()).open();
let (_tmp_dir, opener) = Store::tmp_opener();
let store = opener.open();
let keys = generate_keys(num_keys, key_size);
write_to_db(&store, &keys, max_value_size, col);

Expand Down
73 changes: 32 additions & 41 deletions core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ pub struct StoreConfig {
}

fn default_enable_statistics_export() -> bool {
StoreConfig::const_default().enable_statistics_export
StoreConfig::DEFAULT.enable_statistics_export
}

fn default_max_open_files() -> u32 {
StoreConfig::const_default().max_open_files
StoreConfig::DEFAULT.max_open_files
}

fn default_col_state_cache_size() -> bytesize::ByteSize {
StoreConfig::const_default().col_state_cache_size
StoreConfig::DEFAULT.col_state_cache_size
}

fn default_block_size() -> bytesize::ByteSize {
StoreConfig::const_default().block_size
StoreConfig::DEFAULT.block_size
}

impl StoreConfig {
Expand All @@ -81,16 +81,14 @@ impl StoreConfig {
/// then.
const DEFAULT_BLOCK_SIZE: bytesize::ByteSize = bytesize::ByteSize::kib(16);

const fn const_default() -> Self {
Self {
path: None,
enable_statistics: false,
enable_statistics_export: true,
max_open_files: Self::DEFAULT_MAX_OPEN_FILES,
col_state_cache_size: Self::DEFAULT_COL_STATE_CACHE_SIZE,
block_size: Self::DEFAULT_BLOCK_SIZE,
}
}
pub const DEFAULT: Self = Self {
path: None,
enable_statistics: false,
enable_statistics_export: true,
max_open_files: Self::DEFAULT_MAX_OPEN_FILES,
col_state_cache_size: Self::DEFAULT_COL_STATE_CACHE_SIZE,
block_size: Self::DEFAULT_BLOCK_SIZE,
};

/// Returns cache size for given column.
pub const fn col_cache_size(&self, col: crate::DBCol) -> bytesize::ByteSize {
Expand All @@ -103,7 +101,7 @@ impl StoreConfig {

impl Default for StoreConfig {
fn default() -> Self {
Self::const_default()
Self::DEFAULT
}
}

Expand All @@ -118,14 +116,16 @@ pub fn get_store_path(base_path: &std::path::Path) -> std::path::PathBuf {
/// Typical usage:
///
/// ```ignore
/// let store = StoreOpener::new(&near_config.config.store)
/// let store = Store::opener(&near_config.config.store)
/// .home(neard_home_dir)
/// .open();
/// ```
pub struct StoreOpener<'a> {
/// Near home directory; path to the database is resolved relative to this
/// directory.
home_dir: &'a std::path::Path,
/// Path to the database.
///
/// This is resolved from nearcore home directory and store configuration
/// passed to [`Store::opener`].
path: std::path::PathBuf,

/// Configuration as provided by the user.
config: &'a StoreConfig,
Expand All @@ -136,17 +136,10 @@ pub struct StoreOpener<'a> {

impl<'a> StoreOpener<'a> {
/// Initialises a new opener with given home directory and store config.
pub fn new(home_dir: &'a std::path::Path, config: &'a StoreConfig) -> Self {
Self { home_dir, config, read_only: false }
}

/// Initialises a new opener with given home directory and default config.
///
/// This is meant for tests only. Production code should always read store
/// configuration from a config file and use [`Self::new`] instead.
pub fn with_default_config(home_dir: &'a std::path::Path) -> Self {
static CONFIG: StoreConfig = StoreConfig::const_default();
Self::new(home_dir, &CONFIG)
pub(crate) fn new(home_dir: &std::path::Path, config: &'a StoreConfig) -> Self {
let path =
home_dir.join(config.path.as_deref().unwrap_or(std::path::Path::new(STORE_PATH)));
Self { path, config, read_only: false }
}

/// Configure whether the database should be opened in read-only mode.
Expand All @@ -168,15 +161,14 @@ impl<'a> StoreOpener<'a> {

/// Returns path to the underlying RocksDB database.
///
/// Does not check whether the database actually exists. It merely
/// constructs the path where the database would be if it existed.
pub fn get_path(&self) -> std::path::PathBuf {
self.home_dir.join(self.config.path.as_deref().unwrap_or(std::path::Path::new(STORE_PATH)))
/// Does not check whether the database actually exists.
pub fn get_path(&self) -> &std::path::Path {
&self.path
}

/// Returns version of the database; or `None` if it does not exist.
pub fn get_version_if_exists(&self) -> Result<Option<DbVersion>, crate::db::DBError> {
std::fs::canonicalize(self.get_path())
std::fs::canonicalize(&self.path)
.ok()
.map(|path| crate::RocksDB::get_version(&path))
.transpose()
Expand All @@ -187,16 +179,15 @@ impl<'a> StoreOpener<'a> {
/// Panics on failure.
// TODO(mina86): Change it to return Result.
pub fn open(&self) -> crate::Store {
let path = self.get_path();
if std::fs::canonicalize(&path).is_ok() {
tracing::info!(target: "near", path=%path.display(), "Opening RocksDB database");
if std::fs::canonicalize(&self.path).is_ok() {
tracing::info!(target: "near", path=%self.path.display(), "Opening RocksDB database");
} else if self.read_only {
tracing::error!(target: "near", path=%path.display(), "Database does not exist");
tracing::error!(target: "near", path=%self.path.display(), "Database does not exist");
panic!("Failed to open non-existent the database");
} else {
tracing::info!(target: "near", path=%path.display(), "Creating new RocksDB database");
tracing::info!(target: "near", path=%self.path.display(), "Creating new RocksDB database");
}
let db = crate::RocksDB::open(&path, &self.config, self.read_only)
let db = crate::RocksDB::open(&self.path, &self.config, self.read_only)
.expect("Failed to open the database");
crate::Store::new(std::sync::Arc::new(db))
}
Expand Down
14 changes: 7 additions & 7 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ fn parse_statistics(statistics: &str) -> Result<StoreStatistics, Box<dyn std::er
mod tests {
use crate::db::StatsValue::{Count, Percentile, Sum};
use crate::db::{parse_statistics, rocksdb_read_options, DBError, Database, RocksDB};
use crate::{DBCol, StoreConfig, StoreOpener, StoreStatistics};
use crate::{DBCol, Store, StoreConfig, StoreStatistics};

impl RocksDB {
#[cfg(not(feature = "single_thread_rocksdb"))]
Expand All @@ -805,15 +805,15 @@ mod tests {

#[test]
fn test_prewrite_check() {
let tmp_dir = tempfile::Builder::new().prefix("_test_prewrite_check").tempdir().unwrap();
let store = RocksDB::open(tmp_dir.path(), &StoreConfig::default(), false).unwrap();
let tmp_dir = tempfile::Builder::new().prefix("prewrite_check").tempdir().unwrap();
let store = RocksDB::open(tmp_dir.path(), &StoreConfig::DEFAULT, false).unwrap();
store.pre_write_check().unwrap()
}

#[test]
fn test_clear_column() {
let tmp_dir = tempfile::Builder::new().prefix("_test_clear_column").tempdir().unwrap();
let store = StoreOpener::with_default_config(tmp_dir.path()).open();
let (_tmp_dir, opener) = Store::tmp_opener();
let store = opener.open();
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
{
let mut store_update = store.store_update();
Expand All @@ -833,8 +833,8 @@ mod tests {

#[test]
fn rocksdb_merge_sanity() {
let tmp_dir = tempfile::Builder::new().prefix("_test_snapshot_sanity").tempdir().unwrap();
let store = StoreOpener::with_default_config(tmp_dir.path()).open();
let (_tmp_dir, opener) = Store::tmp_opener();
let store = opener.open();
let ptr = (&*store.storage) as *const (dyn Database + 'static);
let rocksdb = unsafe { &*(ptr as *const RocksDB) };
assert_eq!(store.get(DBCol::State, &[1]).unwrap(), None);
Expand Down
18 changes: 18 additions & 0 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ pub struct Store {
}

impl Store {
/// Initialises a new opener with given home directory and store config.
pub fn opener<'a>(home_dir: &std::path::Path, config: &'a StoreConfig) -> StoreOpener<'a> {
StoreOpener::new(home_dir, config)
}

/// Initialises a new opener for temporary store.
///
/// This is meant for tests only. It **panics** if a temporary directory
/// cannot be created.
///
/// Caller must hold the temporary directory returned as first element of
/// the tuple while the store is open.
pub fn tmp_opener() -> (tempfile::TempDir, StoreOpener<'static>) {
let dir = tempfile::tempdir().unwrap();
let opener = Self::opener(dir.path(), &StoreConfig::DEFAULT);
(dir, opener)
}

pub(crate) fn new(storage: Arc<dyn Database>) -> Store {
Store { storage }
}
Expand Down
2 changes: 1 addition & 1 deletion genesis-tools/genesis-populate/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ fn main() {
let near_config = load_config(home_dir, GenesisValidationMode::Full)
.unwrap_or_else(|e| panic!("Error loading config: {:#}", e));

let store = near_store::StoreOpener::new(home_dir, &near_config.config.store).open();
let store = near_store::Store::opener(home_dir, &near_config.config.store).open();
GenesisBuilder::from_config_and_store(home_dir, Arc::new(near_config.genesis), store)
.add_additional_accounts(additional_accounts_num)
.add_additional_accounts_contract(near_test_contracts::trivial_contract().to_vec())
Expand Down
3 changes: 2 additions & 1 deletion genesis-tools/genesis-populate/src/state_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ pub struct StateDump {

impl StateDump {
pub fn from_dir(dir: &Path, store_home_dir: &Path) -> Self {
let store = near_store::StoreOpener::with_default_config(store_home_dir).open();
let store =
near_store::Store::opener(store_home_dir, &near_store::StoreConfig::DEFAULT).open();
let state_file = dir.join(STATE_DUMP_FILE);
store
.load_from_file(DBCol::State, state_file.as_path())
Expand Down
Loading

0 comments on commit a6cab5d

Please sign in to comment.