Skip to content

Add SHOW CLIENTS / SHOW SERVERS + Stats refactor and tests #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 155 additions & 42 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
use bytes::{Buf, BufMut, BytesMut};
use log::{info, trace};
use std::collections::HashMap;
use tokio::time::Instant;

use crate::config::{get_config, reload_config, VERSION};
use crate::errors::Error;
use crate::messages::*;
use crate::pool::get_all_pools;
use crate::stats::get_stats;
use crate::stats::{
get_address_stats, get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState,
};
use crate::ClientServerMap;

pub fn generate_server_info_for_admin() -> BytesMut {
Expand Down Expand Up @@ -72,6 +75,14 @@ where
trace!("SHOW POOLS");
show_pools(stream).await
}
"CLIENTS" => {
trace!("SHOW CLIENTS");
show_clients(stream).await
}
"SERVERS" => {
trace!("SHOW SERVERS");
show_servers(stream).await
}
"STATS" => {
trace!("SHOW STATS");
show_stats(stream).await
Expand All @@ -91,7 +102,8 @@ async fn show_lists<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let stats = get_stats();
let client_stats = get_client_stats();
let server_stats = get_server_stats();

let columns = vec![("list", DataType::Text), ("items", DataType::Int4)];

Expand All @@ -111,18 +123,18 @@ where
res.put(data_row(&vec!["pools".to_string(), databases.to_string()]));
res.put(data_row(&vec![
"free_clients".to_string(),
stats
client_stats
.keys()
.map(|address_id| stats[&address_id]["cl_idle"])
.sum::<i64>()
.filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Idle)
.count()
.to_string(),
]));
res.put(data_row(&vec![
"used_clients".to_string(),
stats
client_stats
.keys()
.map(|address_id| stats[&address_id]["cl_active"])
.sum::<i64>()
.filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Active)
.count()
.to_string(),
]));
res.put(data_row(&vec![
Expand All @@ -131,18 +143,18 @@ where
]));
res.put(data_row(&vec![
"free_servers".to_string(),
stats
server_stats
.keys()
.map(|address_id| stats[&address_id]["sv_idle"])
.sum::<i64>()
.filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Idle)
.count()
.to_string(),
]));
res.put(data_row(&vec![
"used_servers".to_string(),
stats
server_stats
.keys()
.map(|address_id| stats[&address_id]["sv_active"])
.sum::<i64>()
.filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Active)
.count()
.to_string(),
]));
res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()]));
Expand Down Expand Up @@ -182,11 +194,12 @@ async fn show_pools<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let stats = get_stats();
let all_pool_stats = get_pool_stats();

let columns = vec![
("database", DataType::Text),
("user", DataType::Text),
("pool_mode", DataType::Text),
("cl_idle", DataType::Numeric),
("cl_active", DataType::Numeric),
("cl_waiting", DataType::Numeric),
Expand All @@ -198,32 +211,27 @@ where
("sv_login", DataType::Numeric),
("maxwait", DataType::Numeric),
("maxwait_us", DataType::Numeric),
("pool_mode", DataType::Text),
];

let mut res = BytesMut::new();
res.put(row_description(&columns));
for (_, pool) in get_all_pools() {
let pool_config = &pool.settings;
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let stats = match stats.get(&address.id) {
Some(stats) => stats.clone(),
None => HashMap::new(),
};
for ((pool_name, username), pool) in get_all_pools() {
let def = HashMap::default();
let pool_stats = all_pool_stats
.get(&(pool_name.clone(), username.clone()))
.unwrap_or(&def);

let mut row = vec![address.name(), pool_config.user.username.clone()];

for column in &columns[2..columns.len() - 1] {
let value = stats.get(column.0).unwrap_or(&0).to_string();
row.push(value);
}

row.push(pool_config.pool_mode.to_string());
res.put(data_row(&row));
}
let pool_config = &pool.settings;
let mut row = vec![
pool_name.clone(),
username.clone(),
pool_config.pool_mode.to_string(),
];
for column in &columns[3..columns.len()] {
let value = pool_stats.get(column.0).unwrap_or(&0).to_string();
row.push(value);
}
res.put(data_row(&row));
}

