From 3ae1be373d13825f15736dea290e0f76938b299a Mon Sep 17 00:00:00 2001 From: Nikolay Kurtov Date: Fri, 24 Mar 2023 12:39:16 +0100 Subject: [PATCH] Move some code to #8794 --- nearcore/src/metrics.rs | 27 +-- nearcore/src/runtime/mod.rs | 18 +- nearcore/src/state_sync.rs | 56 +++--- tools/state-viewer/src/state_parts.rs | 239 +++++++++++--------------- 4 files changed, 146 insertions(+), 194 deletions(-) diff --git a/nearcore/src/metrics.rs b/nearcore/src/metrics.rs index 19605ceeca1..4fd4fa910c2 100644 --- a/nearcore/src/metrics.rs +++ b/nearcore/src/metrics.rs @@ -50,6 +50,15 @@ pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy = Lazy: ) .unwrap() }); +pub(crate) static STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_dump_obtain_part_elapsed_sec", + "Time needed to obtain a part", + &["shard_id"], + Some(exponential_buckets(0.001, 1.6, 25).unwrap()), + ) + .unwrap() +}); pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_TOTAL: Lazy = Lazy::new(|| { try_create_int_gauge_vec( "near_state_sync_dump_num_parts_total", @@ -82,21 +91,3 @@ pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy = Lazy::new(|| ) .unwrap() }); -pub static STATE_SYNC_APPLY_PART_DELAY: Lazy = Lazy::new(|| { - try_create_histogram_vec( - "near_state_sync_apply_part_delay_sec", - "Latency of applying a state part", - &["shard_id"], - Some(exponential_buckets(0.001, 2.0, 20).unwrap()), - ) - .unwrap() -}); -pub static STATE_SYNC_OBTAIN_PART_DELAY: Lazy = Lazy::new(|| { - try_create_histogram_vec( - "near_state_sync_obtain_part_delay_sec", - "Latency of applying a state part", - &["shard_id"], - Some(exponential_buckets(0.001, 2.0, 20).unwrap()), - ) - .unwrap() -}); diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index 7c55c565d48..aeff68e8221 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -1246,10 +1246,6 @@ impl RuntimeAdapter for NightshadeRuntime { %block_hash, num_parts = part_id.total) .entered(); - let _timer = metrics::STATE_SYNC_OBTAIN_PART_DELAY - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - let epoch_id = self.get_epoch_id(block_hash)?; let shard_uid = self.get_shard_uid_from_epoch_id(shard_id, &epoch_id)?; let trie = self.tries.get_view_trie_for_shard(shard_uid, *state_root); @@ -1274,17 +1270,11 @@ impl RuntimeAdapter for NightshadeRuntime { match Trie::validate_trie_nodes_for_part(state_root, part_id, trie_nodes) { Ok(_) => true, // Storage error should not happen - Err(err) => { - tracing::error!(target: "state-parts", ?err, "State part storage error"); - false - } + Err(_) => false, } } // Deserialization error means we've got the data from malicious peer - Err(err) => { - tracing::error!(target: "state-parts", ?err, "State part deserialization error"); - false - } + Err(_) => false, } } @@ -1381,10 +1371,6 @@ impl RuntimeAdapter for NightshadeRuntime { data: &[u8], epoch_id: &EpochId, ) -> Result<(), Error> { - let _timer = metrics::STATE_SYNC_APPLY_PART_DELAY - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - let part = BorshDeserialize::try_from_slice(data) .expect("Part was already validated earlier, so could never fail here"); let ApplyStatePartResult { trie_changes, flat_state_delta, contract_codes } = diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 465fb987841..458bff4e27e 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -3,7 +3,7 @@ use borsh::BorshSerialize; use near_chain::types::RuntimeAdapter; use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Error}; use near_chain_configs::ClientConfig; -use near_client::sync::state::{s3_location, StateSync}; +use near_client::sync::state::StateSync; use near_crypto::PublicKey; use near_epoch_manager::EpochManagerAdapter; use near_primitives::hash::CryptoHash; @@ -175,7 +175,7 @@ async fn state_sync_dump( .with_label_values(&[&shard_id.to_string()]) .start_timer(); - let state_part = match obtain_and_store_state_part( + let state_part = match get_state_part( &runtime, &shard_id, &sync_hash, @@ -328,8 +328,7 @@ fn set_metrics( } } -/// Obtains and then saves the part data. -fn obtain_and_store_state_part( +fn get_state_part( runtime: &Arc, shard_id: &ShardId, sync_hash: &CryptoHash, @@ -338,13 +337,19 @@ fn obtain_and_store_state_part( num_parts: u64, chain: &Chain, ) -> Result, Error> { - let state_part = runtime.obtain_state_part( - *shard_id, - &sync_hash, - &state_root, - PartId::new(part_id, num_parts), - )?; + let state_part = { + let _timer = metrics::STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + runtime.obtain_state_part( + *shard_id, + &sync_hash, + &state_root, + PartId::new(part_id, num_parts), + )? + }; + // Save the part data. let key = StatePartKey(*sync_hash, *shard_id, part_id).try_to_vec()?; let mut store_update = chain.store().store().store_update(); store_update.set(DBCol::StateParts, &key, &state_part); @@ -363,17 +368,13 @@ fn start_dumping( let epoch_info = runtime.get_epoch_info(&epoch_id)?; let epoch_height = epoch_info.epoch_height(); let num_shards = runtime.num_shards(&epoch_id)?; - let sync_prev_header = chain.get_block_header(&sync_hash)?; - let sync_prev_hash = sync_prev_header.prev_hash(); - let prev_sync_block = chain.get_block(&sync_prev_hash)?; - if runtime.cares_about_shard(None, prev_sync_block.header().prev_hash(), shard_id, false) { - assert_eq!(num_shards, prev_sync_block.chunks().len() as u64); - let state_root = prev_sync_block.chunks()[shard_id as usize].prev_state_root(); - // See `get_state_response_header()` for reference. - let state_root_node = - runtime.get_state_root_node(shard_id, &sync_prev_hash, &state_root)?; + let sync_hash_block = chain.get_block(&sync_hash)?; + if runtime.cares_about_shard(None, sync_hash_block.header().prev_hash(), shard_id, false) { + assert_eq!(num_shards, sync_hash_block.chunks().len() as u64); + let state_root = sync_hash_block.chunks()[shard_id as usize].prev_state_root(); + let state_root_node = runtime.get_state_root_node(shard_id, &sync_hash, &state_root)?; let num_parts = get_num_state_parts(state_root_node.memory_usage); - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch"); + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch"); // Note that first the state of the state machines gets changes to // `InProgress` and it starts dumping state after a short interval. set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height)); @@ -386,7 +387,7 @@ fn start_dumping( num_parts, })) } else { - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_prev_hash, %sync_hash, "Shard is not tracked, skip the epoch"); + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch"); Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) })) } } @@ -421,3 +422,16 @@ fn check_new_epoch( } } } + +fn s3_location( + chain_id: &str, + epoch_height: u64, + shard_id: u64, + part_id: u64, + num_parts: u64, +) -> String { + format!( + "chain_id={}/epoch_height={}/shard_id={}/state_part_{:06}_of_{:06}", + chain_id, epoch_height, shard_id, part_id, num_parts + ) +} diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index c44943dede1..27e117c1332 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -1,6 +1,7 @@ use crate::epoch_info::iterate_and_filter; -use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode}; -use near_client::sync::state::StateSync; +use near_chain::types::RuntimeAdapter; +use near_chain::{ChainStore, ChainStoreAccess}; +use near_epoch_manager::EpochManager; use near_primitives::epoch_manager::epoch_info::EpochInfo; use near_primitives::state_part::PartId; use near_primitives::syncing::get_num_state_parts; @@ -14,6 +15,7 @@ use std::fs::DirEntry; use std::ops::Range; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::Arc; use std::time::Instant; #[derive(clap::Subcommand, Debug, Clone)] @@ -47,12 +49,6 @@ pub(crate) enum StatePartsSubCommand { #[clap(subcommand)] epoch_selection: EpochSelection, }, - /// Read State Header from the DB - ReadStateHeader { - /// Select an epoch to work on. - #[clap(subcommand)] - epoch_selection: EpochSelection, - }, } impl StatePartsSubCommand { @@ -66,16 +62,6 @@ impl StatePartsSubCommand { near_config: NearConfig, store: Store, ) { - let runtime = NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); - let chain_genesis = ChainGenesis::new(&near_config.genesis); - let mut chain = Chain::new_for_view_client( - runtime.clone(), - &chain_genesis, - DoomslugThresholdMode::TwoThirds, - false, - ) - .unwrap(); - let chain_id = &near_config.genesis.config.chain_id; match self { StatePartsSubCommand::Apply { dry_run, state_root, part_id, epoch_selection } => { apply_state_parts( @@ -84,8 +70,8 @@ impl StatePartsSubCommand { part_id, dry_run, state_root, - &mut chain, - chain_id, + home_dir, + near_config, store, Location::new(root_dir, (s3_bucket, s3_region)), ); @@ -96,15 +82,12 @@ impl StatePartsSubCommand { shard_id, part_from, part_to, - &chain, - chain_id, + home_dir, + near_config, store, Location::new(root_dir, (s3_bucket, s3_region)), ); } - StatePartsSubCommand::ReadStateHeader { epoch_selection } => { - read_state_header(epoch_selection, shard_id, &chain, store) - } } } } @@ -124,10 +107,15 @@ pub(crate) enum EpochSelection { } impl EpochSelection { - fn to_epoch_id(&self, store: Store, chain: &Chain) -> EpochId { + fn to_epoch_id( + &self, + store: Store, + chain_store: &ChainStore, + epoch_manager: &EpochManager, + ) -> EpochId { match self { EpochSelection::Current => { - chain.runtime_adapter.get_epoch_id(&chain.head().unwrap().last_block_hash).unwrap() + epoch_manager.get_epoch_id(&chain_store.head().unwrap().last_block_hash).unwrap() } EpochSelection::EpochId { epoch_id } => { EpochId(CryptoHash::from_str(&epoch_id).unwrap()) @@ -144,12 +132,12 @@ impl EpochSelection { } EpochSelection::BlockHash { block_hash } => { let block_hash = CryptoHash::from_str(&block_hash).unwrap(); - chain.runtime_adapter.get_epoch_id(&block_hash).unwrap() + epoch_manager.get_epoch_id(&block_hash).unwrap() } EpochSelection::BlockHeight { block_height } => { // Fetch an epoch containing the given block height. - let block_hash = chain.store().get_block_hash_by_height(*block_height).unwrap(); - chain.runtime_adapter.get_epoch_id(&block_hash).unwrap() + let block_hash = chain_store.get_block_hash_by_height(*block_height).unwrap(); + epoch_manager.get_epoch_id(&block_hash).unwrap() } } } @@ -184,18 +172,21 @@ impl Location { } } -/// Returns block hash of some block of the given `epoch_info` epoch. -fn get_any_block_hash_of_epoch(epoch_info: &EpochInfo, chain: &Chain) -> CryptoHash { - let head = chain.store().head().unwrap(); - let mut cur_block_info = chain.runtime_adapter.get_block_info(&head.last_block_hash).unwrap(); +/// Returns block hash of the last block of an epoch preceding the given `epoch_info`. +fn get_prev_hash_of_epoch( + epoch_info: &EpochInfo, + chain_store: &ChainStore, + epoch_manager: &EpochManager, +) -> CryptoHash { + let head = chain_store.head().unwrap(); + let mut cur_block_info = epoch_manager.get_block_info(&head.last_block_hash).unwrap(); // EpochManager doesn't have an API that maps EpochId to Blocks, and this function works // around that limitation by iterating over the epochs. // This workaround is acceptable because: // 1) Extending EpochManager's API is a major change. // 2) This use case is not critical at all. loop { - let cur_epoch_info = - chain.runtime_adapter.get_epoch_info(cur_block_info.epoch_id()).unwrap(); + let cur_epoch_info = epoch_manager.get_epoch_info(cur_block_info.epoch_id()).unwrap(); let cur_epoch_height = cur_epoch_info.epoch_height(); assert!( cur_epoch_height >= epoch_info.epoch_height(), @@ -204,12 +195,12 @@ fn get_any_block_hash_of_epoch(epoch_info: &EpochInfo, chain: &Chain) -> CryptoH epoch_info.epoch_height() ); let epoch_first_block_info = - chain.runtime_adapter.get_block_info(cur_block_info.epoch_first_block()).unwrap(); + epoch_manager.get_block_info(cur_block_info.epoch_first_block()).unwrap(); let prev_epoch_last_block_info = - chain.runtime_adapter.get_block_info(epoch_first_block_info.prev_hash()).unwrap(); + epoch_manager.get_block_info(epoch_first_block_info.prev_hash()).unwrap(); if cur_epoch_height == epoch_info.epoch_height() { - return *cur_block_info.hash(); + return *prev_epoch_last_block_info.hash(); } cur_block_info = prev_epoch_last_block_info; @@ -222,36 +213,32 @@ fn apply_state_parts( part_id: Option, dry_run: bool, maybe_state_root: Option, - chain: &mut Chain, - chain_id: &str, + home_dir: &Path, + near_config: NearConfig, store: Store, location: Location, ) { - let (state_root, epoch_height, epoch_id, sync_hash, sync_prev_hash) = if let ( - Some(state_root), - EpochSelection::EpochHeight { epoch_height }, - ) = - (maybe_state_root, &epoch_selection) - { - (state_root, *epoch_height, None, None, None) + let runtime_adapter: Arc = + NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); + let epoch_manager = + EpochManager::new_from_genesis_config(store.clone(), &near_config.genesis.config) + .expect("Failed to start Epoch Manager"); + let chain_store = ChainStore::new( + store.clone(), + near_config.genesis.config.genesis_height, + near_config.client_config.save_trie_changes, + ); + + let epoch_id = epoch_selection.to_epoch_id(store, &chain_store, &epoch_manager); + let epoch = epoch_manager.get_epoch_info(&epoch_id).unwrap(); + + let (state_root, sync_prev_hash) = if let Some(state_root) = maybe_state_root { + (state_root, None) } else { - let epoch_id = epoch_selection.to_epoch_id(store, &chain); - let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); - - let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); - let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); - let sync_header = chain.get_block_header(&sync_hash).unwrap(); - // See `get_state_response_header()`. - let sync_prev_block = chain.get_block(sync_header.prev_hash()).unwrap(); - let sync_prev_hash = sync_prev_block.hash(); - tracing::info!( - target: "state-parts", - ?sync_hash, - ?sync_prev_hash, - height = sync_prev_block.header().height(), - state_roots = ?sync_prev_block.chunks().iter().map(|chunk| chunk.prev_state_root()).collect::>()); - - assert!(chain.runtime_adapter.is_next_block_epoch_start(&sync_prev_hash).unwrap()); + let sync_prev_hash = get_prev_hash_of_epoch(&epoch, &chain_store, &epoch_manager); + let sync_prev_block = chain_store.get_block(&sync_prev_hash).unwrap(); + + assert!(epoch_manager.is_next_block_epoch_start(&sync_prev_hash).unwrap()); assert!( shard_id < sync_prev_block.chunks().len() as u64, "shard_id: {}, #shards: {}", @@ -259,21 +246,26 @@ fn apply_state_parts( sync_prev_block.chunks().len() ); let state_root = sync_prev_block.chunks()[shard_id as usize].prev_state_root(); - (state_root, epoch.epoch_height(), Some(epoch_id), Some(sync_hash), Some(*sync_prev_hash)) + (state_root, Some(sync_prev_hash)) }; - let part_storage = get_state_part_reader(location, &chain_id, epoch_height, shard_id); + let part_storage = get_state_part_reader( + location, + &near_config.client_config.chain_id, + epoch.epoch_height(), + shard_id, + ); let num_parts = part_storage.num_parts(); assert_ne!(num_parts, 0, "Too few num_parts: {}", num_parts); let part_ids = get_part_ids(part_id, part_id.map(|x| x + 1), num_parts); tracing::info!( target: "state-parts", - epoch_height, + epoch_height = epoch.epoch_height(), + epoch_id = ?epoch_id.0, shard_id, num_parts, ?sync_prev_hash, - ?sync_hash, ?part_ids, "Applying state as seen at the beginning of the specified epoch.", ); @@ -285,29 +277,20 @@ fn apply_state_parts( let part = part_storage.read(part_id, num_parts); if dry_run { - assert!(chain.runtime_adapter.validate_state_part( + assert!(runtime_adapter.validate_state_part( &state_root, PartId::new(part_id, num_parts), &part )); tracing::info!(target: "state-parts", part_id, part_length = part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Validated a state part"); } else { - chain - .set_state_part( - shard_id, - sync_hash.unwrap(), - PartId::new(part_id, num_parts), - &part, - ) - .unwrap(); - chain - .runtime_adapter + runtime_adapter .apply_state_part( shard_id, &state_root, PartId::new(part_id, num_parts), &part, - epoch_id.as_ref().unwrap(), + &epoch_id, ) .unwrap(); tracing::info!(target: "state-parts", part_id, part_length = part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Applied a state part"); @@ -321,21 +304,28 @@ fn dump_state_parts( shard_id: ShardId, part_from: Option, part_to: Option, - chain: &Chain, - chain_id: &str, + home_dir: &Path, + near_config: NearConfig, store: Store, location: Location, ) { - let epoch_id = epoch_selection.to_epoch_id(store, &chain); - let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); - let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); - let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); - let sync_header = chain.get_block_header(&sync_hash).unwrap(); - // See `get_state_response_header()`. - let sync_prev_block = chain.get_block(sync_header.prev_hash()).unwrap(); - let sync_prev_hash = sync_prev_block.hash(); - - assert!(chain.runtime_adapter.is_next_block_epoch_start(&sync_prev_hash).unwrap()); + let runtime_adapter: Arc = + NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); + let epoch_manager = + EpochManager::new_from_genesis_config(store.clone(), &near_config.genesis.config) + .expect("Failed to start Epoch Manager"); + let chain_store = ChainStore::new( + store.clone(), + near_config.genesis.config.genesis_height, + near_config.client_config.save_trie_changes, + ); + + let epoch_id = epoch_selection.to_epoch_id(store, &chain_store, &epoch_manager); + let epoch = epoch_manager.get_epoch_info(&epoch_id).unwrap(); + let sync_prev_hash = get_prev_hash_of_epoch(&epoch, &chain_store, &epoch_manager); + let sync_prev_block = chain_store.get_block(&sync_prev_hash).unwrap(); + + assert!(epoch_manager.is_next_block_epoch_start(&sync_prev_hash).unwrap()); assert!( shard_id < sync_prev_block.chunks().len() as u64, "shard_id: {}, #shards: {}", @@ -344,7 +334,7 @@ fn dump_state_parts( ); let state_root = sync_prev_block.chunks()[shard_id as usize].prev_state_root(); let state_root_node = - chain.runtime_adapter.get_state_root_node(shard_id, &sync_prev_hash, &state_root).unwrap(); + runtime_adapter.get_state_root_node(shard_id, &sync_prev_hash, &state_root).unwrap(); let num_parts = get_num_state_parts(state_root_node.memory_usage); let part_ids = get_part_ids(part_from, part_to, num_parts); @@ -355,22 +345,29 @@ fn dump_state_parts( epoch_id = ?epoch_id.0, shard_id, num_parts, - ?sync_hash, ?sync_prev_hash, ?part_ids, - ?state_root, "Dumping state as seen at the beginning of the specified epoch.", ); - let part_storage = get_state_part_writer(location, chain_id, epoch.epoch_height(), shard_id); + let part_storage = get_state_part_writer( + location, + &near_config.client_config.chain_id, + epoch.epoch_height(), + shard_id, + ); let timer = Instant::now(); for part_id in part_ids { let timer = Instant::now(); assert!(part_id < num_parts, "part_id: {}, num_parts: {}", part_id, num_parts); - let state_part = chain - .runtime_adapter - .obtain_state_part(shard_id, &sync_hash, &state_root, PartId::new(part_id, num_parts)) + let state_part = runtime_adapter + .obtain_state_part( + shard_id, + &sync_prev_hash, + &state_root, + PartId::new(part_id, num_parts), + ) .unwrap(); part_storage.write(&state_part, part_id, num_parts); tracing::info!(target: "state-parts", part_id, part_length = state_part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Wrote a state part"); @@ -378,28 +375,10 @@ fn dump_state_parts( tracing::info!(target: "state-parts", total_elapsed_sec = timer.elapsed().as_secs_f64(), "Wrote all requested state parts"); } -/// Reads `StateHeader` stored in the DB. -fn read_state_header( - epoch_selection: EpochSelection, - shard_id: ShardId, - chain: &Chain, - store: Store, -) { - let epoch_id = epoch_selection.to_epoch_id(store, &chain); - let epoch = chain.runtime_adapter.get_epoch_info(&epoch_id).unwrap(); - - let sync_hash = get_any_block_hash_of_epoch(&epoch, &chain); - let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash).unwrap(); - - let state_header = chain.store().get_state_header(shard_id, sync_hash); - tracing::info!(target: "state-parts", ?epoch_id, ?sync_hash, ?state_header); -} - fn get_part_ids(part_from: Option, part_to: Option, num_parts: u64) -> Range { part_from.unwrap_or(0)..part_to.unwrap_or(num_parts) } -// Needs to be in sync with `fn s3_location()`. fn location_prefix(chain_id: &str, epoch_height: u64, shard_id: u64) -> String { format!("chain_id={}/epoch_height={}/shard_id={}", chain_id, epoch_height, shard_id) } @@ -487,7 +466,6 @@ impl FileSystemStorage { tracing::info!(target: "state-parts", ?root_dir, ?prefix, ?state_parts_dir, "Ensuring the directory exists"); std::fs::create_dir_all(&state_parts_dir).unwrap(); } - tracing::info!(target: "state-parts", ?state_parts_dir, "Initialized FileSystemStorage"); Self { state_parts_dir } } @@ -507,38 +485,21 @@ impl StatePartWriter for FileSystemStorage { impl StatePartReader for FileSystemStorage { fn read(&self, part_id: u64, num_parts: u64) -> Vec { let filename = self.get_location(part_id, num_parts); - tracing::debug!(target: "state-parts", part_id, num_parts, ?filename, "Reading state part file"); let part = std::fs::read(filename).unwrap(); part } fn num_parts(&self) -> u64 { let paths = std::fs::read_dir(&self.state_parts_dir).unwrap(); - let mut known_num_parts = None; - let num_files = paths + let num_parts = paths .filter(|path| { let full_path = path.as_ref().unwrap(); tracing::debug!(target: "state-parts", ?full_path); - let filename = full_path.file_name().to_str().unwrap().to_string(); - if let Some(num_parts) = get_num_parts_from_filename(&filename) { - if let Some(known_num_parts) = known_num_parts { - assert_eq!(known_num_parts, num_parts); - } - known_num_parts = Some(num_parts); - } - is_part_filename(&filename) + is_part_filename(full_path.file_name().to_str().unwrap()) }) .collect::>>() .len(); - if known_num_parts != Some(num_files as u64) { - // This is expected when a user saves time and downloads a few parts instead of all parts. - tracing::warn!(target: "state-parts", - dir = ?self.state_parts_dir, - ?known_num_parts, - num_files, - "Filename indicates that number of files expected doesn't match the number of files available"); - } - known_num_parts.unwrap() + num_parts as u64 } }