diff --git a/CHANGELOG.md b/CHANGELOG.md index 61a36fb7943..044ca48c442 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ ### Non-protocol Changes +* Experimental option to dump state of every epoch to external storage. [#8661](https://github.com/near/nearcore/pull/8661) +* State-viewer tool to dump and apply state changes from/to a range of blocks [#8628](https://github.com/near/nearcore/pull/8628) + ## 1.32.0 ### Protocol Changes diff --git a/Cargo.lock b/Cargo.lock index d6a043304b9..623574d210d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4048,6 +4048,7 @@ dependencies = [ "rand 0.8.5", "rayon", "rlimit", + "rust-s3", "serde", "serde_ignored", "serde_json", diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 9142b1e5bfd..b5a6fdfa7da 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -19,7 +19,7 @@ use near_primitives::sharding::{ }; use near_primitives::syncing::{ get_num_state_parts, ReceiptProofResponse, ShardStateSyncResponseHeader, StateHeaderKey, - StatePartKey, + StatePartKey, StateSyncDumpProgress, }; use near_primitives::transaction::{ ExecutionOutcomeWithId, ExecutionOutcomeWithIdAndProof, ExecutionOutcomeWithProof, @@ -838,6 +838,39 @@ impl ChainStore { } Ok(None) } + + /// Constructs key 'STATE_SYNC_DUMP:', + /// for example 'STATE_SYNC_DUMP:2' for shard_id=2. + /// Doesn't contain epoch_id, because only one dump process per shard is allowed. + fn state_sync_dump_progress_key(shard_id: ShardId) -> Vec { + let mut key = b"STATE_SYNC_DUMP:".to_vec(); + key.extend(shard_id.to_le_bytes()); + key + } + + /// Retrieves STATE_SYNC_DUMP for the given shard. + pub fn get_state_sync_dump_progress( + &self, + shard_id: ShardId, + ) -> Result, Error> { + option_to_not_found( + self.store + .get_ser(DBCol::BlockMisc, &ChainStore::state_sync_dump_progress_key(shard_id)), + "STATE_SYNC_DUMP", + ) + } + + /// Updates STATE_SYNC_DUMP for the given shard. + pub fn set_state_sync_dump_progress( + &self, + shard_id: ShardId, + value: Option, + ) -> Result<(), Error> { + let mut store_update = self.store.store_update(); + let key = ChainStore::state_sync_dump_progress_key(shard_id); + store_update.set_ser(DBCol::BlockMisc, &key, &value)?; + store_update.commit().map_err(|err| err.into()) + } } impl ChainStoreAccess for ChainStore { diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index 85d2d3c7bad..898f8e8cb19 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -167,6 +167,15 @@ pub struct ClientConfig { pub client_background_migration_threads: usize, /// Duration to perform background flat storage creation step. pub flat_storage_creation_period: Duration, + /// If enabled, will dump state of every epoch to external storage. + pub state_sync_dump_enabled: bool, + /// S3 bucket for storing state dumps. + pub state_sync_s3_bucket: String, + /// S3 region for storing state dumps. + pub state_sync_s3_region: String, + /// Restart dumping state of selected shards. + /// Use for troubleshooting of the state dumping process. + pub state_sync_restart_dump_for_shards: Vec, } impl ClientConfig { @@ -237,6 +246,10 @@ impl ClientConfig { enable_statistics_export: true, client_background_migration_threads: 1, flat_storage_creation_period: Duration::from_secs(1), + state_sync_dump_enabled: false, + state_sync_s3_bucket: String::new(), + state_sync_s3_region: String::new(), + state_sync_restart_dump_for_shards: vec![], } } } diff --git a/core/primitives/src/syncing.rs b/core/primitives/src/syncing.rs index 75ad582f8dc..4dc68f3b7a6 100644 --- a/core/primitives/src/syncing.rs +++ b/core/primitives/src/syncing.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use borsh::{BorshDeserialize, BorshSerialize}; +use near_primitives_core::types::EpochHeight; use crate::block_header::BlockHeader; use crate::epoch_manager::block_info::BlockInfo; @@ -10,7 +11,7 @@ use crate::merkle::{MerklePath, PartialMerkleTree}; use crate::sharding::{ ReceiptProof, ShardChunk, ShardChunkHeader, ShardChunkHeaderV1, ShardChunkV1, }; -use crate::types::{BlockHeight, ShardId, StateRoot, StateRootNode}; +use crate::types::{BlockHeight, EpochId, ShardId, StateRoot, StateRootNode}; use crate::views::LightClientBlockView; #[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize)] @@ -226,3 +227,32 @@ pub fn get_num_state_parts(memory_usage: u64) -> u64 { // TODO #1708 memory_usage / STATE_PART_MEMORY_LIMIT.as_u64() + 3 } + +#[derive(BorshSerialize, BorshDeserialize, Debug, Clone)] +/// Represents the progress of dumps state of a shard. +pub enum StateSyncDumpProgress { + /// Represents two cases: + /// * An epoch dump is complete + /// * The node is running its first epoch and there is nothing to dump. + AllDumped { + /// The dumped state corresponds to the state at the beginning of the specified epoch. + epoch_id: EpochId, + epoch_height: EpochHeight, + // Missing in case of a node running the first epoch. + num_parts: Option, + }, + /// Represents the case of an epoch being partially dumped. + InProgress { + /// The dumped state corresponds to the state at the beginning of the specified epoch. + epoch_id: EpochId, + epoch_height: EpochHeight, + /// Block hash of the first block of the epoch. + /// The dumped state corresponds to the state before applying this block. + sync_hash: CryptoHash, + /// Root of the state being dumped. + state_root: StateRoot, + /// Progress made. + parts_dumped: u64, + num_parts: u64, + }, +} diff --git a/docs/misc/state_sync_dump.md b/docs/misc/state_sync_dump.md new file mode 100644 index 00000000000..08bb1e96c2a --- /dev/null +++ b/docs/misc/state_sync_dump.md @@ -0,0 +1,73 @@ +# Experimental: Dump of state to External Storage + +## Purpose + +Current implementation of state sync (see +https://github.com/near/nearcore/blob/master/docs/architecture/how/sync.md for +details) doesn't allow the nodes to reliably perform state sync for testnet or +mainnet. + +That's why a new solution for state sync is being designed. +The experimental code is likely going to be a part of solution to greatly +improve both reliability and speed of state sync. + +The new solution will probably involve making the state available on external +storage, making downloading the state both low latency and reliable process, +thanks to the robust infrastructure of external storage such as S3. + +## How-to + +[#8661](https://github.com/near/nearcore/pull/8661) adds an experimental option +to dump state of every epoch to external storage. At the moment only S3 is +supported as external storage. + +To enable, add this to your `config.json` file: + +```json +"state_sync": { + "s3_bucket": "my-bucket", + "s3_region": "eu-central-1", + "dump_enabled": true +} +``` + +And run your node with environment variables `AWS_ACCESS_KEY_ID` and +`AWS_SECRET_ACCESS_KEY`: +```shell +AWS_ACCESS_KEY_ID="MY_ACCESS_KEY" AWS_SECRET_ACCESS_KEY="MY_AWS_SECRET_ACCESS_KEY" ./neard run +``` + +## Implementation Details + +The experimental option spawns a thread for each of the shards tracked by a node. +Each of the threads acts independently. Each thread determines the last +complete epoch, and starts the process of dumping the state. + +To dump the state a thread does the following: +* Get the size of the trie to determine the number of state parts +* Obtain each state part +* Upload each state part to S3 + +State parts are uploaded as individual objects. Location of those objects is +computed as follows: +``` +"chain_id={chain_id}/epoch_height={epoch_height}/shard_id={shard_id}/state_part_{part_id:06}_of_{num_parts:06}", +``` +for example `chain_id=testnet/epoch_height=1790/shard_id=2/state_part_032642_of_065402` + +Currently, using multiple nodes for dumping state doesn't make the process go +any faster. The nodes will simpler duplicate the work overwriting files created +by each other. + +Future improvement can be to make the nodes cooperate. To avoid introducing a +complicated consensus process, we can suggest the following simple process: +* Get a list of state parts already dumped for an epoch +* Pick 100 random state parts that are not yet random +* Obtain and upload that 100 state parts +* Repeat until all state parts are complete + +The process of dumping state parts is managed as a state machine with 2 +possible states. The state is stored in the `BlockMisc` column with row key +`STATE_SYNC_DUMP:X` for shard X. Note that epoch id is not included in the row +key, because epoch id is not needed for managing the state machine, because only +one epoch per shard can be dumped at a time. diff --git a/integration-tests/src/tests/client/process_blocks.rs b/integration-tests/src/tests/client/process_blocks.rs index 4d68e2cae9e..de35d1fc8c0 100644 --- a/integration-tests/src/tests/client/process_blocks.rs +++ b/integration-tests/src/tests/client/process_blocks.rs @@ -1475,11 +1475,11 @@ fn test_gc_with_epoch_length_common(epoch_length: NumBlocks) { let block_hash = *blocks[i as usize].hash(); assert_matches!( env.clients[0].chain.get_block(&block_hash).unwrap_err(), - Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == "BLOCK: ".to_owned() + &block_hash.to_string() + Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == format!("BLOCK: {}", block_hash) ); assert_matches!( env.clients[0].chain.get_block_by_height(i).unwrap_err(), - Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == "BLOCK: ".to_owned() + &block_hash.to_string() + Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == format!("BLOCK: {}", block_hash) ); assert!(env.clients[0] .chain diff --git a/nearcore/Cargo.toml b/nearcore/Cargo.toml index 9ed1f0c5887..f4d09855d0c 100644 --- a/nearcore/Cargo.toml +++ b/nearcore/Cargo.toml @@ -25,6 +25,7 @@ once_cell.workspace = true rand.workspace = true rayon.workspace = true rlimit.workspace = true +rust-s3.workspace = true serde.workspace = true serde_ignored.workspace = true serde_json.workspace = true diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 06f9c895821..a1533c77de1 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -353,6 +353,9 @@ pub struct Config { pub db_migration_snapshot_path: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub expected_shutdown: Option, + /// Options for dumping state of every epoch to S3. + #[serde(skip_serializing_if = "Option::is_none")] + pub state_sync: Option, } fn is_false(value: &bool) -> bool { @@ -389,6 +392,7 @@ impl Default for Config { cold_store: None, split_storage: None, expected_shutdown: None, + state_sync: None, } } } @@ -697,6 +701,22 @@ impl NearConfig { enable_statistics_export: config.store.enable_statistics_export, client_background_migration_threads: config.store.background_migration_threads, flat_storage_creation_period: config.store.flat_storage_creation_period, + state_sync_dump_enabled: config + .state_sync + .as_ref() + .map_or(false, |x| x.dump_enabled.unwrap_or(false)), + state_sync_s3_bucket: config + .state_sync + .as_ref() + .map_or(String::new(), |x| x.s3_bucket.clone()), + state_sync_s3_region: config + .state_sync + .as_ref() + .map_or(String::new(), |x| x.s3_region.clone()), + state_sync_restart_dump_for_shards: config + .state_sync + .as_ref() + .map_or(vec![], |x| x.drop_state_of_dump.clone().unwrap_or(vec![])), }, network_config: NetworkConfig::new( config.network, @@ -1520,6 +1540,17 @@ pub fn load_test_config(seed: &str, addr: tcp::ListenerAddr, genesis: Genesis) - NearConfig::new(config, genesis, signer.into(), validator_signer).unwrap() } +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)] +/// Options for dumping state to S3. +pub struct StateSyncConfig { + pub s3_bucket: String, + pub s3_region: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub dump_enabled: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub drop_state_of_dump: Option>, +} + #[test] fn test_init_config_localnet() { // Check that we can initialize the config with multiple shards. diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 7498a01830d..3da5e6a2766 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -2,6 +2,7 @@ use crate::cold_storage::spawn_cold_store_loop; pub use crate::config::{init_configs, load_config, load_test_config, NearConfig, NEAR_BASE}; pub use crate::runtime::NightshadeRuntime; pub use crate::shard_tracker::TrackedConfig; +use crate::state_sync::{spawn_state_sync_dump, StateSyncDumpHandle}; use actix::{Actor, Addr}; use actix_rt::ArbiterHandle; use actix_web; @@ -12,17 +13,17 @@ use near_async::messaging::{IntoSender, LateBoundSender}; use near_chain::{Chain, ChainGenesis}; use near_chunks::shards_manager_actor::start_shards_manager; use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor}; -use near_primitives::time; - use near_network::PeerManagerActor; use near_primitives::block::GenesisId; +use near_primitives::time; use near_store::metadata::DbKind; use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError}; use near_telemetry::TelemetryActor; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::broadcast; -use tracing::{info, trace}; +use tracing::info; + pub mod append_only_map; mod cold_storage; pub mod config; @@ -33,6 +34,7 @@ mod metrics; pub mod migrations; mod runtime; mod shard_tracker; +mod state_sync; pub fn get_default_home() -> PathBuf { if let Ok(near_home) = std::env::var("NEAR_HOME") { @@ -188,6 +190,8 @@ pub struct NearNode { /// The cold_store_loop_handle will only be set if the cold store is configured. /// It's a handle to a background thread that copies data from the hot store to the cold store. pub cold_store_loop_handle: Option, + /// Contains handles to background threads that may be dumping state to S3. + pub state_sync_dump_handle: Option, } pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result { @@ -242,7 +246,7 @@ pub fn start_with_config_and_synchronization( ); let (client_actor, client_arbiter_handle) = start_client( config.client_config.clone(), - chain_genesis, + chain_genesis.clone(), runtime.clone(), node_id, network_adapter.clone().into(), @@ -255,7 +259,7 @@ pub fn start_with_config_and_synchronization( ); client_adapter_for_shards_manager.bind(client_actor.clone().with_auto_span_context()); let (shards_manager_actor, shards_manager_arbiter_handle) = start_shards_manager( - runtime, + runtime.clone(), network_adapter.as_sender(), client_adapter_for_shards_manager.as_sender(), config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()), @@ -264,6 +268,13 @@ pub fn start_with_config_and_synchronization( ); shards_manager_adapter.bind(shards_manager_actor); + let state_sync_dump_handle = spawn_state_sync_dump( + &config, + chain_genesis, + runtime, + config.network_config.node_id().public_key(), + )?; + #[allow(unused_mut)] let mut rpc_servers = Vec::new(); let network_actor = PeerManagerActor::spawn( @@ -304,7 +315,7 @@ pub fn start_with_config_and_synchronization( rpc_servers.shrink_to_fit(); - trace!(target: "diagnostic", key="log", "Starting NEAR node with diagnostic activated"); + tracing::trace!(target: "diagnostic", key = "log", "Starting NEAR node with diagnostic activated"); Ok(NearNode { client: client_actor, @@ -312,6 +323,7 @@ pub fn start_with_config_and_synchronization( rpc_servers, arbiters: vec![client_arbiter_handle, shards_manager_arbiter_handle], cold_store_loop_handle, + state_sync_dump_handle, }) } diff --git a/nearcore/src/metrics.rs b/nearcore/src/metrics.rs index 105397d11f2..ee53c9c2422 100644 --- a/nearcore/src/metrics.rs +++ b/nearcore/src/metrics.rs @@ -1,6 +1,7 @@ use near_o11y::metrics::{ - linear_buckets, try_create_histogram_vec, try_create_int_counter_vec, try_create_int_gauge, - HistogramVec, IntCounterVec, IntGauge, + exponential_buckets, linear_buckets, try_create_histogram_vec, try_create_int_counter_vec, + try_create_int_gauge, try_create_int_gauge_vec, HistogramVec, IntCounterVec, IntGauge, + IntGaugeVec, }; use once_cell::sync::Lazy; @@ -44,3 +45,63 @@ pub(crate) static COLD_STORE_COPY_RESULT: Lazy = Lazy::new(|| { ) .unwrap() }); + +pub(crate) static STATE_SYNC_DUMP_ITERATION_ELAPSED: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_dump_iteration_elapsed_sec", + "Time needed to obtain and write a part", + &["shard_id"], + Some(exponential_buckets(0.001, 1.6, 25).unwrap()), + ) + .unwrap() +}); +pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_dump_put_object_elapsed_sec", + "Time needed to write a part", + &["shard_id"], + Some(exponential_buckets(0.001, 1.6, 25).unwrap()), + ) + .unwrap() +}); +pub(crate) static STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED: Lazy = Lazy::new(|| { + try_create_histogram_vec( + "near_state_sync_dump_obtain_part_elapsed_sec", + "Time needed to obtain a part", + &["shard_id"], + Some(exponential_buckets(0.001, 1.6, 25).unwrap()), + ) + .unwrap() +}); +pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_TOTAL: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_state_sync_dump_num_parts_total", + "Total number of parts in the epoch that being dumped", + &["shard_id"], + ) + .unwrap() +}); +pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_DUMPED: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_state_sync_dump_num_parts_dumped", + "Number of parts dumped in the epoch that is being dumped", + &["shard_id"], + ) + .unwrap() +}); +pub(crate) static STATE_SYNC_DUMP_SIZE_TOTAL: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_state_sync_dump_size_total", + "Total size of parts written to S3", + &["shard_id"], + ) + .unwrap() +}); +pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_state_sync_dump_epoch_height", + "Epoch Height of an epoch being dumped", + &["shard_id"], + ) + .unwrap() +}); diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs new file mode 100644 index 00000000000..458bff4e27e --- /dev/null +++ b/nearcore/src/state_sync.rs @@ -0,0 +1,437 @@ +use crate::{metrics, NearConfig, NightshadeRuntime}; +use borsh::BorshSerialize; +use near_chain::types::RuntimeAdapter; +use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Error}; +use near_chain_configs::ClientConfig; +use near_client::sync::state::StateSync; +use near_crypto::PublicKey; +use near_epoch_manager::EpochManagerAdapter; +use near_primitives::hash::CryptoHash; +use near_primitives::state_part::PartId; +use near_primitives::syncing::{get_num_state_parts, StatePartKey, StateSyncDumpProgress}; +use near_primitives::types::{EpochHeight, EpochId, ShardId, StateRoot}; +use near_store::DBCol; +use std::sync::Arc; + +/// Starts one a thread per tracked shard. +/// Each started thread will be dumping state parts of a single epoch to external storage. +pub fn spawn_state_sync_dump( + config: &NearConfig, + chain_genesis: ChainGenesis, + runtime: Arc, + node_key: &PublicKey, +) -> anyhow::Result> { + if !config.client_config.state_sync_dump_enabled { + return Ok(None); + } + if config.client_config.state_sync_s3_bucket.is_empty() + || config.client_config.state_sync_s3_region.is_empty() + { + panic!("Enabled dumps of state to external storage. Please specify state_sync.s3_bucket and state_sync.s3_region"); + } + tracing::info!(target: "state_sync_dump", "Spawning the state sync dump loop"); + + // Create a connection to S3. + let s3_bucket = config.client_config.state_sync_s3_bucket.clone(); + let s3_region = config.client_config.state_sync_s3_region.clone(); + + // Credentials to establish a connection are taken from environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. + let bucket = s3::Bucket::new( + &s3_bucket, + s3_region + .parse::() + .map_err(|err| >::into(err))?, + s3::creds::Credentials::default().map_err(|err| { + tracing::error!(target: "state_sync_dump", "Failed to create a connection to S3. Did you provide environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY?"); + >::into(err) + })?, + ).map_err(|err| >::into(err))?; + + // Determine how many threads to start. + // TODO: Handle the case of changing the shard layout. + let num_shards = { + // Sadly, `Chain` is not `Send` and each thread needs to create its own `Chain` instance. + let chain = Chain::new_for_view_client( + runtime.clone(), + &chain_genesis, + DoomslugThresholdMode::TwoThirds, + false, + )?; + let epoch_id = chain.head()?.epoch_id; + runtime.num_shards(&epoch_id) + }?; + + // Start a thread for each shard. + let handles = (0..num_shards as usize) + .map(|shard_id| { + let client_config = config.client_config.clone(); + let runtime = runtime.clone(); + let chain_genesis = chain_genesis.clone(); + let chain = Chain::new_for_view_client( + runtime.clone(), + &chain_genesis, + DoomslugThresholdMode::TwoThirds, + false, + ) + .unwrap(); + let arbiter_handle = actix_rt::Arbiter::new().handle(); + assert!(arbiter_handle.spawn(state_sync_dump( + shard_id as ShardId, + chain, + runtime, + client_config, + bucket.clone(), + node_key.clone(), + ))); + arbiter_handle + }) + .collect(); + + Ok(Some(StateSyncDumpHandle { handles })) +} + +/// Holds arbiter handles controlling the lifetime of the spawned threads. +pub struct StateSyncDumpHandle { + pub handles: Vec, +} + +impl Drop for StateSyncDumpHandle { + fn drop(&mut self) { + self.stop() + } +} + +impl StateSyncDumpHandle { + pub fn stop(&self) { + let _: Vec = self.handles.iter().map(|handle| handle.stop()).collect(); + } +} + +/// A thread loop that dumps state of the latest available epoch to S3. +/// Operates as a state machine. Persists its state in the Misc column. +/// Shards store the state-machine state separately. +async fn state_sync_dump( + shard_id: ShardId, + chain: Chain, + runtime: Arc, + config: ClientConfig, + bucket: s3::Bucket, + _node_key: PublicKey, +) { + tracing::info!(target: "state_sync_dump", shard_id, "Running StateSyncDump loop"); + let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(10)); + + if config.state_sync_restart_dump_for_shards.contains(&shard_id) { + tracing::debug!(target: "state_sync_dump", shard_id, "Dropped existing progress"); + chain.store().set_state_sync_dump_progress(shard_id, None).unwrap(); + } + + loop { + // Avoid a busy-loop when there is nothing to do. + interval.tick().await; + + let progress = chain.store().get_state_sync_dump_progress(shard_id); + tracing::debug!(target: "state_sync_dump", shard_id, ?progress, "Running StateSyncDump loop iteration"); + // The `match` returns the next state of the state machine. + let next_state = match progress { + Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts })) => { + // The latest epoch was dumped. Check if a newer epoch is available. + check_new_epoch( + Some(epoch_id), + Some(epoch_height), + num_parts, + shard_id, + &chain, + &runtime, + ) + } + Err(Error::DBNotFoundErr(_)) | Ok(None) => { + // First invocation of this state-machine. See if at least one epoch is available for dumping. + check_new_epoch(None, None, None, shard_id, &chain, &runtime) + } + Err(err) => { + // Something went wrong, let's retry. + tracing::warn!(target: "state_sync_dump", shard_id, ?err, "Failed to read the progress, will now delete and retry"); + if let Err(err) = chain.store().set_state_sync_dump_progress(shard_id, None) { + tracing::warn!(target: "state_sync_dump", shard_id, ?err, "and failed to delete the progress. Will later retry."); + } + Ok(None) + } + Ok(Some(StateSyncDumpProgress::InProgress { + epoch_id, + epoch_height, + sync_hash, + state_root, + parts_dumped, + num_parts, + })) => { + // The actual dumping of state to S3. + tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, %state_root, parts_dumped, num_parts, "Creating parts and dumping them"); + let mut res = None; + for part_id in parts_dumped..num_parts { + // Dump parts sequentially synchronously. + // TODO: How to make it possible to dump state more effectively using multiple nodes? + let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + + let state_part = match get_state_part( + &runtime, + &shard_id, + &sync_hash, + &state_root, + part_id, + num_parts, + &chain, + ) { + Ok(state_part) => state_part, + Err(err) => { + res = Some(err); + break; + } + }; + let location = + s3_location(&config.chain_id, epoch_height, shard_id, part_id, num_parts); + if let Err(err) = + put_state_part(&location, &state_part, &shard_id, &bucket).await + { + res = Some(err); + break; + } + update_progress( + &shard_id, + &epoch_id, + epoch_height, + &sync_hash, + &state_root, + part_id, + num_parts, + state_part.len(), + &chain, + ); + } + if let Some(err) = res { + Err(err) + } else { + Ok(Some(StateSyncDumpProgress::AllDumped { + epoch_id, + epoch_height, + num_parts: Some(num_parts), + })) + } + } + }; + + // Record the next state of the state machine. + match next_state { + Ok(Some(next_state)) => { + tracing::debug!(target: "state_sync_dump", shard_id, ?next_state); + match chain.store().set_state_sync_dump_progress(shard_id, Some(next_state)) { + Ok(_) => {} + Err(err) => { + // This will be retried. + tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to set progress"); + } + } + } + Ok(None) => { + // Do nothing. + tracing::debug!(target: "state_sync_dump", shard_id, "Idle"); + } + Err(err) => { + // Will retry. + tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to determine what to do"); + } + } + } +} + +async fn put_state_part( + location: &str, + state_part: &[u8], + shard_id: &ShardId, + bucket: &s3::Bucket, +) -> Result { + let _timer = metrics::STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + let put = bucket + .put_object(&location, &state_part) + .await + .map_err(|err| Error::Other(err.to_string())); + tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to S3"); + put +} + +fn update_progress( + shard_id: &ShardId, + epoch_id: &EpochId, + epoch_height: EpochHeight, + sync_hash: &CryptoHash, + state_root: &StateRoot, + part_id: u64, + num_parts: u64, + part_len: usize, + chain: &Chain, +) { + // Record that a part was obtained and dumped. + metrics::STATE_SYNC_DUMP_SIZE_TOTAL + .with_label_values(&[&shard_id.to_string()]) + .inc_by(part_len as u64); + let next_progress = StateSyncDumpProgress::InProgress { + epoch_id: epoch_id.clone(), + epoch_height, + sync_hash: *sync_hash, + state_root: *state_root, + parts_dumped: part_id + 1, + num_parts, + }; + match chain.store().set_state_sync_dump_progress(*shard_id, Some(next_progress.clone())) { + Ok(_) => { + tracing::debug!(target: "state_sync_dump", shard_id, ?next_progress, "Updated dump progress"); + } + Err(err) => { + tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to update dump progress, continue"); + } + } + set_metrics(shard_id, Some(part_id + 1), Some(num_parts), Some(epoch_height)); +} + +fn set_metrics( + shard_id: &ShardId, + parts_dumped: Option, + num_parts: Option, + epoch_height: Option, +) { + if let Some(parts_dumped) = parts_dumped { + metrics::STATE_SYNC_DUMP_NUM_PARTS_DUMPED + .with_label_values(&[&shard_id.to_string()]) + .set(parts_dumped as i64); + } + if let Some(num_parts) = num_parts { + metrics::STATE_SYNC_DUMP_NUM_PARTS_TOTAL + .with_label_values(&[&shard_id.to_string()]) + .set(num_parts as i64); + } + if let Some(epoch_height) = epoch_height { + assert!( + epoch_height < 10000, + "Impossible: {:?} {:?} {:?} {:?}", + shard_id, + parts_dumped, + num_parts, + epoch_height + ); + metrics::STATE_SYNC_DUMP_EPOCH_HEIGHT + .with_label_values(&[&shard_id.to_string()]) + .set(epoch_height as i64); + } +} + +fn get_state_part( + runtime: &Arc, + shard_id: &ShardId, + sync_hash: &CryptoHash, + state_root: &StateRoot, + part_id: u64, + num_parts: u64, + chain: &Chain, +) -> Result, Error> { + let state_part = { + let _timer = metrics::STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + runtime.obtain_state_part( + *shard_id, + &sync_hash, + &state_root, + PartId::new(part_id, num_parts), + )? + }; + + // Save the part data. + let key = StatePartKey(*sync_hash, *shard_id, part_id).try_to_vec()?; + let mut store_update = chain.store().store().store_update(); + store_update.set(DBCol::StateParts, &key, &state_part); + store_update.commit()?; + Ok(state_part) +} + +/// Gets basic information about the epoch to be dumped. +fn start_dumping( + epoch_id: EpochId, + sync_hash: CryptoHash, + shard_id: ShardId, + chain: &Chain, + runtime: &Arc, +) -> Result, Error> { + let epoch_info = runtime.get_epoch_info(&epoch_id)?; + let epoch_height = epoch_info.epoch_height(); + let num_shards = runtime.num_shards(&epoch_id)?; + let sync_hash_block = chain.get_block(&sync_hash)?; + if runtime.cares_about_shard(None, sync_hash_block.header().prev_hash(), shard_id, false) { + assert_eq!(num_shards, sync_hash_block.chunks().len() as u64); + let state_root = sync_hash_block.chunks()[shard_id as usize].prev_state_root(); + let state_root_node = runtime.get_state_root_node(shard_id, &sync_hash, &state_root)?; + let num_parts = get_num_state_parts(state_root_node.memory_usage); + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch"); + // Note that first the state of the state machines gets changes to + // `InProgress` and it starts dumping state after a short interval. + set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height)); + Ok(Some(StateSyncDumpProgress::InProgress { + epoch_id, + epoch_height, + sync_hash, + state_root, + parts_dumped: 0, + num_parts, + })) + } else { + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch"); + Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) })) + } +} + +/// Checks what is the latest complete epoch. +/// `epoch_id` represents the last fully dumped epoch. +fn check_new_epoch( + epoch_id: Option, + epoch_height: Option, + num_parts: Option, + shard_id: ShardId, + chain: &Chain, + runtime: &Arc, +) -> Result, Error> { + let head = chain.head()?; + if Some(&head.epoch_id) == epoch_id.as_ref() { + set_metrics(&shard_id, num_parts, num_parts, epoch_height); + Ok(None) + } else { + // Check if the final block is now in the next epoch. + tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, "Check if a new complete epoch is available"); + let hash = head.last_block_hash; + let header = chain.get_block_header(&hash)?; + let final_hash = header.last_final_block(); + let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &final_hash)?; + let header = chain.get_block_header(&sync_hash)?; + if Some(header.epoch_id()) == epoch_id.as_ref() { + // Still in the latest dumped epoch. Do nothing. + Ok(None) + } else { + start_dumping(head.epoch_id, sync_hash, shard_id, &chain, runtime) + } + } +} + +fn s3_location( + chain_id: &str, + epoch_height: u64, + shard_id: u64, + part_id: u64, + num_parts: u64, +) -> String { + format!( + "chain_id={}/epoch_height={}/shard_id={}/state_part_{:06}_of_{:06}", + chain_id, epoch_height, shard_id, part_id, num_parts + ) +} diff --git a/neard/src/cli.rs b/neard/src/cli.rs index 15dd7889877..c6e00ad21b4 100644 --- a/neard/src/cli.rs +++ b/neard/src/cli.rs @@ -505,14 +505,18 @@ impl RunCmd { UpdateableConfigLoader::new(updateable_configs.clone(), tx_config_update); let config_updater = ConfigUpdater::new(rx_config_update); - let nearcore::NearNode { rpc_servers, cold_store_loop_handle, .. } = - nearcore::start_with_config_and_synchronization( - home_dir, - near_config, - Some(tx_crash), - Some(config_updater), - ) - .expect("start_with_config"); + let nearcore::NearNode { + rpc_servers, + cold_store_loop_handle, + state_sync_dump_handle, + .. + } = nearcore::start_with_config_and_synchronization( + home_dir, + near_config, + Some(tx_crash), + Some(config_updater), + ) + .expect("start_with_config"); let sig = loop { let sig = wait_for_interrupt_signal(home_dir, &mut rx_crash).await; @@ -526,6 +530,7 @@ impl RunCmd { }; warn!(target: "neard", "{}, stopping... this may take a few minutes.", sig); cold_store_loop_handle.map(|handle| handle.stop()); + state_sync_dump_handle.map(|handle| handle.stop()); futures::future::join_all(rpc_servers.iter().map(|(name, server)| async move { server.stop(true).await; debug!(target: "neard", "{} server stopped", name); diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index 63ec8569713..934221ac61f 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -412,7 +412,7 @@ impl S3Storage { let location = location_prefix(chain_id, epoch_height, shard_id); let bucket = s3::Bucket::new( &s3_bucket, - s3_region.parse().unwrap(), + s3_region.parse::().unwrap(), s3::creds::Credentials::default().unwrap(), ) .unwrap();