res.put(command_complete("SHOW"));
Expand Down Expand Up @@ -387,6 +395,7 @@ where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let columns = vec![
("instance", DataType::Text),
("database", DataType::Text),
("user", DataType::Text),
("total_xact_count", DataType::Numeric),
Expand All @@ -396,32 +405,32 @@ where
("total_xact_time", DataType::Numeric),
("total_query_time", DataType::Numeric),
("total_wait_time", DataType::Numeric),
("total_errors", DataType::Numeric),
("avg_xact_count", DataType::Numeric),
("avg_query_count", DataType::Numeric),
("avg_recv", DataType::Numeric),
("avg_sent", DataType::Numeric),
("avg_errors", DataType::Numeric),
("avg_xact_time", DataType::Numeric),
("avg_query_time", DataType::Numeric),
("avg_wait_time", DataType::Numeric),
];

let stats = get_stats();
let all_stats = get_address_stats();
let mut res = BytesMut::new();
res.put(row_description(&columns));

for ((_db_name, username), pool) in get_all_pools() {
for ((db, username), pool) in get_all_pools() {
for shard in 0..pool.shards() {
for server in 0..pool.servers(shard) {
let address = pool.address(shard, server);
let stats = match stats.get(&address.id) {
let stats = match all_stats.get(&address.id) {
Some(stats) => stats.clone(),
None => HashMap::new(),
};

let mut row = vec![address.name()];
row.push(username.clone());

for column in &columns[2..] {
let mut row = vec![address.name(), db.clone(), username.clone()];
for column in &columns[3..] {
row.push(stats.get(column.0).unwrap_or(&0).to_string());
}

Expand All @@ -439,3 +448,107 @@ where

write_all_half(stream, res).await
}

/// Show currently connected clients
async fn show_clients<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let columns = vec![
("client_id", DataType::Text),
("database", DataType::Text),
("user", DataType::Text),
("application_name", DataType::Text),
("state", DataType::Text),
("transaction_count", DataType::Numeric),
("query_count", DataType::Numeric),
("error_count", DataType::Numeric),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errors are pooler errors encountered by the client. In this PR I introduce two kinds

  • Checkout Failure
  • Server Connection Banned while client is talking to it

("age_seconds", DataType::Numeric),
];

let new_map = get_client_stats();
let mut res = BytesMut::new();
res.put(row_description(&columns));

for (_, client) in new_map {
let row = vec![
format!("{:#010X}", client.client_id),
client.pool_name,
client.username,
client.application_name.clone(),
client.state.to_string(),
client.transaction_count.to_string(),
client.query_count.to_string(),
client.error_count.to_string(),
Instant::now()
.duration_since(client.connect_time)
.as_secs()
.to_string(),
];

res.put(data_row(&row));
}

res.put(command_complete("SHOW"));

// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');

write_all_half(stream, res).await
}

/// Show currently connected servers
async fn show_servers<T>(stream: &mut T) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let columns = vec![
("server_id", DataType::Text),
("database_name", DataType::Text),
("user", DataType::Text),
("address_id", DataType::Text),
("application_name", DataType::Text),
("state", DataType::Text),
("transaction_count", DataType::Numeric),
("query_count", DataType::Numeric),
("bytes_sent", DataType::Numeric),
("bytes_received", DataType::Numeric),
("age_seconds", DataType::Numeric),
];

let new_map = get_server_stats();
let mut res = BytesMut::new();
res.put(row_description(&columns));

for (_, server) in new_map {
let row = vec![
format!("{:#010X}", server.server_id),
server.pool_name,
server.username,
server.address_name,
server.application_name,
server.state.to_string(),
server.transaction_count.to_string(),
server.query_count.to_string(),
server.bytes_sent.to_string(),
server.bytes_received.to_string(),
Instant::now()
.duration_since(server.connect_time)
.as_secs()
.to_string(),
];

res.put(data_row(&row));
}

res.put(command_complete("SHOW"));

// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');

write_all_half(stream, res).await
}
Loading