diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index c9bf8a31630..85d43a95a37 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -2699,17 +2699,12 @@ impl Chain { ) } - pub fn get_state_response_header( + /// Computes ShardStateSyncResponseHeader. + pub fn compute_state_response_header( &self, shard_id: ShardId, sync_hash: CryptoHash, ) -> Result { - // Check cache - let key = StateHeaderKey(shard_id, sync_hash).try_to_vec()?; - if let Ok(Some(header)) = self.store.store().get_ser(DBCol::StateHeaders, &key) { - return Ok(header); - } - // Consistency rules: // 1. Everything prefixed with `sync_` indicates new epoch, for which we are syncing. // 1a. `sync_prev` means the last of the prev epoch. @@ -2875,6 +2870,24 @@ impl Chain { }) } }; + Ok(shard_state_header) + } + + /// Returns ShardStateSyncResponseHeader for the given epoch and shard. + /// If the header is already available in the DB, returns the cached version and doesn't recompute it. + /// If the header was computed then it also gets cached in the DB. + pub fn get_state_response_header( + &self, + shard_id: ShardId, + sync_hash: CryptoHash, + ) -> Result { + // Check cache + let key = StateHeaderKey(shard_id, sync_hash).try_to_vec()?; + if let Ok(Some(header)) = self.store.store().get_ser(DBCol::StateHeaders, &key) { + return Ok(header); + } + + let shard_state_header = self.compute_state_response_header(shard_id, sync_hash)?; // Saving the header data let mut store_update = self.store.store().store_update(); diff --git a/core/primitives/src/syncing.rs b/core/primitives/src/syncing.rs index 4dc68f3b7a6..a1450fceb8d 100644 --- a/core/primitives/src/syncing.rs +++ b/core/primitives/src/syncing.rs @@ -249,10 +249,7 @@ pub enum StateSyncDumpProgress { /// Block hash of the first block of the epoch. /// The dumped state corresponds to the state before applying this block. sync_hash: CryptoHash, - /// Root of the state being dumped. - state_root: StateRoot, /// Progress made. parts_dumped: u64, - num_parts: u64, }, } diff --git a/nearcore/src/metrics.rs b/nearcore/src/metrics.rs index 4fd4fa910c2..60363424afd 100644 --- a/nearcore/src/metrics.rs +++ b/nearcore/src/metrics.rs @@ -50,15 +50,6 @@ pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy = Lazy: ) .unwrap() }); -pub(crate) static STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED: Lazy = Lazy::new(|| { - try_create_histogram_vec( - "near_state_sync_dump_obtain_part_elapsed_sec", - "Time needed to obtain a part", - &["shard_id"], - Some(exponential_buckets(0.001, 1.6, 25).unwrap()), - ) - .unwrap() -}); pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_TOTAL: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_dump_num_parts_total", @@ -91,3 +82,21 @@ pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy = Lazy::new(|| ) .unwrap() }); +pub static STATE_SYNC_APPLY_PART_DELAY: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_apply_part_delay_sec", + "Time needed to apply a state part", + &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() +}); +pub static STATE_SYNC_OBTAIN_PART_DELAY: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_obtain_part_delay_sec", + "Time needed to obtain a part", + &["shard_id"], + Some(exponential_buckets(0.001, 2.0, 20).unwrap()), + ) + .unwrap() +}); diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index aeff68e8221..26f603c56db 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -1246,6 +1246,10 @@ impl RuntimeAdapter for NightshadeRuntime { %block_hash, num_parts = part_id.total) .entered(); + let _timer = metrics::STATE_SYNC_OBTAIN_PART_DELAY + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + let epoch_id = self.get_epoch_id(block_hash)?; let shard_uid = self.get_shard_uid_from_epoch_id(shard_id, &epoch_id)?; let trie = self.tries.get_view_trie_for_shard(shard_uid, *state_root); @@ -1270,11 +1274,27 @@ impl RuntimeAdapter for NightshadeRuntime { match Trie::validate_trie_nodes_for_part(state_root, part_id, trie_nodes) { Ok(_) => true, // Storage error should not happen - Err(_) => false, + Err(err) => { + tracing::error!( + target: "state-parts", + ?state_root, + ?part_id, + ?err, + "State part storage error"); + false + } } } // Deserialization error means we've got the data from malicious peer - Err(_) => false, + Err(err) => { + tracing::error!( + target: "state-parts", + ?state_root, + ?part_id, + ?err, + "State part deserialization error"); + false + } } } @@ -1371,6 +1391,10 @@ impl RuntimeAdapter for NightshadeRuntime { data: &[u8], epoch_id: &EpochId, ) -> Result<(), Error> { + let _timer = metrics::STATE_SYNC_APPLY_PART_DELAY + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + let part = BorshDeserialize::try_from_slice(data) .expect("Part was already validated earlier, so could never fail here"); let ApplyStatePartResult { trie_changes, flat_state_delta, contract_codes } = diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 458bff4e27e..0df51e42802 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -161,63 +161,75 @@ async fn state_sync_dump( epoch_id, epoch_height, sync_hash, - state_root, parts_dumped, - num_parts, })) => { - // The actual dumping of state to S3. - tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, %state_root, parts_dumped, num_parts, "Creating parts and dumping them"); - let mut res = None; - for part_id in parts_dumped..num_parts { - // Dump parts sequentially synchronously. - // TODO: How to make it possible to dump state more effectively using multiple nodes? - let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); + let state_header = chain.get_state_response_header(shard_id, sync_hash); + match state_header { + Ok(state_header) => { + let state_root = state_header.chunk_prev_state_root(); + let num_parts = + get_num_state_parts(state_header.state_root_node().memory_usage); - let state_part = match get_state_part( - &runtime, - &shard_id, - &sync_hash, - &state_root, - part_id, - num_parts, - &chain, - ) { - Ok(state_part) => state_part, - Err(err) => { - res = Some(err); - break; + let mut res = None; + // The actual dumping of state to S3. + tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, parts_dumped, "Creating parts and dumping them"); + for part_id in parts_dumped..num_parts { + // Dump parts sequentially synchronously. + // TODO: How to make it possible to dump state more effectively using multiple nodes? + let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + + let state_part = match obtain_and_store_state_part( + &runtime, + &shard_id, + &sync_hash, + &state_root, + part_id, + num_parts, + &chain, + ) { + Ok(state_part) => state_part, + Err(err) => { + res = Some(err); + break; + } + }; + let location = s3_location( + &config.chain_id, + epoch_height, + shard_id, + part_id, + num_parts, + ); + if let Err(err) = + put_state_part(&location, &state_part, &shard_id, &bucket).await + { + res = Some(err); + break; + } + update_progress( + &shard_id, + &epoch_id, + epoch_height, + &sync_hash, + part_id, + num_parts, + state_part.len(), + &chain, + ); + } + if let Some(err) = res { + Err(err) + } else { + Ok(Some(StateSyncDumpProgress::AllDumped { + epoch_id, + epoch_height, + num_parts: Some(num_parts), + })) } - }; - let location = - s3_location(&config.chain_id, epoch_height, shard_id, part_id, num_parts); - if let Err(err) = - put_state_part(&location, &state_part, &shard_id, &bucket).await - { - res = Some(err); - break; } - update_progress( - &shard_id, - &epoch_id, - epoch_height, - &sync_hash, - &state_root, - part_id, - num_parts, - state_part.len(), - &chain, - ); - } - if let Some(err) = res { - Err(err) - } else { - Ok(Some(StateSyncDumpProgress::AllDumped { - epoch_id, - epoch_height, - num_parts: Some(num_parts), - })) + Err(err) => Err(err), } } }; @@ -268,7 +280,6 @@ fn update_progress( epoch_id: &EpochId, epoch_height: EpochHeight, sync_hash: &CryptoHash, - state_root: &StateRoot, part_id: u64, num_parts: u64, part_len: usize, @@ -282,9 +293,7 @@ fn update_progress( epoch_id: epoch_id.clone(), epoch_height, sync_hash: *sync_hash, - state_root: *state_root, parts_dumped: part_id + 1, - num_parts, }; match chain.store().set_state_sync_dump_progress(*shard_id, Some(next_progress.clone())) { Ok(_) => { @@ -328,7 +337,8 @@ fn set_metrics( } } -fn get_state_part( +/// Obtains and then saves the part data. +fn obtain_and_store_state_part( runtime: &Arc, shard_id: &ShardId, sync_hash: &CryptoHash, @@ -337,19 +347,13 @@ fn get_state_part( num_parts: u64, chain: &Chain, ) -> Result, Error> { - let state_part = { - let _timer = metrics::STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - runtime.obtain_state_part( - *shard_id, - &sync_hash, - &state_root, - PartId::new(part_id, num_parts), - )? - }; + let state_part = runtime.obtain_state_part( + *shard_id, + &sync_hash, + &state_root, + PartId::new(part_id, num_parts), + )?; - // Save the part data. let key = StatePartKey(*sync_hash, *shard_id, part_id).try_to_vec()?; let mut store_update = chain.store().store().store_update(); store_update.set(DBCol::StateParts, &key, &state_part); @@ -367,14 +371,13 @@ fn start_dumping( ) -> Result, Error> { let epoch_info = runtime.get_epoch_info(&epoch_id)?; let epoch_height = epoch_info.epoch_height(); - let num_shards = runtime.num_shards(&epoch_id)?; - let sync_hash_block = chain.get_block(&sync_hash)?; - if runtime.cares_about_shard(None, sync_hash_block.header().prev_hash(), shard_id, false) { - assert_eq!(num_shards, sync_hash_block.chunks().len() as u64); - let state_root = sync_hash_block.chunks()[shard_id as usize].prev_state_root(); - let state_root_node = runtime.get_state_root_node(shard_id, &sync_hash, &state_root)?; - let num_parts = get_num_state_parts(state_root_node.memory_usage); - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch"); + let sync_prev_header = chain.get_block_header(&sync_hash)?; + let sync_prev_hash = sync_prev_header.hash(); + + let state_header = chain.get_state_response_header(shard_id, sync_hash)?; + let num_parts = get_num_state_parts(state_header.state_root_node().memory_usage); + if runtime.cares_about_shard(None, sync_prev_hash, shard_id, false) { + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Initialize dumping state of Epoch"); // Note that first the state of the state machines gets changes to // `InProgress` and it starts dumping state after a short interval. set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height)); @@ -382,12 +385,10 @@ fn start_dumping( epoch_id, epoch_height, sync_hash, - state_root, parts_dumped: 0, - num_parts, })) } else { - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch"); + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Shard is not tracked, skip the epoch"); Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) })) } } diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index 27e117c1332..6bd4afef858 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -1,7 +1,6 @@ use crate::epoch_info::iterate_and_filter; -use near_chain::types::RuntimeAdapter; -use near_chain::{ChainStore, ChainStoreAccess}; -use near_epoch_manager::EpochManager; +use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode}; +use near_client::sync::state::StateSync; use near_primitives::epoch_manager::epoch_info::EpochInfo; use near_primitives::state_part::PartId; use near_primitives::syncing::get_num_state_parts; @@ -15,7 +14,6 @@ use std::fs::DirEntry; use std::ops::Range; use std::path::{Path, PathBuf}; use std::str::FromStr; -use std::sync::Arc; use std::time::Instant; #[derive(clap::Subcommand, Debug, Clone)] @@ -49,6 +47,12 @@ pub(crate) enum StatePartsSubCommand { #[clap(subcommand)] epoch_selection: EpochSelection, }, + /// Read State Header from the DB + ReadStateHeader { + /// Select an epoch to work on. + #[clap(subcommand)] + epoch_selection: EpochSelection, + }, } impl StatePartsSubCommand { @@ -62,6 +66,16 @@ impl StatePartsSubCommand { near_config: NearConfig, store: Store, ) { + let runtime = NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); + let chain_genesis = ChainGenesis::new(&near_config.genesis); + let mut chain = Chain::new_for_view_client( + runtime, + &chain_genesis, + DoomslugThresholdMode::TwoThirds, + false, + ) + .unwrap(); + let chain_id = &near_config.genesis.config.chain_id; match self { StatePartsSubCommand::Apply { dry_run, state_root, part_id, epoch_selection } => { apply_state_parts( @@ -70,8 +84,8 @@ impl StatePartsSubCommand { part_id, dry_run, state_root, - home_dir, - near_config, + &mut chain, + chain_id, store, Location::new(root_dir, (s3_bucket, s3_region)), ); @@ -82,12 +96,15 @@ impl StatePartsSubCommand { shard_id, part_from, part_to, - home_dir, - near_config, + &chain, + chain_id, store, Location::new(root_dir, (s3_bucket, s3_region)), ); } + StatePartsSubCommand::ReadStateHeader { epoch_selection } => { + read_state_header(epoch_selection, shard_id, &chain, store) + } } } } @@ -107,15 +124,10 @@ pub(crate) enum EpochSelection { } impl EpochSelection { - fn to_epoch_id( - &self, - store: Store, - chain_store: &ChainStore, - epoch_manager: &EpochManager, - ) -> EpochId { + fn to_epoch_id(&self, store: Store, chain: &Chain) -> EpochId { match self { EpochSelection::Current => { - epoch_manager.get_epoch_id(&chain_store.head().unwrap().last_block_hash).unwrap() + chain.runtime_adapter.get_epoch_id(&chain.head().unwrap().last_block_hash).unwrap() } EpochSelection::EpochId { epoch_id } => { EpochId(CryptoHash::from_str(&epoch_id).unwrap()) @@ -132,12 +144,12 @@ impl EpochSelection { } EpochSelection::BlockHash { block_hash } => { let block_hash = CryptoHash::from_str(&block_hash).unwrap(); - epoch_manager.get_epoch_id(&block_hash).unwrap() + chain.runtime_adapter.get_epoch_id(&block_hash).unwrap() } EpochSelection::BlockHeight { block_height } => { // Fetch an epoch containing the given block height. - let block_hash = chain_store.get_block_hash_by_height(*block_height).unwrap(); - epoch_manager.get_epoch_id(&block_hash).unwrap() + let block_hash = chain.store().get_block_hash_by_height(*block_height).unwrap(); + chain.runtime_adapter.get_epoch_id(&block_hash).unwrap() } } } @@ -172,21 +184,18 @@ impl Location { } } -/// Returns block hash of the last block of an epoch preceding the given `epoch_info`. -fn get_prev_hash_of_epoch( - epoch_info: &EpochInfo, - chain_store: &ChainStore, - epoch_manager: &EpochManager, -) -> CryptoHash { - let head = chain_store.head().unwrap(); - let mut cur_block_info = epoch_manager.get_block_info(&head.last_block_hash).unwrap(); +/// Returns block hash of some block of the given `epoch_info` epoch. +fn get_any_block_hash_of_epoch(epoch_info: &EpochInfo, chain: &Chain) -> CryptoHash { + let head = chain.store().head().unwrap(); + let mut cur_block_info = chain.runtime_adapter.get_block_info(&head.last_block_hash).unwrap(); // EpochManager doesn't have an API that maps EpochId to Blocks, and this function works // around that limitation by iterating over the epochs. // This workaround is acceptable because: // 1) Extending EpochManager's API is a major change. // 2) This use case is not critical at all. loop { - let cur_epoch_info = epoch_manager.get_epoch_info(cur_block_info.epoch_id()).unwrap(); + let cur_epoch_info = + chain.runtime_adapter.get_epoch_info(cur_block_info.epoch_id()).unwrap(); let cur_epoch_height = cur_epoch_info.epoch_height(); assert!( cur_epoch_height >= epoch_info.epoch_height(), @@ -195,12 +204,12 @@ fn get_prev_hash_of_epoch( epoch_info.epoch_height() ); let epoch_first_block_info = - epoch_manager.get_block_info(cur_block_info.epoch_first_block()).unwrap(); + chain.runtime_adapter.get_block_info(cur_block_info.epoch_first_block()).unwrap(); let prev_epoch_last_block_info = - epoch_manager.get_block_info(epoch_first_block_info.prev_hash()).unwrap(); + chain.runtime_adapter.get_block_info(epoch_first_block_info.prev_hash()).unwrap(); if cur_epoch_height == epoch_info.epoch_height() { - return *prev_epoch_last_block_info.hash(); + return *cur_block_info.hash(); } cur_block_info = prev_epoch_last_block_info; @@ -213,59 +222,40 @@ fn apply_state_parts( part_id: Option, dry_run: bool, maybe_state_root: Option, - home_dir: &Path, - near_config: NearConfig, + chain: &mut Chain, + chain_id: &str, store: Store, location: Location, ) { - let runtime_adapter: Arc = - NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); - let epoch_manager = - EpochManager::new_from_genesis_config(store.clone(), &near_config.genesis.config) - .expect("Failed to start Epoch Manager"); - let chain_store = ChainStore::new( - store.clone(), - near_config.genesis.config.genesis_height, - near_config.client_config.save_trie_changes, - ); + let (state_root, epoch_height, epoch_id, sync_hash) = + if let (Some(state_root), EpochSelection::EpochHeight { epoch_height }) = + (maybe_state_root, &epoch_selection) + { + (state_root, *epoch_height, None, None) + } else { + let epoch_id = epoch_selection.to_epoch_id(store, &chain); + let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); - let epoch_id = epoch_selection.to_epoch_id(store, &chain_store, &epoch_manager); - let epoch = epoch_manager.get_epoch_info(&epoch_id).unwrap(); + let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); + let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); - let (state_root, sync_prev_hash) = if let Some(state_root) = maybe_state_root { - (state_root, None) - } else { - let sync_prev_hash = get_prev_hash_of_epoch(&epoch, &chain_store, &epoch_manager); - let sync_prev_block = chain_store.get_block(&sync_prev_hash).unwrap(); + let state_header = chain.get_state_response_header(shard_id, sync_hash).unwrap(); + let state_root = state_header.chunk_prev_state_root(); - assert!(epoch_manager.is_next_block_epoch_start(&sync_prev_hash).unwrap()); - assert!( - shard_id < sync_prev_block.chunks().len() as u64, - "shard_id: {}, #shards: {}", - shard_id, - sync_prev_block.chunks().len() - ); - let state_root = sync_prev_block.chunks()[shard_id as usize].prev_state_root(); - (state_root, Some(sync_prev_hash)) - }; - - let part_storage = get_state_part_reader( - location, - &near_config.client_config.chain_id, - epoch.epoch_height(), - shard_id, - ); + (state_root, epoch.epoch_height(), Some(epoch_id), Some(sync_hash)) + }; + + let part_storage = get_state_part_reader(location, &chain_id, epoch_height, shard_id); let num_parts = part_storage.num_parts(); assert_ne!(num_parts, 0, "Too few num_parts: {}", num_parts); let part_ids = get_part_ids(part_id, part_id.map(|x| x + 1), num_parts); tracing::info!( target: "state-parts", - epoch_height = epoch.epoch_height(), - epoch_id = ?epoch_id.0, + epoch_height, shard_id, num_parts, - ?sync_prev_hash, + ?sync_hash, ?part_ids, "Applying state as seen at the beginning of the specified epoch.", ); @@ -277,20 +267,29 @@ fn apply_state_parts( let part = part_storage.read(part_id, num_parts); if dry_run { - assert!(runtime_adapter.validate_state_part( + assert!(chain.runtime_adapter.validate_state_part( &state_root, PartId::new(part_id, num_parts), &part )); tracing::info!(target: "state-parts", part_id, part_length = part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Validated a state part"); } else { - runtime_adapter + chain + .set_state_part( + shard_id, + sync_hash.unwrap(), + PartId::new(part_id, num_parts), + &part, + ) + .unwrap(); + chain + .runtime_adapter .apply_state_part( shard_id, &state_root, PartId::new(part_id, num_parts), &part, - &epoch_id, + epoch_id.as_ref().unwrap(), ) .unwrap(); tracing::info!(target: "state-parts", part_id, part_length = part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Applied a state part"); @@ -304,39 +303,19 @@ fn dump_state_parts( shard_id: ShardId, part_from: Option, part_to: Option, - home_dir: &Path, - near_config: NearConfig, + chain: &Chain, + chain_id: &str, store: Store, location: Location, ) { - let runtime_adapter: Arc = - NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); - let epoch_manager = - EpochManager::new_from_genesis_config(store.clone(), &near_config.genesis.config) - .expect("Failed to start Epoch Manager"); - let chain_store = ChainStore::new( - store.clone(), - near_config.genesis.config.genesis_height, - near_config.client_config.save_trie_changes, - ); - - let epoch_id = epoch_selection.to_epoch_id(store, &chain_store, &epoch_manager); - let epoch = epoch_manager.get_epoch_info(&epoch_id).unwrap(); - let sync_prev_hash = get_prev_hash_of_epoch(&epoch, &chain_store, &epoch_manager); - let sync_prev_block = chain_store.get_block(&sync_prev_hash).unwrap(); - - assert!(epoch_manager.is_next_block_epoch_start(&sync_prev_hash).unwrap()); - assert!( - shard_id < sync_prev_block.chunks().len() as u64, - "shard_id: {}, #shards: {}", - shard_id, - sync_prev_block.chunks().len() - ); - let state_root = sync_prev_block.chunks()[shard_id as usize].prev_state_root(); - let state_root_node = - runtime_adapter.get_state_root_node(shard_id, &sync_prev_hash, &state_root).unwrap(); - - let num_parts = get_num_state_parts(state_root_node.memory_usage); + let epoch_id = epoch_selection.to_epoch_id(store, &chain); + let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); + let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); + let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); + + let state_header = chain.compute_state_response_header(shard_id, sync_hash).unwrap(); + let state_root = state_header.chunk_prev_state_root(); + let num_parts = get_num_state_parts(state_header.state_root_node().memory_usage); let part_ids = get_part_ids(part_from, part_to, num_parts); tracing::info!( @@ -345,29 +324,21 @@ fn dump_state_parts( epoch_id = ?epoch_id.0, shard_id, num_parts, - ?sync_prev_hash, + ?sync_hash, ?part_ids, + ?state_root, "Dumping state as seen at the beginning of the specified epoch.", ); - let part_storage = get_state_part_writer( - location, - &near_config.client_config.chain_id, - epoch.epoch_height(), - shard_id, - ); + let part_storage = get_state_part_writer(location, chain_id, epoch.epoch_height(), shard_id); let timer = Instant::now(); for part_id in part_ids { let timer = Instant::now(); assert!(part_id < num_parts, "part_id: {}, num_parts: {}", part_id, num_parts); - let state_part = runtime_adapter - .obtain_state_part( - shard_id, - &sync_prev_hash, - &state_root, - PartId::new(part_id, num_parts), - ) + let state_part = chain + .runtime_adapter + .obtain_state_part(shard_id, &sync_hash, &state_root, PartId::new(part_id, num_parts)) .unwrap(); part_storage.write(&state_part, part_id, num_parts); tracing::info!(target: "state-parts", part_id, part_length = state_part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Wrote a state part"); @@ -375,10 +346,28 @@ fn dump_state_parts( tracing::info!(target: "state-parts", total_elapsed_sec = timer.elapsed().as_secs_f64(), "Wrote all requested state parts"); } +/// Reads `StateHeader` stored in the DB. +fn read_state_header( + epoch_selection: EpochSelection, + shard_id: ShardId, + chain: &Chain, + store: Store, +) { + let epoch_id = epoch_selection.to_epoch_id(store, &chain); + let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); + + let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); + let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); + + let state_header = chain.store().get_state_header(shard_id, sync_hash); + tracing::info!(target: "state-parts", ?epoch_id, ?sync_hash, ?state_header); +} + fn get_part_ids(part_from: Option, part_to: Option, num_parts: u64) -> Range { part_from.unwrap_or(0)..part_to.unwrap_or(num_parts) } +// Needs to be in sync with `fn s3_location()`. fn location_prefix(chain_id: &str, epoch_height: u64, shard_id: u64) -> String { format!("chain_id={}/epoch_height={}/shard_id={}", chain_id, epoch_height, shard_id) } @@ -466,6 +455,7 @@ impl FileSystemStorage { tracing::info!(target: "state-parts", ?root_dir, ?prefix, ?state_parts_dir, "Ensuring the directory exists"); std::fs::create_dir_all(&state_parts_dir).unwrap(); } + tracing::info!(target: "state-parts", ?state_parts_dir, "Initialized FileSystemStorage"); Self { state_parts_dir } } @@ -485,21 +475,38 @@ impl StatePartWriter for FileSystemStorage { impl StatePartReader for FileSystemStorage { fn read(&self, part_id: u64, num_parts: u64) -> Vec { let filename = self.get_location(part_id, num_parts); + tracing::debug!(target: "state-parts", part_id, num_parts, ?filename, "Reading state part file"); let part = std::fs::read(filename).unwrap(); part } fn num_parts(&self) -> u64 { let paths = std::fs::read_dir(&self.state_parts_dir).unwrap(); - let num_parts = paths + let mut known_num_parts = None; + let num_files = paths .filter(|path| { let full_path = path.as_ref().unwrap(); tracing::debug!(target: "state-parts", ?full_path); - is_part_filename(full_path.file_name().to_str().unwrap()) + let filename = full_path.file_name().to_str().unwrap().to_string(); + if let Some(num_parts) = get_num_parts_from_filename(&filename) { + if let Some(known_num_parts) = known_num_parts { + assert_eq!(known_num_parts, num_parts); + } + known_num_parts = Some(num_parts); + } + is_part_filename(&filename) }) .collect::>>() .len(); - num_parts as u64 + if known_num_parts != Some(num_files as u64) { + // This is expected when a user saves time and downloads a few parts instead of all parts. + tracing::warn!(target: "state-parts", + dir = ?self.state_parts_dir, + ?known_num_parts, + num_files, + "Filename indicates that number of files expected doesn't match the number of files available"); + } + known_num_parts.unwrap() } }