Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dump state of every epoch to S3 #8661

Merged
merged 20 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 33 additions & 1 deletion chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use near_primitives::sharding::{
};
use near_primitives::syncing::{
get_num_state_parts, ReceiptProofResponse, ShardStateSyncResponseHeader, StateHeaderKey,
StatePartKey,
StatePartKey, StateSyncDumpProgress,
};
use near_primitives::transaction::{
ExecutionOutcomeWithId, ExecutionOutcomeWithIdAndProof, ExecutionOutcomeWithProof,
Expand Down Expand Up @@ -839,6 +839,38 @@ impl ChainStore {
}
Ok(None)
}

/// Constructs key 'STATE_SYNC_DUMP:<ShardId>',
/// for example 'STATE_SYNC_DUMP:2' for shard_id=2.
fn state_sync_dump_progress_key(shard_id: ShardId) -> Vec<u8> {
nikurt marked this conversation as resolved.
Show resolved Hide resolved
let mut key = b"STATE_SYNC_DUMP:".to_vec();
key.extend(shard_id.to_le_bytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if this key shouldn't contain the epoch id information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not including epoch id in the key, because only one epoch dump per shard is possible at a time.
I can think of adding epoch id to the key for the purpose of storing the history of dumping epochs to external storage, but doesn't seem necessary.

key
}

/// Retrieves STATE_SYNC_DUMP for the given shard.
pub fn get_state_sync_dump_progress(
&self,
shard_id: ShardId,
) -> Result<Option<StateSyncDumpProgress>, Error> {
option_to_not_found(
self.store
.get_ser(DBCol::BlockMisc, &ChainStore::state_sync_dump_progress_key(shard_id)),
"STATE_SYNC_DUMP",
)
}

/// Updates STATE_SYNC_DUMP for the given shard.
pub fn set_state_sync_dump_progress(
&self,
shard_id: ShardId,
value: Option<StateSyncDumpProgress>,
) -> Result<(), Error> {
let mut store_update = self.store.store_update();
let key = ChainStore::state_sync_dump_progress_key(shard_id);
store_update.set_ser(DBCol::BlockMisc, &key, &value)?;
store_update.commit().map_err(|err| err.into())
}
}

impl ChainStoreAccess for ChainStore {
Expand Down
12 changes: 12 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ pub struct ClientConfig {
pub client_background_migration_threads: usize,
/// Duration to perform background flat storage creation step.
pub flat_storage_creation_period: Duration,
/// Whether to enable dumping state of every epoch to S3.
pub state_sync_dump_enabled: bool,
/// S3 bucket for storing state dumps.
pub state_sync_s3_bucket: String,
/// S3 region for storing state dumps.
pub state_sync_s3_region: String,
/// Discard the existing progress of dumping an epoch state to S3.
nikurt marked this conversation as resolved.
Show resolved Hide resolved
pub state_sync_dump_drop_state: Vec<ShardId>,
}

impl ClientConfig {
Expand Down Expand Up @@ -237,6 +245,10 @@ impl ClientConfig {
enable_statistics_export: true,
client_background_migration_threads: 1,
flat_storage_creation_period: Duration::from_secs(1),
state_sync_dump_enabled: false,
state_sync_s3_bucket: String::new(),
state_sync_s3_region: String::new(),
state_sync_dump_drop_state: vec![],
}
}
}
21 changes: 20 additions & 1 deletion core/primitives/src/syncing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use borsh::{BorshDeserialize, BorshSerialize};
use near_primitives_core::types::EpochHeight;

use crate::block_header::BlockHeader;
use crate::epoch_manager::block_info::BlockInfo;
Expand All @@ -10,7 +11,7 @@ use crate::merkle::{MerklePath, PartialMerkleTree};
use crate::sharding::{
ReceiptProof, ShardChunk, ShardChunkHeader, ShardChunkHeaderV1, ShardChunkV1,
};
use crate::types::{BlockHeight, ShardId, StateRoot, StateRootNode};
use crate::types::{BlockHeight, EpochId, ShardId, StateRoot, StateRootNode};
use crate::views::LightClientBlockView;

#[derive(PartialEq, Eq, Clone, Debug, BorshSerialize, BorshDeserialize)]
Expand Down Expand Up @@ -226,3 +227,21 @@ pub fn get_num_state_parts(memory_usage: u64) -> u64 {
// TODO #1708
memory_usage / STATE_PART_MEMORY_LIMIT.as_u64() + 3
}

