Skip to content

Add more metrics to prometheus endpoint #263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 220 additions & 58 deletions src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::net::SocketAddr;

use crate::config::Address;
use crate::pool::get_all_pools;
use crate::stats::get_address_stats;
use crate::stats::{get_address_stats, get_pool_stats, get_server_stats, ServerInformation};

struct MetricHelpType {
help: &'static str,
Expand All @@ -19,113 +19,141 @@ struct MetricHelpType {
// counters only increase
// gauges can arbitrarily increase or decrease
static METRIC_HELP_AND_TYPES_LOOKUP: phf::Map<&'static str, MetricHelpType> = phf_map! {
"total_query_count" => MetricHelpType {
"stats_total_query_count" => MetricHelpType {
help: "Number of queries sent by all clients",
ty: "counter",
},
"total_query_time" => MetricHelpType {
"stats_total_query_time" => MetricHelpType {
help: "Total amount of time for queries to execute",
ty: "counter",
},
"total_received" => MetricHelpType {
"stats_total_received" => MetricHelpType {
help: "Number of bytes received from the server",
ty: "counter",
},
"total_sent" => MetricHelpType {
"stats_total_sent" => MetricHelpType {
help: "Number of bytes sent to the server",
ty: "counter",
},
"total_xact_count" => MetricHelpType {
"stats_total_xact_count" => MetricHelpType {
help: "Total number of transactions started by the client",
ty: "counter",
},
"total_xact_time" => MetricHelpType {
"stats_total_xact_time" => MetricHelpType {
help: "Total amount of time for all transactions to execute",
ty: "counter",
},
"total_wait_time" => MetricHelpType {
"stats_total_wait_time" => MetricHelpType {
help: "Total time client waited for a server connection",
ty: "counter",
},
"avg_query_count" => MetricHelpType {
"stats_avg_query_count" => MetricHelpType {
help: "Average of total_query_count every 15 seconds",
ty: "gauge",
},
"avg_query_time" => MetricHelpType {
"stats_avg_query_time" => MetricHelpType {
help: "Average time taken for queries to execute every 15 seconds",
ty: "gauge",
},
"avg_recv" => MetricHelpType {
"stats_avg_recv" => MetricHelpType {
help: "Average of total_received bytes every 15 seconds",
ty: "gauge",
},
"avg_sent" => MetricHelpType {
"stats_avg_sent" => MetricHelpType {
help: "Average of total_sent bytes every 15 seconds",
ty: "gauge",
},
"avg_errors" => MetricHelpType {
"stats_avg_errors" => MetricHelpType {
help: "Average number of errors every 15 seconds",
ty: "gauge",
},
"avg_xact_count" => MetricHelpType {
"stats_avg_xact_count" => MetricHelpType {
help: "Average of total_xact_count every 15 seconds",
ty: "gauge",
},
"avg_xact_time" => MetricHelpType {
"stats_avg_xact_time" => MetricHelpType {
help: "Average of total_xact_time every 15 seconds",
ty: "gauge",
},
"avg_wait_time" => MetricHelpType {
"stats_avg_wait_time" => MetricHelpType {
help: "Average of total_wait_time every 15 seconds",
ty: "gauge",
},
"maxwait_us" => MetricHelpType {
"pools_maxwait_us" => MetricHelpType {
help: "The time a client waited for a server connection in microseconds",
ty: "gauge",
},
"maxwait" => MetricHelpType {
"pools_maxwait" => MetricHelpType {
help: "The time a client waited for a server connection in seconds",
ty: "gauge",
},
"cl_waiting" => MetricHelpType {
"pools_cl_waiting" => MetricHelpType {
help: "How many clients are waiting for a connection from the pool",
ty: "gauge",
},
"cl_active" => MetricHelpType {
"pools_cl_active" => MetricHelpType {
help: "How many clients are actively communicating with a server",
ty: "gauge",
},
"cl_idle" => MetricHelpType {
"pools_cl_idle" => MetricHelpType {
help: "How many clients are idle",
ty: "gauge",
},
"sv_idle" => MetricHelpType {
"pools_sv_idle" => MetricHelpType {
help: "How many server connections are idle",
ty: "gauge",
},
"sv_active" => MetricHelpType {
"pools_sv_active" => MetricHelpType {
help: "How many server connections are actively communicating with a client",
ty: "gauge",
},
"sv_login" => MetricHelpType {
"pools_sv_login" => MetricHelpType {
help: "How many server connections are currently being created",
ty: "gauge",
},
"sv_tested" => MetricHelpType {
"pools_sv_tested" => MetricHelpType {
help: "How many server connections are currently waiting on a health check to succeed",
ty: "gauge",
},
"servers_bytes_received" => MetricHelpType {
help: "Volume in bytes of network traffic received by server",
ty: "gauge",
},
"servers_bytes_sent" => MetricHelpType {
help: "Volume in bytes of network traffic sent by server",
ty: "gauge",
},
"servers_transaction_count" => MetricHelpType {
help: "Number of transactions executed by server",
ty: "gauge",
},
"servers_query_count" => MetricHelpType {
help: "Number of queries executed by server",
ty: "gauge",
},
"servers_error_count" => MetricHelpType {
help: "Number of errors",
ty: "gauge",
},
"databases_pool_size" => MetricHelpType {
help: "Maximum number of server connections",
ty: "gauge",
},
"databases_current_connections" => MetricHelpType {
help: "Current number of connections for this database",
ty: "gauge",
},
};

struct PrometheusMetric {
struct PrometheusMetric<Value: fmt::Display> {
name: String,
help: String,
ty: String,
labels: HashMap<&'static str, String>,
value: i64,
value: Value,
}

impl fmt::Display for PrometheusMetric {
impl<Value: fmt::Display> fmt::Display for PrometheusMetric<Value> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let formatted_labels = self
.labels
Expand All @@ -145,50 +173,81 @@ impl fmt::Display for PrometheusMetric {
}
}

impl PrometheusMetric {
fn new(address: &Address, name: &str, value: i64) -> Option<PrometheusMetric> {
let mut labels = HashMap::new();
labels.insert("host", address.host.clone());
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("database", address.database.to_string());

impl<Value: fmt::Display> PrometheusMetric<Value> {
fn from_name<V: fmt::Display>(
name: &str,
value: V,
labels: HashMap<&'static str, String>,
) -> Option<PrometheusMetric<V>> {
METRIC_HELP_AND_TYPES_LOOKUP
.get(name)
.map(|metric| PrometheusMetric {
.map(|metric| PrometheusMetric::<V> {
name: name.to_owned(),
help: metric.help.to_owned(),
ty: metric.ty.to_owned(),
labels,
value,
labels,
})
}

fn from_database_info(
address: &Address,
name: &str,
value: u32,
) -> Option<PrometheusMetric<u32>> {
let mut labels = HashMap::new();
labels.insert("host", address.host.clone());
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("database", address.database.to_string());

Self::from_name(&format!("databases_{}", name), value, labels)
}

fn from_server_info(
address: &Address,
name: &str,
value: u64,
) -> Option<PrometheusMetric<u64>> {
let mut labels = HashMap::new();
labels.insert("host", address.host.clone());
labels.insert("shard", address.shard.to_string());
labels.insert("role", address.role.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("database", address.database.to_string());

Self::from_name(&format!("servers_{}", name), value, labels)
}

fn from_address(address: &Address, name: &str, value: i64) -> Option<PrometheusMetric<i64>> {
let mut labels = HashMap::new();
labels.insert("host", address.host.clone());
labels.insert("shard", address.shard.to_string());
labels.insert("pool", address.pool_name.clone());
labels.insert("role", address.role.to_string());
labels.insert("database", address.database.to_string());

Self::from_name(&format!("stats_{}", name), value, labels)
}

fn from_pool(pool: &(String, String), name: &str, value: i64) -> Option<PrometheusMetric<i64>> {
let mut labels = HashMap::new();
labels.insert("pool", pool.0.clone());
labels.insert("user", pool.1.clone());

Self::from_name(&format!("pools_{}", name), value, labels)
}
}

async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
match (request.method(), request.uri().path()) {
(&Method::GET, "/metrics") => {
let stats: HashMap<usize, HashMap<String, i64>> = get_address_stats();

let mut lines = Vec::new();
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
if let Some(address_stats) = stats.get(&address.id) {
for (key, value) in address_stats.iter() {
if let Some(prometheus_metric) =
PrometheusMetric::new(address, key, *value)
{
lines.push(prometheus_metric.to_string());
} else {
warn!("Metric {} not implemented for {}", key, address.name());
}
}
}
}
}
}
push_address_stats(&mut lines);
push_pool_stats(&mut lines);
push_server_stats(&mut lines);
push_database_stats(&mut lines);

Response::builder()
.header("content-type", "text/plain; version=0.0.4")
Expand All @@ -200,6 +259,109 @@ async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hype
}
}

// Adds metrics shown in a SHOW STATS admin command.
fn push_address_stats(lines: &mut Vec<String>) {
let address_stats: HashMap<usize, HashMap<String, i64>> = get_address_stats();
for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
if let Some(address_stats) = address_stats.get(&address.id) {
for (key, value) in address_stats.iter() {
if let Some(prometheus_metric) =
PrometheusMetric::<i64>::from_address(address, key, *value)
{
lines.push(prometheus_metric.to_string());
} else {
warn!("Metric {} not implemented for {}", key, address.name());
}
}
}
}
}
}
}

// Adds relevant metrics shown in a SHOW POOLS admin command.
fn push_pool_stats(lines: &mut Vec<String>) {
let pool_stats = get_pool_stats();
for (pool, stats) in pool_stats.iter() {
for (name, value) in stats.iter() {
if let Some(prometheus_metric) = PrometheusMetric::<i64>::from_pool(pool, name, *value)
{
lines.push(prometheus_metric.to_string());
} else {
warn!(
"Metric {} not implemented for ({},{})",
name, pool.0, pool.1
);
}
}
}
}

// Adds relevant metrics shown in a SHOW DATABASES admin command.
fn push_database_stats(lines: &mut Vec<String>) {
for (_, pool) in get_all_pools() {
let pool_config = pool.settings.clone();
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let pool_state = pool.pool_state(shard, server);

let metrics = vec![
("pool_size", pool_config.user.pool_size),
("current_connections", pool_state.connections),
];
for (key, value) in metrics {
if let Some(prometheus_metric) =
PrometheusMetric::<u32>::from_database_info(address, key, value)
{
lines.push(prometheus_metric.to_string());
} else {
warn!("Metric {} not implemented for {}", key, address.name());
}
}
}
}
}
}

// Adds relevant metrics shown in a SHOW SERVERS admin command.
fn push_server_stats(lines: &mut Vec<String>) {
let server_stats = get_server_stats();
let mut server_stats_by_addresses = HashMap::<String, ServerInformation>::new();
for (_, info) in server_stats {
server_stats_by_addresses.insert(info.address_name.clone(), info);
}

for (_, pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
if let Some(server_info) = server_stats_by_addresses.get(&address.name()) {
let metrics = [
("bytes_received", server_info.bytes_received),
("bytes_sent", server_info.bytes_sent),
("transaction_count", server_info.transaction_count),
("query_count", server_info.query_count),
("error_count", server_info.error_count),
];
for (key, value) in metrics {
if let Some(prometheus_metric) =
PrometheusMetric::<u64>::from_server_info(address, key, value)
{
lines.push(prometheus_metric.to_string());
} else {
warn!("Metric {} not implemented for {}", key, address.name());
}
}
}
}
}
}
}

pub async fn start_metric_server(http_addr: SocketAddr) {
let http_service_factory =
make_service_fn(|_conn| async { Ok::<_, hyper::Error>(service_fn(prometheus_stats)) });
Expand Down