Skip to content

Commit

Permalink
feat: Dump state of every epoch to S3 (near#8661)
Browse files Browse the repository at this point in the history
* Start a thread per shard to do the dumping
* AWS credentials are provided as environment variables: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`
* In `config.json` specify both `config.state_sync.s3_bucket` and `config.state_sync.s3_region` to enable the new behavior.
* No changes to the behavior of the node if those options are not enabled in `config.json`.
* State is persisted to RocksDB such that restarts of the node are well handled.
* Some useful metrics are exported.
* The node assumes it's the only node in the this and all alternative universes that does the dumping.
* * Unclear how to use multiple nodes to complete the dump faster
* TODO: Speed this up by doing things in parallel: obtain parts, upload parts, set tags
* * Do we even need tags?
  • Loading branch information
nikurt committed Mar 15, 2023
1 parent d64aaae commit ad4e87c
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 134 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

### Non-protocol Changes

* Experimental option to dump state of every epoch to external storage. [#8661](https://github.com/near/nearcore/pull/8661)
* State-viewer tool to dump and apply state changes from/to a range of blocks [#8628](https://github.com/near/nearcore/pull/8628)

## 1.32.0

### Protocol Changes
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ impl ChainStore {

/// Constructs key 'STATE_SYNC_DUMP:<ShardId>',
/// for example 'STATE_SYNC_DUMP:2' for shard_id=2.
/// Doesn't contain epoch_id, because only one dump process per shard is allowed.
fn state_sync_dump_progress_key(shard_id: ShardId) -> Vec<u8> {
let mut key = b"STATE_SYNC_DUMP:".to_vec();
key.extend(shard_id.to_le_bytes());
Expand Down
17 changes: 7 additions & 10 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,15 @@ pub struct ClientConfig {
pub client_background_migration_threads: usize,
/// Duration to perform background flat storage creation step.
pub flat_storage_creation_period: Duration,
/// Whether to enable dumping state of every epoch to S3.
pub state_dump_enabled: bool,
/// If enabled, will dump state of every epoch to external storage.
pub state_sync_dump_enabled: bool,
/// S3 bucket for storing state dumps.
pub state_sync_s3_bucket: String,
/// S3 region for storing state dumps.
pub state_sync_s3_region: String,
/// Discard the existing progress of dumping an epoch state to S3.
pub state_sync_dump_drop_state: Vec<ShardId>,
/// Whether to enable state sync from S3.
/// If disabled will perform state sync from the peers.
pub state_sync_from_s3_enabled: bool,
/// Restart dumping state of selected shards.
/// Use for troubleshooting of the state dumping process.
pub state_sync_restart_dump_for_shards: Vec<ShardId>,
}

impl ClientConfig {
Expand Down Expand Up @@ -248,11 +246,10 @@ impl ClientConfig {
enable_statistics_export: true,
client_background_migration_threads: 1,
flat_storage_creation_period: Duration::from_secs(1),
state_dump_enabled: false,
state_sync_from_s3_enabled: false,
state_sync_dump_enabled: false,
state_sync_s3_bucket: String::new(),
state_sync_s3_region: String::new(),
state_sync_dump_drop_state: vec![],
state_sync_restart_dump_for_shards: vec![],
}
}
}
13 changes: 12 additions & 1 deletion core/primitives/src/syncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,29 @@ pub fn get_num_state_parts(memory_usage: u64) -> u64 {
}

#[derive(BorshSerialize, BorshDeserialize, Debug, Clone)]
/// Represents the state of the state machine that dumps state.
/// Represents the progress of dumps state of a shard.
pub enum StateSyncDumpProgress {
/// Represents two cases:
/// * An epoch dump is complete
/// * The node is running its first epoch and there is nothing to dump.
AllDumped {
/// The dumped state corresponds to the state at the beginning of the specified epoch.
epoch_id: EpochId,
epoch_height: EpochHeight,
// Missing in case of a node running the first epoch.
num_parts: Option<u64>,
},
/// Represents the case of an epoch being partially dumped.
InProgress {
/// The dumped state corresponds to the state at the beginning of the specified epoch.
epoch_id: EpochId,
epoch_height: EpochHeight,
/// Block hash of the first block of the epoch.
/// The dumped state corresponds to the state before applying this block.
sync_hash: CryptoHash,
/// Root of the state being dumped.
state_root: StateRoot,
/// Progress made.
parts_dumped: u64,
num_parts: u64,
},
Expand Down
73 changes: 73 additions & 0 deletions docs/misc/state_sync_dump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Experimental: Dump of state to External Storage

## Purpose

Current implementation of state sync (see
https://github.com/near/nearcore/blob/master/docs/architecture/how/sync.md for
details) doesn't allow the nodes to reliably perform state sync for testnet or
mainnet.

That's why a new solution for state sync is being designed.
The experimental code is likely going to be a part of solution to greatly
improve both reliability and speed of state sync.

The new solution will probably involve making the state available on external
storage, making downloading the state both low latency and reliable process,
thanks to the robust infrastructure of external storage such as S3.

## How-to

[#8661](https://github.com/near/nearcore/pull/8661) adds an experimental option
to dump state of every epoch to external storage. At the moment only S3 is
supported as external storage.

To enable, add this to your `config.json` file:

```json
"state_sync": {
"s3_bucket": "my-bucket",
"s3_region": "eu-central-1",
"dump_enabled": true
}
```

And run your node with environment variables `AWS_ACCESS_KEY_ID` and
`AWS_SECRET_ACCESS_KEY`:
```shell
AWS_ACCESS_KEY_ID="MY_ACCESS_KEY" AWS_SECRET_ACCESS_KEY="MY_AWS_SECRET_ACCESS_KEY" ./neard run
```

## Implementation Details

The experimental option spawns a thread for each of the shards tracked by a node.
Each of the threads acts independently. Each thread determines the last
complete epoch, and starts the process of dumping the state.

To dump the state a thread does the following:
* Get the size of the trie to determine the number of state parts
* Obtain each state part
* Upload each state part to S3

State parts are uploaded as individual objects. Location of those objects is
computed as follows:
```
"chain_id={chain_id}/epoch_height={epoch_height}/shard_id={shard_id}/state_part_{part_id:06}_of_{num_parts:06}",
```
for example `chain_id=testnet/epoch_height=1790/shard_id=2/state_part_032642_of_065402`

Currently, using multiple nodes for dumping state doesn't make the process go
any faster. The nodes will simpler duplicate the work overwriting files created
by each other.

Future improvement can be to make the nodes cooperate. To avoid introducing a
complicated consensus process, we can suggest the following simple process:
* Get a list of state parts already dumped for an epoch
* Pick 100 random state parts that are not yet random
* Obtain and upload that 100 state parts
* Repeat until all state parts are complete

The process of dumping state parts is managed as a state machine with 2
possible states. The state is stored in the `BlockMisc` column with row key
`STATE_SYNC_DUMP:X` for shard X. Note that epoch id is not included in the row
key, because epoch id is not needed for managing the state machine, because only
one epoch per shard can be dumped at a time.
4 changes: 2 additions & 2 deletions integration-tests/src/tests/client/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1475,11 +1475,11 @@ fn test_gc_with_epoch_length_common(epoch_length: NumBlocks) {
let block_hash = *blocks[i as usize].hash();
assert_matches!(
env.clients[0].chain.get_block(&block_hash).unwrap_err(),
Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == "BLOCK: ".to_owned() + &block_hash.to_string()
Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == format!("BLOCK: {}", block_hash)
);
assert_matches!(
env.clients[0].chain.get_block_by_height(i).unwrap_err(),
Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == "BLOCK: ".to_owned() + &block_hash.to_string()
Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == format!("BLOCK: {}", block_hash)
);
assert!(env.clients[0]
.chain
Expand Down
20 changes: 10 additions & 10 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,10 @@ impl NearConfig {
enable_statistics_export: config.store.enable_statistics_export,
client_background_migration_threads: config.store.background_migration_threads,
flat_storage_creation_period: config.store.flat_storage_creation_period,
state_dump_enabled: config.state_sync.as_ref().map_or(false, |x| x.dump_enabled),
state_sync_dump_enabled: config
.state_sync
.as_ref()
.map_or(false, |x| x.dump_enabled.unwrap_or(false)),
state_sync_s3_bucket: config
.state_sync
.as_ref()
Expand All @@ -710,14 +713,10 @@ impl NearConfig {
.state_sync
.as_ref()
.map_or(String::new(), |x| x.s3_region.clone()),
state_sync_dump_drop_state: config
.state_sync
.as_ref()
.map_or(vec![], |x| x.drop_state_of_dump.clone()),
state_sync_from_s3_enabled: config
state_sync_restart_dump_for_shards: config
.state_sync
.as_ref()
.map_or(false, |x| x.sync_from_s3_enabled),
.map_or(vec![], |x| x.drop_state_of_dump.clone().unwrap_or(vec![])),
},
network_config: NetworkConfig::new(
config.network,
Expand Down Expand Up @@ -1546,9 +1545,10 @@ pub fn load_test_config(seed: &str, addr: tcp::ListenerAddr, genesis: Genesis) -
pub struct StateSyncConfig {
pub s3_bucket: String,
pub s3_region: String,
pub dump_enabled: bool,
pub drop_state_of_dump: Vec<ShardId>,
pub sync_from_s3_enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub dump_enabled: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub drop_state_of_dump: Option<Vec<ShardId>>,
}

#[test]
Expand Down
24 changes: 18 additions & 6 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::cold_storage::spawn_cold_store_loop;
pub use crate::config::{init_configs, load_config, load_test_config, NearConfig, NEAR_BASE};
pub use crate::runtime::NightshadeRuntime;
pub use crate::shard_tracker::TrackedConfig;
use crate::state_sync::{spawn_state_sync_dump, StateSyncDumpHandle};
use actix::{Actor, Addr};
use actix_rt::ArbiterHandle;
use actix_web;
Expand All @@ -12,17 +13,17 @@ use near_async::messaging::{IntoSender, LateBoundSender};
use near_chain::{Chain, ChainGenesis};
use near_chunks::shards_manager_actor::start_shards_manager;
use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor};
use near_primitives::time;

use near_network::PeerManagerActor;
use near_primitives::block::GenesisId;
use near_primitives::time;
use near_store::metadata::DbKind;
use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError};
use near_telemetry::TelemetryActor;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{info, trace};
use tracing::info;

pub mod append_only_map;
mod cold_storage;
pub mod config;
Expand All @@ -33,6 +34,7 @@ mod metrics;
pub mod migrations;
mod runtime;
mod shard_tracker;
mod state_sync;

pub fn get_default_home() -> PathBuf {
if let Ok(near_home) = std::env::var("NEAR_HOME") {
Expand Down Expand Up @@ -188,6 +190,8 @@ pub struct NearNode {
/// The cold_store_loop_handle will only be set if the cold store is configured.
/// It's a handle to a background thread that copies data from the hot store to the cold store.
pub cold_store_loop_handle: Option<ColdStoreLoopHandle>,
/// Contains handles to background threads that may be dumping state to S3.
pub state_sync_dump_handle: Option<StateSyncDumpHandle>,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -242,7 +246,7 @@ pub fn start_with_config_and_synchronization(
);
let (client_actor, client_arbiter_handle) = start_client(
config.client_config.clone(),
chain_genesis,
chain_genesis.clone(),
runtime.clone(),
node_id,
network_adapter.clone().into(),
Expand All @@ -255,7 +259,7 @@ pub fn start_with_config_and_synchronization(
);
client_adapter_for_shards_manager.bind(client_actor.clone().with_auto_span_context());
let (shards_manager_actor, shards_manager_arbiter_handle) = start_shards_manager(
runtime,
runtime.clone(),
network_adapter.as_sender(),
client_adapter_for_shards_manager.as_sender(),
config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()),
Expand All @@ -264,6 +268,13 @@ pub fn start_with_config_and_synchronization(
);
shards_manager_adapter.bind(shards_manager_actor);

let state_sync_dump_handle = spawn_state_sync_dump(
&config,
chain_genesis,
runtime,
config.network_config.node_id().public_key(),
)?;

#[allow(unused_mut)]
let mut rpc_servers = Vec::new();
let network_actor = PeerManagerActor::spawn(
Expand Down Expand Up @@ -304,14 +315,15 @@ pub fn start_with_config_and_synchronization(

rpc_servers.shrink_to_fit();

trace!(target: "diagnostic", key="log", "Starting NEAR node with diagnostic activated");
tracing::trace!(target: "diagnostic", key = "log", "Starting NEAR node with diagnostic activated");

Ok(NearNode {
client: client_actor,
view_client,
rpc_servers,
arbiters: vec![client_arbiter_handle, shards_manager_arbiter_handle],
cold_store_loop_handle,
state_sync_dump_handle,
})
}

Expand Down
Loading

0 comments on commit ad4e87c

Please sign in to comment.