Skip to content

Commit

Permalink
finish code for flat storage creation
Browse files Browse the repository at this point in the history
  • Loading branch information
Looogarithm committed Nov 15, 2022
1 parent 7db3eac commit e5a4638
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 54 deletions.
278 changes: 255 additions & 23 deletions chain/chain/src/flat_storage_creator.rs

Large diffs are not rendered by default.

76 changes: 56 additions & 20 deletions core/store/src/flat_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,14 +443,24 @@ struct FlatStorageStateInner {
deltas: HashMap<CryptoHash, Arc<FlatStateDelta>>,
}

/// Number of parts to which we divide shard state for parallel traversal.
// TODO: consider changing it for different shards, ensure that shard memory usage / `NUM_PARTS` < X MiB.
/// Number of traversed parts during a single step of fetching state.
#[allow(unused)]
const NUM_FETCHED_STATE_PARTS: u64 = 4_000;
pub const NUM_PARTS_IN_ONE_STEP: u64 = 20;

/// Number of traversed parts during a single step of fetching state.
/// Memory limit for state part being fetched.
#[allow(unused)]
pub const NUM_PARTS_IN_ONE_STEP: u64 = 50;
pub const STATE_PART_MEMORY_LIMIT: bytesize::ByteSize = bytesize::ByteSize(10 * bytesize::MIB);

/// Current step of fetching state to fill flat storage.
#[derive(BorshSerialize, BorshDeserialize, Clone, Debug, PartialEq, Eq)]
pub struct FetchingStateStatus {
/// Number of the first state part to be fetched in this step.
pub part_id: u64,
/// Number of parts fetched in one step.
pub num_parts_in_step: u64,
/// Total number of state parts.
pub num_parts: u64,
}

