Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: adding State to cold columns #7926

Merged
merged 8 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 65 additions & 5 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::columns::DBKeyType;
use crate::refcount::add_positive_refcount;
use crate::{DBCol, DBTransaction, Database, Store};
use crate::trie::TrieRefcountChange;
use crate::{DBCol, DBTransaction, Database, Store, TrieChanges};

use borsh::BorshDeserialize;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::types::BlockHeight;
use std::collections::HashMap;
use std::io;
Expand Down Expand Up @@ -38,13 +40,14 @@ struct StoreWithCache<'a> {
pub fn update_cold_db(
cold_db: &dyn Database,
hot_store: &Store,
shard_layout: &ShardLayout,
height: &BlockHeight,
) -> io::Result<()> {
let _span = tracing::debug_span!(target: "store", "update cold db", height = height);

let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() };

let key_type_to_keys = get_keys_from_store(&mut store_with_cache, height)?;
let key_type_to_keys = get_keys_from_store(&mut store_with_cache, shard_layout, height)?;
for col in DBCol::iter() {
if col.is_cold() {
copy_from_store(
Expand Down Expand Up @@ -116,6 +119,10 @@ pub fn test_cold_genesis_update(cold_db: &dyn Database, hot_store: &Store) -> io
Ok(())
}

pub fn test_get_store_reads(column: DBCol) -> u64 {
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[&column.to_string()]).get()
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved
}

/// Returns HashMap from DBKeyType to possible keys of that type for provided height.
/// Only constructs keys for key types that are used in cold columns.
/// The goal is to capture all changes to db made during production of the block at provided height.
Expand All @@ -124,6 +131,7 @@ pub fn test_cold_genesis_update(cold_db: &dyn Database, hot_store: &Store) -> io
/// But for TransactionHash, for example, it is all of the tx hashes in that block.
fn get_keys_from_store(
store: &mut StoreWithCache,
shard_layout: &ShardLayout,
height: &BlockHeight,
) -> io::Result<HashMap<DBKeyType, Vec<StoreKey>>> {
let mut key_type_to_keys = HashMap::new();
Expand All @@ -136,6 +144,38 @@ fn get_keys_from_store(
key_type,
match key_type {
DBKeyType::BlockHash => vec![block_hash_key.clone()],
DBKeyType::ShardUId => shard_layout
.get_shard_uids()
.iter()
.map(|uid| uid.to_bytes().to_vec())
.collect(),
DBKeyType::TrieNodeOrValueHash => {
let mut keys = vec![];
for shard_uid in shard_layout.get_shard_uids() {
let shard_uid_key = shard_uid.to_bytes().to_vec();
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved

debug_assert_eq!(
DBCol::TrieChanges.key_type(),
&[DBKeyType::BlockHash, DBKeyType::ShardUId]
);
let trie_changes_option: Option<TrieChanges> = {
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved
store
.get_ser(
DBCol::TrieChanges,
&join_two_keys(&block_hash_key, &shard_uid_key),
)
.unwrap()
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved
};

if let Some(trie_changes) = trie_changes_option {
for op in trie_changes.insertions() {
store.insert_state_to_cache_from_op(op, &shard_uid_key);
keys.push(op.hash().as_ref().to_vec());
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
keys
}
_ => {
vec![]
}
Expand All @@ -146,6 +186,12 @@ fn get_keys_from_store(
Ok(key_type_to_keys)
}

pub fn join_two_keys(prefix_key: &StoreKey, suffix_key: &StoreKey) -> StoreKey {
let mut new_key = prefix_key.clone();
new_key.extend(suffix_key);
new_key
}
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved

/// Returns all possible keys for a column with key represented by a specific sequence of key types.
/// `key_type_to_value` -- result of `get_keys_from_store`, mapping from KeyType to all possible keys of that type.
/// `key_types` -- description of a final key, what sequence of key types forms a key, result of `DBCol::key_type`.
Expand Down Expand Up @@ -179,9 +225,7 @@ fn combine_keys_with_stop(
let mut result_keys = vec![];
for prefix_key in &all_smaller_keys {
for suffix_key in &key_type_to_keys[last_kt] {
let mut new_key = prefix_key.clone();
new_key.extend(suffix_key);
result_keys.push(new_key);
result_keys.push(join_two_keys(prefix_key, suffix_key));
}
}
result_keys
Expand All @@ -202,6 +246,7 @@ where
impl StoreWithCache<'_> {
pub fn get(&mut self, column: DBCol, key: &[u8]) -> io::Result<StoreValue> {
if !self.cache.contains_key(&(column, key.to_vec())) {
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[&column.to_string()]).inc();
self.cache.insert(
(column.clone(), key.to_vec()),
self.store.get(column, key)?.map(|x| x.as_slice().to_vec()),
Expand Down Expand Up @@ -232,6 +277,21 @@ impl StoreWithCache<'_> {
) -> io::Result<T> {
option_to_not_found(self.get_ser(column, key), format_args!("{:?}: {:?}", column, key))
}

pub fn insert_state_to_cache_from_op(
&mut self,
op: &TrieRefcountChange,
shard_uid_key: &StoreKey,
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved
) {
debug_assert_eq!(
DBCol::State.key_type(),
&[DBKeyType::ShardUId, DBKeyType::TrieNodeOrValueHash]
);
self.cache.insert(
(DBCol::State, join_two_keys(shard_uid_key, &op.hash().as_ref().to_vec())),
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved
Some(op.value()),
);
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ impl DBCol {
/// Whether this column should be copied to the cold storage.
pub const fn is_cold(&self) -> bool {
match self {
DBCol::Block => true,
DBCol::Block | DBCol::State => true,
_ => false,
}
}
Expand Down
9 changes: 9 additions & 0 deletions core/store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,12 @@ pub static PREFETCH_STAGED_SLOTS: Lazy<IntGaugeVec> = Lazy::new(|| {
)
.unwrap()
});
#[cfg(feature = "cold_store")]
pub static COLD_MIGRATION_READS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_cold_migration_reads",
"Number of get calls to hot store made for every column during copying data to cold storage.",
&["column"],
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved
)
.unwrap()
});
14 changes: 14 additions & 0 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,16 @@ pub struct TrieRefcountChange {
rc: std::num::NonZeroU32,
}

impl TrieRefcountChange {
pub fn hash(&self) -> CryptoHash {
self.trie_node_or_value_hash
}

pub fn value(&self) -> Vec<u8> {
self.trie_node_or_value.clone()
}
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved
}

///
/// TrieChanges stores delta for refcount.
/// Multiple versions of the state work the following way:
Expand Down Expand Up @@ -533,6 +543,10 @@ impl TrieChanges {
pub fn empty(old_root: StateRoot) -> Self {
TrieChanges { old_root, new_root: old_root, insertions: vec![], deletions: vec![] }
}

pub fn insertions(&self) -> &Vec<TrieRefcountChange> {
&self.insertions
}
posvyatokum marked this conversation as resolved.
Show resolved Hide resolved
}

/// Result of applying state part to Trie.
Expand Down
33 changes: 28 additions & 5 deletions integration-tests/src/tests/client/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use near_o11y::testonly::init_test_logger;
use near_primitives::transaction::{
Action, DeployContractAction, FunctionCallAction, SignedTransaction,
};
use near_store::cold_storage::{test_cold_genesis_update, update_cold_db};
use near_store::cold_storage::{test_cold_genesis_update, test_get_store_reads, update_cold_db};
use near_store::db::TestDB;
use near_store::{DBCol, NodeStorage, Store, Temperature};
use nearcore::config::GenesisExt;
Expand All @@ -22,10 +22,13 @@ fn check_key(first_store: &Store, second_store: &Store, col: DBCol, key: &[u8])
assert_eq!(first_res.unwrap(), second_res.unwrap());
}

fn check_iter(first_store: &Store, second_store: &Store, col: DBCol) {
fn check_iter(first_store: &Store, second_store: &Store, col: DBCol) -> u64 {
let mut num_checks = 0;
for (key, _) in first_store.iter(col).map(Result::unwrap) {
check_key(first_store, second_store, col, &key);
num_checks += 1;
}
num_checks
}

/// Deploying test contract and calling write_random_value 5 times every block for 4 epochs.
Expand Down Expand Up @@ -56,6 +59,8 @@ fn test_storage_after_commit_of_cold_update() {

test_cold_genesis_update(&*cold_db, &env.clients[0].runtime_adapter.store()).unwrap();

let state_reads = test_get_store_reads(DBCol::State);

for h in 1..max_height {
let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0");
if h == 1 {
Expand Down Expand Up @@ -102,16 +107,34 @@ fn test_storage_after_commit_of_cold_update() {
let block = env.clients[0].produce_block(h).unwrap().unwrap();
env.process_block(0, block.clone(), Provenance::PRODUCED);

last_hash = block.hash().clone();
update_cold_db(
&*cold_db,
&env.clients[0].runtime_adapter.store(),
&env.clients[0]
.runtime_adapter
.get_shard_layout(
&env.clients[0]
.runtime_adapter
.get_epoch_id_from_prev_block(&last_hash)
.unwrap(),
)
.unwrap(),
&h,
)
.unwrap();

update_cold_db(&*cold_db, &env.clients[0].runtime_adapter.store(), &h).unwrap();
last_hash = block.hash().clone();
}

// assert that we don't read State from db, but from TrieChanges
assert_eq!(state_reads, test_get_store_reads(DBCol::State));

let cold_store = NodeStorage::new(cold_db).get_store(Temperature::Hot);

for col in DBCol::iter() {
if col.is_cold() {
check_iter(&env.clients[0].runtime_adapter.store(), &cold_store, col);
// assert that this test actually checks something
assert!(check_iter(&env.clients[0].runtime_adapter.store(), &cold_store, col) > 0);
}
}
}