#[derive(BorshSerialize, BorshDeserialize, Debug, Clone)]
/// Represents the state of the state machine that dumps state.
pub enum StateSyncDumpProgress {
AllDumped {
epoch_id: EpochId,
epoch_height: EpochHeight,
num_parts: Option<u64>,
nikurt marked this conversation as resolved.
Show resolved Hide resolved
},
InProgress {
epoch_id: EpochId,
epoch_height: EpochHeight,
sync_hash: CryptoHash,
nikurt marked this conversation as resolved.
Show resolved Hide resolved
state_root: StateRoot,
parts_dumped: u64,
num_parts: u64,
},
}
4 changes: 2 additions & 2 deletions integration-tests/src/tests/client/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1475,11 +1475,11 @@ fn test_gc_with_epoch_length_common(epoch_length: NumBlocks) {
let block_hash = *blocks[i as usize].hash();
assert_matches!(
env.clients[0].chain.get_block(&block_hash).unwrap_err(),
Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == "BLOCK: ".to_owned() + &block_hash.to_string()
Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == "BLOCK: ".to_owned() + block_hash.to_string()
);
assert_matches!(
env.clients[0].chain.get_block_by_height(i).unwrap_err(),
Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == "BLOCK: ".to_owned() + &block_hash.to_string()
Error::DBNotFoundErr(missing_block_hash) if missing_block_hash == "BLOCK: ".to_owned() + block_hash.to_string()
);
assert!(env.clients[0]
.chain
Expand Down
1 change: 1 addition & 0 deletions nearcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ once_cell.workspace = true
rand.workspace = true
rayon.workspace = true
rlimit.workspace = true
rust-s3.workspace = true
serde.workspace = true
serde_ignored.workspace = true
serde_json.workspace = true
Expand Down
29 changes: 29 additions & 0 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ pub struct Config {
pub db_migration_snapshot_path: Option<PathBuf>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub expected_shutdown: Option<BlockHeight>,
/// Options for dumping state of every epoch to S3.
#[serde(skip_serializing_if = "Option::is_none")]
pub state_sync: Option<StateSyncConfig>,
}

fn is_false(value: &bool) -> bool {
Expand Down Expand Up @@ -389,6 +392,7 @@ impl Default for Config {
cold_store: None,
split_storage: None,
expected_shutdown: None,
state_sync: None,
}
}
}
Expand Down Expand Up @@ -697,6 +701,22 @@ impl NearConfig {
enable_statistics_export: config.store.enable_statistics_export,
client_background_migration_threads: config.store.background_migration_threads,
flat_storage_creation_period: config.store.flat_storage_creation_period,
state_sync_dump_enabled: config
.state_sync
.as_ref()
.map_or(false, |x| x.dump_enabled),
state_sync_s3_bucket: config
.state_sync
.as_ref()
.map_or(String::new(), |x| x.s3_bucket.clone()),
state_sync_s3_region: config
.state_sync
.as_ref()
.map_or(String::new(), |x| x.s3_region.clone()),
state_sync_dump_drop_state: config
.state_sync
.as_ref()
.map_or(vec![], |x| x.drop_state_of_dump.clone()),
},
network_config: NetworkConfig::new(
config.network,
Expand Down Expand Up @@ -1520,6 +1540,15 @@ pub fn load_test_config(seed: &str, addr: tcp::ListenerAddr, genesis: Genesis) -
NearConfig::new(config, genesis, signer.into(), validator_signer).unwrap()
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
/// Options for dumping state to S3.
pub struct StateSyncConfig {
pub s3_bucket: String,
pub s3_region: String,
pub dump_enabled: bool,
pub drop_state_of_dump: Vec<ShardId>,
}

#[test]
fn test_init_config_localnet() {
// Check that we can initialize the config with multiple shards.
Expand Down
24 changes: 18 additions & 6 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::cold_storage::spawn_cold_store_loop;
pub use crate::config::{init_configs, load_config, load_test_config, NearConfig, NEAR_BASE};
pub use crate::runtime::NightshadeRuntime;
pub use crate::shard_tracker::TrackedConfig;
use crate::state_sync::{spawn_state_sync_dump, StateSyncDumpHandle};
use actix::{Actor, Addr};
use actix_rt::ArbiterHandle;
use actix_web;
Expand All @@ -12,17 +13,17 @@ use near_async::messaging::{IntoSender, LateBoundSender};
use near_chain::{Chain, ChainGenesis};
use near_chunks::shards_manager_actor::start_shards_manager;
use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor};
use near_primitives::time;

use near_network::PeerManagerActor;
use near_primitives::block::GenesisId;
use near_primitives::time;
use near_store::metadata::DbKind;
use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError};
use near_telemetry::TelemetryActor;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{info, trace};
use tracing::info;

pub mod append_only_map;
mod cold_storage;
pub mod config;
Expand All @@ -33,6 +34,7 @@ mod metrics;
pub mod migrations;
mod runtime;
mod shard_tracker;
mod state_sync;

pub fn get_default_home() -> PathBuf {
if let Ok(near_home) = std::env::var("NEAR_HOME") {
Expand Down Expand Up @@ -188,6 +190,8 @@ pub struct NearNode {
/// The cold_store_loop_handle will only be set if the cold store is configured.
/// It's a handle to a background thread that copies data from the hot store to the cold store.
pub cold_store_loop_handle: Option<ColdStoreLoopHandle>,
/// Contains handles to background threads that may be dumping state to S3.
pub state_sync_dump_handle: Option<StateSyncDumpHandle>,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -242,7 +246,7 @@ pub fn start_with_config_and_synchronization(
);
let (client_actor, client_arbiter_handle) = start_client(
config.client_config.clone(),
chain_genesis,
chain_genesis.clone(),
runtime.clone(),
node_id,
network_adapter.clone().into(),
Expand All @@ -255,7 +259,7 @@ pub fn start_with_config_and_synchronization(
);
client_adapter_for_shards_manager.bind(client_actor.clone().with_auto_span_context());
let (shards_manager_actor, shards_manager_arbiter_handle) = start_shards_manager(
runtime,
runtime.clone(),
network_adapter.as_sender(),
client_adapter_for_shards_manager.as_sender(),
config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()),
Expand All @@ -264,6 +268,13 @@ pub fn start_with_config_and_synchronization(
);
shards_manager_adapter.bind(shards_manager_actor);

let state_sync_dump_handle = spawn_state_sync_dump(
&config,
chain_genesis.clone(),
runtime.clone(),
config.network_config.node_id().public_key(),
)?;

#[allow(unused_mut)]
let mut rpc_servers = Vec::new();
let network_actor = PeerManagerActor::spawn(
Expand Down Expand Up @@ -304,14 +315,15 @@ pub fn start_with_config_and_synchronization(

rpc_servers.shrink_to_fit();

trace!(target: "diagnostic", key="log", "Starting NEAR node with diagnostic activated");
tracing::trace!(target: "diagnostic", key = "log", "Starting NEAR node with diagnostic activated");

Ok(NearNode {
client: client_actor,
view_client,
rpc_servers,
arbiters: vec![client_arbiter_handle, shards_manager_arbiter_handle],
cold_store_loop_handle,
state_sync_dump_handle,
})
}

Expand Down
65 changes: 63 additions & 2 deletions nearcore/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use near_o11y::metrics::{
linear_buckets, try_create_histogram_vec, try_create_int_counter_vec, try_create_int_gauge,
HistogramVec, IntCounterVec, IntGauge,
exponential_buckets, linear_buckets, try_create_histogram_vec, try_create_int_counter_vec,
try_create_int_gauge, try_create_int_gauge_vec, HistogramVec, IntCounterVec, IntGauge,
IntGaugeVec,
};
use once_cell::sync::Lazy;

Expand Down Expand Up @@ -44,3 +45,63 @@ pub(crate) static COLD_STORE_COPY_RESULT: Lazy<IntCounterVec> = Lazy::new(|| {
)
.unwrap()
});

pub(crate) static STATE_SYNC_DUMP_ITERATION_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_iteration_elapsed_sec",
"Time needed to obtain and write a part",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_put_object_elapsed_sec",
"Time needed to write a part",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_OBTAIN_PART_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_obtain_part_elapsed_sec",
"Time needed to obtain a part",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_TOTAL: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_dump_num_parts_total",
"Total number of parts in the epoch that being dumped",
&["shard_id"],
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_NUM_PARTS_DUMPED: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_dump_num_parts_dumped",
"Number of parts dumped in the epoch that is being dumped",
&["shard_id"],
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_SIZE_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_dump_size_total",
"Total size of parts written to S3",
&["shard_id"],
)
.unwrap()
});
pub(crate) static STATE_SYNC_DUMP_EPOCH_HEIGHT: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_dump_epoch_height",
"Epoch Height of an epoch being dumped",
&["shard_id"],
)
.unwrap()
});
Loading