/// If a node has flat storage enabled but it didn't have flat storage data on disk, its creation should be initiated.
/// Because this is a heavy work requiring ~5h for testnet rpc node and ~10h for testnet archival node, we do it on
Expand All @@ -463,13 +473,14 @@ pub enum FlatStorageStateStatus {
/// final chain head moves after saved chain head.
SavingDeltas,
/// Flat storage state misses key-value pairs. We need to fetch Trie state to fill flat storage for some final chain
/// head. It is the heaviest step, so it is done in `NUM_FETCHED_STATE_PARTS` / `NUM_PARTS_IN_ONE_STEP` steps.
/// head. It is the heaviest work, so it is done in multiple steps, see comment for `FetchingStateStatus` for more
/// details.
/// During each step we spawn background threads to fill some contiguous range of state keys.
/// Status contains block hash for which we fetch the shard state and number of current step. Progress of each step
/// is saved to disk, so if creation is interrupted during some step, we don't repeat previous steps, starting from
/// the saved step again.
#[allow(unused)]
FetchingState((CryptoHash, u64)),
FetchingState(FetchingStateStatus),
/// Flat storage data exists on disk but its head is too far away from chain final head. We apply deltas from disk
/// until the head reaches final head.
#[allow(unused)]
Expand All @@ -482,7 +493,9 @@ pub enum FlatStorageStateStatus {

#[cfg(feature = "protocol_feature_flat_state")]
pub mod store_helper {
use crate::flat_state::{FlatStorageError, FlatStorageStateStatus, KeyForFlatStateDelta};
use crate::flat_state::{
FetchingStateStatus, FlatStorageError, FlatStorageStateStatus, KeyForFlatStateDelta,
};
use crate::{FlatStateDelta, Store, StoreUpdate};
use borsh::BorshSerialize;
use near_primitives::hash::CryptoHash;
Expand All @@ -491,6 +504,7 @@ pub mod store_helper {
use std::sync::Arc;

pub const FETCHING_STATE_STEP_KEY_PREFIX: &[u8; 4] = b"STEP";
pub const CATCHUP_KEY_PREFIX: &[u8; 7] = b"CATCHUP";

pub fn get_delta(
store: &Store,
Expand Down Expand Up @@ -562,31 +576,43 @@ pub mod store_helper {
}
}

fn fetching_state_step_key(shard_id: ShardId) -> Vec<u8> {
fn fetching_state_status_key(shard_id: ShardId) -> Vec<u8> {
let mut fetching_state_step_key = FETCHING_STATE_STEP_KEY_PREFIX.to_vec();
fetching_state_step_key.extend_from_slice(&shard_id.try_to_vec().unwrap());
fetching_state_step_key
}

fn get_fetching_state_step(store: &Store, shard_id: ShardId) -> Option<u64> {
store.get_ser(crate::DBCol::FlatStateMisc, &fetching_state_step_key(shard_id)).expect(
fn get_fetching_state_status(store: &Store, shard_id: ShardId) -> Option<FetchingStateStatus> {
store.get_ser(crate::DBCol::FlatStateMisc, &fetching_state_status_key(shard_id)).expect(
format!("Error reading fetching step for flat state for shard {shard_id}").as_str(),
)
}

pub fn set_fetching_state_step(store_update: &mut StoreUpdate, shard_id: ShardId, value: u64) {
pub fn set_fetching_state_status(
store_update: &mut StoreUpdate,
shard_id: ShardId,
value: FetchingStateStatus,
) {
store_update
.set_ser(crate::DBCol::FlatStateMisc, &fetching_state_step_key(shard_id), &value)
.set_ser(crate::DBCol::FlatStateMisc, &fetching_state_status_key(shard_id), &value)
.expect(
format!("Error setting fetching step for shard {shard_id} to {value}").as_str(),
format!("Error setting fetching step for shard {shard_id} to {:?}", value).as_str(),
);
}

fn get_catchup_status(store: &Store, shard_id: ShardId) -> bool {
let mut catchup_status_key = FETCHING_STATE_STEP_KEY_PREFIX.to_vec();
pub fn remove_fetching_state_status(store_update: &mut StoreUpdate, shard_id: ShardId) {
store_update.delete(crate::DBCol::FlatStateMisc, &fetching_state_status_key(shard_id));
}

fn catchup_status_key(shard_id: ShardId) -> Vec<u8> {
let mut catchup_status_key = CATCHUP_KEY_PREFIX.to_vec();
catchup_status_key.extend_from_slice(&shard_id.try_to_vec().unwrap());
catchup_status_key
}

fn get_catchup_status(store: &Store, shard_id: ShardId) -> bool {
let status: Option<bool> =
store.get_ser(crate::DBCol::FlatStateMisc, &catchup_status_key).expect(
store.get_ser(crate::DBCol::FlatStateMisc, &catchup_status_key(shard_id)).expect(
format!("Error reading catchup status for flat state for shard {shard_id}")
.as_str(),
);
Expand All @@ -603,15 +629,25 @@ pub mod store_helper {
}
}

pub fn start_catchup(store_update: &mut StoreUpdate, shard_id: ShardId) {
store_update
.set_ser(crate::DBCol::FlatStateMisc, &catchup_status_key(shard_id), &true)
.expect(format!("Error setting catchup status for shard {shard_id}").as_str());
}

pub fn finish_catchup(store_update: &mut StoreUpdate, shard_id: ShardId) {
store_update.delete(crate::DBCol::FlatStateMisc, &catchup_status_key(shard_id));
}

pub fn get_flat_storage_state_status(
store: &Store,
shard_id: ShardId,
) -> FlatStorageStateStatus {
match get_flat_head(store, shard_id) {
None => FlatStorageStateStatus::SavingDeltas,
Some(block_hash) => {
if let Some(fetching_state_step) = get_fetching_state_step(store, shard_id) {
FlatStorageStateStatus::FetchingState((block_hash, fetching_state_step))
Some(_) => {
if let Some(fetching_state_status) = get_fetching_state_status(store, shard_id) {
FlatStorageStateStatus::FetchingState(fetching_state_status)
} else if get_catchup_status(store, shard_id) {
FlatStorageStateStatus::CatchingUp
} else {
Expand Down
5 changes: 3 additions & 2 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ use crate::db::{
refcount, DBIterator, DBOp, DBSlice, DBTransaction, Database, StoreStatistics,
GENESIS_JSON_HASH_KEY, GENESIS_STATE_ROOTS_KEY,
};
pub use crate::trie::iterator::TrieIterator;
pub use crate::trie::iterator::{TrieIterator, TrieTraversalItem};
pub use crate::trie::update::{TrieUpdate, TrieUpdateIterator, TrieUpdateValuePtr};
pub use crate::trie::{
estimator, split_state, ApplyStatePartResult, KeyForStateChanges, KeyLookupMode, NibbleSlice,
PartialStorage, PrefetchApi, RawTrieNode, RawTrieNodeWithSize, ShardTries, Trie, TrieAccess,
TrieCache, TrieCachingStorage, TrieChanges, TrieConfig, TrieStorage, WrappedTrieChanges,
TrieCache, TrieCachingStorage, TrieChanges, TrieConfig, TrieDBStorage, TrieStorage,
WrappedTrieChanges,
};
pub use flat_state::FlatStateDelta;

Expand Down
2 changes: 1 addition & 1 deletion core/store/src/trie/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl<'a> TrieIterator<'a> {
/// Visits all nodes belonging to the interval [path_begin, path_end) in depth-first search
/// order and return TrieTraversalItem for each visited node.
/// Used to generate and apply state parts for state sync.
pub(crate) fn visit_nodes_interval(
pub fn visit_nodes_interval(
&mut self,
path_begin: &[u8],
path_end: &[u8],
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::trie::iterator::TrieIterator;
pub use crate::trie::nibble_slice::NibbleSlice;
pub use crate::trie::prefetching_trie_storage::PrefetchApi;
pub use crate::trie::shard_tries::{KeyForStateChanges, ShardTries, WrappedTrieChanges};
pub use crate::trie::trie_storage::{TrieCache, TrieCachingStorage, TrieStorage};
pub use crate::trie::trie_storage::{TrieCache, TrieCachingStorage, TrieDBStorage, TrieStorage};
use crate::trie::trie_storage::{TrieMemoryPartialStorage, TrieRecordingStorage};
use crate::StorageError;
pub use near_primitives::types::TrieNodesCount;
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/trie/state_parts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Trie {
/// Part part_id has nodes with paths `[path(part_id), path(part_id + 1))`
/// path is returned as nibbles, last path is `vec![16]`, previous paths end
/// in nodes
pub(crate) fn find_path_for_part_boundary(
pub fn find_path_for_part_boundary(
&self,
part_id: u64,
num_parts: u64,
Expand Down
57 changes: 52 additions & 5 deletions integration-tests/src/tests/client/flat_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ use assert_matches::assert_matches;
use near_chain::{ChainGenesis, RuntimeAdapter};
use near_chain_configs::Genesis;
use near_client::test_utils::TestEnv;
use near_o11y::testonly::init_test_logger;
use near_store::flat_state::{store_helper, FlatStorageStateStatus};
// use near_o11y::testonly::init_test_logger;
use near_primitives_core::types::BlockHeight;
use near_store::flat_state::{
store_helper, FetchingStateStatus, FlatStorageStateStatus, NUM_PARTS_IN_ONE_STEP,
};
use near_store::test_utils::create_test_store;
use nearcore::config::GenesisExt;
use std::path::Path;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

/// Check correctness of flat storage creation.
#[test]
fn test_flat_storage_creation() {
init_test_logger();
// init_test_logger();
let genesis = Genesis::test(vec!["test0".parse().unwrap()], 1);
let chain_genesis = ChainGenesis::new(&genesis);
let store = create_test_store();
Expand Down Expand Up @@ -104,10 +109,52 @@ fn test_flat_storage_creation() {
// We started the node from height 3, and now final head should move to height 4.
env.produce_block(0, 6);
let final_block_hash = env.clients[0].chain.get_block_hash_by_height(4).unwrap();
assert_eq!(store_helper::get_flat_head(&store, 0), Some(final_block_hash));
assert_eq!(
store_helper::get_flat_storage_state_status(&store, 0),
FlatStorageStateStatus::FetchingState((final_block_hash, 0))
FlatStorageStateStatus::FetchingState(FetchingStateStatus {
part_id: 0,
num_parts_in_step: NUM_PARTS_IN_ONE_STEP,
num_parts: 1,
})
);

// TODO: support next statuses once their logic is implemented.
// Run chain for a couple of blocks and check that statuses switch to `CatchingUp` and then to `Ready`.
// State is being fetched in rayon threads, but we expect it to finish in <30s because state is small and there is
// only one state part.
const BLOCKS_TIMEOUT: BlockHeight = 30;
let start_height = 8;
let mut next_height = start_height;
let mut was_catching_up = false;
while next_height < start_height + BLOCKS_TIMEOUT {
println!("producing block {next_height}");
env.produce_block(0, next_height);
next_height += 1;
match store_helper::get_flat_storage_state_status(&store, 0) {
FlatStorageStateStatus::FetchingState(..) => {
assert!(!was_catching_up, "Flat storage state status inconsistency: it was catching up before fetching state");
}
FlatStorageStateStatus::CatchingUp => {
was_catching_up = true;
}
FlatStorageStateStatus::Ready => {
assert!(
was_catching_up,
"Flat storage state is ready but there was no flat storage catchup observed"
);
break;
}
status @ _ => {
panic!(
"Unexpected flat storage state status for height {next_height}: {:?}",
status
);
}
}
thread::sleep(Duration::from_secs(1));
}
if next_height == start_height + BLOCKS_TIMEOUT {
let status = store_helper::get_flat_storage_state_status(&store, 0);
panic!("Apparently, node didn't fetch the whole state in {BLOCKS_TIMEOUT} blocks. Current status: {:?}", status);
}
}
4 changes: 3 additions & 1 deletion nearcore/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ impl<'a> near_store::StoreMigrator for Migrator<'a> {
}
#[cfg(feature = "protocol_feature_flat_state")]
34 => {
panic!("Trying to use a DB that has no been migrated to flat state. This binary does not support migrating.")
tracing::info!(target: "migrations", "Migrating DB version from 34 to 35. Flat storage data will be created on disk.");
tracing::info!(target: "migrations", "It will happen in parallel with regular block processing. ETA is 5h for RPC node and 10h for archival node.");
Ok(())
}
DB_VERSION.. => unreachable!(),
}
Expand Down

0 comments on commit e5a4638

Please sign in to comment.