Skip to content
This repository has been archived by the owner on May 28, 2022. It is now read-only.

Commit

Permalink
feature: add exponential retry for channel faults and metrics to trac…
Browse files Browse the repository at this point in the history
…k number of faults
  • Loading branch information
luketchang committed Feb 22, 2022
1 parent 48c6e18 commit 2a2d124
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
34 changes: 31 additions & 3 deletions rust/nomad-base/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ use nomad_core::{db::DB, Common};
use tracing::instrument::Instrumented;
use tracing::{error, info, info_span, Instrument};

use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::{task::JoinHandle, time::sleep};

/// Properties shared across all agents
Expand Down Expand Up @@ -92,14 +96,22 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef<AgentCore> {
/// Run the agent with the given home and replica
fn run(channel: Self::Channel) -> Instrumented<JoinHandle<Result<()>>>;

/// Run the Agent, and tag errors with the domain ID of the replica
/// Run the agent for a given channel. If the channel dies, exponentially
/// retry. If failures are more than 5 minutes apart, reset exponential
/// backoff (likely unrelated after that point).
#[allow(clippy::unit_arg)]
#[tracing::instrument]
fn run_report_error(&self, replica: String) -> Instrumented<JoinHandle<Result<()>>> {
let channel = self.build_channel(&replica);
let channel_faults_gauge = self
.metrics()
.channel_specific_gauge(self.home().name(), &replica);

tokio::spawn(async move {
let mut exponential = 0;
loop {
let running_time = SystemTime::now();

let handle = Self::run(channel.clone()).in_current_span();
let res = handle
.await?
Expand All @@ -112,7 +124,23 @@ pub trait NomadAgent: Send + Sync + Sized + std::fmt::Debug + AsRef<AgentCore> {
"Channel for replica {} errored out! Error: {:?}",
&replica, e
);
info!("Restarting channel to {}", &replica);
channel_faults_gauge.inc();

// If running time >= 5 minutes, current failure likely
// unrelated to previous
if running_time.elapsed().unwrap().as_secs() >= 300 {
exponential = 0;
} else {
exponential += 1;
}

let sleep_time = 2u64.pow(exponential);
info!(
"Restarting channel to {} in {} seconds",
&replica, sleep_time
);

sleep(Duration::from_secs(sleep_time)).await;
}
}
}
Expand Down
18 changes: 17 additions & 1 deletion rust/nomad-base/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use color_eyre::Result;
use prometheus::{
Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry,
Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry,
};
use std::sync::Arc;
use tokio::task::JoinHandle;
Expand All @@ -13,6 +13,7 @@ pub struct CoreMetrics {
agent_name: String,
transactions: Box<IntGaugeVec>,
wallet_balance: Box<IntGaugeVec>,
channel_faults: Box<IntGaugeVec>,
rpc_latencies: Box<HistogramVec>,
span_durations: Box<HistogramVec>,
listen_port: Option<u16>,
Expand Down Expand Up @@ -47,6 +48,15 @@ impl CoreMetrics {
.const_label("VERSION", env!("CARGO_PKG_VERSION")),
&["chain", "wallet", "agent"],
)?),
channel_faults: Box::new(IntGaugeVec::new(
Opts::new(
"channel_faults",
"Number of per home <> replica channel faults (errors)",
)
.namespace("nomad")
.const_label("VERSION", env!("CARGO_PKG_VERSION")),
&["home", "replica", "agent"],
)?),
rpc_latencies: Box::new(HistogramVec::new(
HistogramOpts::new(
"rpc_duration_ms",
Expand Down Expand Up @@ -149,6 +159,12 @@ impl CoreMetrics {
.set(current_balance.as_u64() as i64) // XXX: truncated data
}

/// Return single gauge for one home <> replica channel
pub fn channel_specific_gauge(&self, home: &str, replica: &str) -> IntGauge {
self.channel_faults
.with_label_values(&[home, replica, &self.agent_name])
}

/// Call with RPC duration after it is complete
pub fn rpc_complete(&self, chain: &str, method: &str, duration_ms: f64) {
self.rpc_latencies
Expand Down

0 comments on commit 2a2d124

Please sign in to comment.