Skip to content

feat(fortuna): Better metrics tracking for alerting #2703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/fortuna/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
.collect(),
));
for (chain_id, chain_config) in config.chains.clone() {
keeper_metrics.add_chain(chain_id.clone(), config.provider.address);
let keeper_metrics = keeper_metrics.clone();
let keeper_private_key_option = keeper_private_key_option.clone();
let chains = chains.clone();
Expand Down Expand Up @@ -168,7 +169,6 @@ async fn setup_chain_and_run_keeper(
rpc_metrics.clone(),
)
.await?;
keeper_metrics.add_chain(chain_id.clone(), state.provider_address);
chains.write().await.insert(
chain_id.clone(),
ApiBlockChainState::Initialized(state.clone()),
Expand Down
87 changes: 49 additions & 38 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,45 +178,56 @@ pub async fn run_keeper_threads(
};

loop {
// There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
// If rpc start fails all of these threads will just exit, instead of retrying.
// We are tracking rpc failures elsewhere, so it's fine.
spawn(
track_provider(
chain_id.clone(),
contract.clone(),
provider_address,
keeper_metrics.clone(),
)
.in_current_span(),
);
spawn(
track_balance(
chain_id.clone(),
contract.client(),
keeper_address,
keeper_metrics.clone(),
)
.in_current_span(),
);
spawn(
track_accrued_pyth_fees(
chain_id.clone(),
contract.clone(),
keeper_metrics.clone(),
)
.in_current_span(),
);
spawn(
track_block_timestamp_lag(
chain_id.clone(),
contract.client(),
keeper_metrics.clone(),
)
.in_current_span(),
);

time::sleep(TRACK_INTERVAL).await;

// Track provider info and balance sequentially. Note that the tracking is done sequentially with the
// timestamp last. If there is a persistent error in any of these methods, the timestamp will lag behind
// current time and trigger an alert.
if let Err(e) = track_provider(
chain_id.clone(),
contract.clone(),
provider_address,
keeper_metrics.clone(),
)
.await
{
tracing::error!("Error tracking provider: {:?}", e);
continue;
}

if let Err(e) = track_balance(
chain_id.clone(),
contract.client(),
keeper_address,
keeper_metrics.clone(),
)
.await
{
tracing::error!("Error tracking balance: {:?}", e);
continue;
}

if let Err(e) = track_accrued_pyth_fees(
chain_id.clone(),
contract.clone(),
keeper_metrics.clone(),
)
.await
{
tracing::error!("Error tracking accrued pyth fees: {:?}", e);
continue;
}

if let Err(e) = track_block_timestamp_lag(
chain_id.clone(),
contract.client(),
keeper_metrics.clone(),
)
.await
{
tracing::error!("Error tracking block timestamp lag: {:?}", e);
continue;
}
}
}
.in_current_span(),
Expand Down
37 changes: 35 additions & 2 deletions apps/fortuna/src/keeper/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ use {
api::{self, BlockchainState},
chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
eth_utils::utils::EscalationPolicy,
keeper::keeper_metrics::KeeperMetrics,
keeper::keeper_metrics::{ChainIdLabel, KeeperMetrics},
keeper::process_event::process_event_with_backoff,
},
anyhow::Result,
ethers::types::U256,
std::{collections::HashSet, sync::Arc},
std::{
collections::HashSet,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
},
tokio::{
spawn,
sync::{mpsc, RwLock},
Expand Down Expand Up @@ -115,6 +119,10 @@ pub async fn process_single_block_batch(
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
) {
let label = ChainIdLabel {
chain_id: chain_state.id.clone(),
};

loop {
let events_res = chain_state
.contract
Expand All @@ -125,6 +133,31 @@ pub async fn process_single_block_batch(
)
.await;

// Only update metrics if we successfully retrieved events.
if events_res.is_ok() {
// Track the last time blocks were processed. If anything happens to the processing thread, the
// timestamp will lag, which will trigger an alert.
let server_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs() as i64)
.unwrap_or(0);
metrics
.process_event_timestamp
.get_or_create(&label)
.set(server_timestamp);

let current_block = metrics
.process_event_block_number
.get_or_create(&label)
.get();
if block_range.to > current_block as u64 {
metrics
.process_event_block_number
.get_or_create(&label)
.set(block_range.to as i64);
}
}

match events_res {
Ok(events) => {
tracing::info!(num_of_events = &events.len(), "Processing",);
Expand Down
38 changes: 38 additions & 0 deletions apps/fortuna/src/keeper/keeper_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub struct KeeperMetrics {
pub gas_price_estimate: Family<AccountLabel, Gauge<f64, AtomicU64>>,
pub accrued_pyth_fees: Family<ChainIdLabel, Gauge<f64, AtomicU64>>,
pub block_timestamp_lag: Family<ChainIdLabel, Gauge>,
pub latest_block_timestamp: Family<ChainIdLabel, Gauge>,
pub process_event_timestamp: Family<ChainIdLabel, Gauge>,
pub latest_block_number: Family<ChainIdLabel, Gauge>,
pub process_event_block_number: Family<ChainIdLabel, Gauge>,
}

impl Default for KeeperMetrics {
Expand Down Expand Up @@ -87,6 +91,10 @@ impl Default for KeeperMetrics {
gas_price_estimate: Family::default(),
accrued_pyth_fees: Family::default(),
block_timestamp_lag: Family::default(),
latest_block_timestamp: Family::default(),
process_event_timestamp: Family::default(),
latest_block_number: Family::default(),
process_event_block_number: Family::default(),
}
}
}
Expand Down Expand Up @@ -228,6 +236,30 @@ impl KeeperMetrics {
keeper_metrics.block_timestamp_lag.clone(),
);

writable_registry.register(
"latest_block_timestamp",
"The current block timestamp",
keeper_metrics.latest_block_timestamp.clone(),
);

writable_registry.register(
"process_event_timestamp",
"Timestamp of the last time the keeper updated the events",
keeper_metrics.process_event_timestamp.clone(),
);

writable_registry.register(
"latest_block_number",
"The current block number",
keeper_metrics.latest_block_number.clone(),
);

writable_registry.register(
"process_event_block_number",
"The highest block number for which events have been successfully retrieved and processed",
keeper_metrics.process_event_block_number.clone(),
);

// *Important*: When adding a new metric:
// 1. Register it above using `writable_registry.register(...)`
// 2. Add a get_or_create call in the add_chain function below to initialize it for each chain/provider pair
Expand All @@ -241,6 +273,12 @@ impl KeeperMetrics {
};
let _ = self.accrued_pyth_fees.get_or_create(&chain_id_label);
let _ = self.block_timestamp_lag.get_or_create(&chain_id_label);
let _ = self.latest_block_timestamp.get_or_create(&chain_id_label);
let _ = self.process_event_timestamp.get_or_create(&chain_id_label);
let _ = self.latest_block_number.get_or_create(&chain_id_label);
let _ = self
.process_event_block_number
.get_or_create(&chain_id_label);

let account_label = AccountLabel {
chain_id,
Expand Down
Loading
Loading