Skip to content

Commit

Permalink
store: adding State to cold columns (#7926)
Browse files Browse the repository at this point in the history
  • Loading branch information
posvyatokum authored and nikurt committed Nov 7, 2022
1 parent 1f075ab commit ec1db8b
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 11 deletions.
61 changes: 56 additions & 5 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::columns::DBKeyType;
use crate::refcount::add_positive_refcount;
use crate::{DBCol, DBTransaction, Database, Store};
use crate::trie::TrieRefcountChange;
use crate::{DBCol, DBTransaction, Database, Store, TrieChanges};

use borsh::BorshDeserialize;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::types::BlockHeight;
use std::collections::HashMap;
use std::io;
Expand Down Expand Up @@ -38,13 +40,14 @@ struct StoreWithCache<'a> {
pub fn update_cold_db(
cold_db: &dyn Database,
hot_store: &Store,
shard_layout: &ShardLayout,
height: &BlockHeight,
) -> io::Result<()> {
let _span = tracing::debug_span!(target: "store", "update cold db", height = height);

let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() };

let key_type_to_keys = get_keys_from_store(&mut store_with_cache, height)?;
let key_type_to_keys = get_keys_from_store(&mut store_with_cache, shard_layout, height)?;
for col in DBCol::iter() {
if col.is_cold() {
copy_from_store(
Expand Down Expand Up @@ -116,6 +119,10 @@ pub fn test_cold_genesis_update(cold_db: &dyn Database, hot_store: &Store) -> io
Ok(())
}

pub fn test_get_store_reads(column: DBCol) -> u64 {
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[<&str>::from(column)]).get()
}

/// Returns HashMap from DBKeyType to possible keys of that type for provided height.
/// Only constructs keys for key types that are used in cold columns.
/// The goal is to capture all changes to db made during production of the block at provided height.
Expand All @@ -124,6 +131,7 @@ pub fn test_cold_genesis_update(cold_db: &dyn Database, hot_store: &Store) -> io
/// But for TransactionHash, for example, it is all of the tx hashes in that block.
fn get_keys_from_store(
store: &mut StoreWithCache,
shard_layout: &ShardLayout,
height: &BlockHeight,
) -> io::Result<HashMap<DBKeyType, Vec<StoreKey>>> {
let mut key_type_to_keys = HashMap::new();
Expand All @@ -136,6 +144,35 @@ fn get_keys_from_store(
key_type,
match key_type {
DBKeyType::BlockHash => vec![block_hash_key.clone()],
DBKeyType::ShardUId => shard_layout
.get_shard_uids()
.iter()
.map(|uid| uid.to_bytes().to_vec())
.collect(),
// TODO: don't write values of State column to cache. Write them directly to colddb.
DBKeyType::TrieNodeOrValueHash => {
let mut keys = vec![];
for shard_uid in shard_layout.get_shard_uids() {
let shard_uid_key = shard_uid.to_bytes();

debug_assert_eq!(
DBCol::TrieChanges.key_type(),
&[DBKeyType::BlockHash, DBKeyType::ShardUId]
);
let trie_changes_option: Option<TrieChanges> = store.get_ser(
DBCol::TrieChanges,
&join_two_keys(&block_hash_key, &shard_uid_key),
)?;

if let Some(trie_changes) = trie_changes_option {
for op in trie_changes.insertions() {
store.insert_state_to_cache_from_op(op, &shard_uid_key);
keys.push(op.hash().as_bytes().to_vec());
}
}
}
keys
}
_ => {
vec![]
}
Expand All @@ -146,6 +183,10 @@ fn get_keys_from_store(
Ok(key_type_to_keys)
}

pub fn join_two_keys(prefix_key: &[u8], suffix_key: &[u8]) -> StoreKey {
[prefix_key, suffix_key].concat()
}

/// Returns all possible keys for a column with key represented by a specific sequence of key types.
/// `key_type_to_value` -- result of `get_keys_from_store`, mapping from KeyType to all possible keys of that type.
/// `key_types` -- description of a final key, what sequence of key types forms a key, result of `DBCol::key_type`.
Expand Down Expand Up @@ -179,9 +220,7 @@ fn combine_keys_with_stop(
let mut result_keys = vec![];
for prefix_key in &all_smaller_keys {
for suffix_key in &key_type_to_keys[last_kt] {
let mut new_key = prefix_key.clone();
new_key.extend(suffix_key);
result_keys.push(new_key);
result_keys.push(join_two_keys(prefix_key, suffix_key));
}
}
result_keys
Expand All @@ -202,6 +241,7 @@ where
impl StoreWithCache<'_> {
pub fn get(&mut self, column: DBCol, key: &[u8]) -> io::Result<StoreValue> {
if !self.cache.contains_key(&(column, key.to_vec())) {
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[<&str>::from(column)]).inc();
self.cache.insert(
(column.clone(), key.to_vec()),
self.store.get(column, key)?.map(|x| x.as_slice().to_vec()),
Expand Down Expand Up @@ -232,6 +272,17 @@ impl StoreWithCache<'_> {
) -> io::Result<T> {
option_to_not_found(self.get_ser(column, key), format_args!("{:?}: {:?}", column, key))
}

pub fn insert_state_to_cache_from_op(&mut self, op: &TrieRefcountChange, shard_uid_key: &[u8]) {
debug_assert_eq!(
DBCol::State.key_type(),
&[DBKeyType::ShardUId, DBKeyType::TrieNodeOrValueHash]
);
self.cache.insert(
(DBCol::State, join_two_keys(shard_uid_key, op.hash().as_bytes())),
Some(op.payload().to_vec()),
);
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ impl DBCol {
/// Whether this column should be copied to the cold storage.
pub const fn is_cold(&self) -> bool {
match self {
DBCol::Block => true,
DBCol::Block | DBCol::State => true,
_ => false,
}
}
Expand Down
9 changes: 9 additions & 0 deletions core/store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,12 @@ pub static PREFETCH_STAGED_SLOTS: Lazy<IntGaugeVec> = Lazy::new(|| {
)
.unwrap()
});
#[cfg(feature = "cold_store")]
pub static COLD_MIGRATION_READS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_cold_migration_reads",
"Number of get calls to hot store made for every column during copying data to cold storage.",
&["col"],
)
.unwrap()
});
14 changes: 14 additions & 0 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,16 @@ pub struct TrieRefcountChange {
rc: std::num::NonZeroU32,
}

impl TrieRefcountChange {
pub fn hash(&self) -> &CryptoHash {
&self.trie_node_or_value_hash
}

pub fn payload(&self) -> &[u8] {
self.trie_node_or_value.as_slice()
}
}

///
/// TrieChanges stores delta for refcount.
/// Multiple versions of the state work the following way:
Expand Down Expand Up @@ -533,6 +543,10 @@ impl TrieChanges {
pub fn empty(old_root: StateRoot) -> Self {
TrieChanges { old_root, new_root: old_root, insertions: vec![], deletions: vec![] }
}

pub fn insertions(&self) -> &[TrieRefcountChange] {
self.insertions.as_slice()
}
}

/// Result of applying state part to Trie.
Expand Down
33 changes: 28 additions & 5 deletions integration-tests/src/tests/client/cold_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use near_o11y::testonly::init_test_logger;
use near_primitives::transaction::{
Action, DeployContractAction, FunctionCallAction, SignedTransaction,
};
use near_store::cold_storage::{test_cold_genesis_update, update_cold_db};
use near_store::cold_storage::{test_cold_genesis_update, test_get_store_reads, update_cold_db};
use near_store::db::TestDB;
use near_store::{DBCol, NodeStorage, Store, Temperature};
use nearcore::config::GenesisExt;
Expand All @@ -22,10 +22,13 @@ fn check_key(first_store: &Store, second_store: &Store, col: DBCol, key: &[u8])
assert_eq!(first_res.unwrap(), second_res.unwrap());
}

fn check_iter(first_store: &Store, second_store: &Store, col: DBCol) {
fn check_iter(first_store: &Store, second_store: &Store, col: DBCol) -> u64 {
let mut num_checks = 0;
for (key, _) in first_store.iter(col).map(Result::unwrap) {
check_key(first_store, second_store, col, &key);
num_checks += 1;
}
num_checks
}

/// Deploying test contract and calling write_random_value 5 times every block for 4 epochs.
Expand Down Expand Up @@ -56,6 +59,8 @@ fn test_storage_after_commit_of_cold_update() {

test_cold_genesis_update(&*cold_db, &env.clients[0].runtime_adapter.store()).unwrap();

let state_reads = test_get_store_reads(DBCol::State);

for h in 1..max_height {
let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0");
if h == 1 {
Expand Down Expand Up @@ -102,16 +107,34 @@ fn test_storage_after_commit_of_cold_update() {
let block = env.clients[0].produce_block(h).unwrap().unwrap();
env.process_block(0, block.clone(), Provenance::PRODUCED);

last_hash = block.hash().clone();
update_cold_db(
&*cold_db,
&env.clients[0].runtime_adapter.store(),
&env.clients[0]
.runtime_adapter
.get_shard_layout(
&env.clients[0]
.runtime_adapter
.get_epoch_id_from_prev_block(&last_hash)
.unwrap(),
)
.unwrap(),
&h,
)
.unwrap();

update_cold_db(&*cold_db, &env.clients[0].runtime_adapter.store(), &h).unwrap();
last_hash = block.hash().clone();
}

// assert that we don't read State from db, but from TrieChanges
assert_eq!(state_reads, test_get_store_reads(DBCol::State));

let cold_store = NodeStorage::new(cold_db).get_store(Temperature::Hot);

for col in DBCol::iter() {
if col.is_cold() {
check_iter(&env.clients[0].runtime_adapter.store(), &cold_store, col);
// assert that this test actually checks something
assert!(check_iter(&env.clients[0].runtime_adapter.store(), &cold_store, col) > 0);
}
}
}

0 comments on commit ec1db8b

Please sign in to comment.