Skip to content

Commit

Permalink
indexer-alt: factor out DB connection stats collector
Browse files Browse the repository at this point in the history
## Description

...and use it in the RPC implementation as well. A parameter has been
added to add a prefix to all metric names. This ensures that if an
Indexer and RPC service are sharing a metrics service, they won't stomp
on each others metrics.

## Test plan

Run the RPC service and check its metrics.
  • Loading branch information
amnn committed Jan 15, 2025
1 parent 43618c0 commit a44d664
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 189 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer-alt-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ tracing.workspace = true
url.workspace = true

sui-field-count.workspace = true
sui-indexer-alt-metrics.workspace = true
sui-pg-db.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
Expand Down
8 changes: 6 additions & 2 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use diesel::{
};
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use ingestion::{client::IngestionClient, ClientArgs, IngestionConfig, IngestionService};
use metrics::{DbConnectionStatsCollector, IndexerMetrics};
use metrics::IndexerMetrics;
use pipeline::{
concurrent::{self, ConcurrentConfig},
sequential::{self, SequentialConfig},
Processor,
};
use prometheus::Registry;
use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
use sui_pg_db::{temp::TempDb, Db, DbArgs};
use task::graceful_shutdown;
use tempfile::tempdir;
Expand Down Expand Up @@ -142,7 +143,10 @@ impl Indexer {
.context("Failed to run pending migrations")?;

let metrics = IndexerMetrics::new(registry);
registry.register(Box::new(DbConnectionStatsCollector::new(db.clone())))?;
registry.register(Box::new(DbConnectionStatsCollector::new(
Some("indexer_db"),
db.clone(),
)))?;

let ingestion_service = IngestionService::new(
client_args,
Expand Down
186 changes: 0 additions & 186 deletions crates/sui-indexer-alt-framework/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@
use std::sync::{atomic::AtomicU64, Arc};

use prometheus::{
core::{Collector, Desc},
proto::{Counter, Gauge, LabelPair, Metric, MetricFamily, MetricType, Summary},
register_histogram_vec_with_registry, register_histogram_with_registry,
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use sui_pg_db::Db;
use tracing::warn;

use crate::{ingestion::error::Error, pipeline::Processor};
Expand Down Expand Up @@ -134,12 +131,6 @@ pub(crate) struct IndexerMetrics {
pub watermark_pruner_hi_in_db: IntGaugeVec,
}

/// Collects information about the database connection pool.
pub(crate) struct DbConnectionStatsCollector {
db: Db,
desc: Vec<(MetricType, Desc)>,
}

/// A helper struct to report metrics regarding the checkpoint lag at various points in the indexer.
pub(crate) struct CheckpointLagMetricReporter {
/// Metric to report the lag distribution of each checkpoint.
Expand Down Expand Up @@ -614,100 +605,6 @@ impl IndexerMetrics {
}
}

impl DbConnectionStatsCollector {
pub(crate) fn new(db: Db) -> Self {
let desc = vec![
(
MetricType::GAUGE,
desc(
"db_connections",
"Number of connections currently being managed by the pool",
),
),
(
MetricType::GAUGE,
desc(
"db_idle_connections",
"Number of idle connections in the pool",
),
),
(
MetricType::COUNTER,
desc("db_connect_direct", "Connections that did not have to wait"),
),
(
MetricType::SUMMARY,
desc("db_connect_waited", "Connections that had to wait"),
),
(
MetricType::COUNTER,
desc(
"db_connect_timed_out",
"Connections that timed out waiting for a connection",
),
),
(
MetricType::COUNTER,
desc(
"db_connections_created",
"Connections that have been created in the pool",
),
),
(
MetricType::COUNTER,
desc_with_labels(
"db_connections_closed",
"Total connections that were closed",
&["reason"],
),
),
];

Self { db, desc }
}
}

impl Collector for DbConnectionStatsCollector {
fn desc(&self) -> Vec<&Desc> {
self.desc.iter().map(|d| &d.1).collect()
}

fn collect(&self) -> Vec<MetricFamily> {
let state = self.db.state();
let stats = state.statistics;

vec![
gauge(&self.desc[0].1, state.connections as f64),
gauge(&self.desc[1].1, state.idle_connections as f64),
counter(&self.desc[2].1, stats.get_direct as f64),
summary(
&self.desc[3].1,
stats.get_wait_time.as_millis() as f64,
stats.get_waited + stats.get_timed_out,
),
counter(&self.desc[4].1, stats.get_timed_out as f64),
counter(&self.desc[5].1, stats.connections_created as f64),
counter_with_labels(
&self.desc[6].1,
&[
("reason", "broken", stats.connections_closed_broken as f64),
("reason", "invalid", stats.connections_closed_invalid as f64),
(
"reason",
"max_lifetime",
stats.connections_closed_max_lifetime as f64,
),
(
"reason",
"idle_timeout",
stats.connections_closed_idle_timeout as f64,
),
],
),
]
}
}

impl CheckpointLagMetricReporter {
pub fn new(
checkpoint_time_lag_histogram: Histogram,
Expand Down Expand Up @@ -750,89 +647,6 @@ impl CheckpointLagMetricReporter {
}
}

fn desc(name: &str, help: &str) -> Desc {
desc_with_labels(name, help, &[])
}

fn desc_with_labels(name: &str, help: &str, labels: &[&str]) -> Desc {
Desc::new(
name.to_string(),
help.to_string(),
labels.iter().map(|s| s.to_string()).collect(),
Default::default(),
)
.expect("Bad metric description")
}

fn gauge(desc: &Desc, value: f64) -> MetricFamily {
let mut g = Gauge::default();
let mut m = Metric::default();
let mut mf = MetricFamily::new();

g.set_value(value);
m.set_gauge(g);

mf.mut_metric().push(m);
mf.set_name(desc.fq_name.clone());
mf.set_help(desc.help.clone());
mf.set_field_type(MetricType::COUNTER);
mf
}

fn counter(desc: &Desc, value: f64) -> MetricFamily {
let mut c = Counter::default();
let mut m = Metric::default();
let mut mf = MetricFamily::new();

c.set_value(value);
m.set_counter(c);

mf.mut_metric().push(m);
mf.set_name(desc.fq_name.clone());
mf.set_help(desc.help.clone());
mf.set_field_type(MetricType::GAUGE);
mf
}

fn counter_with_labels(desc: &Desc, values: &[(&str, &str, f64)]) -> MetricFamily {
let mut mf = MetricFamily::new();

for (name, label, value) in values {
let mut c = Counter::default();
let mut l = LabelPair::default();
let mut m = Metric::default();

c.set_value(*value);
l.set_name(name.to_string());
l.set_value(label.to_string());

m.set_counter(c);
m.mut_label().push(l);
mf.mut_metric().push(m);
}

mf.set_name(desc.fq_name.clone());
mf.set_help(desc.help.clone());
mf.set_field_type(MetricType::COUNTER);
mf
}

fn summary(desc: &Desc, sum: f64, count: u64) -> MetricFamily {
let mut s = Summary::default();
let mut m = Metric::default();
let mut mf = MetricFamily::new();

s.set_sample_sum(sum);
s.set_sample_count(count);
m.set_summary(s);

mf.mut_metric().push(m);
mf.set_name(desc.fq_name.clone());
mf.set_help(desc.help.clone());
mf.set_field_type(MetricType::SUMMARY);
mf
}

#[cfg(test)]
pub(crate) mod tests {
use std::sync::Arc;
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-indexer-alt-jsonrpc/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use diesel::result::Error as DieselError;
use diesel_async::methods::LoadQuery;
use diesel_async::RunQueryDsl;
use jsonrpsee::types::{error::INTERNAL_ERROR_CODE, ErrorObject};
use prometheus::Registry;
use sui_indexer_alt_metrics::db::DbConnectionStatsCollector;
use sui_pg_db as db;
use tracing::debug;

Expand Down Expand Up @@ -48,9 +50,17 @@ impl Reader {
pub(crate) async fn new(
db_args: db::DbArgs,
metrics: Arc<RpcMetrics>,
registry: &Registry,
) -> Result<Self, DbError> {
let db = db::Db::for_read(db_args).await.map_err(DbError::Create)?;

registry
.register(Box::new(DbConnectionStatsCollector::new(
Some("rpc_db"),
db.clone(),
)))
.map_err(|e| DbError::Create(e.into()))?;

Ok(Self { db, metrics })
}

Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub async fn start_rpc(
let mut rpc =
RpcService::new(rpc_args, registry, cancel).context("Failed to create RPC service")?;

let reader = Reader::new(db_args, rpc.metrics()).await?;
let reader = Reader::new(db_args, rpc.metrics(), registry).await?;

rpc.add_module(Governance(reader.clone()))?;

Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ prometheus.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true

sui-pg-db.workspace = true
Loading

0 comments on commit a44d664

Please sign in to comment.