Skip to content

Commit

Permalink
feat: use split store in the view client (near#8656)
Browse files Browse the repository at this point in the history
- use split store in the view client - when configured
- removed the get_store(Temperature) method and replaced usages with safer get_cold_store and get_hot_store
- added SplitStorageConfig and used it to configure a few things
  • Loading branch information
wacban authored and nikurt committed Mar 15, 2023
1 parent a355830 commit f3ad60b
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 370 deletions.
48 changes: 47 additions & 1 deletion nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ pub struct Config {
/// This feature is under development, do not use in production.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cold_store: Option<near_store::StoreConfig>,

/// Configuration for the
#[serde(default, skip_serializing_if = "Option::is_none")]
pub split_storage: Option<SplitStorageConfig>,
// TODO(mina86): Remove those two altogether at some point. We need to be
// somewhat careful though and make sure that we don’t start silently
// ignoring this option without users setting corresponding store option.
Expand Down Expand Up @@ -388,12 +390,56 @@ impl Default for Config {
use_db_migration_snapshot: None,
store: near_store::StoreConfig::default(),
cold_store: None,
split_storage: None,
expected_shutdown: None,
state_sync: None,
}
}
}

fn default_enable_split_storage_view_client() -> bool {
false
}

fn default_cold_store_initial_migration_batch_size() -> usize {
500_000_000
}

fn default_cold_store_initial_migration_loop_sleep_duration() -> Duration {
Duration::from_secs(30)
}

