diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e4317a58bc..d0911247116 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Protocol Changes ### Non-protocol Changes +* Dump state by multiple nodes, each node will refer to s3 for which parts need to be dumped. [#9049](https://github.com/near/nearcore/pull/9049) ## 1.34.0 diff --git a/Cargo.lock b/Cargo.lock index 0abc68e83a4..f51586bd264 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4482,6 +4482,7 @@ dependencies = [ "primitive-types", "rand 0.8.5", "rayon", + "regex", "rlimit", "rust-s3", "serde", diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index 2f81e55f8be..5dfb5945368 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -487,3 +487,13 @@ pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy = Lazy: ) .unwrap() }); + +pub(crate) static STATE_SYNC_DUMP_LIST_OBJECT_ELAPSED: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_dump_list_object_elapsed_sec", + "Latency of ls in external storage", + &["shard_id"], + Some(exponential_buckets(0.001, 1.6, 25).unwrap()), + ) + .unwrap() +}); diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index c2bb1718976..39a4d338ea8 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -44,7 +44,7 @@ use near_primitives::shard_layout::ShardUId; use near_primitives::state_part::PartId; use near_primitives::static_clock::StaticClock; use near_primitives::syncing::{get_num_state_parts, ShardStateSyncResponse}; -use near_primitives::types::{AccountId, EpochHeight, ShardId, StateRoot}; +use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot}; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; use std::collections::HashMap; @@ -60,6 +60,9 @@ pub const MAX_STATE_PART_REQUEST: u64 = 16; /// Number of state parts already requested stored as pending. /// This number should not exceed MAX_STATE_PART_REQUEST times (number of peers in the network). pub const MAX_PENDING_PART: u64 = MAX_STATE_PART_REQUEST * 10000; +/// Time limit per state dump iteration. +/// A node must check external storage for parts to dump again once time is up. +pub const STATE_DUMP_ITERATION_TIME_LIMIT_SECS: u64 = 300; pub enum StateSyncResult { /// No shard has changed its status @@ -179,6 +182,50 @@ impl ExternalConnection { } } } + + fn extract_file_name_from_full_path(full_path: String) -> String { + return Self::extract_file_name_from_path_buf(PathBuf::from(full_path)); + } + + fn extract_file_name_from_path_buf(path_buf: PathBuf) -> String { + return path_buf.file_name().unwrap().to_str().unwrap().to_string(); + } + + pub async fn list_state_parts( + &self, + shard_id: ShardId, + directory_path: &str, + ) -> Result, anyhow::Error> { + let _timer = metrics::STATE_SYNC_DUMP_LIST_OBJECT_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + match self { + ExternalConnection::S3 { bucket } => { + let prefix = format!("{}/", directory_path); + let list_results = bucket.list(prefix.clone(), Some("/".to_string())).await?; + tracing::debug!(target: "state_sync_dump", shard_id, ?directory_path, "List state parts in s3"); + let mut file_names = vec![]; + for res in list_results { + for obj in res.contents { + file_names.push(Self::extract_file_name_from_full_path(obj.key)) + } + } + Ok(file_names) + } + ExternalConnection::Filesystem { root_dir } => { + let path = root_dir.join(directory_path); + tracing::debug!(target: "state_sync_dump", shard_id, ?path, "List state parts in local directory"); + std::fs::create_dir_all(&path)?; + let mut file_names = vec![]; + let files = std::fs::read_dir(&path)?; + for file in files { + let file_name = Self::extract_file_name_from_path_buf(file?.path()); + file_names.push(file_name); + } + Ok(file_names) + } + } + } } /// Helper to track state sync. @@ -726,6 +773,7 @@ impl StateSync { part_id, download, shard_id, + epoch_id, epoch_height, state_num_parts, &chain_id.clone(), @@ -1172,6 +1220,7 @@ fn request_part_from_external_storage( part_id: u64, download: &mut DownloadStatus, shard_id: ShardId, + epoch_id: &EpochId, epoch_height: EpochHeight, num_parts: u64, chain_id: &str, @@ -1189,7 +1238,8 @@ fn request_part_from_external_storage( download.state_requests_count += 1; download.last_target = None; - let location = external_storage_location(chain_id, epoch_height, shard_id, part_id, num_parts); + let location = + external_storage_location(chain_id, epoch_id, epoch_height, shard_id, part_id, num_parts); let download_response = download.response.clone(); near_performance_metrics::actix::spawn("StateSync", { async move { @@ -1454,6 +1504,7 @@ impl Iterator for SamplerLimited { /// Construct a location on the external storage. pub fn external_storage_location( chain_id: &str, + epoch_id: &EpochId, epoch_height: u64, shard_id: u64, part_id: u64, @@ -1461,13 +1512,30 @@ pub fn external_storage_location( ) -> String { format!( "{}/{}", - location_prefix(chain_id, epoch_height, shard_id), + location_prefix(chain_id, epoch_height, epoch_id, shard_id), part_filename(part_id, num_parts) ) } -pub fn location_prefix(chain_id: &str, epoch_height: u64, shard_id: u64) -> String { - format!("chain_id={}/epoch_height={}/shard_id={}", chain_id, epoch_height, shard_id) +pub fn external_storage_location_directory( + chain_id: &str, + epoch_id: &EpochId, + epoch_height: u64, + shard_id: u64, +) -> String { + location_prefix(chain_id, epoch_height, epoch_id, shard_id) +} + +pub fn location_prefix( + chain_id: &str, + epoch_height: u64, + epoch_id: &EpochId, + shard_id: u64, +) -> String { + format!( + "chain_id={}/epoch_height={}/epoch_id={}/shard_id={}", + chain_id, epoch_height, epoch_id.0, shard_id + ) } pub fn part_filename(part_id: u64, num_parts: u64) -> String { @@ -1494,6 +1562,17 @@ pub fn get_num_parts_from_filename(s: &str) -> Option { None } +pub fn get_part_id_from_filename(s: &str) -> Option { + if let Some(captures) = match_filename(s) { + if let Some(part_id) = captures.get(1) { + if let Ok(part_id) = part_id.as_str().parse::() { + return Some(part_id); + } + } + } + None +} + #[cfg(test)] mod test { use super::*; @@ -1637,5 +1716,8 @@ mod test { assert_eq!(get_num_parts_from_filename(&filename), Some(15)); assert_eq!(get_num_parts_from_filename("123123"), None); + + assert_eq!(get_part_id_from_filename(&filename), Some(5)); + assert_eq!(get_part_id_from_filename("123123"), None); } } diff --git a/core/primitives/src/syncing.rs b/core/primitives/src/syncing.rs index a1450fceb8d..e7a64497856 100644 --- a/core/primitives/src/syncing.rs +++ b/core/primitives/src/syncing.rs @@ -249,7 +249,5 @@ pub enum StateSyncDumpProgress { /// Block hash of the first block of the epoch. /// The dumped state corresponds to the state before applying this block. sync_hash: CryptoHash, - /// Progress made. - parts_dumped: u64, }, } diff --git a/integration-tests/src/tests/nearcore/sync_state_nodes.rs b/integration-tests/src/tests/nearcore/sync_state_nodes.rs index 94f44b4f998..8f21986fbdc 100644 --- a/integration-tests/src/tests/nearcore/sync_state_nodes.rs +++ b/integration-tests/src/tests/nearcore/sync_state_nodes.rs @@ -398,10 +398,10 @@ fn sync_empty_state() { }); } -/// Runs one node for some time, which dumps state to a temp directory. -/// Start the second node which gets state parts from that temp directory. #[test] #[cfg_attr(not(feature = "expensive_tests"), ignore)] +/// Runs one node for some time, which dumps state to a temp directory. +/// Start the second node which gets state parts from that temp directory. fn sync_state_dump() { heavy_test(|| { init_integration_logger(); diff --git a/nearcore/Cargo.toml b/nearcore/Cargo.toml index 7c91075bc96..a9dce1e2cfb 100644 --- a/nearcore/Cargo.toml +++ b/nearcore/Cargo.toml @@ -26,6 +26,7 @@ num-rational.workspace = true once_cell.workspace = true rand.workspace = true rayon.workspace = true +regex.workspace = true rlimit.workspace = true rust-s3.workspace = true serde.workspace = true diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 2aee4b9f699..a68e4ad3f5d 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -3,7 +3,10 @@ use borsh::BorshSerialize; use near_chain::types::RuntimeAdapter; use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Error}; use near_chain_configs::{ClientConfig, ExternalStorageLocation}; -use near_client::sync::state::{external_storage_location, ExternalConnection, StateSync}; +use near_client::sync::state::{ + external_storage_location, external_storage_location_directory, get_part_id_from_filename, + is_part_filename, ExternalConnection, StateSync, STATE_DUMP_ITERATION_TIME_LIMIT_SECS, +}; use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::EpochManagerAdapter; use near_primitives::hash::CryptoHash; @@ -11,9 +14,11 @@ use near_primitives::state_part::PartId; use near_primitives::syncing::{get_num_state_parts, StatePartKey, StateSyncDumpProgress}; use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot}; use near_store::DBCol; +use rand::{thread_rng, Rng}; +use std::collections::HashSet; use std::sync::atomic::AtomicBool; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; /// Starts one a thread per tracked shard. /// Each started thread will be dumping state parts of a single epoch to external storage. @@ -128,9 +133,47 @@ impl StateSyncDumpHandle { } } -/// A thread loop that dumps state of the latest available epoch to S3. -/// Operates as a state machine. Persists its state in the Misc column. -/// Shards store the state-machine state separately. +fn extract_part_id_from_part_file_name(file_name: &String) -> u64 { + assert!(is_part_filename(file_name)); + return get_part_id_from_filename(file_name).unwrap(); +} + +async fn get_missing_part_ids_for_epoch( + shard_id: ShardId, + chain_id: &String, + epoch_id: &EpochId, + epoch_height: u64, + total_parts: u64, + external: &ExternalConnection, +) -> Result, anyhow::Error> { + let directory_path = + external_storage_location_directory(chain_id, epoch_id, epoch_height, shard_id); + let file_names = external.list_state_parts(shard_id, &directory_path).await?; + if !file_names.is_empty() { + let existing_nums: HashSet<_> = file_names + .iter() + .map(|file_name| extract_part_id_from_part_file_name(file_name)) + .collect(); + let missing_nums: Vec = + (0..total_parts).filter(|i| !existing_nums.contains(i)).collect(); + let num_missing = missing_nums.len(); + tracing::debug!(target: "state_sync_dump", ?num_missing, ?directory_path, "Some parts have already been dumped."); + Ok(missing_nums) + } else { + tracing::debug!(target: "state_sync_dump", ?total_parts, ?directory_path, "No part has been dumped."); + let missing_nums = (0..total_parts).collect::>(); + Ok(missing_nums) + } +} + +fn select_random_part_id_with_index(parts_to_be_dumped: &Vec) -> (u64, usize) { + let mut rng = thread_rng(); + let selected_idx = rng.gen_range(0..parts_to_be_dumped.len()); + let selected_element = parts_to_be_dumped[selected_idx]; + tracing::debug!(target: "state_sync_dump", ?selected_element, "selected parts to dump: "); + (selected_element, selected_idx) +} + async fn state_sync_dump( shard_id: ShardId, chain: Chain, @@ -151,11 +194,14 @@ async fn state_sync_dump( chain.store().set_state_sync_dump_progress(shard_id, None).unwrap(); } + // Stop if the node is stopped. + // Note that without this check the state dumping thread is unstoppable, i.e. non-interruptable. while keep_running.load(std::sync::atomic::Ordering::Relaxed) { + // TODO (ND-437): Start every iteration of the state dumping loop with checking if a new epoch is available. let progress = chain.store().get_state_sync_dump_progress(shard_id); tracing::debug!(target: "state_sync_dump", shard_id, ?progress, "Running StateSyncDump loop iteration"); // The `match` returns the next state of the state machine. - let next_state = match progress { + let next_state: Result, Error> = match progress { Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts })) => { // The latest epoch was dumped. Check if a newer epoch is available. check_new_epoch( @@ -190,83 +236,110 @@ async fn state_sync_dump( } Ok(None) } - Ok(Some(StateSyncDumpProgress::InProgress { - epoch_id, - epoch_height, - sync_hash, - parts_dumped, - })) => { + Ok(Some(StateSyncDumpProgress::InProgress { epoch_id, epoch_height, sync_hash })) => { let in_progress_data = get_in_progress_data(shard_id, sync_hash, &chain); - let mut res = None; match in_progress_data { + Err(error) => Err(error), Ok((state_root, num_parts, sync_prev_hash)) => { - // The actual dumping of state to S3. - tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, ?state_root, parts_dumped, "Creating parts and dumping them"); - for part_id in parts_dumped..num_parts { - // Dump parts sequentially synchronously. - // TODO: How to make it possible to dump state more effectively using multiple nodes? - let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - - let state_part = match obtain_and_store_state_part( - runtime.as_ref(), - shard_id, - sync_hash, - &sync_prev_hash, - &state_root, - part_id, - num_parts, - &chain, - ) { - Ok(state_part) => state_part, - Err(err) => { - res = Some(err); - break; - } - }; - let location = external_storage_location( - &chain_id, - epoch_height, - shard_id, - part_id, - num_parts, - ); - if let Err(err) = - external.put_state_part(&state_part, shard_id, &location).await - { - res = Some(Error::Other(err.to_string())); - break; + let missing_parts = get_missing_part_ids_for_epoch( + shard_id, + &chain_id, + &epoch_id, + epoch_height, + num_parts, + &external, + ) + .await; + + match missing_parts { + Err(err) => { + tracing::debug!(target: "state_sync_dump", shard_id, ?err, "get_missing_state_parts_for_epoch error"); + Err(Error::Other(format!( + "get_missing_state_parts_for_epoch failed" + ))) } - update_progress( - &shard_id, - &epoch_id, - epoch_height, - &sync_hash, - part_id, - num_parts, - state_part.len(), - &chain, - ); - - // Stop if the node is stopped. - // Note that without this check the state dumping thread is unstoppable, i.e. non-interruptable. - if !keep_running.load(std::sync::atomic::Ordering::Relaxed) { - res = Some(Error::Other("Stopped".to_owned())); - break; + Ok(parts_not_dumped) if parts_not_dumped.is_empty() => { + Ok(Some(StateSyncDumpProgress::AllDumped { + epoch_id, + epoch_height, + num_parts: Some(num_parts), + })) + } + Ok(parts_not_dumped) => { + let mut parts_to_dump = parts_not_dumped.clone(); + let timer = Instant::now(); + // Stop if the node is stopped. + // Note that without this check the state dumping thread is unstoppable, i.e. non-interruptable. + while keep_running.load(std::sync::atomic::Ordering::Relaxed) + && timer.elapsed().as_secs() + <= STATE_DUMP_ITERATION_TIME_LIMIT_SECS + && !parts_to_dump.is_empty() + { + let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + + let (part_id, selected_idx) = + select_random_part_id_with_index(&parts_to_dump); + + let state_part = match obtain_and_store_state_part( + runtime.as_ref(), + shard_id, + sync_hash, + &sync_prev_hash, + &state_root, + part_id, + num_parts, + &chain, + ) { + Ok(state_part) => state_part, + Err(err) => { + tracing::warn!(target: "state_sync_dump", shard_id, epoch_height, part_id, ?err, "Failed to obtain and store part. Will skip this part."); + break; + } + }; + let location = external_storage_location( + &chain_id, + &epoch_id, + epoch_height, + shard_id, + part_id, + num_parts, + ); + if let Err(_) = external + .put_state_part(&state_part, shard_id, &location) + .await + { + // no need to break if there's an error, we should keep dumping other parts. + // reason is we are dumping random selected parts, so it's fine if we are not able to finish all of them + continue; + } + + // remove the dumped part from parts_to_dump so that we draw without replacement + parts_to_dump.swap_remove(selected_idx); + update_dumped_size_and_cnt_metrics( + &shard_id, + epoch_height, + state_part.len(), + ); + } + + if parts_to_dump.is_empty() { + Ok(Some(StateSyncDumpProgress::AllDumped { + epoch_id, + epoch_height, + num_parts: Some(num_parts), + })) + } else { + Ok(Some(StateSyncDumpProgress::InProgress { + epoch_id, + epoch_height, + sync_hash, + })) + } } - } - if let Some(err) = res { - Err(err) - } else { - Ok(Some(StateSyncDumpProgress::AllDumped { - epoch_id, - epoch_height, - num_parts: Some(num_parts), - })) } } - Err(err) => Err(err), } } }; @@ -285,7 +358,7 @@ async fn state_sync_dump( } } Ok(None) => { - // Do nothing. + // Will retry. tracing::debug!(target: "state_sync_dump", shard_id, "Idle"); false } @@ -319,35 +392,16 @@ fn get_in_progress_data( Ok((state_root, num_parts, *sync_prev_hash)) } -fn update_progress( +fn update_dumped_size_and_cnt_metrics( shard_id: &ShardId, - epoch_id: &EpochId, epoch_height: EpochHeight, - sync_hash: &CryptoHash, - part_id: u64, - num_parts: u64, part_len: usize, - chain: &Chain, ) { - // Record that a part was obtained and dumped. metrics::STATE_SYNC_DUMP_SIZE_TOTAL .with_label_values(&[&epoch_height.to_string(), &shard_id.to_string()]) .inc_by(part_len as u64); - let next_progress = StateSyncDumpProgress::InProgress { - epoch_id: epoch_id.clone(), - epoch_height, - sync_hash: *sync_hash, - parts_dumped: part_id + 1, - }; - match chain.store().set_state_sync_dump_progress(*shard_id, Some(next_progress.clone())) { - Ok(_) => { - tracing::debug!(target: "state_sync_dump", shard_id, ?next_progress, "Updated dump progress"); - } - Err(err) => { - tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to update dump progress, continue"); - } - } - set_metrics(shard_id, Some(part_id + 1), Some(num_parts), Some(epoch_height)); + + metrics::STATE_SYNC_DUMP_NUM_PARTS_DUMPED.with_label_values(&[&shard_id.to_string()]).inc(); } fn set_metrics( @@ -436,12 +490,7 @@ fn start_dumping( // Note that first the state of the state machines gets changes to // `InProgress` and it starts dumping state after a short interval. set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height)); - Ok(Some(StateSyncDumpProgress::InProgress { - epoch_id, - epoch_height, - sync_hash, - parts_dumped: 0, - })) + Ok(Some(StateSyncDumpProgress::InProgress { epoch_id, epoch_height, sync_hash })) } else { tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch"); Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) })) @@ -537,13 +586,12 @@ mod tests { Some("test0".parse().unwrap()), ) .unwrap(); - let mut last_block_hash = None; for i in 1..=MAX_HEIGHT { let block = env.clients[0].produce_block(i as u64).unwrap().unwrap(); - last_block_hash = Some(*block.hash()); env.process_block(0, block, Provenance::PRODUCED); } - let epoch_id = epoch_manager.get_epoch_id(last_block_hash.as_ref().unwrap()).unwrap(); + let head = &env.clients[0].chain.head().unwrap(); + let epoch_id = head.clone().epoch_id; let epoch_info = epoch_manager.get_epoch_info(&epoch_id).unwrap(); let epoch_height = epoch_info.epoch_height(); @@ -558,6 +606,7 @@ mod tests { for part_id in 0..num_parts { let path = root_dir.path().join(external_storage_location( "unittest", + &epoch_id, epoch_height, shard_id, part_id, diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index ca23d0da6a2..b49b1f5c7eb 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -254,13 +254,13 @@ fn load_state_parts( store: Store, location: Location, ) { + let epoch_id = epoch_selection.to_epoch_id(store, chain); let (state_root, epoch_height, epoch_id, sync_hash) = if let (Some(state_root), EpochSelection::EpochHeight { epoch_height }) = (maybe_state_root, &epoch_selection) { - (state_root, *epoch_height, None, None) + (state_root, *epoch_height, epoch_id, None) } else { - let epoch_id = epoch_selection.to_epoch_id(store, chain); let epoch = chain.epoch_manager.get_epoch_info(&epoch_id).unwrap(); let sync_hash = get_any_block_hash_of_epoch(&epoch, chain); @@ -269,10 +269,10 @@ fn load_state_parts( let state_header = chain.get_state_response_header(shard_id, sync_hash).unwrap(); let state_root = state_header.chunk_prev_state_root(); - (state_root, epoch.epoch_height(), Some(epoch_id), Some(sync_hash)) + (state_root, epoch.epoch_height(), epoch_id, Some(sync_hash)) }; - let part_storage = get_state_part_reader(location, chain_id, epoch_height, shard_id); + let part_storage = get_state_part_reader(location, chain_id, &epoch_id, epoch_height, shard_id); let num_parts = part_storage.num_parts(); assert_ne!(num_parts, 0, "Too few num_parts: {}", num_parts); @@ -310,7 +310,7 @@ fn load_state_parts( &state_root, PartId::new(part_id, num_parts), &part, - epoch_id.as_ref().unwrap(), + &epoch_id, ) .unwrap(); tracing::info!(target: "state-parts", part_id, part_length = part.len(), elapsed_sec = timer.elapsed().as_secs_f64(), "Loaded a state part"); @@ -371,7 +371,8 @@ fn dump_state_parts( "Dumping state as seen at the beginning of the specified epoch.", ); - let part_storage = get_state_part_writer(location, chain_id, epoch.epoch_height(), shard_id); + let part_storage = + get_state_part_writer(location, chain_id, &epoch_id, epoch.epoch_height(), shard_id); let timer = Instant::now(); for part_id in part_ids { @@ -446,15 +447,21 @@ trait StatePartReader { fn get_state_part_reader( location: Location, chain_id: &str, + epoch_id: &EpochId, epoch_height: u64, shard_id: ShardId, ) -> Box { match location { - Location::Files(root_dir) => { - Box::new(FileSystemStorage::new(root_dir, false, chain_id, epoch_height, shard_id)) - } + Location::Files(root_dir) => Box::new(FileSystemStorage::new( + root_dir, + false, + chain_id, + epoch_id, + epoch_height, + shard_id, + )), Location::S3 { bucket, region } => { - Box::new(S3Storage::new(&bucket, ®ion, chain_id, epoch_height, shard_id)) + Box::new(S3Storage::new(&bucket, ®ion, chain_id, epoch_id, epoch_height, shard_id)) } } } @@ -462,15 +469,21 @@ fn get_state_part_reader( fn get_state_part_writer( location: Location, chain_id: &str, + epoch_id: &EpochId, epoch_height: u64, shard_id: ShardId, ) -> Box { match location { - Location::Files(root_dir) => { - Box::new(FileSystemStorage::new(root_dir, true, chain_id, epoch_height, shard_id)) - } + Location::Files(root_dir) => Box::new(FileSystemStorage::new( + root_dir, + true, + chain_id, + epoch_id, + epoch_height, + shard_id, + )), Location::S3 { bucket, region } => { - Box::new(S3Storage::new(&bucket, ®ion, chain_id, epoch_height, shard_id)) + Box::new(S3Storage::new(&bucket, ®ion, chain_id, epoch_id, epoch_height, shard_id)) } } } @@ -484,10 +497,11 @@ impl FileSystemStorage { root_dir: PathBuf, create_dir: bool, chain_id: &str, + epoch_id: &EpochId, epoch_height: u64, shard_id: u64, ) -> Self { - let prefix = location_prefix(chain_id, epoch_height, shard_id); + let prefix = location_prefix(chain_id, epoch_height, epoch_id, shard_id); let state_parts_dir = root_dir.join(&prefix); if create_dir { tracing::info!(target: "state-parts", ?root_dir, ?prefix, ?state_parts_dir, "Ensuring the directory exists"); @@ -557,10 +571,11 @@ impl S3Storage { s3_bucket: &str, s3_region: &str, chain_id: &str, + epoch_id: &EpochId, epoch_height: u64, shard_id: u64, ) -> Self { - let location = location_prefix(chain_id, epoch_height, shard_id); + let location = location_prefix(chain_id, epoch_height, epoch_id, shard_id); let bucket = s3::Bucket::new( s3_bucket, s3_region.parse::().unwrap(),