Skip to content

Commit

Permalink
feat(dynamic config): Refactor and make several config fields reloada…
Browse files Browse the repository at this point in the history
…ble at runtime (near#8240)

Introduce mutable fields in ClientConfig.
Intoruce the infrastructure to reload `config.json` and notify `Client` that certain fields were updated.
Refactoring to inline `dyn_config.json` into `config.json`.
  • Loading branch information
nikurt committed Jan 30, 2023
1 parent c92e6c1 commit 7c1109e
Show file tree
Hide file tree
Showing 30 changed files with 549 additions and 219 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use near_chain::{
BlockProcessingArtifact, BlockStatus, Chain, ChainGenesis, ChainStoreAccess,
DoneApplyChunkCallback, Doomslug, DoomslugThresholdMode, Provenance, RuntimeAdapter,
};
use near_chain_configs::ClientConfig;
use near_chain_configs::{ClientConfig, UpdateableClientConfig};
use near_chunks::ShardsManager;
use near_network::types::{
HighestHeightPeerInfo, NetworkRequests, PeerManagerAdapter, ReasonForBan,
Expand Down Expand Up @@ -149,6 +149,12 @@ pub struct Client {
flat_storage_creator: Option<FlatStorageCreator>,
}

impl Client {
pub(crate) fn update_client_config(&self, update_client_config: UpdateableClientConfig) {
self.config.expected_shutdown.update(update_client_config.expected_shutdown);
}
}

// Debug information about the upcoming block.
#[derive(Default)]
pub struct BlockDebugStatus {
Expand Down
44 changes: 31 additions & 13 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::adapter::{
RecvPartialEncodedChunkRequest, RecvPartialEncodedChunkResponse, SetNetworkInfo, StateResponse,
};
use crate::client::{Client, EPOCH_START_INFO_BLOCKS};
use crate::config_updater::ConfigUpdater;
use crate::debug::new_network_info_view;
use crate::info::{display_sync_status, InfoHelper};
use crate::metrics::PARTIAL_ENCODED_CHUNK_RESPONSE_DELAY;
Expand Down Expand Up @@ -38,7 +39,6 @@ use near_chunks::logic::cares_about_shard_this_or_next_epoch;
use near_client_primitives::types::{
Error, GetNetworkInfo, NetworkInfoResponse, Status, StatusError, StatusSyncInfo, SyncStatus,
};
use near_dyn_configs::EXPECTED_SHUTDOWN_AT;
#[cfg(feature = "test_features")]
use near_network::types::NetworkAdversarialMessage;
use near_network::types::ReasonForBan;
Expand Down Expand Up @@ -69,7 +69,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::oneshot;
use tokio::sync::broadcast;
use tracing::{debug, error, info, trace, warn};

/// Multiplier on `max_block_time` to wait until deciding that chain stalled.
Expand Down Expand Up @@ -116,7 +116,10 @@ pub struct ClientActor {

/// Synchronization measure to allow graceful shutdown.
/// Informs the system when a ClientActor gets dropped.
shutdown_signal: Option<oneshot::Sender<()>>,
shutdown_signal: Option<broadcast::Sender<()>>,

/// Manages updating the config.
config_updater: Option<ConfigUpdater>,
}

/// Blocks the program until given genesis time arrives.
Expand Down Expand Up @@ -152,8 +155,9 @@ impl ClientActor {
enable_doomslug: bool,
rng_seed: RngSeed,
ctx: &Context<ClientActor>,
shutdown_signal: Option<oneshot::Sender<()>>,
shutdown_signal: Option<broadcast::Sender<()>>,
adv: crate::adversarial::Controls,
config_updater: Option<ConfigUpdater>,
) -> Result<Self, Error> {
let state_parts_arbiter = Arbiter::new();
let self_addr = ctx.address();
Expand Down Expand Up @@ -222,7 +226,8 @@ impl ClientActor {

#[cfg(feature = "sandbox")]
fastforward_delta: 0,
shutdown_signal: shutdown_signal,
shutdown_signal,
config_updater,
})
}
}
Expand Down Expand Up @@ -1129,14 +1134,20 @@ impl ClientActor {
/// Returns the delay before the next time `check_triggers` should be called, which is
/// min(time until the closest trigger, 1 second).
fn check_triggers(&mut self, ctx: &mut Context<ClientActor>) -> Duration {
if let Some(config_updater) = &mut self.config_updater {
config_updater.try_update(&|updateable_client_config| {
self.client.update_client_config(updateable_client_config)
});
}

// Check block height to trigger expected shutdown
if let Ok(head) = self.client.chain.head() {
let block_height_to_shutdown =
EXPECTED_SHUTDOWN_AT.load(std::sync::atomic::Ordering::Relaxed);
if block_height_to_shutdown > 0 && head.height >= block_height_to_shutdown {
info!(target: "client", "Expected shutdown triggered: head block({}) >= ({})", head.height, block_height_to_shutdown);
if let Some(tx) = self.shutdown_signal.take() {
let _ = tx.send(()); // Ignore send signal fail, it will send again in next trigger
if let Some(block_height_to_shutdown) = self.client.config.expected_shutdown.get() {
if head.height >= block_height_to_shutdown {
info!(target: "client", "Expected shutdown triggered: head block({}) >= ({:?})", head.height, block_height_to_shutdown);
if let Some(tx) = self.shutdown_signal.take() {
let _ = tx.send(()); // Ignore send signal fail, it will send again in next trigger
}
}
}
}
Expand Down Expand Up @@ -1755,7 +1766,12 @@ impl ClientActor {
fn log_summary(&mut self) {
let _span = tracing::debug_span!(target: "client", "log_summary").entered();
let _d = delay_detector::DelayDetector::new(|| "client log summary".into());
self.info_helper.log_summary(&self.client, &self.node_id, &self.network_info)
self.info_helper.log_summary(
&self.client,
&self.node_id,
&self.network_info,
&self.config_updater,
)
}
}

Expand Down Expand Up @@ -1970,8 +1986,9 @@ pub fn start_client(
network_adapter: Arc<dyn PeerManagerAdapter>,
validator_signer: Option<Arc<dyn ValidatorSigner>>,
telemetry_actor: Addr<TelemetryActor>,
sender: Option<oneshot::Sender<()>>,
sender: Option<broadcast::Sender<()>>,
adv: crate::adversarial::Controls,
config_updater: Option<ConfigUpdater>,
) -> (Addr<ClientActor>, ArbiterHandle) {
let client_arbiter = Arbiter::new();
let client_arbiter_handle = client_arbiter.handle();
Expand All @@ -1990,6 +2007,7 @@ pub fn start_client(
ctx,
sender,
adv,
config_updater,
)
.unwrap()
});
Expand Down
53 changes: 53 additions & 0 deletions chain/client/src/config_updater.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use near_chain_configs::UpdateableClientConfig;
use near_dyn_configs::{UpdateableConfigLoaderError, UpdateableConfigs};
use std::sync::Arc;
use tokio::sync::broadcast::Receiver;

#[derive(Debug)]
pub enum ClientConfigUpdateError {}

/// Manages updating the config encapsulating.
pub struct ConfigUpdater {
/// Receives config updates while the node is running.
rx_config_update: Receiver<Result<UpdateableConfigs, Arc<UpdateableConfigLoaderError>>>,

/// Represents the latest Error of reading the dynamically reloadable configs.
updateable_configs_error: Option<Arc<UpdateableConfigLoaderError>>,
}

impl ConfigUpdater {
pub fn new(
rx_config_update: Receiver<Result<UpdateableConfigs, Arc<UpdateableConfigLoaderError>>>,
) -> Self {
Self { rx_config_update, updateable_configs_error: None }
}

/// Check if any of the configs were updated.
/// If they did, the receiver (rx_config_update) will contain a clone of the new configs.
pub fn try_update(&mut self, update_client_config_fn: &dyn Fn(UpdateableClientConfig)) {
while let Ok(maybe_updateable_configs) = self.rx_config_update.try_recv() {
match maybe_updateable_configs {
Ok(updateable_configs) => {
if let Some(client_config) = updateable_configs.client_config {
update_client_config_fn(client_config);
tracing::info!(target: "config", "Updated ClientConfig");
}
self.updateable_configs_error = None;
}
Err(err) => {
self.updateable_configs_error = Some(err.clone());
}
}
}
}

/// Prints an error if it's present.
pub fn report_status(&self) {
if let Some(updateable_configs_error) = &self.updateable_configs_error {
tracing::warn!(
target: "stats",
"Dynamically updateable configs are not valid. Please fix this ASAP otherwise the node will probably crash after restart: {}",
*updateable_configs_error);
}
}
}
10 changes: 8 additions & 2 deletions chain/client/src/info.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::config_updater::ConfigUpdater;
use crate::{metrics, rocksdb_metrics, SyncStatus};
use actix::Addr;
use itertools::Itertools;
Expand Down Expand Up @@ -125,6 +126,7 @@ impl InfoHelper {
client: &crate::client::Client,
node_id: &PeerId,
network_info: &NetworkInfo,
config_updater: &Option<ConfigUpdater>,
) {
let is_syncing = client.sync_status.is_syncing();
let head = unwrap_or_return!(client.chain.head());
Expand Down Expand Up @@ -190,6 +192,7 @@ impl InfoHelper {
.unwrap_or(0),
statistics,
&client.config,
config_updater,
);
self.log_chain_processing_info(client, &head.epoch_id);
}
Expand All @@ -206,6 +209,7 @@ impl InfoHelper {
protocol_upgrade_block_height: BlockHeight,
statistics: Option<StoreStatistics>,
client_config: &ClientConfig,
config_updater: &Option<ConfigUpdater>,
) {
let use_colour = matches!(self.log_summary_style, LogSummaryStyle::Colored);
let paint = |colour: ansi_term::Colour, text: Option<String>| match text {
Expand Down Expand Up @@ -268,12 +272,14 @@ impl InfoHelper {
paint(ansi_term::Colour::Blue, machine_info_log),
);
if catchup_status_log != "" {
info!(target:"stats", "Catchups\n{}", catchup_status_log);
info!(target: "stats", "Catchups\n{}", catchup_status_log);
}
if let Some(statistics) = statistics {
rocksdb_metrics::export_stats_as_metrics(statistics);
}

if let Some(config_updater) = &config_updater {
config_updater.report_status();
}
let (cpu_usage, memory_usage) = proc_info.unwrap_or_default();
let is_validator = validator_info.map(|v| v.is_validator).unwrap_or_default();
(metrics::IS_VALIDATOR.set(is_validator as i64));
Expand Down
2 changes: 2 additions & 0 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ pub use crate::adapter::{
};
pub use crate::client::Client;
pub use crate::client_actor::{start_client, ClientActor};
pub use crate::config_updater::ConfigUpdater;
pub use crate::view_client::{start_view_client, ViewClientActor};

pub mod adapter;
pub mod adversarial;
mod client;
mod client_actor;
mod config_updater;
pub mod debug;
mod info;
mod metrics;
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ pub fn setup(
ctx,
None,
adv,
None,
)
.unwrap();
(genesis_block, client, view_client_addr)
Expand Down
2 changes: 2 additions & 0 deletions core/chain-configs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ anyhow.workspace = true
chrono.workspace = true
derive_more.workspace = true
num-rational.workspace = true
once_cell.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2.workspace = true
smart-default.workspace = true
tracing.workspace = true

near-crypto = { path = "../crypto" }
near-o11y = { path = "../o11y" }
near-primitives = { path = "../primitives" }

[features]
Expand Down
13 changes: 10 additions & 3 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::time::Duration;

use serde::{Deserialize, Serialize};

use near_primitives::types::{AccountId, BlockHeightDelta, Gas, NumBlocks, NumSeats, ShardId};
use crate::MutableConfigValue;
use near_primitives::types::{
AccountId, BlockHeight, BlockHeightDelta, Gas, NumBlocks, NumSeats, ShardId,
};
use near_primitives::version::Version;

pub const TEST_STATE_SYNC_TIMEOUT: u64 = 5;
Expand Down Expand Up @@ -70,14 +73,17 @@ impl GCConfig {
}
}

#[derive(Clone, Serialize, Deserialize)]
/// ClientConfig where some fields can be updated at runtime.
#[derive(Clone)]
pub struct ClientConfig {
/// Version of the binary.
pub version: Version,
/// Chain id for status.
pub chain_id: String,
/// Listening rpc port for status.
pub rpc_addr: Option<String>,
/// Graceful shutdown at expected block height.
pub expected_shutdown: MutableConfigValue<Option<BlockHeight>>,
/// Duration to check for producing / skipping block.
pub block_production_tracking_delay: Duration,
/// Minimum duration before producing block.
Expand Down Expand Up @@ -182,10 +188,11 @@ impl ClientConfig {
because non-archival nodes must save trie changes in order to do do garbage collection."
);

ClientConfig {
Self {
version: Default::default(),
chain_id: "unittest".to_string(),
rpc_addr: Some("0.0.0.0:3030".to_string()),
expected_shutdown: MutableConfigValue::new(None, "expected_shutdown"),
block_production_tracking_delay: Duration::from_millis(std::cmp::max(
10,
min_block_prod_time / 5,
Expand Down
3 changes: 3 additions & 0 deletions core/chain-configs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod client_config;
mod genesis_config;
pub mod genesis_validate;
mod metrics;
mod updateable_config;

pub use client_config::{
ClientConfig, GCConfig, LogSummaryStyle, DEFAULT_GC_NUM_EPOCHS_TO_KEEP,
Expand All @@ -10,3 +12,4 @@ pub use genesis_config::{
get_initial_supply, stream_records_from_file, Genesis, GenesisChangeConfig, GenesisConfig,
GenesisRecords, GenesisValidationMode, ProtocolConfig, ProtocolConfigView,
};
pub use updateable_config::{MutableConfigValue, UpdateableClientConfig};
11 changes: 11 additions & 0 deletions core/chain-configs/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use near_o11y::metrics::{try_create_int_gauge_vec, IntGaugeVec};
use once_cell::sync::Lazy;

pub static CONFIG_MUTABLE_FIELD: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_config_mutable_field",
"Timestamp and value of a mutable config field",
&["field_name", "timestamp", "value"],
)
.unwrap()
});
Loading

0 comments on commit 7c1109e

Please sign in to comment.