Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: adding State to cold columns #7926

Merged
merged 8 commits into from
Oct 26, 2022
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::columns::DBKeyType;
use crate::refcount::add_positive_refcount;
use crate::{DBCol, DBTransaction, Database, Store};
use crate::trie::TrieRefcountChange;
use crate::{DBCol, DBTransaction, Database, Store, TrieChanges};

use borsh::BorshDeserialize;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::types::BlockHeight;
use std::collections::HashMap;
use std::io;
@@ -38,13 +40,14 @@ struct StoreWithCache<'a> {
pub fn update_cold_db(
cold_db: &dyn Database,
hot_store: &Store,
shard_layout: &ShardLayout,
height: &BlockHeight,
) -> io::Result<()> {
let _span = tracing::debug_span!(target: "store", "update cold db", height = height);

let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() };

let key_type_to_keys = get_keys_from_store(&mut store_with_cache, height)?;
let key_type_to_keys = get_keys_from_store(&mut store_with_cache, shard_layout, height)?;
for col in DBCol::iter() {
if col.is_cold() {
copy_from_store(
@@ -116,6 +119,10 @@ pub fn test_cold_genesis_update(cold_db: &dyn Database, hot_store: &Store) -> io
Ok(())
}

pub fn test_get_store_reads(column: DBCol) -> u64 {
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[<&str>::from(column)]).get()
}

/// Returns HashMap from DBKeyType to possible keys of that type for provided height.
/// Only constructs keys for key types that are used in cold columns.
/// The goal is to capture all changes to db made during production of the block at provided height.
@@ -124,6 +131,7 @@ pub fn test_cold_genesis_update(cold_db: &dyn Database, hot_store: &Store) -> io
/// But for TransactionHash, for example, it is all of the tx hashes in that block.
fn get_keys_from_store(
store: &mut StoreWithCache,
shard_layout: &ShardLayout,
height: &BlockHeight,
) -> io::Result<HashMap<DBKeyType, Vec<StoreKey>>> {
let mut key_type_to_keys = HashMap::new();
@@ -136,6 +144,36 @@ fn get_keys_from_store(
key_type,
match key_type {
DBKeyType::BlockHash => vec![block_hash_key.clone()],
DBKeyType::ShardUId => shard_layout
.get_shard_uids()
.iter()
.map(|uid| uid.to_bytes().to_vec())
.collect(),
DBKeyType::TrieNodeOrValueHash => {
let mut keys = vec![];
for shard_uid in shard_layout.get_shard_uids() {
let shard_uid_key = shard_uid.to_bytes();

debug_assert_eq!(
DBCol::TrieChanges.key_type(),
&[DBKeyType::BlockHash, DBKeyType::ShardUId]
);
let trie_changes_option: Option<TrieChanges> =
store
.get_ser(
DBCol::TrieChanges,
&join_two_keys(&block_hash_key, &shard_uid_key),
)?;

if let Some(trie_changes) = trie_changes_option {
for op in trie_changes.insertions() {
store.insert_state_to_cache_from_op(op, &shard_uid_key);
keys.push(op.hash().as_bytes().to_vec());
}
}
}
keys
}
_ => {
vec![]
}
@@ -146,6 +184,10 @@ fn get_keys_from_store(
Ok(key_type_to_keys)
}

pub fn join_two_keys(prefix_key: &[u8], suffix_key: &[u8]) -> StoreKey {
[prefix_key, suffix_key].concat()
}

/// Returns all possible keys for a column with key represented by a specific sequence of key types.
/// `key_type_to_value` -- result of `get_keys_from_store`, mapping from KeyType to all possible keys of that type.
/// `key_types` -- description of a final key, what sequence of key types forms a key, result of `DBCol::key_type`.
@@ -179,9 +221,7 @@ fn combine_keys_with_stop(
let mut result_keys = vec![];
for prefix_key in &all_smaller_keys {
for suffix_key in &key_type_to_keys[last_kt] {
let mut new_key = prefix_key.clone();
new_key.extend(suffix_key);
result_keys.push(new_key);
result_keys.push(join_two_keys(prefix_key, suffix_key));
}
}
result_keys
@@ -202,6 +242,7 @@ where
impl StoreWithCache<'_> {
pub fn get(&mut self, column: DBCol, key: &[u8]) -> io::Result<StoreValue> {
if !self.cache.contains_key(&(column, key.to_vec())) {
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[<&str>::from(column)]).inc();
self.cache.insert(
(column.clone(), key.to_vec()),
self.store.get(column, key)?.map(|x| x.as_slice().to_vec()),
@@ -232,6 +273,21 @@ impl StoreWithCache<'_> {
) -> io::Result<T> {
option_to_not_found(self.get_ser(column, key), format_args!("{:?}: {:?}", column, key))
}

pub fn insert_state_to_cache_from_op(
&mut self,
op: &TrieRefcountChange,
shard_uid_key: &[u8],
) {
debug_assert_eq!(
DBCol::State.key_type(),
&[DBKeyType::ShardUId, DBKeyType::TrieNodeOrValueHash]
);
self.cache.insert(
(DBCol::State, join_two_keys(shard_uid_key, op.hash().as_bytes())),
Some(op.payload().to_vec()),
);
}
}

#[cfg(test)]
2 changes: 1 addition & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
@@ -368,7 +368,7 @@ impl DBCol {
/// Whether this column should be copied to the cold storage.
pub const fn is_cold(&self) -> bool {
match self {
DBCol::Block => true,
DBCol::Block | DBCol::State => true,
_ => false,
}
}
9 changes: 9 additions & 0 deletions core/store/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -205,3 +205,12 @@ pub static PREFETCH_STAGED_SLOTS: Lazy<IntGaugeVec> = Lazy::new(|| {
)
.unwrap()
});
#[cfg(feature = "cold_store")]
pub static COLD_MIGRATION_READS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_cold_migration_reads",
"Number of get calls to hot store made for every column during copying data to cold storage.",
&["col"],
)
.unwrap()
});
14 changes: 14 additions & 0 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
@@ -498,6 +498,16 @@ pub struct TrieRefcountChange {
rc: std::num::NonZeroU32,
}

impl TrieRefcountChange {
pub fn hash(&self) -> &CryptoHash {
&self.trie_node_or_value_hash
}

pub fn payload(&self) -> &[u8] {
self.trie_node_or_value.as_slice()
}
}

///
/// TrieChanges stores delta for refcount.
/// Multiple versions of the state work the following way:
@@ -533,6 +543,10 @@ impl TrieChanges {
pub fn empty(old_root: StateRoot) -> Self {
TrieChanges { old_root, new_root: old_root, insertions: vec![], deletions: vec![] }
}

pub fn insertions(&self) -> &[TrieRefcountChange] {
self.insertions.as_slice()
}
}

/// Result of applying state part to Trie.
33 changes: 28 additions & 5 deletions integration-tests/src/tests/client/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ use near_o11y::testonly::init_test_logger;
use near_primitives::transaction::{
Action, DeployContractAction, FunctionCallAction, SignedTransaction,
};
use near_store::cold_storage::{test_cold_genesis_update, update_cold_db};
use near_store::cold_storage::{test_cold_genesis_update, test_get_store_reads, update_cold_db};
use near_store::db::TestDB;
use near_store::{DBCol, NodeStorage, Store, Temperature};
use nearcore::config::GenesisExt;
@@ -22,10 +22,13 @@ fn check_key(first_store: &Store, second_store: &Store, col: DBCol, key: &[u8])
assert_eq!(first_res.unwrap(), second_res.unwrap());
}

fn check_iter(first_store: &Store, second_store: &Store, col: DBCol) {
fn check_iter(first_store: &Store, second_store: &Store, col: DBCol) -> u64 {
let mut num_checks = 0;
for (key, _) in first_store.iter(col).map(Result::unwrap) {
check_key(first_store, second_store, col, &key);
num_checks += 1;
}
num_checks
}

/// Deploying test contract and calling write_random_value 5 times every block for 4 epochs.
@@ -56,6 +59,8 @@ fn test_storage_after_commit_of_cold_update() {

test_cold_genesis_update(&*cold_db, &env.clients[0].runtime_adapter.store()).unwrap();

let state_reads = test_get_store_reads(DBCol::State);

for h in 1..max_height {
let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0");
if h == 1 {
@@ -102,16 +107,34 @@ fn test_storage_after_commit_of_cold_update() {
let block = env.clients[0].produce_block(h).unwrap().unwrap();
env.process_block(0, block.clone(), Provenance::PRODUCED);

last_hash = block.hash().clone();
update_cold_db(
&*cold_db,
&env.clients[0].runtime_adapter.store(),
&env.clients[0]
.runtime_adapter
.get_shard_layout(
&env.clients[0]
.runtime_adapter
.get_epoch_id_from_prev_block(&last_hash)
.unwrap(),
)
.unwrap(),
&h,
)
.unwrap();

update_cold_db(&*cold_db, &env.clients[0].runtime_adapter.store(), &h).unwrap();
last_hash = block.hash().clone();
}

// assert that we don't read State from db, but from TrieChanges
assert_eq!(state_reads, test_get_store_reads(DBCol::State));

let cold_store = NodeStorage::new(cold_db).get_store(Temperature::Hot);

for col in DBCol::iter() {
if col.is_cold() {
check_iter(&env.clients[0].runtime_adapter.store(), &cold_store, col);
// assert that this test actually checks something
assert!(check_iter(&env.clients[0].runtime_adapter.store(), &cold_store, col) > 0);
}
}
}