From ad4e87c8f95556d4e67c3821c7383620965064a7 Mon Sep 17 00:00:00 2001 From: nikurt <86772482+nikurt@users.noreply.github.com> Date: Fri, 10 Mar 2023 10:39:14 +0100 Subject: [PATCH] feat: Dump state of every epoch to S3 (#8661) * Start a thread per shard to do the dumping * AWS credentials are provided as environment variables: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` * In `config.json` specify both `config.state_sync.s3_bucket` and `config.state_sync.s3_region` to enable the new behavior. * No changes to the behavior of the node if those options are not enabled in `config.json`. * State is persisted to RocksDB such that restarts of the node are well handled. * Some useful metrics are exported. * The node assumes it's the only node in the this and all alternative universes that does the dumping. * * Unclear how to use multiple nodes to complete the dump faster * TODO: Speed this up by doing things in parallel: obtain parts, upload parts, set tags * * Do we even need tags? --- CHANGELOG.md | 3 + chain/chain/src/store.rs | 1 + core/chain-configs/src/client_config.rs | 17 +- core/primitives/src/syncing.rs | 13 +- docs/misc/state_sync_dump.md | 73 ++++++ .../src/tests/client/process_blocks.rs | 4 +- nearcore/src/config.rs | 20 +- nearcore/src/lib.rs | 24 +- nearcore/src/state_sync.rs | 245 ++++++++++-------- 9 files changed, 266 insertions(+), 134 deletions(-) create mode 100644 docs/misc/state_sync_dump.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 61a36fb7943..044ca48c442 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ ### Non-protocol Changes +* Experimental option to dump state of every epoch to external storage. [#8661](https://github.com/near/nearcore/pull/8661) +* State-viewer tool to dump and apply state changes from/to a range of blocks [#8628](https://github.com/near/nearcore/pull/8628) + ## 1.32.0 ### Protocol Changes diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index fe7e681add1..b5a6fdfa7da 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -841,6 +841,7 @@ impl ChainStore { /// Constructs key 'STATE_SYNC_DUMP:', /// for example 'STATE_SYNC_DUMP:2' for shard_id=2. + /// Doesn't contain epoch_id, because only one dump process per shard is allowed. fn state_sync_dump_progress_key(shard_id: ShardId) -> Vec { let mut key = b"STATE_SYNC_DUMP:".to_vec(); key.extend(shard_id.to_le_bytes()); diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index f3df1c7aa6d..898f8e8cb19 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -167,17 +167,15 @@ pub struct ClientConfig { pub client_background_migration_threads: usize, /// Duration to perform background flat storage creation step. pub flat_storage_creation_period: Duration, - /// Whether to enable dumping state of every epoch to S3. - pub state_dump_enabled: bool, + /// If enabled, will dump state of every epoch to external storage. + pub state_sync_dump_enabled: bool, /// S3 bucket for storing state dumps. pub state_sync_s3_bucket: String, /// S3 region for storing state dumps. pub state_sync_s3_region: String, - /// Discard the existing progress of dumping an epoch state to S3. - pub state_sync_dump_drop_state: Vec, - /// Whether to enable state sync from S3. - /// If disabled will perform state sync from the peers. - pub state_sync_from_s3_enabled: bool, + /// Restart dumping state of selected shards. + /// Use for troubleshooting of the state dumping process. + pub state_sync_restart_dump_for_shards: Vec, } impl ClientConfig { @@ -248,11 +246,10 @@ impl ClientConfig { enable_statistics_export: true, client_background_migration_threads: 1, flat_storage_creation_period: Duration::from_secs(1), - state_dump_enabled: false, - state_sync_from_s3_enabled: false, + state_sync_dump_enabled: false, state_sync_s3_bucket: String::new(), state_sync_s3_region: String::new(), - state_sync_dump_drop_state: vec![], + state_sync_restart_dump_for_shards: vec![], } } } diff --git a/core/primitives/src/syncing.rs b/core/primitives/src/syncing.rs index f64e2760432..4dc68f3b7a6 100644 --- a/core/primitives/src/syncing.rs +++ b/core/primitives/src/syncing.rs @@ -229,18 +229,29 @@ pub fn get_num_state_parts(memory_usage: u64) -> u64 { } #[derive(BorshSerialize, BorshDeserialize, Debug, Clone)] -/// Represents the state of the state machine that dumps state. +/// Represents the progress of dumps state of a shard. pub enum StateSyncDumpProgress { + /// Represents two cases: + /// * An epoch dump is complete + /// * The node is running its first epoch and there is nothing to dump. AllDumped { + /// The dumped state corresponds to the state at the beginning of the specified epoch. epoch_id: EpochId, epoch_height: EpochHeight, + // Missing in case of a node running the first epoch. num_parts: Option, }, + /// Represents the case of an epoch being partially dumped. InProgress { + /// The dumped state corresponds to the state at the beginning of the specified epoch. epoch_id: EpochId, epoch_height: EpochHeight, + /// Block hash of the first block of the epoch. + /// The dumped state corresponds to the state before applying this block. sync_hash: CryptoHash, + /// Root of the state being dumped. state_root: StateRoot, + /// Progress made. parts_dumped: u64, num_parts: u64, }, diff --git a/docs/misc/state_sync_dump.md b/docs/misc/state_sync_dump.md new file mode 100644 index 00000000000..08bb1e96c2a --- /dev/null +++ b/docs/misc/state_sync_dump.md @@ -0,0 +1,73 @@ +# Experimental: Dump of state to External Storage + +## Purpose + +Current implementation of state sync (see +https://github.com/near/nearcore/blob/master/docs/architecture/how/sync.md for +details) doesn't allow the nodes to reliably perform state sync for testnet or +mainnet. + +That's why a new solution for state sync is being designed. +The experimental code is likely going to be a part of solution to greatly +improve both reliability and speed of state sync. + +The new solution will probably involve making the state available on external +storage, making downloading the state both low latency and reliable process, +thanks to the robust infrastructure of external storage such as S3. + +## How-to + +[#8661](https://github.com/near/nearcore/pull/8661) adds an experimental option +to dump state of every epoch to external storage. At the moment only S3 is +supported as external storage. + +To enable, add this to your `config.json` file: + +```json +"state_sync": { + "s3_bucket": "my-bucket", + "s3_region": "eu-central-1", + "dump_enabled": true +} +``` + +And run your node with environment variables `AWS_ACCESS_KEY_ID` and +`AWS_SECRET_ACCESS_KEY`: +```shell +AWS_ACCESS_KEY_ID="MY_ACCESS_KEY" AWS_SECRET_ACCESS_KEY="MY_AWS_SECRET_ACCESS_KEY" ./neard run +``` + +## Implementation Details + +The experimental option spawns a thread for each of the shards tracked by a node. +Each of the threads acts independently. Each thread determines the last +complete epoch, and starts the process of dumping the state. + +To dump the state a thread does the following: +* Get the size of the trie to determine the number of state parts +* Obtain each state part +* Upload each state part to S3 + +State parts are uploaded as individual objects. Location of those objects is +computed as follows: +``` +"chain_id={chain_id}/epoch_height={epoch_height}/shard_id={shard_id}/state_part_{part_id:06}_of_{num_parts:06}", +``` +for example `chain_id=testnet/epoch_height=1790/shard_id=2/state_part_032642_of_065402` + +Currently, using multiple nodes for dumping state doesn't make the process go +any faster. The nodes will simpler duplicate the work overwriting files created +by each other. + +Future improvement can be to make the nodes cooperate. To avoid introducing a +complicated consensus process, we can suggest the following simple process: +* Get a list of state parts already dumped for an epoch +* Pick 100 random state parts that are not yet random +* Obtain and upload that 100 state parts +* Repeat until all state parts are complete + +The process of dumping state parts is managed as a state machine with 2 +possible states. The state is stored in the `BlockMisc` column with row key +`STATE_SYNC_DUMP:X` for shard X. Note that epoch id is not included in the row +key, because epoch id is not needed for managing the state machine, because only +one epoch per shard can be dumped at a time. diff --git a/integration-tests/src/tests/client/process_blocks.rs b/integration-tests/src/tests/client/process_blocks.rs index 4d68e2cae9e..de35d1fc8c0 100644 --- a/integration-tests/src/tests/client/process_blocks.rs +++ b/integration-tests/src/tests/client/process_blocks.rs @@ -1475,11 +1475,11 @@ fn test_gc_with_epoch_length_common(epoch_length: NumBlocks) { let block_hash = *blocks[i as usize].hash(); assert_matches!( env.clients[0].chain.get_block(&block_hash).unwrap_err(), - Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == "BLOCK: ".to_owned() + &block_hash.to_string() + Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == format!("BLOCK: {}", block_hash) ); assert_matches!( env.clients[0].chain.get_block_by_height(i).unwrap_err(), - Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == "BLOCK: ".to_owned() + &block_hash.to_string() + Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == format!("BLOCK: {}", block_hash) ); assert!(env.clients[0] .chain diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index f1d9846763e..a1533c77de1 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -701,7 +701,10 @@ impl NearConfig { enable_statistics_export: config.store.enable_statistics_export, client_background_migration_threads: config.store.background_migration_threads, flat_storage_creation_period: config.store.flat_storage_creation_period, - state_dump_enabled: config.state_sync.as_ref().map_or(false, |x| x.dump_enabled), + state_sync_dump_enabled: config + .state_sync + .as_ref() + .map_or(false, |x| x.dump_enabled.unwrap_or(false)), state_sync_s3_bucket: config .state_sync .as_ref() @@ -710,14 +713,10 @@ impl NearConfig { .state_sync .as_ref() .map_or(String::new(), |x| x.s3_region.clone()), - state_sync_dump_drop_state: config - .state_sync - .as_ref() - .map_or(vec![], |x| x.drop_state_of_dump.clone()), - state_sync_from_s3_enabled: config + state_sync_restart_dump_for_shards: config .state_sync .as_ref() - .map_or(false, |x| x.sync_from_s3_enabled), + .map_or(vec![], |x| x.drop_state_of_dump.clone().unwrap_or(vec![])), }, network_config: NetworkConfig::new( config.network, @@ -1546,9 +1545,10 @@ pub fn load_test_config(seed: &str, addr: tcp::ListenerAddr, genesis: Genesis) - pub struct StateSyncConfig { pub s3_bucket: String, pub s3_region: String, - pub dump_enabled: bool, - pub drop_state_of_dump: Vec, - pub sync_from_s3_enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub dump_enabled: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub drop_state_of_dump: Option>, } #[test] diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 7498a01830d..3da5e6a2766 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -2,6 +2,7 @@ use crate::cold_storage::spawn_cold_store_loop; pub use crate::config::{init_configs, load_config, load_test_config, NearConfig, NEAR_BASE}; pub use crate::runtime::NightshadeRuntime; pub use crate::shard_tracker::TrackedConfig; +use crate::state_sync::{spawn_state_sync_dump, StateSyncDumpHandle}; use actix::{Actor, Addr}; use actix_rt::ArbiterHandle; use actix_web; @@ -12,17 +13,17 @@ use near_async::messaging::{IntoSender, LateBoundSender}; use near_chain::{Chain, ChainGenesis}; use near_chunks::shards_manager_actor::start_shards_manager; use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor}; -use near_primitives::time; - use near_network::PeerManagerActor; use near_primitives::block::GenesisId; +use near_primitives::time; use near_store::metadata::DbKind; use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError}; use near_telemetry::TelemetryActor; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::broadcast; -use tracing::{info, trace}; +use tracing::info; + pub mod append_only_map; mod cold_storage; pub mod config; @@ -33,6 +34,7 @@ mod metrics; pub mod migrations; mod runtime; mod shard_tracker; +mod state_sync; pub fn get_default_home() -> PathBuf { if let Ok(near_home) = std::env::var("NEAR_HOME") { @@ -188,6 +190,8 @@ pub struct NearNode { /// The cold_store_loop_handle will only be set if the cold store is configured. /// It's a handle to a background thread that copies data from the hot store to the cold store. pub cold_store_loop_handle: Option, + /// Contains handles to background threads that may be dumping state to S3. + pub state_sync_dump_handle: Option, } pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result { @@ -242,7 +246,7 @@ pub fn start_with_config_and_synchronization( ); let (client_actor, client_arbiter_handle) = start_client( config.client_config.clone(), - chain_genesis, + chain_genesis.clone(), runtime.clone(), node_id, network_adapter.clone().into(), @@ -255,7 +259,7 @@ pub fn start_with_config_and_synchronization( ); client_adapter_for_shards_manager.bind(client_actor.clone().with_auto_span_context()); let (shards_manager_actor, shards_manager_arbiter_handle) = start_shards_manager( - runtime, + runtime.clone(), network_adapter.as_sender(), client_adapter_for_shards_manager.as_sender(), config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()), @@ -264,6 +268,13 @@ pub fn start_with_config_and_synchronization( ); shards_manager_adapter.bind(shards_manager_actor); + let state_sync_dump_handle = spawn_state_sync_dump( + &config, + chain_genesis, + runtime, + config.network_config.node_id().public_key(), + )?; + #[allow(unused_mut)] let mut rpc_servers = Vec::new(); let network_actor = PeerManagerActor::spawn( @@ -304,7 +315,7 @@ pub fn start_with_config_and_synchronization( rpc_servers.shrink_to_fit(); - trace!(target: "diagnostic", key="log", "Starting NEAR node with diagnostic activated"); + tracing::trace!(target: "diagnostic", key = "log", "Starting NEAR node with diagnostic activated"); Ok(NearNode { client: client_actor, @@ -312,6 +323,7 @@ pub fn start_with_config_and_synchronization( rpc_servers, arbiters: vec![client_arbiter_handle, shards_manager_arbiter_handle], cold_store_loop_handle, + state_sync_dump_handle, }) } diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 1f3220057cc..458bff4e27e 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -1,53 +1,61 @@ use crate::{metrics, NearConfig, NightshadeRuntime}; +use borsh::BorshSerialize; use near_chain::types::RuntimeAdapter; -use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode, Error}; +use near_chain::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode, Error}; use near_chain_configs::ClientConfig; use near_client::sync::state::StateSync; use near_crypto::PublicKey; use near_epoch_manager::EpochManagerAdapter; use near_primitives::hash::CryptoHash; use near_primitives::state_part::PartId; -use near_primitives::syncing::{get_num_state_parts, StateSyncDumpProgress}; -use near_primitives::types::{EpochHeight, EpochId, ShardId}; +use near_primitives::syncing::{get_num_state_parts, StatePartKey, StateSyncDumpProgress}; +use near_primitives::types::{EpochHeight, EpochId, ShardId, StateRoot}; +use near_store::DBCol; use std::sync::Arc; +/// Starts one a thread per tracked shard. +/// Each started thread will be dumping state parts of a single epoch to external storage. pub fn spawn_state_sync_dump( config: &NearConfig, - chain_genesis: &ChainGenesis, + chain_genesis: ChainGenesis, runtime: Arc, node_key: &PublicKey, ) -> anyhow::Result> { - if !config.client_config.state_dump_enabled - || config.client_config.state_sync_s3_bucket.is_empty() + if !config.client_config.state_sync_dump_enabled { + return Ok(None); + } + if config.client_config.state_sync_s3_bucket.is_empty() || config.client_config.state_sync_s3_region.is_empty() { - return Ok(None); + panic!("Enabled dumps of state to external storage. Please specify state_sync.s3_bucket and state_sync.s3_region"); } tracing::info!(target: "state_sync_dump", "Spawning the state sync dump loop"); // Create a connection to S3. let s3_bucket = config.client_config.state_sync_s3_bucket.clone(); let s3_region = config.client_config.state_sync_s3_region.clone(); + + // Credentials to establish a connection are taken from environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY. let bucket = s3::Bucket::new( &s3_bucket, s3_region .parse::() .map_err(|err| >::into(err))?, s3::creds::Credentials::default().map_err(|err| { + tracing::error!(target: "state_sync_dump", "Failed to create a connection to S3. Did you provide environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY?"); >::into(err) })?, - ) - .map_err(|err| >::into(err))?; + ).map_err(|err| >::into(err))?; // Determine how many threads to start. - // Doesn't handle the case of changing the shard layout. + // TODO: Handle the case of changing the shard layout. let num_shards = { // Sadly, `Chain` is not `Send` and each thread needs to create its own `Chain` instance. let chain = Chain::new_for_view_client( runtime.clone(), - chain_genesis, + &chain_genesis, DoomslugThresholdMode::TwoThirds, - config.client_config.save_trie_changes, + false, )?; let epoch_id = chain.head()?.epoch_id; runtime.num_shards(&epoch_id) @@ -58,13 +66,12 @@ pub fn spawn_state_sync_dump( .map(|shard_id| { let client_config = config.client_config.clone(); let runtime = runtime.clone(); - let save_trie_changes = client_config.save_trie_changes; let chain_genesis = chain_genesis.clone(); let chain = Chain::new_for_view_client( runtime.clone(), &chain_genesis, DoomslugThresholdMode::TwoThirds, - save_trie_changes, + false, ) .unwrap(); let arbiter_handle = actix_rt::Arbiter::new().handle(); @@ -114,7 +121,7 @@ async fn state_sync_dump( tracing::info!(target: "state_sync_dump", shard_id, "Running StateSyncDump loop"); let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(10)); - if config.state_sync_dump_drop_state.contains(&shard_id) { + if config.state_sync_restart_dump_for_shards.contains(&shard_id) { tracing::debug!(target: "state_sync_dump", shard_id, "Dropped existing progress"); chain.store().set_state_sync_dump_progress(shard_id, None).unwrap(); } @@ -136,18 +143,17 @@ async fn state_sync_dump( shard_id, &chain, &runtime, - &config, ) } Err(Error::DBNotFoundErr(_)) | Ok(None) => { // First invocation of this state-machine. See if at least one epoch is available for dumping. - check_new_epoch(None, None, None, shard_id, &chain, &runtime, &config) + check_new_epoch(None, None, None, shard_id, &chain, &runtime) } Err(err) => { // Something went wrong, let's retry. - tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to read the progress, delete and retry"); + tracing::warn!(target: "state_sync_dump", shard_id, ?err, "Failed to read the progress, will now delete and retry"); if let Err(err) = chain.store().set_state_sync_dump_progress(shard_id, None) { - tracing::debug!(target: "state_sync_dump", shard_id, ?err, "And failed to delete it too :("); + tracing::warn!(target: "state_sync_dump", shard_id, ?err, "and failed to delete the progress. Will later retry."); } Ok(None) } @@ -160,7 +166,7 @@ async fn state_sync_dump( num_parts, })) => { // The actual dumping of state to S3. - tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, ?sync_hash, ?state_root, parts_dumped, num_parts, "Creating parts and dumping them"); + tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, %sync_hash, %state_root, parts_dumped, num_parts, "Creating parts and dumping them"); let mut res = None; for part_id in parts_dumped..num_parts { // Dump parts sequentially synchronously. @@ -168,18 +174,16 @@ async fn state_sync_dump( let _timer = metrics::STATE_SYNC_DUMP_ITERATION_ELAPSED .with_label_values(&[&shard_id.to_string()]) .start_timer(); - let state_part = { - let _timer = metrics::STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - runtime.obtain_state_part( - shard_id, - &sync_hash, - &state_root, - PartId::new(part_id, num_parts), - ) - }; - let state_part = match state_part { + + let state_part = match get_state_part( + &runtime, + &shard_id, + &sync_hash, + &state_root, + part_id, + num_parts, + &chain, + ) { Ok(state_part) => state_part, Err(err) => { res = Some(err); @@ -188,70 +192,23 @@ async fn state_sync_dump( }; let location = s3_location(&config.chain_id, epoch_height, shard_id, part_id, num_parts); - + if let Err(err) = + put_state_part(&location, &state_part, &shard_id, &bucket).await { - let _timer = metrics::STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED - .with_label_values(&[&shard_id.to_string()]) - .start_timer(); - let put = bucket - .put_object(&location, &state_part) - .await - .map_err(|err| Error::Other(err.to_string())); - if let Err(err) = put { - res = Some(err); - break; - } - - /* - // Optional, we probably don't need this. - let put = bucket - .put_object_tagging( - &location, - &[ - ("chain_id", &config.chain_id), - ("epoch_height", &epoch_height.to_string()), - ("epoch_id", &format!("{:?}", epoch_id.0)), - ("node_key", &format!("{:?}", node_key)), - ("num_parts", &format!("{}", num_parts)), - ("part_id", &format!("{}", part_id)), - ("state_root", &format!("{:?}", state_root)), - ("sync_hash", &format!("{:?}", sync_hash)), - ], - ) - .await - .map_err(|err| Error::Other(err.to_string())); - if let Err(err) = put { - res = Some(err); - break; - } - */ + res = Some(err); + break; } - - // Record that a part was obtained and dumped. - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, epoch_height, ?sync_hash, ?state_root, part_id, part_length = state_part.len(), ?location, "Wrote a state part to S3"); - metrics::STATE_SYNC_DUMP_SIZE_TOTAL - .with_label_values(&[&shard_id.to_string()]) - .inc_by(state_part.len() as u64); - let next_progress = StateSyncDumpProgress::InProgress { - epoch_id: epoch_id.clone(), + update_progress( + &shard_id, + &epoch_id, epoch_height, - sync_hash, - state_root, - parts_dumped: part_id + 1, + &sync_hash, + &state_root, + part_id, num_parts, - }; - match chain - .store() - .set_state_sync_dump_progress(shard_id, Some(next_progress.clone())) - { - Ok(_) => { - tracing::debug!(target: "state_sync_dump", shard_id, ?next_progress, "Updated dump progress"); - } - Err(err) => { - tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to update dump progress, continue"); - } - } - set_metrics(shard_id, Some(part_id + 1), Some(num_parts), Some(epoch_height)); + state_part.len(), + &chain, + ); } if let Some(err) = res { Err(err) @@ -289,8 +246,59 @@ async fn state_sync_dump( } } +async fn put_state_part( + location: &str, + state_part: &[u8], + shard_id: &ShardId, + bucket: &s3::Bucket, +) -> Result { + let _timer = metrics::STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + let put = bucket + .put_object(&location, &state_part) + .await + .map_err(|err| Error::Other(err.to_string())); + tracing::debug!(target: "state_sync_dump", shard_id, part_length = state_part.len(), ?location, "Wrote a state part to S3"); + put +} + +fn update_progress( + shard_id: &ShardId, + epoch_id: &EpochId, + epoch_height: EpochHeight, + sync_hash: &CryptoHash, + state_root: &StateRoot, + part_id: u64, + num_parts: u64, + part_len: usize, + chain: &Chain, +) { + // Record that a part was obtained and dumped. + metrics::STATE_SYNC_DUMP_SIZE_TOTAL + .with_label_values(&[&shard_id.to_string()]) + .inc_by(part_len as u64); + let next_progress = StateSyncDumpProgress::InProgress { + epoch_id: epoch_id.clone(), + epoch_height, + sync_hash: *sync_hash, + state_root: *state_root, + parts_dumped: part_id + 1, + num_parts, + }; + match chain.store().set_state_sync_dump_progress(*shard_id, Some(next_progress.clone())) { + Ok(_) => { + tracing::debug!(target: "state_sync_dump", shard_id, ?next_progress, "Updated dump progress"); + } + Err(err) => { + tracing::debug!(target: "state_sync_dump", shard_id, ?err, "Failed to update dump progress, continue"); + } + } + set_metrics(shard_id, Some(part_id + 1), Some(num_parts), Some(epoch_height)); +} + fn set_metrics( - shard_id: ShardId, + shard_id: &ShardId, parts_dumped: Option, num_parts: Option, epoch_height: Option, @@ -320,6 +328,35 @@ fn set_metrics( } } +fn get_state_part( + runtime: &Arc, + shard_id: &ShardId, + sync_hash: &CryptoHash, + state_root: &StateRoot, + part_id: u64, + num_parts: u64, + chain: &Chain, +) -> Result, Error> { + let state_part = { + let _timer = metrics::STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED + .with_label_values(&[&shard_id.to_string()]) + .start_timer(); + runtime.obtain_state_part( + *shard_id, + &sync_hash, + &state_root, + PartId::new(part_id, num_parts), + )? + }; + + // Save the part data. + let key = StatePartKey(*sync_hash, *shard_id, part_id).try_to_vec()?; + let mut store_update = chain.store().store().store_update(); + store_update.set(DBCol::StateParts, &key, &state_part); + store_update.commit()?; + Ok(state_part) +} + /// Gets basic information about the epoch to be dumped. fn start_dumping( epoch_id: EpochId, @@ -337,10 +374,10 @@ fn start_dumping( let state_root = sync_hash_block.chunks()[shard_id as usize].prev_state_root(); let state_root_node = runtime.get_state_root_node(shard_id, &sync_hash, &state_root)?; let num_parts = get_num_state_parts(state_root_node.memory_usage); - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, ?sync_hash, ?state_root, num_parts, "Initialize dumping state of Epoch"); + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, %state_root, num_parts, "Initialize dumping state of Epoch"); // Note that first the state of the state machines gets changes to // `InProgress` and it starts dumping state after a short interval. - set_metrics(shard_id, Some(0), Some(num_parts), Some(epoch_height)); + set_metrics(&shard_id, Some(0), Some(num_parts), Some(epoch_height)); Ok(Some(StateSyncDumpProgress::InProgress { epoch_id, epoch_height, @@ -350,7 +387,7 @@ fn start_dumping( num_parts, })) } else { - tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, ?sync_hash, "Shard is not tracked, skip the epoch"); + tracing::debug!(target: "state_sync_dump", shard_id, ?epoch_id, %sync_hash, "Shard is not tracked, skip the epoch"); Ok(Some(StateSyncDumpProgress::AllDumped { epoch_id, epoch_height, num_parts: Some(0) })) } } @@ -364,26 +401,24 @@ fn check_new_epoch( shard_id: ShardId, chain: &Chain, runtime: &Arc, - config: &ClientConfig, ) -> Result, Error> { let head = chain.head()?; if Some(&head.epoch_id) == epoch_id.as_ref() { - set_metrics(shard_id, num_parts, num_parts, epoch_height); + set_metrics(&shard_id, num_parts, num_parts, epoch_height); Ok(None) } else { + // Check if the final block is now in the next epoch. tracing::info!(target: "state_sync_dump", shard_id, ?epoch_id, "Check if a new complete epoch is available"); - let mut sync_hash = head.prev_block_hash; - // Step back a few blocks to avoid dealing with forks. - for _ in 0..config.state_fetch_horizon { - sync_hash = *chain.get_block_header(&sync_hash)?.prev_hash(); - } - let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &sync_hash)?; + let hash = head.last_block_hash; + let header = chain.get_block_header(&hash)?; + let final_hash = header.last_final_block(); + let sync_hash = StateSync::get_epoch_start_sync_hash(&chain, &final_hash)?; let header = chain.get_block_header(&sync_hash)?; if Some(header.epoch_id()) == epoch_id.as_ref() { // Still in the latest dumped epoch. Do nothing. Ok(None) } else { - start_dumping(head.epoch_id.clone(), sync_hash, shard_id, &chain, runtime) + start_dumping(head.epoch_id, sync_hash, shard_id, &chain, runtime) } } }