Skip to content
This repository has been archived by the owner on Aug 28, 2024. It is now read-only.

Commit

Permalink
feat(en): Switch EN to use node framework (matter-labs#2427)
Browse files Browse the repository at this point in the history
## What ❔

This PR ports remaining parts of the EN to node framework and makes
framework the default way to run it.
In more detail:

- Config for the health check limits are now set from config.
- EN and rust metrics are now exposed; the protocol version update task
now runs.
- ⚠️ Connection pool healthcheck was removed. It was controversial
initially, its usefulness is not clear, it was supposed to be refactored
a year ago but didn't, and it wasn't working well when testing. See
[linear
issue](https://linear.app/matterlabs/issue/PLA-255/revamp-db-connection-health-check)
for more context.
- Tests were reworked to use node framework; some refactoring was also
applied to reduce boilerplate.
- Additional tests were added to check for invalid EN configurations.
- ⚠️ Node framework was made the default way to run the EN. There is
also a hook to force EN to run the old way, so that we don't have to
rollback over small issues.

## Why ❔

- Part of switch to the node framework.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
popzxc authored and irnb committed Jul 12, 2024
1 parent 9d656c4 commit d149d83
Show file tree
Hide file tree
Showing 37 changed files with 982 additions and 528 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/bin/external_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ zksync_concurrency.workspace = true
zksync_consensus_roles.workspace = true
vise.workspace = true

async-trait.workspace = true
anyhow.workspace = true
tokio = { workspace = true, features = ["full"] }
futures.workspace = true
Expand Down
25 changes: 12 additions & 13 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use zksync_config::configs::{api::MerkleTreeApiConfig, database::MerkleTreeMode}
use zksync_consistency_checker::ConsistencyChecker;
use zksync_core_leftovers::setup_sigint_handler;
use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core};
use zksync_db_connection::{
connection_pool::ConnectionPoolBuilder, healthcheck::ConnectionPoolHealthCheck,
};
use zksync_db_connection::connection_pool::ConnectionPoolBuilder;
use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck};
use zksync_metadata_calculator::{
api_server::{TreeApiClient, TreeApiHttpClient},
Expand Down Expand Up @@ -105,7 +103,6 @@ async fn build_state_keeper(
Box::new(main_node_client.for_component("external_io")),
chain_id,
)
.await
.context("Failed initializing I/O for external node state keeper")?;

Ok(ZkSyncStateKeeper::new(
Expand Down Expand Up @@ -725,10 +722,6 @@ struct Cli {
external_node_config_path: Option<std::path::PathBuf>,
/// Path to the yaml with consensus.
consensus_path: Option<std::path::PathBuf>,

/// Run the node using the node framework.
#[arg(long)]
use_node_framework: bool,
}

#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
Expand Down Expand Up @@ -825,8 +818,11 @@ async fn main() -> anyhow::Result<()> {
.await
.context("failed fetching remote part of node config from main node")?;

// Can be used to force the old approach to the external node.
let force_old_approach = std::env::var("EXTERNAL_NODE_OLD_APPROACH").is_ok();

// If the node framework is used, run the node.
if opt.use_node_framework {
if !force_old_approach {
// We run the node from a different thread, since the current thread is in tokio context.
std::thread::spawn(move || {
let node =
Expand All @@ -840,6 +836,8 @@ async fn main() -> anyhow::Result<()> {
return Ok(());
}

tracing::info!("Running the external node in the old approach");

if let Some(threshold) = config.optional.slow_query_threshold() {
ConnectionPool::<Core>::global_config().set_slow_query_threshold(threshold)?;
}
Expand All @@ -848,7 +846,11 @@ async fn main() -> anyhow::Result<()> {
}

RUST_METRICS.initialize();
EN_METRICS.observe_config(&config);
EN_METRICS.observe_config(
config.required.l1_chain_id,
config.required.l2_chain_id,
config.postgres.max_connections,
);

let singleton_pool_builder = ConnectionPool::singleton(config.postgres.database_url());
let connection_pool = ConnectionPool::<Core>::builder(
Expand Down Expand Up @@ -911,9 +913,6 @@ async fn run_node(
app_health.insert_custom_component(Arc::new(MainNodeHealthCheck::from(
main_node_client.clone(),
)))?;
app_health.insert_custom_component(Arc::new(ConnectionPoolHealthCheck::new(
connection_pool.clone(),
)))?;

// Start the health check server early into the node lifecycle so that its health can be monitored from the very start.
let healthcheck_handle = HealthCheckHandle::spawn_server(
Expand Down
82 changes: 82 additions & 0 deletions core/bin/external_node/src/metrics/framework.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::time::Duration;

use zksync_dal::{ConnectionPool, Core, CoreDal as _};
use zksync_node_framework::{
implementations::resources::pools::{MasterPool, PoolResource},
FromContext, IntoContext, StopReceiver, Task, TaskId, WiringError, WiringLayer,
};
use zksync_shared_metrics::rustc::RUST_METRICS;
use zksync_types::{L1ChainId, L2ChainId};

use super::EN_METRICS;

#[derive(Debug)]
pub struct ExternalNodeMetricsLayer {
pub l1_chain_id: L1ChainId,
pub l2_chain_id: L2ChainId,
pub postgres_pool_size: u32,
}

#[derive(Debug, FromContext)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
}

#[derive(Debug, IntoContext)]
pub struct Output {
#[context(task)]
pub task: ProtocolVersionMetricsTask,
}

#[async_trait::async_trait]
impl WiringLayer for ExternalNodeMetricsLayer {
type Input = Input;
type Output = Output;

fn layer_name(&self) -> &'static str {
"external_node_metrics"
}

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
RUST_METRICS.initialize();
EN_METRICS.observe_config(self.l1_chain_id, self.l2_chain_id, self.postgres_pool_size);

let pool = input.master_pool.get_singleton().await?;
let task = ProtocolVersionMetricsTask { pool };
Ok(Output { task })
}
}

#[derive(Debug)]
pub struct ProtocolVersionMetricsTask {
pool: ConnectionPool<Core>,
}

#[async_trait::async_trait]
impl Task for ProtocolVersionMetricsTask {
fn id(&self) -> TaskId {
"en_protocol_version_metrics".into()
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
const QUERY_INTERVAL: Duration = Duration::from_secs(10);

while !*stop_receiver.0.borrow_and_update() {
let maybe_protocol_version = self
.pool
.connection()
.await?
.protocol_versions_dal()
.last_used_version_id()
.await;
if let Some(version) = maybe_protocol_version {
EN_METRICS.protocol_version.set(version as u64);
}

tokio::time::timeout(QUERY_INTERVAL, stop_receiver.0.changed())
.await
.ok();
}
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use std::time::Duration;
use tokio::sync::watch;
use vise::{EncodeLabelSet, Gauge, Info, Metrics};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_types::{L1ChainId, L2ChainId};

use crate::{config::ExternalNodeConfig, metadata::SERVER_VERSION};
use crate::metadata::SERVER_VERSION;

pub(crate) mod framework;

/// Immutable EN parameters that affect multiple components.
#[derive(Debug, Clone, Copy, EncodeLabelSet)]
Expand All @@ -26,12 +29,17 @@ pub(crate) struct ExternalNodeMetrics {
}

impl ExternalNodeMetrics {
pub(crate) fn observe_config(&self, config: &ExternalNodeConfig) {
pub(crate) fn observe_config(
&self,
l1_chain_id: L1ChainId,
l2_chain_id: L2ChainId,
postgres_pool_size: u32,
) {
let info = ExternalNodeInfo {
server_version: SERVER_VERSION,
l1_chain_id: config.required.l1_chain_id.0,
l2_chain_id: config.required.l2_chain_id.as_u64(),
postgres_pool_size: config.postgres.max_connections,
l1_chain_id: l1_chain_id.0,
l2_chain_id: l2_chain_id.as_u64(),
postgres_pool_size,
};
tracing::info!("Setting general node information: {info:?}");

Expand Down
50 changes: 40 additions & 10 deletions core/bin/external_node/src/node_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! as well as an interface to run the node with the specified components.
use anyhow::Context as _;
use zksync_block_reverter::NodeRole;
use zksync_config::{
configs::{
api::{HealthCheckConfig, MerkleTreeApiConfig},
Expand All @@ -15,6 +16,7 @@ use zksync_node_api_server::{tx_sender::ApiContracts, web3::Namespace};
use zksync_node_framework::{
implementations::layers::{
batch_status_updater::BatchStatusUpdaterLayer,
block_reverter::BlockReverterLayer,
commitment_generator::CommitmentGeneratorLayer,
consensus::ExternalNodeConsensusLayer,
consistency_checker::ConsistencyCheckerLayer,
Expand Down Expand Up @@ -55,13 +57,14 @@ use zksync_state::RocksdbStorageOptions;

use crate::{
config::{self, ExternalNodeConfig},
metrics::framework::ExternalNodeMetricsLayer,
Component,
};

/// Builder for the external node.
#[derive(Debug)]
pub(crate) struct ExternalNodeBuilder {
node: ZkStackServiceBuilder,
pub(crate) node: ZkStackServiceBuilder,
config: ExternalNodeConfig,
}

Expand Down Expand Up @@ -115,6 +118,15 @@ impl ExternalNodeBuilder {
Ok(self)
}

fn add_external_node_metrics_layer(mut self) -> anyhow::Result<Self> {
self.node.add_layer(ExternalNodeMetricsLayer {
l1_chain_id: self.config.required.l1_chain_id,
l2_chain_id: self.config.required.l2_chain_id,
postgres_pool_size: self.config.postgres.max_connections,
});
Ok(self)
}

fn add_main_node_client_layer(mut self) -> anyhow::Result<Self> {
let layer = MainNodeClientLayer::new(
self.config.required.main_node_url.clone(),
Expand Down Expand Up @@ -431,6 +443,18 @@ impl ExternalNodeBuilder {
Ok(self)
}

fn add_block_reverter_layer(mut self) -> anyhow::Result<Self> {
let mut layer = BlockReverterLayer::new(NodeRole::External);
// Reverting executed batches is more-or-less safe for external nodes.
layer
.allow_rolling_back_executed_batches()
.enable_rolling_back_postgres()
.enable_rolling_back_merkle_tree(self.config.required.merkle_tree_path.clone())
.enable_rolling_back_state_keeper_cache(self.config.required.state_cache_path.clone());
self.node.add_layer(layer);
Ok(self)
}

/// This layer will make sure that the database is initialized correctly,
/// e.g.:
/// - genesis or snapshot recovery will be performed if it's required.
Expand Down Expand Up @@ -480,6 +504,21 @@ impl ExternalNodeBuilder {
.add_query_eth_client_layer()?
.add_reorg_detector_layer()?;

// Add layers that must run only on a single component.
if components.contains(&Component::Core) {
// Core is a singleton & mandatory component,
// so until we have a dedicated component for "auxiliary" tasks,
// it's responsible for things like metrics.
self = self
.add_postgres_metrics_layer()?
.add_external_node_metrics_layer()?;
// We assign the storage initialization to the core, as it's considered to be
// the "main" component.
self = self
.add_block_reverter_layer()?
.add_storage_initialization_layer(LayerKind::Task)?;
}

// Add preconditions for all the components.
self = self
.add_l1_batch_commitment_mode_validation_layer()?
Expand Down Expand Up @@ -536,11 +575,6 @@ impl ExternalNodeBuilder {
self = self.add_tree_data_fetcher_layer()?;
}
Component::Core => {
// Core is a singleton & mandatory component,
// so until we have a dedicated component for "auxiliary" tasks,
// it's responsible for things like metrics.
self = self.add_postgres_metrics_layer()?;

// Main tasks
self = self
.add_state_keeper_layer()?
Expand All @@ -549,10 +583,6 @@ impl ExternalNodeBuilder {
.add_consistency_checker_layer()?
.add_commitment_generator_layer()?
.add_batch_status_updater_layer()?;

// We assign the storage initialization to the core, as it's considered to be
// the "main" component.
self = self.add_storage_initialization_layer(LayerKind::Task)?;
}
}
}
Expand Down
Loading

0 comments on commit d149d83

Please sign in to comment.