fn default_cold_store_loop_sleep_duration() -> Duration {
Duration::from_secs(1)
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct SplitStorageConfig {
#[serde(default = "default_enable_split_storage_view_client")]
pub enable_split_storage_view_client: bool,

#[serde(default = "default_cold_store_initial_migration_batch_size")]
pub cold_store_initial_migration_batch_size: usize,
#[serde(default = "default_cold_store_initial_migration_loop_sleep_duration")]
pub cold_store_initial_migration_loop_sleep_duration: Duration,

#[serde(default = "default_cold_store_loop_sleep_duration")]
pub cold_store_loop_sleep_duration: Duration,
}

impl Default for SplitStorageConfig {
fn default() -> Self {
SplitStorageConfig {
enable_split_storage_view_client: default_enable_split_storage_view_client(),
cold_store_initial_migration_batch_size:
default_cold_store_initial_migration_batch_size(),
cold_store_initial_migration_loop_sleep_duration:
default_cold_store_initial_migration_loop_sleep_duration(),
cold_store_loop_sleep_duration: default_cold_store_loop_sleep_duration(),
}
}
}

impl Config {
/// load Config from config.json without panic. Do semantic validation on field values.
/// If config file issues occur, a ValidationError::ConfigFileError will be returned;
Expand Down
78 changes: 50 additions & 28 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::cold_storage::spawn_cold_store_loop;
pub use crate::config::{init_configs, load_config, load_test_config, NearConfig, NEAR_BASE};
pub use crate::runtime::NightshadeRuntime;
pub use crate::shard_tracker::TrackedConfig;
use crate::state_sync::{spawn_state_sync_dump, StateSyncDumpHandle};
use actix::{Actor, Addr};
use actix_rt::ArbiterHandle;
use actix_web;
Expand All @@ -13,16 +12,17 @@ use near_async::messaging::{IntoSender, LateBoundSender};
use near_chain::{Chain, ChainGenesis};
use near_chunks::shards_manager_actor::start_shards_manager;
use near_client::{start_client, start_view_client, ClientActor, ConfigUpdater, ViewClientActor};
use near_primitives::time;

use near_network::PeerManagerActor;
use near_primitives::block::GenesisId;
use near_primitives::time;
use near_store::{DBCol, Mode, NodeStorage, StoreOpenerError, Temperature};
use near_store::metadata::DbKind;
use near_store::{DBCol, Mode, NodeStorage, Store, StoreOpenerError};
use near_telemetry::TelemetryActor;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::info;

use tracing::{info, trace};
pub mod append_only_map;
mod cold_storage;
pub mod config;
Expand All @@ -33,7 +33,6 @@ mod metrics;
pub mod migrations;
mod runtime;
mod shard_tracker;
mod state_sync;

pub fn get_default_home() -> PathBuf {
if let Ok(near_home) = std::env::var("NEAR_HOME") {
Expand Down Expand Up @@ -154,6 +153,33 @@ fn open_storage(home_dir: &Path, near_config: &mut NearConfig) -> anyhow::Result
Ok(storage)
}

// Safely get the split store while checking that all conditions to use it are met.
fn get_split_store(config: &NearConfig, storage: &NodeStorage) -> anyhow::Result<Option<Store>> {
// SplitStore should only be used on archival nodes.
if !config.config.archive {
return Ok(None);
}

// SplitStore should only be used if cold store is configured.
if config.config.cold_store.is_none() {
return Ok(None);
}

// SplitStore should only be used in the view client if it is enabled.
if !config.config.split_storage.as_ref().map_or(false, |c| c.enable_split_storage_view_client) {
return Ok(None);
}

// SplitStore should only be used if the migration is finished. The
// migration to cold store is finished when the db kind of the hot store is
// changed from Archive to Hot.
if storage.get_hot_store().get_db_kind()? != Some(DbKind::Hot) {
return Ok(None);
}

Ok(storage.get_split_store())
}

pub struct NearNode {
pub client: Addr<ClientActor>,
pub view_client: Addr<ViewClientActor>,
Expand All @@ -162,8 +188,6 @@ pub struct NearNode {
/// The cold_store_loop_handle will only be set if the cold store is configured.
/// It's a handle to a background thread that copies data from the hot store to the cold store.
pub cold_store_loop_handle: Option<ColdStoreLoopHandle>,
/// Contains handles to background threads that may be dumping state to S3.
pub state_sync_dump_handle: Option<StateSyncDumpHandle>,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand All @@ -180,11 +204,17 @@ pub fn start_with_config_and_synchronization(
) -> anyhow::Result<NearNode> {
let store = open_storage(home_dir, &mut config)?;

let runtime = Arc::new(NightshadeRuntime::from_config(
home_dir,
store.get_store(Temperature::Hot),
&config,
));
let runtime =
Arc::new(NightshadeRuntime::from_config(home_dir, store.get_hot_store(), &config));

// Get the split store. If split store is some then create a new runtime for
// the view client. Otherwise just re-use the existing runtime.
let split_store = get_split_store(&config, &store)?;
let view_runtime = if let Some(split_store) = split_store {
Arc::new(NightshadeRuntime::from_config(home_dir, split_store, &config))
} else {
runtime.clone()
};

let cold_store_loop_handle = spawn_cold_store_loop(&config, &store, runtime.clone())?;

Expand All @@ -205,14 +235,14 @@ pub fn start_with_config_and_synchronization(
let view_client = start_view_client(
config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()),
chain_genesis.clone(),
runtime.clone(),
view_runtime,
network_adapter.clone().into(),
config.client_config.clone(),
adv.clone(),
);
let (client_actor, client_arbiter_handle) = start_client(
config.client_config.clone(),
chain_genesis.clone(),
chain_genesis,
runtime.clone(),
node_id,
network_adapter.clone().into(),
Expand All @@ -225,22 +255,15 @@ pub fn start_with_config_and_synchronization(
);
client_adapter_for_shards_manager.bind(client_actor.clone().with_auto_span_context());
let (shards_manager_actor, shards_manager_arbiter_handle) = start_shards_manager(
runtime.clone(),
runtime,
network_adapter.as_sender(),
client_adapter_for_shards_manager.as_sender(),
config.validator_signer.as_ref().map(|signer| signer.validator_id().clone()),
store.get_store(Temperature::Hot),
store.get_hot_store(),
config.client_config.chunk_request_retry_period,
);
shards_manager_adapter.bind(shards_manager_actor);

let state_sync_dump_handle = spawn_state_sync_dump(
&config,
&chain_genesis.clone(),
runtime.clone(),
config.network_config.node_id().public_key(),
)?;

#[allow(unused_mut)]
let mut rpc_servers = Vec::new();
let network_actor = PeerManagerActor::spawn(
Expand Down Expand Up @@ -281,15 +304,14 @@ pub fn start_with_config_and_synchronization(

rpc_servers.shrink_to_fit();

tracing::trace!(target: "diagnostic", key = "log", "Starting NEAR node with diagnostic activated");
trace!(target: "diagnostic", key="log", "Starting NEAR node with diagnostic activated");

Ok(NearNode {
client: client_actor,
view_client,
rpc_servers,
arbiters: vec![client_arbiter_handle, shards_manager_arbiter_handle],
cold_store_loop_handle,
state_sync_dump_handle,
})
}

Expand Down Expand Up @@ -334,7 +356,7 @@ pub fn recompress_storage(home_dir: &Path, opts: RecompressOpts) -> anyhow::Resu
"Recompressing database");

info!("Opening database at {}", src_path.display());
let src_store = src_opener.open_in_mode(Mode::ReadOnly)?.get_store(Temperature::Hot);
let src_store = src_opener.open_in_mode(Mode::ReadOnly)?.get_hot_store();

let final_head_height = if skip_columns.contains(&DBCol::PartialChunks) {
let tip: Option<near_primitives::block::Tip> =
Expand All @@ -351,7 +373,7 @@ pub fn recompress_storage(home_dir: &Path, opts: RecompressOpts) -> anyhow::Resu
};

info!("Creating database at {}", dst_path.display());
let dst_store = dst_opener.open_in_mode(Mode::Create)?.get_store(Temperature::Hot);
let dst_store = dst_opener.open_in_mode(Mode::Create)?.get_hot_store();

const BATCH_SIZE_BYTES: u64 = 150_000_000;

Expand Down
Loading

0 comments on commit f3ad60b

Please sign in to comment.