Skip to content

Commit aaaf1d0

Browse files
authoredOct 26, 2022
store: adding State to cold columns (#7926)
1 parent 079ddfa commit aaaf1d0

File tree

5 files changed

+108
-11
lines changed

5 files changed

+108
-11
lines changed
 

‎core/store/src/cold_storage.rs

+56-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use crate::columns::DBKeyType;
22
use crate::refcount::add_positive_refcount;
3-
use crate::{DBCol, DBTransaction, Database, Store};
3+
use crate::trie::TrieRefcountChange;
4+
use crate::{DBCol, DBTransaction, Database, Store, TrieChanges};
45

56
use borsh::BorshDeserialize;
7+
use near_primitives::shard_layout::ShardLayout;
68
use near_primitives::types::BlockHeight;
79
use std::collections::HashMap;
810
use std::io;
@@ -38,13 +40,14 @@ struct StoreWithCache<'a> {
3840
pub fn update_cold_db(
3941
cold_db: &dyn Database,
4042
hot_store: &Store,
43+
shard_layout: &ShardLayout,
4144
height: &BlockHeight,
4245
) -> io::Result<()> {
4346
let _span = tracing::debug_span!(target: "store", "update cold db", height = height);
4447

4548
let mut store_with_cache = StoreWithCache { store: hot_store, cache: StoreCache::new() };
4649

47-
let key_type_to_keys = get_keys_from_store(&mut store_with_cache, height)?;
50+
let key_type_to_keys = get_keys_from_store(&mut store_with_cache, shard_layout, height)?;
4851
for col in DBCol::iter() {
4952
if col.is_cold() {
5053
copy_from_store(
@@ -116,6 +119,10 @@ pub fn test_cold_genesis_update(cold_db: &dyn Database, hot_store: &Store) -> io
116119
Ok(())
117120
}
118121

122+
pub fn test_get_store_reads(column: DBCol) -> u64 {
123+
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[<&str>::from(column)]).get()
124+
}
125+
119126
/// Returns HashMap from DBKeyType to possible keys of that type for provided height.
120127
/// Only constructs keys for key types that are used in cold columns.
121128
/// The goal is to capture all changes to db made during production of the block at provided height.
@@ -124,6 +131,7 @@ pub fn test_cold_genesis_update(cold_db: &dyn Database, hot_store: &Store) -> io
124131
/// But for TransactionHash, for example, it is all of the tx hashes in that block.
125132
fn get_keys_from_store(
126133
store: &mut StoreWithCache,
134+
shard_layout: &ShardLayout,
127135
height: &BlockHeight,
128136
) -> io::Result<HashMap<DBKeyType, Vec<StoreKey>>> {
129137
let mut key_type_to_keys = HashMap::new();
@@ -136,6 +144,35 @@ fn get_keys_from_store(
136144
key_type,
137145
match key_type {
138146
DBKeyType::BlockHash => vec![block_hash_key.clone()],
147+
DBKeyType::ShardUId => shard_layout
148+
.get_shard_uids()
149+
.iter()
150+
.map(|uid| uid.to_bytes().to_vec())
151+
.collect(),
152+
// TODO: don't write values of State column to cache. Write them directly to colddb.
153+
DBKeyType::TrieNodeOrValueHash => {
154+
let mut keys = vec![];
155+
for shard_uid in shard_layout.get_shard_uids() {
156+
let shard_uid_key = shard_uid.to_bytes();
157+
158+
debug_assert_eq!(
159+
DBCol::TrieChanges.key_type(),
160+
&[DBKeyType::BlockHash, DBKeyType::ShardUId]
161+
);
162+
let trie_changes_option: Option<TrieChanges> = store.get_ser(
163+
DBCol::TrieChanges,
164+
&join_two_keys(&block_hash_key, &shard_uid_key),
165+
)?;
166+
167+
if let Some(trie_changes) = trie_changes_option {
168+
for op in trie_changes.insertions() {
169+
store.insert_state_to_cache_from_op(op, &shard_uid_key);
170+
keys.push(op.hash().as_bytes().to_vec());
171+
}
172+
}
173+
}
174+
keys
175+
}
139176
_ => {
140177
vec![]
141178
}
@@ -146,6 +183,10 @@ fn get_keys_from_store(
146183
Ok(key_type_to_keys)
147184
}
148185

186+
pub fn join_two_keys(prefix_key: &[u8], suffix_key: &[u8]) -> StoreKey {
187+
[prefix_key, suffix_key].concat()
188+
}
189+
149190
/// Returns all possible keys for a column with key represented by a specific sequence of key types.
150191
/// `key_type_to_value` -- result of `get_keys_from_store`, mapping from KeyType to all possible keys of that type.
151192
/// `key_types` -- description of a final key, what sequence of key types forms a key, result of `DBCol::key_type`.
@@ -179,9 +220,7 @@ fn combine_keys_with_stop(
179220
let mut result_keys = vec![];
180221
for prefix_key in &all_smaller_keys {
181222
for suffix_key in &key_type_to_keys[last_kt] {
182-
let mut new_key = prefix_key.clone();
183-
new_key.extend(suffix_key);
184-
result_keys.push(new_key);
223+
result_keys.push(join_two_keys(prefix_key, suffix_key));
185224
}
186225
}
187226
result_keys
@@ -202,6 +241,7 @@ where
202241
impl StoreWithCache<'_> {
203242
pub fn get(&mut self, column: DBCol, key: &[u8]) -> io::Result<StoreValue> {
204243
if !self.cache.contains_key(&(column, key.to_vec())) {
244+
crate::metrics::COLD_MIGRATION_READS.with_label_values(&[<&str>::from(column)]).inc();
205245
self.cache.insert(
206246
(column.clone(), key.to_vec()),
207247
self.store.get(column, key)?.map(|x| x.as_slice().to_vec()),
@@ -232,6 +272,17 @@ impl StoreWithCache<'_> {
232272
) -> io::Result<T> {
233273
option_to_not_found(self.get_ser(column, key), format_args!("{:?}: {:?}", column, key))
234274
}
275+
276+
pub fn insert_state_to_cache_from_op(&mut self, op: &TrieRefcountChange, shard_uid_key: &[u8]) {
277+
debug_assert_eq!(
278+
DBCol::State.key_type(),
279+
&[DBKeyType::ShardUId, DBKeyType::TrieNodeOrValueHash]
280+
);
281+
self.cache.insert(
282+
(DBCol::State, join_two_keys(shard_uid_key, op.hash().as_bytes())),
283+
Some(op.payload().to_vec()),
284+
);
285+
}
235286
}
236287

237288
#[cfg(test)]

‎core/store/src/columns.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ impl DBCol {
368368
/// Whether this column should be copied to the cold storage.
369369
pub const fn is_cold(&self) -> bool {
370370
match self {
371-
DBCol::Block => true,
371+
DBCol::Block | DBCol::State => true,
372372
_ => false,
373373
}
374374
}

‎core/store/src/metrics.rs

+9
Original file line numberDiff line numberDiff line change
@@ -205,3 +205,12 @@ pub static PREFETCH_STAGED_SLOTS: Lazy<IntGaugeVec> = Lazy::new(|| {
205205
)
206206
.unwrap()
207207
});
208+
#[cfg(feature = "cold_store")]
209+
pub static COLD_MIGRATION_READS: Lazy<IntCounterVec> = Lazy::new(|| {
210+
try_create_int_counter_vec(
211+
"near_cold_migration_reads",
212+
"Number of get calls to hot store made for every column during copying data to cold storage.",
213+
&["col"],
214+
)
215+
.unwrap()
216+
});

‎core/store/src/trie/mod.rs

+14
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,16 @@ pub struct TrieRefcountChange {
498498
rc: std::num::NonZeroU32,
499499
}
500500

501+
impl TrieRefcountChange {
502+
pub fn hash(&self) -> &CryptoHash {
503+
&self.trie_node_or_value_hash
504+
}
505+
506+
pub fn payload(&self) -> &[u8] {
507+
self.trie_node_or_value.as_slice()
508+
}
509+
}
510+
501511
///
502512
/// TrieChanges stores delta for refcount.
503513
/// Multiple versions of the state work the following way:
@@ -533,6 +543,10 @@ impl TrieChanges {
533543
pub fn empty(old_root: StateRoot) -> Self {
534544
TrieChanges { old_root, new_root: old_root, insertions: vec![], deletions: vec![] }
535545
}
546+
547+
pub fn insertions(&self) -> &[TrieRefcountChange] {
548+
self.insertions.as_slice()
549+
}
536550
}
537551

538552
/// Result of applying state part to Trie.

‎integration-tests/src/tests/client/cold_storage.rs

+28-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use near_o11y::testonly::init_test_logger;
77
use near_primitives::transaction::{
88
Action, DeployContractAction, FunctionCallAction, SignedTransaction,
99
};
10-
use near_store::cold_storage::{test_cold_genesis_update, update_cold_db};
10+
use near_store::cold_storage::{test_cold_genesis_update, test_get_store_reads, update_cold_db};
1111
use near_store::db::TestDB;
1212
use near_store::{DBCol, NodeStorage, Store, Temperature};
1313
use nearcore::config::GenesisExt;
@@ -22,10 +22,13 @@ fn check_key(first_store: &Store, second_store: &Store, col: DBCol, key: &[u8])
2222
assert_eq!(first_res.unwrap(), second_res.unwrap());
2323
}
2424

25-
fn check_iter(first_store: &Store, second_store: &Store, col: DBCol) {
25+
fn check_iter(first_store: &Store, second_store: &Store, col: DBCol) -> u64 {
26+
let mut num_checks = 0;
2627
for (key, _) in first_store.iter(col).map(Result::unwrap) {
2728
check_key(first_store, second_store, col, &key);
29+
num_checks += 1;
2830
}
31+
num_checks
2932
}
3033

3134
/// Deploying test contract and calling write_random_value 5 times every block for 4 epochs.
@@ -56,6 +59,8 @@ fn test_storage_after_commit_of_cold_update() {
5659

5760
test_cold_genesis_update(&*cold_db, &env.clients[0].runtime_adapter.store()).unwrap();
5861

62+
let state_reads = test_get_store_reads(DBCol::State);
63+
5964
for h in 1..max_height {
6065
let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0");
6166
if h == 1 {
@@ -102,16 +107,34 @@ fn test_storage_after_commit_of_cold_update() {
102107
let block = env.clients[0].produce_block(h).unwrap().unwrap();
103108
env.process_block(0, block.clone(), Provenance::PRODUCED);
104109

105-
last_hash = block.hash().clone();
110+
update_cold_db(
111+
&*cold_db,
112+
&env.clients[0].runtime_adapter.store(),
113+
&env.clients[0]
114+
.runtime_adapter
115+
.get_shard_layout(
116+
&env.clients[0]
117+
.runtime_adapter
118+
.get_epoch_id_from_prev_block(&last_hash)
119+
.unwrap(),
120+
)
121+
.unwrap(),
122+
&h,
123+
)
124+
.unwrap();
106125

107-
update_cold_db(&*cold_db, &env.clients[0].runtime_adapter.store(), &h).unwrap();
126+
last_hash = block.hash().clone();
108127
}
109128

129+
// assert that we don't read State from db, but from TrieChanges
130+
assert_eq!(state_reads, test_get_store_reads(DBCol::State));
131+
110132
let cold_store = NodeStorage::new(cold_db).get_store(Temperature::Hot);
111133

112134
for col in DBCol::iter() {
113135
if col.is_cold() {
114-
check_iter(&env.clients[0].runtime_adapter.store(), &cold_store, col);
136+
// assert that this test actually checks something
137+
assert!(check_iter(&env.clients[0].runtime_adapter.store(), &cold_store, col) > 0);
115138
}
116139
}
117140
}

0 commit comments

Comments
 (0)
Please sign in to comment.