diff --git a/rust/nomad-base/src/agent.rs b/rust/nomad-base/src/agent.rs index d1cfa449..4217e690 100644 --- a/rust/nomad-base/src/agent.rs +++ b/rust/nomad-base/src/agent.rs @@ -12,7 +12,11 @@ use nomad_core::{db::DB, Common}; use tracing::instrument::Instrumented; use tracing::{error, info, info_span, Instrument}; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, SystemTime}, +}; use tokio::{task::JoinHandle, time::sleep}; /// Properties shared across all agents @@ -92,14 +96,22 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { /// Run the agent with the given home and replica fn run(channel: Self::Channel) -> Instrumented>>; - /// Run the Agent, and tag errors with the domain ID of the replica + /// Run the agent for a given channel. If the channel dies, exponentially + /// retry. If failures are more than 5 minutes apart, reset exponential + /// backoff (likely unrelated after that point). #[allow(clippy::unit_arg)] #[tracing::instrument] fn run_report_error(&self, replica: String) -> Instrumented>> { let channel = self.build_channel(&replica); + let channel_faults_gauge = self + .metrics() + .channel_specific_gauge(self.home().name(), &replica); tokio::spawn(async move { + let mut exponential = 0; loop { + let running_time = SystemTime::now(); + let handle = Self::run(channel.clone()).in_current_span(); let res = handle .await? @@ -112,7 +124,23 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef { "Channel for replica {} errored out! Error: {:?}", &replica, e ); - info!("Restarting channel to {}", &replica); + channel_faults_gauge.inc(); + + // If running time >= 5 minutes, current failure likely + // unrelated to previous + if running_time.elapsed().unwrap().as_secs() >= 300 { + exponential = 0; + } else { + exponential += 1; + } + + let sleep_time = 2u64.pow(exponential); + info!( + "Restarting channel to {} in {} seconds", + &replica, sleep_time + ); + + sleep(Duration::from_secs(sleep_time)).await; } } } diff --git a/rust/nomad-base/src/metrics.rs b/rust/nomad-base/src/metrics.rs index ff863db6..6d0e1281 100644 --- a/rust/nomad-base/src/metrics.rs +++ b/rust/nomad-base/src/metrics.rs @@ -2,7 +2,7 @@ use color_eyre::Result; use prometheus::{ - Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry, + Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, }; use std::sync::Arc; use tokio::task::JoinHandle; @@ -13,6 +13,7 @@ pub struct CoreMetrics { agent_name: String, transactions: Box, wallet_balance: Box, + channel_faults: Box, rpc_latencies: Box, span_durations: Box, listen_port: Option, @@ -47,6 +48,15 @@ impl CoreMetrics { .const_label("VERSION", env!("CARGO_PKG_VERSION")), &["chain", "wallet", "agent"], )?), + channel_faults: Box::new(IntGaugeVec::new( + Opts::new( + "channel_faults", + "Number of per home <> replica channel faults (errors)", + ) + .namespace("nomad") + .const_label("VERSION", env!("CARGO_PKG_VERSION")), + &["home", "replica", "agent"], + )?), rpc_latencies: Box::new(HistogramVec::new( HistogramOpts::new( "rpc_duration_ms", @@ -149,6 +159,12 @@ impl CoreMetrics { .set(current_balance.as_u64() as i64) // XXX: truncated data } + /// Return single gauge for one home <> replica channel + pub fn channel_specific_gauge(&self, home: &str, replica: &str) -> IntGauge { + self.channel_faults + .with_label_values(&[home, replica, &self.agent_name]) + } + /// Call with RPC duration after it is complete pub fn rpc_complete(&self, chain: &str, method: &str, duration_ms: f64) { self.rpc_latencies