diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 3590c8e6d0..6c35d03e8c 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -99,6 +99,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { .collect(), )); for (chain_id, chain_config) in config.chains.clone() { + keeper_metrics.add_chain(chain_id.clone(), config.provider.address); let keeper_metrics = keeper_metrics.clone(); let keeper_private_key_option = keeper_private_key_option.clone(); let chains = chains.clone(); @@ -168,7 +169,6 @@ async fn setup_chain_and_run_keeper( rpc_metrics.clone(), ) .await?; - keeper_metrics.add_chain(chain_id.clone(), state.provider_address); chains.write().await.insert( chain_id.clone(), ApiBlockChainState::Initialized(state.clone()), diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index c585ff3908..9fdda1c5fd 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -178,45 +178,56 @@ pub async fn run_keeper_threads( }; loop { - // There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds. - // If rpc start fails all of these threads will just exit, instead of retrying. - // We are tracking rpc failures elsewhere, so it's fine. - spawn( - track_provider( - chain_id.clone(), - contract.clone(), - provider_address, - keeper_metrics.clone(), - ) - .in_current_span(), - ); - spawn( - track_balance( - chain_id.clone(), - contract.client(), - keeper_address, - keeper_metrics.clone(), - ) - .in_current_span(), - ); - spawn( - track_accrued_pyth_fees( - chain_id.clone(), - contract.clone(), - keeper_metrics.clone(), - ) - .in_current_span(), - ); - spawn( - track_block_timestamp_lag( - chain_id.clone(), - contract.client(), - keeper_metrics.clone(), - ) - .in_current_span(), - ); - time::sleep(TRACK_INTERVAL).await; + + // Track provider info and balance sequentially. Note that the tracking is done sequentially with the + // timestamp last. If there is a persistent error in any of these methods, the timestamp will lag behind + // current time and trigger an alert. + if let Err(e) = track_provider( + chain_id.clone(), + contract.clone(), + provider_address, + keeper_metrics.clone(), + ) + .await + { + tracing::error!("Error tracking provider: {:?}", e); + continue; + } + + if let Err(e) = track_balance( + chain_id.clone(), + contract.client(), + keeper_address, + keeper_metrics.clone(), + ) + .await + { + tracing::error!("Error tracking balance: {:?}", e); + continue; + } + + if let Err(e) = track_accrued_pyth_fees( + chain_id.clone(), + contract.clone(), + keeper_metrics.clone(), + ) + .await + { + tracing::error!("Error tracking accrued pyth fees: {:?}", e); + continue; + } + + if let Err(e) = track_block_timestamp_lag( + chain_id.clone(), + contract.client(), + keeper_metrics.clone(), + ) + .await + { + tracing::error!("Error tracking block timestamp lag: {:?}", e); + continue; + } } } .in_current_span(), diff --git a/apps/fortuna/src/keeper/block.rs b/apps/fortuna/src/keeper/block.rs index c73719039b..cb1dfa4e9e 100644 --- a/apps/fortuna/src/keeper/block.rs +++ b/apps/fortuna/src/keeper/block.rs @@ -3,12 +3,16 @@ use { api::{self, BlockchainState}, chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber}, eth_utils::utils::EscalationPolicy, - keeper::keeper_metrics::KeeperMetrics, + keeper::keeper_metrics::{ChainIdLabel, KeeperMetrics}, keeper::process_event::process_event_with_backoff, }, anyhow::Result, ethers::types::U256, - std::{collections::HashSet, sync::Arc}, + std::{ + collections::HashSet, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, + }, tokio::{ spawn, sync::{mpsc, RwLock}, @@ -115,6 +119,10 @@ pub async fn process_single_block_batch( metrics: Arc, fulfilled_requests_cache: Arc>>, ) { + let label = ChainIdLabel { + chain_id: chain_state.id.clone(), + }; + loop { let events_res = chain_state .contract @@ -125,6 +133,31 @@ pub async fn process_single_block_batch( ) .await; + // Only update metrics if we successfully retrieved events. + if events_res.is_ok() { + // Track the last time blocks were processed. If anything happens to the processing thread, the + // timestamp will lag, which will trigger an alert. + let server_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_secs() as i64) + .unwrap_or(0); + metrics + .process_event_timestamp + .get_or_create(&label) + .set(server_timestamp); + + let current_block = metrics + .process_event_block_number + .get_or_create(&label) + .get(); + if block_range.to > current_block as u64 { + metrics + .process_event_block_number + .get_or_create(&label) + .set(block_range.to as i64); + } + } + match events_res { Ok(events) => { tracing::info!(num_of_events = &events.len(), "Processing",); diff --git a/apps/fortuna/src/keeper/keeper_metrics.rs b/apps/fortuna/src/keeper/keeper_metrics.rs index abccb8abba..5051f4a1a1 100644 --- a/apps/fortuna/src/keeper/keeper_metrics.rs +++ b/apps/fortuna/src/keeper/keeper_metrics.rs @@ -44,6 +44,10 @@ pub struct KeeperMetrics { pub gas_price_estimate: Family>, pub accrued_pyth_fees: Family>, pub block_timestamp_lag: Family, + pub latest_block_timestamp: Family, + pub process_event_timestamp: Family, + pub latest_block_number: Family, + pub process_event_block_number: Family, } impl Default for KeeperMetrics { @@ -87,6 +91,10 @@ impl Default for KeeperMetrics { gas_price_estimate: Family::default(), accrued_pyth_fees: Family::default(), block_timestamp_lag: Family::default(), + latest_block_timestamp: Family::default(), + process_event_timestamp: Family::default(), + latest_block_number: Family::default(), + process_event_block_number: Family::default(), } } } @@ -228,6 +236,30 @@ impl KeeperMetrics { keeper_metrics.block_timestamp_lag.clone(), ); + writable_registry.register( + "latest_block_timestamp", + "The current block timestamp", + keeper_metrics.latest_block_timestamp.clone(), + ); + + writable_registry.register( + "process_event_timestamp", + "Timestamp of the last time the keeper updated the events", + keeper_metrics.process_event_timestamp.clone(), + ); + + writable_registry.register( + "latest_block_number", + "The current block number", + keeper_metrics.latest_block_number.clone(), + ); + + writable_registry.register( + "process_event_block_number", + "The highest block number for which events have been successfully retrieved and processed", + keeper_metrics.process_event_block_number.clone(), + ); + // *Important*: When adding a new metric: // 1. Register it above using `writable_registry.register(...)` // 2. Add a get_or_create call in the add_chain function below to initialize it for each chain/provider pair @@ -241,6 +273,12 @@ impl KeeperMetrics { }; let _ = self.accrued_pyth_fees.get_or_create(&chain_id_label); let _ = self.block_timestamp_lag.get_or_create(&chain_id_label); + let _ = self.latest_block_timestamp.get_or_create(&chain_id_label); + let _ = self.process_event_timestamp.get_or_create(&chain_id_label); + let _ = self.latest_block_number.get_or_create(&chain_id_label); + let _ = self + .process_event_block_number + .get_or_create(&chain_id_label); let account_label = AccountLabel { chain_id, diff --git a/apps/fortuna/src/keeper/track.rs b/apps/fortuna/src/keeper/track.rs index c5e06981da..0854e83bac 100644 --- a/apps/fortuna/src/keeper/track.rs +++ b/apps/fortuna/src/keeper/track.rs @@ -4,6 +4,7 @@ use { api::ChainId, chain::ethereum::InstrumentedPythContract, eth_utils::traced_client::TracedClient, }, + anyhow::{anyhow, Result}, ethers::middleware::Middleware, ethers::{prelude::BlockNumber, providers::Provider, types::Address}, std::{ @@ -14,23 +15,17 @@ use { }; /// tracks the balance of the given address on the given chain -/// if there was an error, the function will just return #[tracing::instrument(skip_all)] pub async fn track_balance( chain_id: String, provider: Arc>, address: Address, metrics: Arc, -) { - let balance = match provider.get_balance(address, None).await { - // This conversion to u128 is fine as the total balance will never cross the limits - // of u128 practically. - Ok(r) => r.as_u128(), - Err(e) => { - tracing::error!("Error while getting balance. error: {:?}", e); - return; - } - }; +) -> Result<()> { + let balance = provider.get_balance(address, None).await?; + // This conversion to u128 is fine as the total balance will never cross the limits + // of u128 practically. + let balance = balance.as_u128(); // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus. // The balance is in wei, so we need to divide by 1e18 to convert it to eth. let balance = balance as f64 / 1e18; @@ -42,6 +37,8 @@ pub async fn track_balance( address: address.to_string(), }) .set(balance); + + Ok(()) } /// Tracks the difference between the server timestamp and the latest block timestamp for each chain @@ -50,53 +47,47 @@ pub async fn track_block_timestamp_lag( chain_id: String, provider: Arc>, metrics: Arc, -) { - const INF_LAG: i64 = 1000000; // value that definitely triggers an alert - let lag = match provider.get_block(BlockNumber::Latest).await { - Ok(block) => match block { - Some(block) => { - let block_timestamp = block.timestamp; - let server_timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); - let lag: i64 = (server_timestamp as i64) - (block_timestamp.as_u64() as i64); - lag - } - None => { - tracing::error!("Block is None"); - INF_LAG - } - }, - Err(e) => { - tracing::error!("Failed to get block - {:?}", e); - INF_LAG - } +) -> Result<()> { + let label = ChainIdLabel { + chain_id: chain_id.clone(), }; + + let block = provider.get_block(BlockNumber::Latest).await?; + let block = block.ok_or(anyhow!("block was none"))?; + let block_timestamp = block.timestamp.as_u64(); + let block_timestamp = i64::try_from(block_timestamp)?; + let block_number = block + .number + .ok_or(anyhow!("block number was none"))? + .as_u64(); + metrics - .block_timestamp_lag - .get_or_create(&ChainIdLabel { - chain_id: chain_id.clone(), - }) - .set(lag); + .latest_block_timestamp + .get_or_create(&label) + .set(block_timestamp); + + metrics + .latest_block_number + .get_or_create(&label) + .set(block_number as i64); + + let server_timestamp = i64::try_from(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())?; + + let lag = server_timestamp - block_timestamp; + metrics.block_timestamp_lag.get_or_create(&label).set(lag); + + Ok(()) } /// tracks the collected fees and the hashchain data of the given provider address on the given chain -/// if there is a error the function will just return #[tracing::instrument(skip_all)] pub async fn track_provider( chain_id: ChainId, contract: InstrumentedPythContract, provider_address: Address, metrics: Arc, -) { - let provider_info = match contract.get_provider_info(provider_address).call().await { - Ok(info) => info, - Err(e) => { - tracing::error!("Error while getting provider info. error: {:?}", e); - return; - } - }; +) -> Result<()> { + let provider_info = contract.get_provider_info(provider_address).call().await?; // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. // The fee is in wei, so we divide by 1e18 to convert it to eth. @@ -150,23 +141,18 @@ pub async fn track_provider( address: provider_address.to_string(), }) .set(end_sequence_number as i64); + + Ok(()) } /// tracks the accrued pyth fees on the given chain -/// if there is an error the function will just return #[tracing::instrument(skip_all)] pub async fn track_accrued_pyth_fees( chain_id: ChainId, contract: InstrumentedPythContract, metrics: Arc, -) { - let accrued_pyth_fees = match contract.get_accrued_pyth_fees().call().await { - Ok(fees) => fees, - Err(e) => { - tracing::error!("Error while getting accrued pyth fees. error: {:?}", e); - return; - } - }; +) -> Result<()> { + let accrued_pyth_fees = contract.get_accrued_pyth_fees().call().await?; // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus. // The fee is in wei, so we divide by 1e18 to convert it to eth. @@ -178,4 +164,6 @@ pub async fn track_accrued_pyth_fees( chain_id: chain_id.clone(), }) .set(accrued_pyth_fees); + + Ok(()) }