diff --git a/core/bin/external_node/src/helpers.rs b/core/bin/external_node/src/helpers.rs new file mode 100644 index 000000000000..68b1cdfa461b --- /dev/null +++ b/core/bin/external_node/src/helpers.rs @@ -0,0 +1,32 @@ +//! Miscellaneous helpers for the EN. + +use zksync_health_check::{async_trait, CheckHealth, Health, HealthStatus}; +use zksync_web3_decl::{jsonrpsee::http_client::HttpClient, namespaces::EthNamespaceClient}; + +/// Main node health check. +#[derive(Debug)] +pub(crate) struct MainNodeHealthCheck(HttpClient); + +impl From for MainNodeHealthCheck { + fn from(client: HttpClient) -> Self { + Self(client) + } +} + +#[async_trait] +impl CheckHealth for MainNodeHealthCheck { + fn name(&self) -> &'static str { + "main_node_http_rpc" + } + + async fn check_health(&self) -> Health { + if let Err(err) = self.0.get_block_number().await { + tracing::warn!("Health-check call to main node HTTP RPC failed: {err}"); + let details = serde_json::json!({ + "error": err.to_string(), + }); + return Health::from(HealthStatus::NotReady).with_details(details); + } + HealthStatus::Ready.into() + } +} diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 9440399a03a7..749d85d6e53c 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -34,14 +34,17 @@ use zksync_core::{ }, }; use zksync_dal::{healthcheck::ConnectionPoolHealthCheck, ConnectionPool}; -use zksync_health_check::CheckHealth; +use zksync_health_check::{CheckHealth, HealthStatus, ReactiveHealthCheck}; use zksync_state::PostgresStorageCaches; use zksync_storage::RocksDB; use zksync_utils::wait_for_tasks::wait_for_tasks; -use crate::{config::ExternalNodeConfig, init::ensure_storage_initialized}; +use crate::{ + config::ExternalNodeConfig, helpers::MainNodeHealthCheck, init::ensure_storage_initialized, +}; mod config; +mod helpers; mod init; mod metrics; @@ -104,13 +107,12 @@ async fn build_state_keeper( } async fn init_tasks( - config: ExternalNodeConfig, + config: &ExternalNodeConfig, connection_pool: ConnectionPool, + stop_receiver: watch::Receiver, ) -> anyhow::Result<( Vec>>, - watch::Sender, - HealthCheckHandle, - watch::Receiver, + Vec>, )> { let release_manifest: serde_json::Value = serde_json::from_str(RELEASE_MANIFEST) .expect("release manifest is a valid json document; qed"); @@ -124,12 +126,12 @@ async fn init_tasks( .required .main_node_url() .expect("Main node URL is incorrect"); - let (stop_sender, stop_receiver) = watch::channel(false); let mut healthchecks: Vec> = Vec::new(); // Create components. let fee_params_fetcher = Arc::new(MainNodeFeeParamsFetcher::new(&main_node_url)); - let sync_state = SyncState::new(); + let sync_state = SyncState::default(); + healthchecks.push(Box::new(sync_state.clone())); let (action_queue_sender, action_queue) = ActionQueue::new(); let mut task_handles = vec![]; @@ -159,7 +161,7 @@ async fn init_tasks( let state_keeper = build_state_keeper( action_queue, config.required.state_cache_path.clone(), - &config, + config, connection_pool.clone(), sync_state.clone(), config.remote.l2_erc20_bridge_addr, @@ -171,6 +173,9 @@ async fn init_tasks( let main_node_client = ::json_rpc(&main_node_url) .context("Failed creating JSON-RPC client for main node")?; + healthchecks.push(Box::new(MainNodeHealthCheck::from( + main_node_client.clone(), + ))); let singleton_pool_builder = ConnectionPool::singleton(&config.postgres.database_url); let fetcher_handle = if let Some(cfg) = config.consensus.clone() { @@ -245,6 +250,8 @@ async fn init_tasks( .await .context("failed to build connection pool for ConsistencyChecker")?, ); + healthchecks.push(Box::new(consistency_checker.health_check().clone())); + let consistency_checker_handle = tokio::spawn(consistency_checker.run(stop_receiver.clone())); let batch_status_updater = BatchStatusUpdater::new( &main_node_url, @@ -254,6 +261,7 @@ async fn init_tasks( .context("failed to build a connection pool for BatchStatusUpdater")?, ) .context("failed initializing batch status updater")?; + healthchecks.push(Box::new(batch_status_updater.health_check())); // Run the components. let tree_stop_receiver = stop_receiver.clone(); @@ -267,11 +275,9 @@ async fn init_tasks( .build() .await .context("failed to build a commitment_generator_pool")?; - let commitment_generator = - CommitmentGenerator::new(commitment_generator_pool, stop_receiver.clone()); - let commitment_generator_handle = tokio::spawn(commitment_generator.run()); - - let consistency_checker_handle = tokio::spawn(consistency_checker.run(stop_receiver.clone())); + let commitment_generator = CommitmentGenerator::new(commitment_generator_pool); + healthchecks.push(Box::new(commitment_generator.health_check())); + let commitment_generator_handle = tokio::spawn(commitment_generator.run(stop_receiver.clone())); let updater_handle = task::spawn(batch_status_updater.run(stop_receiver.clone())); let fee_address_migration_handle = @@ -347,15 +353,21 @@ async fn init_tasks( healthchecks.push(Box::new(ws_server_handles.health_check)); healthchecks.push(Box::new(http_server_handles.health_check)); healthchecks.push(Box::new(ConnectionPoolHealthCheck::new(connection_pool))); - let healthcheck_handle = HealthCheckHandle::spawn_server( - ([0, 0, 0, 0], config.required.healthcheck_port).into(), - healthchecks, - ); if let Some(port) = config.optional.prometheus_port { - let prometheus_task = PrometheusExporterConfig::pull(port).run(stop_receiver.clone()); - task_handles.push(tokio::spawn(prometheus_task)); + let (prometheus_health_check, prometheus_health_updater) = + ReactiveHealthCheck::new("prometheus_exporter"); + healthchecks.push(Box::new(prometheus_health_check)); + task_handles.push(tokio::spawn(async move { + prometheus_health_updater.update(HealthStatus::Ready.into()); + let result = PrometheusExporterConfig::pull(port) + .run(stop_receiver) + .await; + drop(prometheus_health_updater); + result + })); } + task_handles.extend(http_server_handles.tasks); task_handles.extend(ws_server_handles.tasks); task_handles.extend(cache_update_handle); @@ -370,7 +382,7 @@ async fn init_tasks( commitment_generator_handle, ]); - Ok((task_handles, stop_sender, healthcheck_handle, stop_receiver)) + Ok((task_handles, healthchecks)) } async fn shutdown_components( @@ -515,13 +527,19 @@ async fn main() -> anyhow::Result<()> { ) .await?; - let (task_handles, stop_sender, health_check_handle, stop_receiver) = - init_tasks(config.clone(), connection_pool.clone()) + let (stop_sender, stop_receiver) = watch::channel(false); + let (task_handles, mut healthchecks) = + init_tasks(&config, connection_pool.clone(), stop_receiver.clone()) .await .context("init_tasks")?; - let reorg_detector = ReorgDetector::new(&main_node_url, connection_pool.clone(), stop_receiver); - let mut reorg_detector_handle = tokio::spawn(reorg_detector.run()).fuse(); + let reorg_detector = ReorgDetector::new(&main_node_url, connection_pool.clone()); + healthchecks.push(Box::new(reorg_detector.health_check().clone())); + let healthcheck_handle = HealthCheckHandle::spawn_server( + ([0, 0, 0, 0], config.required.healthcheck_port).into(), + healthchecks, + ); + let mut reorg_detector_handle = tokio::spawn(reorg_detector.run(stop_receiver)).fuse(); let mut reorg_detector_result = None; let particular_crypto_alerts = None; @@ -541,7 +559,7 @@ async fn main() -> anyhow::Result<()> { // Reaching this point means that either some actor exited unexpectedly or we received a stop signal. // Broadcast the stop signal to all actors and exit. - shutdown_components(stop_sender, health_check_handle).await; + shutdown_components(stop_sender, healthcheck_handle).await; if !reorg_detector_handle.is_terminated() { reorg_detector_result = Some(reorg_detector_handle.await); diff --git a/core/lib/health_check/Cargo.toml b/core/lib/health_check/Cargo.toml index 43c2491c1682..9a005762de2d 100644 --- a/core/lib/health_check/Cargo.toml +++ b/core/lib/health_check/Cargo.toml @@ -14,7 +14,7 @@ async-trait = "0.1" futures = "0.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["sync", "time"] } tracing = "0.1" [dev-dependencies] diff --git a/core/lib/health_check/src/lib.rs b/core/lib/health_check/src/lib.rs index e05c06481405..d5a66916a269 100644 --- a/core/lib/health_check/src/lib.rs +++ b/core/lib/health_check/src/lib.rs @@ -15,6 +15,8 @@ pub enum HealthStatus { NotReady, /// Component is ready for operations. Ready, + /// Component is affected by some non-fatal issue. The component is still considered healthy. + Affected, /// Component is shut down. ShutDown, /// Component has been abnormally interrupted by a panic. @@ -22,17 +24,18 @@ pub enum HealthStatus { } impl HealthStatus { - /// Checks whether a component is ready according to this status. - pub fn is_ready(self) -> bool { - matches!(self, Self::Ready) + /// Checks whether a component is healthy according to this status. + pub fn is_healthy(self) -> bool { + matches!(self, Self::Ready | Self::Affected) } fn priority_for_aggregation(self) -> usize { match self { Self::Ready => 0, - Self::ShutDown => 1, - Self::NotReady => 2, - Self::Panicked => 3, + Self::Affected => 1, + Self::ShutDown => 2, + Self::NotReady => 3, + Self::Panicked => 4, } } } @@ -94,7 +97,7 @@ impl AppHealth { let inner = aggregated_status.into(); let this = Self { inner, components }; - if !this.inner.status.is_ready() { + if !this.inner.status.is_healthy() { // Only log non-ready application health so that logs are not spammed without a reason. tracing::debug!("Aggregated application health: {this:?}"); } @@ -129,8 +132,8 @@ impl AppHealth { } } - pub fn is_ready(&self) -> bool { - self.inner.status.is_ready() + pub fn is_healthy(&self) -> bool { + self.inner.status.is_healthy() } } @@ -144,7 +147,7 @@ pub trait CheckHealth: Send + Sync + 'static { } /// Basic implementation of [`CheckHealth`] trait that can be updated using a matching [`HealthUpdater`]. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ReactiveHealthCheck { name: &'static str, health_receiver: watch::Receiver, @@ -293,4 +296,59 @@ mod tests { let updated = health_updater.update(health); assert!(updated); } + + #[tokio::test] + async fn aggregating_health_checks() { + let (first_check, first_updater) = ReactiveHealthCheck::new("first"); + let (second_check, second_updater) = ReactiveHealthCheck::new("second"); + let checks: Vec> = vec![Box::new(first_check), Box::new(second_check)]; + + let app_health = AppHealth::new(&checks).await; + assert!(!app_health.is_healthy()); + assert_matches!(app_health.inner.status(), HealthStatus::NotReady); + assert_matches!( + app_health.components["first"].status, + HealthStatus::NotReady + ); + assert_matches!( + app_health.components["second"].status, + HealthStatus::NotReady + ); + + first_updater.update(HealthStatus::Ready.into()); + + let app_health = AppHealth::new(&checks).await; + assert!(!app_health.is_healthy()); + assert_matches!(app_health.inner.status(), HealthStatus::NotReady); + assert_matches!(app_health.components["first"].status, HealthStatus::Ready); + assert_matches!( + app_health.components["second"].status, + HealthStatus::NotReady + ); + + second_updater.update(HealthStatus::Affected.into()); + + let app_health = AppHealth::new(&checks).await; + assert!(app_health.is_healthy()); + assert_matches!(app_health.inner.status(), HealthStatus::Affected); + assert_matches!(app_health.components["first"].status, HealthStatus::Ready); + assert_matches!( + app_health.components["second"].status, + HealthStatus::Affected + ); + + drop(first_updater); + + let app_health = AppHealth::new(&checks).await; + assert!(!app_health.is_healthy()); + assert_matches!(app_health.inner.status(), HealthStatus::ShutDown); + assert_matches!( + app_health.components["first"].status, + HealthStatus::ShutDown + ); + assert_matches!( + app_health.components["second"].status, + HealthStatus::Affected + ); + } } diff --git a/core/lib/zksync_core/src/api_server/healthcheck.rs b/core/lib/zksync_core/src/api_server/healthcheck.rs index cbf8c9d6faf4..8398005f5917 100644 --- a/core/lib/zksync_core/src/api_server/healthcheck.rs +++ b/core/lib/zksync_core/src/api_server/healthcheck.rs @@ -8,7 +8,7 @@ async fn check_health>( health_checks: State>, ) -> (StatusCode, Json) { let response = AppHealth::new(&health_checks).await; - let response_code = if response.is_ready() { + let response_code = if response.is_healthy() { StatusCode::OK } else { StatusCode::SERVICE_UNAVAILABLE diff --git a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs index a2e8998482bb..ca39d7693319 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs @@ -65,7 +65,7 @@ impl ApiServerHandles { "Timed out waiting for API server" ); let health = self.health_check.check_health().await; - if health.status().is_ready() { + if health.status().is_healthy() { break; } tokio::time::sleep(POLL_INTERVAL).await; diff --git a/core/lib/zksync_core/src/commitment_generator/mod.rs b/core/lib/zksync_core/src/commitment_generator/mod.rs index 7488b9ab1cd0..3e868000317f 100644 --- a/core/lib/zksync_core/src/commitment_generator/mod.rs +++ b/core/lib/zksync_core/src/commitment_generator/mod.rs @@ -7,6 +7,7 @@ use multivm::zk_evm_latest::ethereum_types::U256; use tokio::{sync::watch, task::JoinHandle}; use zksync_commitment_utils::{bootloader_initial_content_commitment, events_queue_commitment}; use zksync_dal::ConnectionPool; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_types::{ commitment::{AuxCommitments, CommitmentCommonInput, CommitmentInput, L1BatchCommitment}, writes::{InitialStorageWrite, RepeatedStorageWrite, StateDiffRecord}, @@ -21,17 +22,21 @@ const SLEEP_INTERVAL: Duration = Duration::from_millis(100); #[derive(Debug)] pub struct CommitmentGenerator { connection_pool: ConnectionPool, - stop_receiver: watch::Receiver, + health_updater: HealthUpdater, } impl CommitmentGenerator { - pub fn new(connection_pool: ConnectionPool, stop_receiver: watch::Receiver) -> Self { + pub fn new(connection_pool: ConnectionPool) -> Self { Self { connection_pool, - stop_receiver, + health_updater: ReactiveHealthCheck::new("commitment_generator").1, } } + pub fn health_check(&self) -> ReactiveHealthCheck { + self.health_updater.subscribe() + } + async fn calculate_aux_commitments( &self, l1_batch_number: L1BatchNumber, @@ -220,13 +225,17 @@ impl CommitmentGenerator { let latency = METRICS.generate_commitment_latency_stage[&CommitmentStage::PrepareInput].start(); let input = self.prepare_input(l1_batch_number).await?; - latency.observe(); + let latency = latency.observe(); + tracing::debug!("Prepared commitment input for L1 batch #{l1_batch_number} in {latency:?}"); let latency = METRICS.generate_commitment_latency_stage[&CommitmentStage::Calculate].start(); let commitment = L1BatchCommitment::new(input); let artifacts = commitment.artifacts(); - latency.observe(); + let latency = latency.observe(); + tracing::debug!( + "Generated commitment artifacts for L1 batch #{l1_batch_number} in {latency:?}" + ); let latency = METRICS.generate_commitment_latency_stage[&CommitmentStage::SaveResults].start(); @@ -236,17 +245,27 @@ impl CommitmentGenerator { .blocks_dal() .save_l1_batch_commitment_artifacts(l1_batch_number, &artifacts) .await?; - latency.observe(); + let latency = latency.observe(); + tracing::debug!( + "Stored commitment artifacts for L1 batch #{l1_batch_number} in {latency:?}" + ); + let health_details = serde_json::json!({ + "l1_batch_number": l1_batch_number, + }); + self.health_updater + .update(Health::from(HealthStatus::Ready).with_details(health_details)); Ok(()) } - pub async fn run(self) -> anyhow::Result<()> { + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + self.health_updater.update(HealthStatus::Ready.into()); loop { - if *self.stop_receiver.borrow() { - tracing::info!("Stop signal received, CommitmentGenerator is shutting down"); + if *stop_receiver.borrow() { + tracing::info!("Stop signal received, commitment generator is shutting down"); break; } + let Some(l1_batch_number) = self .connection_pool .access_storage_tagged("commitment_generator") diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs index 9d55cfa60735..8af73f13444b 100644 --- a/core/lib/zksync_core/src/consensus/testonly.rs +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -431,7 +431,7 @@ impl StateKeeperRunner { miniblock_sealer_handle, self.pool.clone(), self.actions_queue, - SyncState::new(), + SyncState::default(), Box::::default(), Address::repeat_byte(11), u32::MAX, diff --git a/core/lib/zksync_core/src/consistency_checker/mod.rs b/core/lib/zksync_core/src/consistency_checker/mod.rs index 8441f927f8f5..fe49c42b9d3d 100644 --- a/core/lib/zksync_core/src/consistency_checker/mod.rs +++ b/core/lib/zksync_core/src/consistency_checker/mod.rs @@ -1,10 +1,12 @@ use std::{fmt, time::Duration}; use anyhow::Context as _; +use serde::Serialize; use tokio::sync::watch; use zksync_contracts::PRE_BOOJUM_COMMIT_FUNCTION; use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_eth_client::{clients::QueryClient, Error as L1ClientError, EthInterface}; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_l1_contract_interface::{i_executor::structures::CommitBatchInfo, Tokenizable}; use zksync_types::{web3::ethabi, L1BatchNumber, H256}; @@ -30,15 +32,79 @@ impl From for CheckError { } } -trait UpdateCheckedBatch: fmt::Debug + Send + Sync { +/// Handler of life cycle events emitted by [`ConsistencyChecker`]. +trait HandleConsistencyCheckerEvent: fmt::Debug + Send + Sync { + fn initialize(&mut self); + + fn set_first_batch_to_check(&mut self, first_batch_to_check: L1BatchNumber); + fn update_checked_batch(&mut self, last_checked_batch: L1BatchNumber); + + fn report_inconsistent_batch(&mut self, number: L1BatchNumber); +} + +/// Health details reported by [`ConsistencyChecker`]. +#[derive(Debug, Default, Serialize)] +struct ConsistencyCheckerDetails { + #[serde(skip_serializing_if = "Option::is_none")] + first_checked_batch: Option, + #[serde(skip_serializing_if = "Option::is_none")] + last_checked_batch: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + inconsistent_batches: Vec, +} + +impl ConsistencyCheckerDetails { + fn health(&self) -> Health { + let status = if self.inconsistent_batches.is_empty() { + HealthStatus::Ready + } else { + HealthStatus::Affected + }; + Health::from(status).with_details(self) + } +} + +/// Default [`HandleConsistencyCheckerEvent`] implementation that reports the batch number as a metric and via health check details. +#[derive(Debug)] +struct ConsistencyCheckerHealthUpdater { + inner: HealthUpdater, + current_details: ConsistencyCheckerDetails, +} + +impl ConsistencyCheckerHealthUpdater { + fn new() -> (ReactiveHealthCheck, Self) { + let (health_check, health_updater) = ReactiveHealthCheck::new("consistency_checker"); + let this = Self { + inner: health_updater, + current_details: ConsistencyCheckerDetails::default(), + }; + (health_check, this) + } } -/// Default [`UpdateCheckedBatch`] implementation that reports the batch number as a metric. -impl UpdateCheckedBatch for () { +impl HandleConsistencyCheckerEvent for ConsistencyCheckerHealthUpdater { + fn initialize(&mut self) { + self.inner.update(self.current_details.health()); + } + + fn set_first_batch_to_check(&mut self, first_batch_to_check: L1BatchNumber) { + self.current_details.first_checked_batch = Some(first_batch_to_check); + self.inner.update(self.current_details.health()); + } + fn update_checked_batch(&mut self, last_checked_batch: L1BatchNumber) { + tracing::info!("L1 batch #{last_checked_batch} is consistent with L1"); EN_METRICS.last_correct_batch[&CheckerComponent::ConsistencyChecker] .set(last_checked_batch.0.into()); + self.current_details.last_checked_batch = Some(last_checked_batch); + self.inner.update(self.current_details.health()); + } + + fn report_inconsistent_batch(&mut self, number: L1BatchNumber) { + tracing::warn!("L1 batch #{number} is inconsistent with L1"); + self.current_details.inconsistent_batches.push(number); + self.inner.update(self.current_details.health()); } } @@ -128,9 +194,10 @@ pub struct ConsistencyChecker { max_batches_to_recheck: u32, sleep_interval: Duration, l1_client: Box, - l1_batch_updater: Box, + event_handler: Box, l1_data_mismatch_behavior: L1DataMismatchBehavior, pool: ConnectionPool, + health_check: ReactiveHealthCheck, } impl ConsistencyChecker { @@ -138,17 +205,24 @@ impl ConsistencyChecker { pub fn new(web3_url: &str, max_batches_to_recheck: u32, pool: ConnectionPool) -> Self { let web3 = QueryClient::new(web3_url).unwrap(); + let (health_check, health_updater) = ConsistencyCheckerHealthUpdater::new(); Self { contract: zksync_contracts::zksync_contract(), max_batches_to_recheck, sleep_interval: Self::DEFAULT_SLEEP_INTERVAL, l1_client: Box::new(web3), - l1_batch_updater: Box::new(()), + event_handler: Box::new(health_updater), l1_data_mismatch_behavior: L1DataMismatchBehavior::Log, pool, + health_check, } } + /// Returns health check associated with this checker. + pub fn health_check(&self) -> &ReactiveHealthCheck { + &self.health_check + } + async fn check_commitments( &self, batch_number: L1BatchNumber, @@ -249,6 +323,8 @@ impl ConsistencyChecker { } pub async fn run(mut self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { + self.event_handler.initialize(); + // It doesn't make sense to start the checker until we have at least one L1 batch with metadata. let earliest_l1_batch_number = wait_for_l1_batch_with_metadata(&self.pool, self.sleep_interval, &mut stop_receiver) @@ -274,6 +350,8 @@ impl ConsistencyChecker { tracing::info!( "Last committed L1 batch is #{last_committed_batch}; starting checks from L1 batch #{first_batch_to_check}" ); + self.event_handler + .set_first_batch_to_check(first_batch_to_check); let mut batch_number = first_batch_to_check; loop { @@ -294,20 +372,21 @@ impl ConsistencyChecker { match self.check_commitments(batch_number, &local).await { Ok(true) => { - tracing::info!("L1 batch #{batch_number} is consistent with L1"); - self.l1_batch_updater.update_checked_batch(batch_number); + self.event_handler.update_checked_batch(batch_number); batch_number += 1; } - Ok(false) => match &self.l1_data_mismatch_behavior { - #[cfg(test)] - L1DataMismatchBehavior::Bail => { - anyhow::bail!("L1 Batch #{batch_number} is inconsistent with L1"); - } - L1DataMismatchBehavior::Log => { - tracing::warn!("L1 Batch #{batch_number} is inconsistent with L1"); - batch_number += 1; // We don't want to infinitely loop failing the check on the same batch + Ok(false) => { + self.event_handler.report_inconsistent_batch(batch_number); + match &self.l1_data_mismatch_behavior { + #[cfg(test)] + L1DataMismatchBehavior::Bail => { + anyhow::bail!("L1 batch #{batch_number} is inconsistent with L1"); + } + L1DataMismatchBehavior::Log => { + batch_number += 1; // We don't want to infinitely loop failing the check on the same batch + } } - }, + } Err(CheckError::Web3(err)) => { tracing::warn!("Error accessing L1; will retry after a delay: {err}"); tokio::time::sleep(self.sleep_interval).await; diff --git a/core/lib/zksync_core/src/consistency_checker/tests/mod.rs b/core/lib/zksync_core/src/consistency_checker/tests/mod.rs index 7ae745e8e583..19994f9aab30 100644 --- a/core/lib/zksync_core/src/consistency_checker/tests/mod.rs +++ b/core/lib/zksync_core/src/consistency_checker/tests/mod.rs @@ -62,21 +62,35 @@ fn build_commit_tx_input_data(batches: &[L1BatchWithMetadata]) -> Vec { } fn create_mock_checker(client: MockEthereum, pool: ConnectionPool) -> ConsistencyChecker { + let (health_check, health_updater) = ConsistencyCheckerHealthUpdater::new(); ConsistencyChecker { contract: zksync_contracts::zksync_contract(), max_batches_to_recheck: 100, sleep_interval: Duration::from_millis(10), l1_client: Box::new(client), - l1_batch_updater: Box::new(()), + event_handler: Box::new(health_updater), l1_data_mismatch_behavior: L1DataMismatchBehavior::Bail, pool, + health_check, } } -impl UpdateCheckedBatch for mpsc::UnboundedSender { +impl HandleConsistencyCheckerEvent for mpsc::UnboundedSender { + fn initialize(&mut self) { + // Do nothing + } + + fn set_first_batch_to_check(&mut self, _first_batch_to_check: L1BatchNumber) { + // Do nothing + } + fn update_checked_batch(&mut self, last_checked_batch: L1BatchNumber) { self.send(last_checked_batch).ok(); } + + fn report_inconsistent_batch(&mut self, _number: L1BatchNumber) { + // Do nothing + } } #[test] @@ -327,7 +341,7 @@ async fn normal_checker_function( let (l1_batch_updates_sender, mut l1_batch_updates_receiver) = mpsc::unbounded_channel(); let checker = ConsistencyChecker { - l1_batch_updater: Box::new(l1_batch_updates_sender), + event_handler: Box::new(l1_batch_updates_sender), ..create_mock_checker(client, pool.clone()) }; @@ -401,7 +415,7 @@ async fn checker_processes_pre_boojum_batches( let (l1_batch_updates_sender, mut l1_batch_updates_receiver) = mpsc::unbounded_channel(); let checker = ConsistencyChecker { - l1_batch_updater: Box::new(l1_batch_updates_sender), + event_handler: Box::new(l1_batch_updates_sender), ..create_mock_checker(client, pool.clone()) }; @@ -472,7 +486,7 @@ async fn checker_functions_after_snapshot_recovery(delay_batch_insertion: bool) let (l1_batch_updates_sender, mut l1_batch_updates_receiver) = mpsc::unbounded_channel(); let checker = ConsistencyChecker { - l1_batch_updater: Box::new(l1_batch_updates_sender), + event_handler: Box::new(l1_batch_updates_sender), ..create_mock_checker(client, pool.clone()) }; let (stop_sender, stop_receiver) = watch::channel(false); diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 038c64f88404..5575712adda0 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -717,9 +717,11 @@ pub async fn initialize_components( .build() .await .context("failed to build commitment_generator_pool")?; - let commitment_generator = - CommitmentGenerator::new(commitment_generator_pool, stop_receiver.clone()); - task_futures.push(tokio::spawn(commitment_generator.run())); + let commitment_generator = CommitmentGenerator::new(commitment_generator_pool); + healthchecks.push(Box::new(commitment_generator.health_check())); + task_futures.push(tokio::spawn( + commitment_generator.run(stop_receiver.clone()), + )); } // Run healthcheck server for all components. diff --git a/core/lib/zksync_core/src/reorg_detector/mod.rs b/core/lib/zksync_core/src/reorg_detector/mod.rs index 96a6c19890ec..691b58361bf2 100644 --- a/core/lib/zksync_core/src/reorg_detector/mod.rs +++ b/core/lib/zksync_core/src/reorg_detector/mod.rs @@ -4,6 +4,7 @@ use anyhow::Context as _; use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::ConnectionPool; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_types::{L1BatchNumber, MiniblockNumber, H256}; use zksync_web3_decl::{ error::{ClientRpcContext, EnrichedClientError, EnrichedClientResult}, @@ -115,16 +116,24 @@ impl MainNodeClient for HttpClient { } } -trait UpdateCorrectBlock: fmt::Debug + Send + Sync { +trait HandleReorgDetectorEvent: fmt::Debug + Send + Sync { + fn initialize(&mut self); + fn update_correct_block( &mut self, last_correct_miniblock: MiniblockNumber, last_correct_l1_batch: L1BatchNumber, ); + + fn report_divergence(&mut self, diverged_l1_batch: L1BatchNumber); } -/// Default implementation of [`UpdateCorrectBlock`] that reports values as metrics. -impl UpdateCorrectBlock for () { +/// Default implementation of [`HandleReorgDetectorEvent`] that reports values as metrics. +impl HandleReorgDetectorEvent for HealthUpdater { + fn initialize(&mut self) { + self.update(Health::from(HealthStatus::Ready)); + } + fn update_correct_block( &mut self, last_correct_miniblock: MiniblockNumber, @@ -144,6 +153,19 @@ impl UpdateCorrectBlock for () { if prev_checked_l1_batch != last_correct_l1_batch { tracing::debug!("No reorg at L1 batch #{last_correct_l1_batch}"); } + + let health_details = serde_json::json!({ + "last_correct_miniblock": last_correct_miniblock, + "last_correct_l1_batch": last_correct_l1_batch, + }); + self.update(Health::from(HealthStatus::Ready).with_details(health_details)); + } + + fn report_divergence(&mut self, diverged_l1_batch: L1BatchNumber) { + let health_details = serde_json::json!({ + "diverged_l1_batch": diverged_l1_batch, + }); + self.update(Health::from(HealthStatus::Affected).with_details(health_details)); } } @@ -182,28 +204,33 @@ impl MatchOutput { #[derive(Debug)] pub struct ReorgDetector { client: Box, - block_updater: Box, + event_handler: Box, pool: ConnectionPool, - stop_receiver: watch::Receiver, sleep_interval: Duration, + health_check: ReactiveHealthCheck, } impl ReorgDetector { const DEFAULT_SLEEP_INTERVAL: Duration = Duration::from_secs(5); - pub fn new(url: &str, pool: ConnectionPool, stop_receiver: watch::Receiver) -> Self { + pub fn new(url: &str, pool: ConnectionPool) -> Self { let client = HttpClientBuilder::default() .build(url) .expect("Failed to create HTTP client"); + let (health_check, health_updater) = ReactiveHealthCheck::new("reorg_detector"); Self { client: Box::new(client), - block_updater: Box::new(()), + event_handler: Box::new(health_updater), pool, - stop_receiver, sleep_interval: Self::DEFAULT_SLEEP_INTERVAL, + health_check, } } + pub fn health_check(&self) -> &ReactiveHealthCheck { + &self.health_check + } + /// Compares hashes of the given local miniblock and the same miniblock from main node. async fn miniblock_hashes_match( &self, @@ -344,9 +371,13 @@ impl ReorgDetector { .map(L1BatchNumber) } - pub async fn run(mut self) -> anyhow::Result> { + pub async fn run( + mut self, + mut stop_receiver: watch::Receiver, + ) -> anyhow::Result> { + self.event_handler.initialize(); loop { - match self.run_inner().await { + match self.run_inner(&mut stop_receiver).await { Ok(l1_batch_number) => return Ok(l1_batch_number), Err(HashMatchError::Rpc(err)) if is_transient_err(&err) => { tracing::warn!("Following transport error occurred: {err}"); @@ -359,13 +390,12 @@ impl ReorgDetector { } } - async fn run_inner(&mut self) -> Result, HashMatchError> { - let earliest_l1_batch_number = wait_for_l1_batch_with_metadata( - &self.pool, - self.sleep_interval, - &mut self.stop_receiver, - ) - .await?; + async fn run_inner( + &mut self, + stop_receiver: &mut watch::Receiver, + ) -> Result, HashMatchError> { + let earliest_l1_batch_number = + wait_for_l1_batch_with_metadata(&self.pool, self.sleep_interval, stop_receiver).await?; let Some(earliest_l1_batch_number) = earliest_l1_batch_number else { return Ok(None); // Stop signal received @@ -388,7 +418,7 @@ impl ReorgDetector { } loop { - let should_stop = *self.stop_receiver.borrow(); + let should_stop = *stop_receiver.borrow(); // At this point, we are guaranteed to have L1 batches and miniblocks in the storage. let mut storage = self.pool.access_storage().await?; @@ -423,7 +453,7 @@ impl ReorgDetector { // the nodes needs to do catching up; however, it is not certain that there is actually // a re-org taking place. if root_hashes_match && miniblock_hashes_match { - self.block_updater + self.event_handler .update_correct_block(checked_miniblock_number, checked_l1_batch_number); } else { let diverged_l1_batch_number = if root_hashes_match { @@ -431,6 +461,8 @@ impl ReorgDetector { } else { checked_l1_batch_number }; + self.event_handler + .report_divergence(diverged_l1_batch_number); tracing::info!("Searching for the first diverged L1 batch"); let last_correct_l1_batch = self diff --git a/core/lib/zksync_core/src/reorg_detector/tests.rs b/core/lib/zksync_core/src/reorg_detector/tests.rs index 53d07d3cae88..9907a4c9981e 100644 --- a/core/lib/zksync_core/src/reorg_detector/tests.rs +++ b/core/lib/zksync_core/src/reorg_detector/tests.rs @@ -145,7 +145,11 @@ impl MainNodeClient for MockMainNodeClient { } } -impl UpdateCorrectBlock for mpsc::UnboundedSender<(MiniblockNumber, L1BatchNumber)> { +impl HandleReorgDetectorEvent for mpsc::UnboundedSender<(MiniblockNumber, L1BatchNumber)> { + fn initialize(&mut self) { + // Do nothing + } + fn update_correct_block( &mut self, last_correct_miniblock: MiniblockNumber, @@ -154,6 +158,21 @@ impl UpdateCorrectBlock for mpsc::UnboundedSender<(MiniblockNumber, L1BatchNumbe self.send((last_correct_miniblock, last_correct_l1_batch)) .ok(); } + + fn report_divergence(&mut self, _diverged_l1_batch: L1BatchNumber) { + // Do nothing + } +} + +fn create_mock_detector(client: MockMainNodeClient, pool: ConnectionPool) -> ReorgDetector { + let (health_check, health_updater) = ReactiveHealthCheck::new("reorg_detector"); + ReorgDetector { + client: Box::new(client), + event_handler: Box::new(health_updater), + pool, + sleep_interval: Duration::from_millis(10), + health_check, + } } #[test_casing(4, Product(([false, true], [false, true])))] @@ -215,13 +234,10 @@ async fn normal_reorg_function(snapshot_recovery: bool, with_transient_errors: b let (block_update_sender, mut block_update_receiver) = mpsc::unbounded_channel::<(MiniblockNumber, L1BatchNumber)>(); let detector = ReorgDetector { - client: Box::new(client), - block_updater: Box::new(block_update_sender), - pool: pool.clone(), - stop_receiver, - sleep_interval: Duration::from_millis(10), + event_handler: Box::new(block_update_sender), + ..create_mock_detector(client, pool.clone()) }; - let detector_task = tokio::spawn(detector.run()); + let detector_task = tokio::spawn(detector.run(stop_receiver)); for (number, miniblock_hash, l1_batch_hash) in miniblock_and_l1_batch_hashes { store_miniblock(&mut storage, number, miniblock_hash).await; @@ -256,15 +272,9 @@ async fn detector_stops_on_fatal_rpc_error() { *client.error_kind.lock().unwrap() = Some(RpcErrorKind::Fatal); let (_stop_sender, stop_receiver) = watch::channel(false); - let detector = ReorgDetector { - client: Box::new(client), - block_updater: Box::new(()), - pool: pool.clone(), - stop_receiver, - sleep_interval: Duration::from_millis(10), - }; + let detector = create_mock_detector(client, pool.clone()); // Check that the detector stops when a fatal RPC error is encountered. - detector.run().await.unwrap_err(); + detector.run(stop_receiver).await.unwrap_err(); } #[tokio::test] @@ -299,14 +309,8 @@ async fn reorg_is_detected_on_batch_hash_mismatch() { .l1_batch_root_hash_responses .insert(L1BatchNumber(2), H256::repeat_byte(2)); - let detector = ReorgDetector { - client: Box::new(client), - block_updater: Box::new(()), - pool: pool.clone(), - stop_receiver, - sleep_interval: Duration::from_millis(10), - }; - let detector_task = tokio::spawn(detector.run()); + let detector = create_mock_detector(client, pool.clone()); + let detector_task = tokio::spawn(detector.run(stop_receiver)); store_miniblock(&mut storage, 1, miniblock_hash).await; seal_l1_batch(&mut storage, 1, H256::repeat_byte(1)).await; @@ -351,14 +355,8 @@ async fn reorg_is_detected_on_miniblock_hash_mismatch() { .miniblock_hash_responses .insert(MiniblockNumber(3), miniblock_hash); - let detector = ReorgDetector { - client: Box::new(client), - block_updater: Box::new(()), - pool: pool.clone(), - stop_receiver, - sleep_interval: Duration::from_millis(10), - }; - let detector_task = tokio::spawn(detector.run()); + let detector = create_mock_detector(client, pool.clone()); + let detector_task = tokio::spawn(detector.run(stop_receiver)); store_miniblock(&mut storage, 1, miniblock_hash).await; seal_l1_batch(&mut storage, 1, H256::repeat_byte(1)).await; @@ -445,13 +443,10 @@ async fn reorg_is_detected_on_historic_batch_hash_mismatch( let (block_update_sender, mut block_update_receiver) = mpsc::unbounded_channel::<(MiniblockNumber, L1BatchNumber)>(); let detector = ReorgDetector { - client: Box::new(client), - block_updater: Box::new(block_update_sender), - pool: pool.clone(), - stop_receiver, - sleep_interval: Duration::from_millis(10), + event_handler: Box::new(block_update_sender), + ..create_mock_detector(client, pool.clone()) }; - let detector_task = tokio::spawn(detector.run()); + let detector_task = tokio::spawn(detector.run(stop_receiver)); if matches!(storage_update_strategy, StorageUpdateStrategy::Sequential) { let mut last_number = earliest_l1_batch_number; @@ -483,14 +478,8 @@ async fn stopping_reorg_detector_while_waiting_for_l1_batch() { drop(storage); let (stop_sender, stop_receiver) = watch::channel(false); - let detector = ReorgDetector { - client: Box::::default(), - block_updater: Box::new(()), - pool, - stop_receiver, - sleep_interval: Duration::from_millis(10), - }; - let detector_task = tokio::spawn(detector.run()); + let detector = create_mock_detector(MockMainNodeClient::default(), pool); + let detector_task = tokio::spawn(detector.run(stop_receiver)); stop_sender.send_replace(true); @@ -514,16 +503,10 @@ async fn detector_errors_on_earliest_batch_hash_mismatch() { .l1_batch_root_hash_responses .insert(L1BatchNumber(0), H256::zero()); - let (_stop_sender, stop_receiver) = watch::channel(false); - let mut detector = ReorgDetector { - client: Box::new(client), - block_updater: Box::new(()), - pool: pool.clone(), - stop_receiver, - sleep_interval: Duration::from_millis(10), - }; + let (_stop_sender, mut stop_receiver) = watch::channel(false); + let mut detector = create_mock_detector(client, pool.clone()); - let err = detector.run_inner().await.unwrap_err(); + let err = detector.run_inner(&mut stop_receiver).await.unwrap_err(); assert_matches!(err, HashMatchError::EarliestHashMismatch(L1BatchNumber(0))); } @@ -535,14 +518,8 @@ async fn detector_errors_on_earliest_batch_hash_mismatch_with_snapshot_recovery( .l1_batch_root_hash_responses .insert(L1BatchNumber(3), H256::zero()); - let (_stop_sender, stop_receiver) = watch::channel(false); - let mut detector = ReorgDetector { - client: Box::new(client), - block_updater: Box::new(()), - pool: pool.clone(), - stop_receiver, - sleep_interval: Duration::from_millis(10), - }; + let (_stop_sender, mut stop_receiver) = watch::channel(false); + let mut detector = create_mock_detector(client, pool.clone()); tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(20)).await; @@ -555,7 +532,7 @@ async fn detector_errors_on_earliest_batch_hash_mismatch_with_snapshot_recovery( seal_l1_batch(&mut storage, 3, H256::from_low_u64_be(3)).await; }); - let err = detector.run_inner().await.unwrap_err(); + let err = detector.run_inner(&mut stop_receiver).await.unwrap_err(); assert_matches!(err, HashMatchError::EarliestHashMismatch(L1BatchNumber(3))); } @@ -596,14 +573,8 @@ async fn reorg_is_detected_without_waiting_for_main_node_to_catch_up() { client.latest_miniblock_response = Some(MiniblockNumber(3)); let (_stop_sender, stop_receiver) = watch::channel(false); - let detector = ReorgDetector { - client: Box::new(client), - block_updater: Box::new(()), - pool, - stop_receiver, - sleep_interval: Duration::from_millis(10), - }; - let detector_task = tokio::spawn(detector.run()); + let detector = create_mock_detector(client, pool); + let detector_task = tokio::spawn(detector.run(stop_receiver)); let task_result = detector_task.await.unwrap(); let last_correct_l1_batch = task_result.unwrap(); diff --git a/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs b/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs index 7d059f6443e7..8c14f937dd83 100644 --- a/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs +++ b/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs @@ -5,10 +5,12 @@ use std::{fmt, time::Duration}; use anyhow::Context as _; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use serde::Serialize; #[cfg(test)] use tokio::sync::mpsc; use tokio::sync::watch; use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_types::{ aggregated_operations::AggregatedActionType, api, L1BatchNumber, MiniblockNumber, H256, }; @@ -116,7 +118,7 @@ impl MainNodeClient for HttpClient { } /// Cursors for the last executed / proven / committed L1 batch numbers. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize)] struct UpdaterCursor { last_executed_l1_batch: L1BatchNumber, last_proven_l1_batch: L1BatchNumber, @@ -242,6 +244,7 @@ impl UpdaterCursor { pub struct BatchStatusUpdater { client: Box, pool: ConnectionPool, + health_updater: HealthUpdater, sleep_interval: Duration, /// Test-only sender of status changes each time they are produced and applied to the storage. #[cfg(test)] @@ -270,17 +273,24 @@ impl BatchStatusUpdater { Self { client, pool, + health_updater: ReactiveHealthCheck::new("batch_status_updater").1, sleep_interval, #[cfg(test)] changes_sender: mpsc::unbounded_channel().0, } } + pub fn health_check(&self) -> ReactiveHealthCheck { + self.health_updater.subscribe() + } + pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { let mut storage = self.pool.access_storage_tagged("sync_layer").await?; let mut cursor = UpdaterCursor::new(&mut storage).await?; drop(storage); tracing::info!("Initialized batch status updater cursor: {cursor:?}"); + self.health_updater + .update(Health::from(HealthStatus::Ready).with_details(cursor)); loop { if *stop_receiver.borrow() { @@ -305,6 +315,8 @@ impl BatchStatusUpdater { } else { self.apply_status_changes(&mut cursor, status_changes) .await?; + self.health_updater + .update(Health::from(HealthStatus::Ready).with_details(cursor)); } } } diff --git a/core/lib/zksync_core/src/sync_layer/sync_state.rs b/core/lib/zksync_core/src/sync_layer/sync_state.rs index 32a5c29c1988..fb4834b4d5a9 100644 --- a/core/lib/zksync_core/src/sync_layer/sync_state.rs +++ b/core/lib/zksync_core/src/sync_layer/sync_state.rs @@ -1,5 +1,8 @@ use std::sync::{Arc, RwLock}; +use async_trait::async_trait; +use serde::Serialize; +use zksync_health_check::{CheckHealth, Health, HealthStatus}; use zksync_types::MiniblockNumber; use crate::metrics::EN_METRICS; @@ -20,10 +23,6 @@ pub struct SyncState { const SYNC_MINIBLOCK_DELTA: u32 = 10; impl SyncState { - pub fn new() -> Self { - Self::default() - } - pub(crate) fn get_main_node_block(&self) -> MiniblockNumber { self.inner .read() @@ -49,7 +48,7 @@ impl SyncState { } } inner.main_node_block = Some(block); - self.update_sync_metric(&inner); + inner.update_sync_metric(); } pub(super) fn set_local_block(&self, block: MiniblockNumber) { @@ -65,25 +64,35 @@ impl SyncState { } } inner.local_block = Some(block); - self.update_sync_metric(&inner); + inner.update_sync_metric(); } pub(crate) fn is_synced(&self) -> bool { let inner = self.inner.read().unwrap(); - self.is_synced_inner(&inner).0 + inner.is_synced().0 } +} - fn update_sync_metric(&self, inner: &SyncStateInner) { - let (is_synced, lag) = self.is_synced_inner(inner); - EN_METRICS.synced.set(is_synced.into()); - if let Some(lag) = lag { - EN_METRICS.sync_lag.set(lag.into()); - } +#[async_trait] +impl CheckHealth for SyncState { + fn name(&self) -> &'static str { + "sync_state" + } + + async fn check_health(&self) -> Health { + Health::from(&*self.inner.read().unwrap()) } +} - fn is_synced_inner(&self, inner: &SyncStateInner) -> (bool, Option) { - if let (Some(main_node_block), Some(local_block)) = - (inner.main_node_block, inner.local_block) +#[derive(Debug, Default)] +struct SyncStateInner { + main_node_block: Option, + local_block: Option, +} + +impl SyncStateInner { + fn is_synced(&self) -> (bool, Option) { + if let (Some(main_node_block), Some(local_block)) = (self.main_node_block, self.local_block) { let Some(block_diff) = main_node_block.0.checked_sub(local_block.0) else { // We're ahead of the main node, this situation is handled by the re-org detector. @@ -94,34 +103,74 @@ impl SyncState { (false, None) } } + + fn update_sync_metric(&self) { + let (is_synced, lag) = self.is_synced(); + EN_METRICS.synced.set(is_synced.into()); + if let Some(lag) = lag { + EN_METRICS.sync_lag.set(lag.into()); + } + } } -#[derive(Debug, Default)] -struct SyncStateInner { - main_node_block: Option, - local_block: Option, +impl From<&SyncStateInner> for Health { + fn from(state: &SyncStateInner) -> Health { + #[derive(Debug, Serialize)] + struct SyncStateHealthDetails { + is_synced: bool, + #[serde(skip_serializing_if = "Option::is_none")] + main_node_block: Option, + #[serde(skip_serializing_if = "Option::is_none")] + local_block: Option, + } + + let (is_synced, block_diff) = state.is_synced(); + let status = if is_synced { + HealthStatus::Ready + } else if block_diff.is_some() { + HealthStatus::Affected + } else { + return HealthStatus::NotReady.into(); // `state` isn't initialized yet + }; + Health::from(status).with_details(SyncStateHealthDetails { + is_synced, + main_node_block: state.main_node_block, + local_block: state.local_block, + }) + } } #[cfg(test)] mod tests { + use assert_matches::assert_matches; + use super::*; - #[test] - fn test_sync_state() { - let sync_state = SyncState::new(); + #[tokio::test] + async fn test_sync_state() { + let sync_state = SyncState::default(); // The node is not synced if there is no data. assert!(!sync_state.is_synced()); + let health = sync_state.check_health().await; + assert_matches!(health.status(), HealthStatus::NotReady); + // The gap is too big, still not synced. sync_state.set_local_block(MiniblockNumber(0)); sync_state.set_main_node_block(MiniblockNumber(SYNC_MINIBLOCK_DELTA + 1)); assert!(!sync_state.is_synced()); + let health = sync_state.check_health().await; + assert_matches!(health.status(), HealthStatus::Affected); + // Within the threshold, the node is synced. sync_state.set_local_block(MiniblockNumber(1)); assert!(sync_state.is_synced()); + let health = sync_state.check_health().await; + assert_matches!(health.status(), HealthStatus::Ready); + // Can reach the main node last block. sync_state.set_local_block(MiniblockNumber(SYNC_MINIBLOCK_DELTA + 1)); assert!(sync_state.is_synced()); @@ -133,7 +182,7 @@ mod tests { #[test] fn test_sync_state_doesnt_panic_on_local_block() { - let sync_state = SyncState::new(); + let sync_state = SyncState::default(); sync_state.set_main_node_block(MiniblockNumber(1)); sync_state.set_local_block(MiniblockNumber(2)); @@ -145,7 +194,7 @@ mod tests { #[test] fn test_sync_state_doesnt_panic_on_main_node_block() { - let sync_state = SyncState::new(); + let sync_state = SyncState::default(); sync_state.set_local_block(MiniblockNumber(2)); sync_state.set_main_node_block(MiniblockNumber(1)); diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index b4d3ef7349bb..f78a3de84377 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -68,7 +68,7 @@ impl StateKeeperHandles { assert!(!tx_hashes.is_empty()); assert!(tx_hashes.iter().all(|tx_hashes| !tx_hashes.is_empty())); - let sync_state = SyncState::new(); + let sync_state = SyncState::default(); let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(pool.clone(), 5); tokio::spawn(miniblock_sealer.run());