From 0561cac09b2ac8dab7f49487b4814736ee11dbf9 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 4 Sep 2022 17:59:47 -0500 Subject: [PATCH 01/25] wip --- src/admin.rs | 56 +++++++++++++++++++- src/client.rs | 23 +++++--- src/main.rs | 4 +- src/stats.rs | 141 ++++++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 204 insertions(+), 20 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 4576d168..2c720324 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -7,7 +7,7 @@ use crate::config::{get_config, reload_config, VERSION}; use crate::errors::Error; use crate::messages::*; use crate::pool::get_all_pools; -use crate::stats::get_stats; +use crate::stats::{get_stats, CLIENT_STATES}; use crate::ClientServerMap; pub fn generate_server_info_for_admin() -> BytesMut { @@ -72,6 +72,10 @@ where trace!("SHOW POOLS"); show_pools(stream).await } + "CLIENTS" => { + trace!("SHOW CLIENTS"); + show_clients(stream).await + } "STATS" => { trace!("SHOW STATS"); show_stats(stream).await @@ -439,3 +443,53 @@ where write_all_half(stream, res).await } + +/// Show shard and replicas statistics. +async fn show_clients(stream: &mut T) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ + let columns = vec![ + ("database", DataType::Text), + ("user", DataType::Text), + ("application_name", DataType::Text), + ("bytes_sent", DataType::Numeric), + ("bytes_received", DataType::Numeric), + ("transaction_count", DataType::Numeric), + ("query_count", DataType::Numeric), + ("error_count", DataType::Numeric), + ]; + + let new_map = { + let guard = CLIENT_STATES.read(); + let x = guard.clone(); + x + }; + + let mut res = BytesMut::new(); + res.put(row_description(&columns)); + + for (_, client) in new_map { + let row = vec![ + client.pool_name, + client.username, + client.application_name, + client.bytes_sent.to_string(), + client.bytes_received.to_string(), + client.transaction_count.to_string(), + client.query_count.to_string(), + client.error_count.to_string(), + ]; + + res.put(data_row(&row)); + } + + res.put(command_complete("SHOW")); + + // ReadyForQuery + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, res).await +} diff --git a/src/client.rs b/src/client.rs index 3ce4afb9..2723766c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -577,7 +577,14 @@ where // The query router determines where the query is going to go, // e.g. primary, replica, which shard. let mut query_router = QueryRouter::new(); - + if !self.admin { + self.stats.register_client( + self.process_id, + self.pool_name.clone(), + self.username.clone(), + self.application_name.clone(), + ); + } // Our custom protocol loop. // We expect the client to either start a transaction with regular queries // or issue commands for our sharding and server selection protocol. @@ -611,13 +618,6 @@ where message_result = read_message(&mut self.read) => message_result? }; - // Handle admin database queries. - if self.admin { - debug!("Handling admin command"); - handle_admin(&mut self.write, message, self.client_server_map.clone()).await?; - continue; - } - match message[0] as char { // Buffer extended protocol messages even if we do not have // a server connection yet. Hopefully, when we get the S message @@ -637,6 +637,13 @@ where _ => (), } + // Handle admin database queries. + if self.admin { + debug!("Handling admin command"); + handle_admin(&mut self.write, message, self.client_server_map.clone()).await?; + continue; + } + // Get a pool instance referenced by the most up-to-date // pointer. This ensures we always read the latest config // when starting a query. diff --git a/src/main.rs b/src/main.rs index a0c1d7cf..906ddcd4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -184,7 +184,7 @@ async fn main() { info!("Waiting for clients"); let mut admin_only = false; - let mut total_clients = 0; + let mut total_clients = 0 as i16; loop { tokio::select! { @@ -286,7 +286,7 @@ async fn main() { client_ping = drain_rx.recv() => { let client_ping = client_ping.unwrap(); - total_clients += client_ping; + total_clients += client_ping as i16; if total_clients == 0 && admin_only { let _ = exit_tx.send(()).await; diff --git a/src/stats.rs b/src/stats.rs index fde4071b..787465a7 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,14 +1,44 @@ use arc_swap::ArcSwap; /// Statistics and reporting. -use log::{error, info, trace}; +use log::{debug, error, info, trace, warn}; use once_cell::sync::Lazy; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use std::collections::HashMap; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{channel, Receiver, Sender}; use crate::pool::get_number_of_addresses; +pub static CLIENT_STATES: Lazy>> = + Lazy::new(|| RwLock::new(HashMap::new())); + +pub static SERVER_STATES: Lazy>>> = + Lazy::new(|| RwLock::new(HashMap::new())); + +#[derive(Debug, Clone, Copy)] +pub enum ClientState { + ClientWaiting, + ClientActive, + ClientIdle, +} + +#[derive(Debug, Clone)] +pub struct ClientInformation { + /// The name of the event being reported. + pub state: ClientState, + pub process_id: i32, + + pub application_name: String, + pub username: String, + pub pool_name: String, + + pub bytes_sent: u64, + pub bytes_received: u64, + pub transaction_count: u64, + pub query_count: u64, + pub error_count: u64, +} + pub static REPORTER: Lazy> = Lazy::new(|| ArcSwap::from_pointee(Reporter::default())); @@ -22,13 +52,14 @@ static STAT_PERIOD: u64 = 15000; /// The names for the events reported /// to the statistics collector. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] enum EventName { CheckoutTime, Query, Transaction, DataSent, DataReceived, + ClientRegistered(String, String, String), ClientWaiting, ClientActive, ClientIdle, @@ -82,7 +113,7 @@ impl Reporter { /// Send statistics to the task keeping track of stats. fn send(&self, event: Event) { - let name = event.name; + let name = event.name.clone(); let result = self.tx.try_send(event); match result { @@ -99,6 +130,27 @@ impl Reporter { }; } + pub fn register_client( + &self, + process_id: i32, + pool_name: String, + username: String, + app_name: String, + ) { + let event = Event { + name: EventName::ClientRegistered( + pool_name.clone(), + username.clone(), + app_name.clone(), + ), + value: 1, + process_id: process_id, + address_id: 0, + }; + + self.send(event); + } + /// Report a query executed by a client against /// a server identified by the `address_id`. pub fn query(&self, process_id: i32, address_id: usize) { @@ -403,21 +455,41 @@ impl Collector { // Some are counters, some are gauges... match stat.name { EventName::Query => { + let mut guard = CLIENT_STATES.write(); + match guard.get_mut(&stat.process_id) { + Some(client) => client.query_count += stat.value as u64, + None => (), + } let counter = stats.entry("total_query_count").or_insert(0); *counter += stat.value; } EventName::Transaction => { + let mut guard = CLIENT_STATES.write(); + match guard.get_mut(&stat.process_id) { + Some(client) => client.transaction_count += stat.value as u64, + None => (), + } let counter = stats.entry("total_xact_count").or_insert(0); *counter += stat.value; } EventName::DataSent => { + let mut guard = CLIENT_STATES.write(); + match guard.get_mut(&stat.process_id) { + Some(client) => client.bytes_sent += stat.value as u64, + None => (), + } let counter = stats.entry("total_sent").or_insert(0); *counter += stat.value; } EventName::DataReceived => { + let mut guard = CLIENT_STATES.write(); + match guard.get_mut(&stat.process_id) { + Some(client) => client.bytes_received += stat.value as u64, + None => (), + } let counter = stats.entry("total_received").or_insert(0); *counter += stat.value; } @@ -442,17 +514,68 @@ impl Collector { } } - EventName::ClientActive - | EventName::ClientWaiting - | EventName::ClientIdle - | EventName::ServerActive + EventName::ClientRegistered(pool_name, username, app_name) => { + let mut guard = CLIENT_STATES.write(); + match guard.get_mut(&stat.process_id) { + Some(client_state) => { + warn!("Client double registered!"); + } + + None => { + guard.insert( + stat.process_id, + ClientInformation { + state: ClientState::ClientIdle, + process_id: stat.process_id, + pool_name: pool_name.clone(), + username: username.clone(), + application_name: app_name.clone(), + bytes_sent: 0, + bytes_received: 0, + transaction_count: 0, + query_count: 0, + error_count: 0, + }, + ); + } + }; + } + + EventName::ClientActive | EventName::ClientWaiting | EventName::ClientIdle => { + client_server_states.insert(stat.process_id, stat.name.clone()); + let new_state = match stat.name { + EventName::ClientActive => ClientState::ClientActive, + EventName::ClientWaiting => ClientState::ClientWaiting, + EventName::ClientIdle => ClientState::ClientIdle, + _ => unreachable!(), + }; + + let mut guard = CLIENT_STATES.write(); + match guard.get_mut(&stat.process_id) { + Some(client_state) => { + client_state.state = new_state; + } + + None => { + warn!("Stats on unregistered client!"); + } + }; + } + + EventName::ClientDisconnecting => { + client_server_states.remove(&stat.process_id); + let mut guard = CLIENT_STATES.write(); + guard.remove(&stat.process_id); + } + + EventName::ServerActive | EventName::ServerIdle | EventName::ServerTested | EventName::ServerLogin => { client_server_states.insert(stat.process_id, stat.name); } - EventName::ClientDisconnecting | EventName::ServerDisconnecting => { + EventName::ServerDisconnecting => { client_server_states.remove(&stat.process_id); } From d512e1d664f2cd5e72d0fa35e2c5462f9c58d01c Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 4 Sep 2022 20:10:25 -0500 Subject: [PATCH 02/25] Main Thread Panic when swarmed with clients --- .circleci/run_tests.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 6ffef8ba..c036429f 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -37,6 +37,9 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica start_pgcat "info" +# Test number of clients +PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -p 6432 -U sharding_user sharded_db -c 16000 + # Check that prometheus is running curl --fail localhost:9930/metrics From 62e486b7efd83cb274988a99678976eb80b20be3 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 4 Sep 2022 20:15:49 -0500 Subject: [PATCH 03/25] fix --- src/client.rs | 2 +- src/main.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 3ce4afb9..d082b756 100644 --- a/src/client.rs +++ b/src/client.rs @@ -95,7 +95,7 @@ pub async fn client_entrypoint( mut stream: TcpStream, client_server_map: ClientServerMap, shutdown: Receiver<()>, - drain: Sender, + drain: Sender, admin_only: bool, ) -> Result<(), Error> { // Figure out if the client wants TLS or not. diff --git a/src/main.rs b/src/main.rs index a0c1d7cf..7eb2dddf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -178,7 +178,7 @@ async fn main() { let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap(); let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap(); let (shutdown_tx, _) = broadcast::channel::<()>(1); - let (drain_tx, mut drain_rx) = mpsc::channel::(2048); + let (drain_tx, mut drain_rx) = mpsc::channel::(2048); let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1); info!("Waiting for clients"); From 1de08e02cc36c2daac67dd6814a07bdb14b959e8 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 4 Sep 2022 20:19:47 -0500 Subject: [PATCH 04/25] fix --- .circleci/run_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index c036429f..2b526ace 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -38,7 +38,7 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica start_pgcat "info" # Test number of clients -PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -p 6432 -U sharding_user sharded_db -c 16000 +PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -p 6432 -U sharding_user sharded_db -c 4096 # Check that prometheus is running curl --fail localhost:9930/metrics From 9fbdd61ed12d6ca17c765c140e9c3a0118380b48 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 4 Sep 2022 20:24:46 -0500 Subject: [PATCH 05/25] 1024 --- .circleci/run_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 2b526ace..308f1794 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -38,7 +38,7 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica start_pgcat "info" # Test number of clients -PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -p 6432 -U sharding_user sharded_db -c 4096 +PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -p 6432 -U sharding_user sharded_db -c 1024 # Check that prometheus is running curl --fail localhost:9930/metrics From 75c9cff017871fd72acc2994a26bde78bfda1a87 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 4 Sep 2022 20:40:12 -0500 Subject: [PATCH 06/25] fix --- .circleci/run_tests.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 308f1794..8d26a0d6 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -38,7 +38,7 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica start_pgcat "info" # Test number of clients -PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -p 6432 -U sharding_user sharded_db -c 1024 +PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -p 6432 -U sharding_user sharded_db -c 256 # Check that prometheus is running curl --fail localhost:9930/metrics From fd69c0b0101017f2f4fe838c728076de8ee97bd3 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 4 Sep 2022 20:43:51 -0500 Subject: [PATCH 07/25] remove test --- .circleci/run_tests.sh | 3 --- 1 file changed, 3 deletions(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 8d26a0d6..6ffef8ba 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -37,9 +37,6 @@ toxiproxy-cli create -l 127.0.0.1:5433 -u 127.0.0.1:5432 postgres_replica start_pgcat "info" -# Test number of clients -PGPASSWORD=sharding_user pgbench -h 127.0.0.1 -p 6432 -U sharding_user sharded_db -c 256 - # Check that prometheus is running curl --fail localhost:9930/metrics From fa6b0bc10c6a8d546b8afebf1bbea44c141c1303 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 4 Sep 2022 22:29:34 -0500 Subject: [PATCH 08/25] Add SHOW CLIENTS --- src/admin.rs | 7 ++++--- src/stats.rs | 29 ++++++++++++++--------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 2c720324..31e49f01 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -444,12 +444,13 @@ where write_all_half(stream, res).await } -/// Show shard and replicas statistics. +/// Show currently connected clients async fn show_clients(stream: &mut T) -> Result<(), Error> where T: tokio::io::AsyncWrite + std::marker::Unpin, { let columns = vec![ + ("client_id", DataType::Text), ("database", DataType::Text), ("user", DataType::Text), ("application_name", DataType::Text), @@ -462,8 +463,7 @@ where let new_map = { let guard = CLIENT_STATES.read(); - let x = guard.clone(); - x + guard.clone() }; let mut res = BytesMut::new(); @@ -471,6 +471,7 @@ where for (_, client) in new_map { let row = vec![ + format!("{:#08X}", client.process_id), client.pool_name, client.username, client.application_name, diff --git a/src/stats.rs b/src/stats.rs index 787465a7..c67e676e 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -12,9 +12,18 @@ use crate::pool::get_number_of_addresses; pub static CLIENT_STATES: Lazy>> = Lazy::new(|| RwLock::new(HashMap::new())); -pub static SERVER_STATES: Lazy>>> = - Lazy::new(|| RwLock::new(HashMap::new())); +pub static REPORTER: Lazy> = + Lazy::new(|| ArcSwap::from_pointee(Reporter::default())); + +/// Latest stats updated every second; used in SHOW STATS and other admin commands. +static LATEST_STATS: Lazy>>> = + Lazy::new(|| Mutex::new(HashMap::new())); +/// Statistics period used for average calculations. +/// 15 seconds. +static STAT_PERIOD: u64 = 15000; + +/// The various states that a client can be in #[derive(Debug, Clone, Copy)] pub enum ClientState { ClientWaiting, @@ -22,9 +31,10 @@ pub enum ClientState { ClientIdle, } + +/// Information we keep track off which can be queried by SHOW CLIENTS #[derive(Debug, Clone)] pub struct ClientInformation { - /// The name of the event being reported. pub state: ClientState, pub process_id: i32, @@ -39,17 +49,6 @@ pub struct ClientInformation { pub error_count: u64, } -pub static REPORTER: Lazy> = - Lazy::new(|| ArcSwap::from_pointee(Reporter::default())); - -/// Latest stats updated every second; used in SHOW STATS and other admin commands. -static LATEST_STATS: Lazy>>> = - Lazy::new(|| Mutex::new(HashMap::new())); - -/// Statistics period used for average calculations. -/// 15 seconds. -static STAT_PERIOD: u64 = 15000; - /// The names for the events reported /// to the statistics collector. #[derive(Debug, Clone)] @@ -517,7 +516,7 @@ impl Collector { EventName::ClientRegistered(pool_name, username, app_name) => { let mut guard = CLIENT_STATES.write(); match guard.get_mut(&stat.process_id) { - Some(client_state) => { + Some(_) => { warn!("Client double registered!"); } From 3771ea753f4b51226d043965e8a2f28ff782b35f Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 4 Sep 2022 22:33:42 -0500 Subject: [PATCH 09/25] revert --- src/client.rs | 2 +- src/main.rs | 6 +++--- src/stats.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index 7064aeb8..2723766c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -95,7 +95,7 @@ pub async fn client_entrypoint( mut stream: TcpStream, client_server_map: ClientServerMap, shutdown: Receiver<()>, - drain: Sender, + drain: Sender, admin_only: bool, ) -> Result<(), Error> { // Figure out if the client wants TLS or not. diff --git a/src/main.rs b/src/main.rs index bf7d6758..a0c1d7cf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -178,13 +178,13 @@ async fn main() { let mut interrupt_signal = unix_signal(SignalKind::interrupt()).unwrap(); let mut sighup_signal = unix_signal(SignalKind::hangup()).unwrap(); let (shutdown_tx, _) = broadcast::channel::<()>(1); - let (drain_tx, mut drain_rx) = mpsc::channel::(2048); + let (drain_tx, mut drain_rx) = mpsc::channel::(2048); let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1); info!("Waiting for clients"); let mut admin_only = false; - let mut total_clients = 0 as i16; + let mut total_clients = 0; loop { tokio::select! { @@ -286,7 +286,7 @@ async fn main() { client_ping = drain_rx.recv() => { let client_ping = client_ping.unwrap(); - total_clients += client_ping as i16; + total_clients += client_ping; if total_clients == 0 && admin_only { let _ = exit_tx.send(()).await; diff --git a/src/stats.rs b/src/stats.rs index c67e676e..c632fa59 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,6 +1,6 @@ use arc_swap::ArcSwap; /// Statistics and reporting. -use log::{debug, error, info, trace, warn}; +use log::{error, info, trace, warn}; use once_cell::sync::Lazy; use parking_lot::{Mutex, RwLock}; use std::collections::HashMap; From e7d5bc3f6edf6cee4f643946771a66f2685648e5 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 4 Sep 2022 22:35:27 -0500 Subject: [PATCH 10/25] fmt --- src/admin.rs | 2 +- src/stats.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 31e49f01..bba75234 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -450,7 +450,7 @@ where T: tokio::io::AsyncWrite + std::marker::Unpin, { let columns = vec![ - ("client_id", DataType::Text), + ("client_id", DataType::Text), ("database", DataType::Text), ("user", DataType::Text), ("application_name", DataType::Text), diff --git a/src/stats.rs b/src/stats.rs index c632fa59..a88781c9 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -31,7 +31,6 @@ pub enum ClientState { ClientIdle, } - /// Information we keep track off which can be queried by SHOW CLIENTS #[derive(Debug, Clone)] pub struct ClientInformation { From a45ba131f57237e2827dca607a856deb72998d12 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Fri, 9 Sep 2022 03:40:19 -0500 Subject: [PATCH 11/25] Refactor + tests --- src/admin.rs | 14 ++----- src/client.rs | 32 ++++++--------- src/stats.rs | 61 ++++++++++++++--------------- tests/ruby/helpers/pgcat_process.rb | 2 +- 4 files changed, 44 insertions(+), 65 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index bba75234..ab0d50f1 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -7,7 +7,7 @@ use crate::config::{get_config, reload_config, VERSION}; use crate::errors::Error; use crate::messages::*; use crate::pool::get_all_pools; -use crate::stats::{get_stats, CLIENT_STATES}; +use crate::stats::{get_stats, get_client_stats}; use crate::ClientServerMap; pub fn generate_server_info_for_admin() -> BytesMut { @@ -454,18 +454,12 @@ where ("database", DataType::Text), ("user", DataType::Text), ("application_name", DataType::Text), - ("bytes_sent", DataType::Numeric), - ("bytes_received", DataType::Numeric), ("transaction_count", DataType::Numeric), ("query_count", DataType::Numeric), ("error_count", DataType::Numeric), ]; - let new_map = { - let guard = CLIENT_STATES.read(); - guard.clone() - }; - + let new_map = get_client_stats(); let mut res = BytesMut::new(); res.put(row_description(&columns)); @@ -474,9 +468,7 @@ where format!("{:#08X}", client.process_id), client.pool_name, client.username, - client.application_name, - client.bytes_sent.to_string(), - client.bytes_received.to_string(), + client.application_name.clone(), client.transaction_count.to_string(), client.query_count.to_string(), client.error_count.to_string(), diff --git a/src/client.rs b/src/client.rs index 2723766c..48a282dc 100644 --- a/src/client.rs +++ b/src/client.rs @@ -577,14 +577,13 @@ where // The query router determines where the query is going to go, // e.g. primary, replica, which shard. let mut query_router = QueryRouter::new(); - if !self.admin { - self.stats.register_client( - self.process_id, - self.pool_name.clone(), - self.username.clone(), - self.application_name.clone(), - ); - } + self.stats.register_client( + self.process_id, + self.pool_name.clone(), + self.username.clone(), + self.application_name.clone(), + ); + // Our custom protocol loop. // We expect the client to either start a transaction with regular queries // or issue commands for our sharding and server selection protocol. @@ -769,11 +768,7 @@ where server.claim(self.process_id, self.secret_key); self.connected_to_server = true; - // Update statistics. - if let Some(last_address_id) = self.last_address_id { - self.stats - .client_disconnecting(self.process_id, last_address_id); - } + // Update statistics self.stats.client_active(self.process_id, address.id); self.last_address_id = Some(address.id); @@ -1079,14 +1074,9 @@ impl Drop for Client { // Dirty shutdown // TODO: refactor, this is not the best way to handle state management. - if let Some(address_id) = self.last_address_id { - self.stats.client_disconnecting(self.process_id, address_id); - - if self.connected_to_server { - if let Some(process_id) = self.last_server_id { - self.stats.server_idle(process_id, address_id); - } - } + self.stats.client_disconnecting(self.process_id, 0); + if self.connected_to_server && self.last_server_id.is_some() && self.last_address_id.is_some() { + self.stats.server_idle(self.last_server_id.unwrap(), self.last_address_id.unwrap()); } } } diff --git a/src/stats.rs b/src/stats.rs index a88781c9..5dc4837c 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -2,15 +2,19 @@ use arc_swap::ArcSwap; /// Statistics and reporting. use log::{error, info, trace, warn}; use once_cell::sync::Lazy; -use parking_lot::{Mutex, RwLock}; +use parking_lot::{Mutex}; use std::collections::HashMap; +use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{channel, Receiver, Sender}; use crate::pool::get_number_of_addresses; -pub static CLIENT_STATES: Lazy>> = - Lazy::new(|| RwLock::new(HashMap::new())); +type ClientInformationLookup = HashMap; + +/// Latest client stats updated every second; used in SHOW CLIENTS. +static LATEST_CLIENT_STATS: Lazy> = + Lazy::new(|| ArcSwap::from_pointee(ClientInformationLookup::default())); pub static REPORTER: Lazy> = Lazy::new(|| ArcSwap::from_pointee(Reporter::default())); @@ -41,8 +45,6 @@ pub struct ClientInformation { pub username: String, pub pool_name: String, - pub bytes_sent: u64, - pub bytes_received: u64, pub transaction_count: u64, pub query_count: u64, pub error_count: u64, @@ -112,13 +114,14 @@ impl Reporter { /// Send statistics to the task keeping track of stats. fn send(&self, event: Event) { let name = event.name.clone(); - let result = self.tx.try_send(event); + let result = self.tx.try_send(event.clone()); match result { Ok(_) => trace!( - "{:?} event reported successfully, capacity: {}", + "{:?} event reported successfully, capacity: {} {:?}", name, - self.tx.capacity() + self.tx.capacity(), + event ), Err(err) => match err { @@ -261,7 +264,7 @@ impl Reporter { name: EventName::ClientDisconnecting, value: 1, process_id: process_id, - address_id: address_id, + address_id: address_id, // No used }; self.send(event) @@ -395,6 +398,9 @@ impl Collector { // Track which state the client and server are at any given time. let mut client_server_states: HashMap> = HashMap::new(); + let mut client_states= ClientInformationLookup::default(); + + // Flush stats to StatsD and calculate averages every 15 seconds. let tx = self.tx.clone(); tokio::task::spawn(async move { @@ -453,8 +459,7 @@ impl Collector { // Some are counters, some are gauges... match stat.name { EventName::Query => { - let mut guard = CLIENT_STATES.write(); - match guard.get_mut(&stat.process_id) { + match client_states.get_mut(&stat.process_id) { Some(client) => client.query_count += stat.value as u64, None => (), } @@ -463,8 +468,7 @@ impl Collector { } EventName::Transaction => { - let mut guard = CLIENT_STATES.write(); - match guard.get_mut(&stat.process_id) { + match client_states.get_mut(&stat.process_id) { Some(client) => client.transaction_count += stat.value as u64, None => (), } @@ -473,21 +477,11 @@ impl Collector { } EventName::DataSent => { - let mut guard = CLIENT_STATES.write(); - match guard.get_mut(&stat.process_id) { - Some(client) => client.bytes_sent += stat.value as u64, - None => (), - } let counter = stats.entry("total_sent").or_insert(0); *counter += stat.value; } EventName::DataReceived => { - let mut guard = CLIENT_STATES.write(); - match guard.get_mut(&stat.process_id) { - Some(client) => client.bytes_received += stat.value as u64, - None => (), - } let counter = stats.entry("total_received").or_insert(0); *counter += stat.value; } @@ -513,14 +507,13 @@ impl Collector { } EventName::ClientRegistered(pool_name, username, app_name) => { - let mut guard = CLIENT_STATES.write(); - match guard.get_mut(&stat.process_id) { + match client_states.get_mut(&stat.process_id) { Some(_) => { warn!("Client double registered!"); } None => { - guard.insert( + client_states.insert( stat.process_id, ClientInformation { state: ClientState::ClientIdle, @@ -528,8 +521,6 @@ impl Collector { pool_name: pool_name.clone(), username: username.clone(), application_name: app_name.clone(), - bytes_sent: 0, - bytes_received: 0, transaction_count: 0, query_count: 0, error_count: 0, @@ -548,8 +539,7 @@ impl Collector { _ => unreachable!(), }; - let mut guard = CLIENT_STATES.write(); - match guard.get_mut(&stat.process_id) { + match client_states.get_mut(&stat.process_id) { Some(client_state) => { client_state.state = new_state; } @@ -562,8 +552,7 @@ impl Collector { EventName::ClientDisconnecting => { client_server_states.remove(&stat.process_id); - let mut guard = CLIENT_STATES.write(); - guard.remove(&stat.process_id); + client_states.remove(&stat.process_id); } EventName::ServerActive @@ -627,6 +616,8 @@ impl Collector { entry.insert(key.to_string(), value.clone()); } + LATEST_CLIENT_STATS.store(Arc::new(client_states.clone())); + // These are re-calculated every iteration of the loop, so we don't want to add values // from the last iteration. for stat in &[ @@ -679,6 +670,12 @@ pub fn get_stats() -> HashMap> { LATEST_STATS.lock().clone() } +/// Get a snapshot of client statistics. Updated once a second +/// by the `Collector`. +pub fn get_client_stats() -> ClientInformationLookup { + (*(*LATEST_CLIENT_STATS.load())).clone() +} + /// Get the statistics reporter used to update stats across the pools/clients. pub fn get_reporter() -> Reporter { (*(*REPORTER.load())).clone() diff --git a/tests/ruby/helpers/pgcat_process.rb b/tests/ruby/helpers/pgcat_process.rb index a5a6d3d3..b6e798ae 100644 --- a/tests/ruby/helpers/pgcat_process.rb +++ b/tests/ruby/helpers/pgcat_process.rb @@ -111,6 +111,6 @@ def example_connection_string username = cfg["pools"][first_pool_name]["users"]["0"]["username"] password = cfg["pools"][first_pool_name]["users"]["0"]["password"] - "postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}" + "postgresql://#{username}:#{password}@0.0.0.0:#{@port}/#{db_name}?application_name=example_app" end end From 6df4a5e19bab40961d06dc7d7aa7d41ccf4ef27b Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Fri, 9 Sep 2022 03:40:38 -0500 Subject: [PATCH 12/25] fmt --- src/admin.rs | 2 +- src/client.rs | 8 ++++++-- src/stats.rs | 5 ++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index ab0d50f1..7aace675 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -7,7 +7,7 @@ use crate::config::{get_config, reload_config, VERSION}; use crate::errors::Error; use crate::messages::*; use crate::pool::get_all_pools; -use crate::stats::{get_stats, get_client_stats}; +use crate::stats::{get_client_stats, get_stats}; use crate::ClientServerMap; pub fn generate_server_info_for_admin() -> BytesMut { diff --git a/src/client.rs b/src/client.rs index 48a282dc..0ef62fc9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1075,8 +1075,12 @@ impl Drop for Client { // Dirty shutdown // TODO: refactor, this is not the best way to handle state management. self.stats.client_disconnecting(self.process_id, 0); - if self.connected_to_server && self.last_server_id.is_some() && self.last_address_id.is_some() { - self.stats.server_idle(self.last_server_id.unwrap(), self.last_address_id.unwrap()); + if self.connected_to_server + && self.last_server_id.is_some() + && self.last_address_id.is_some() + { + self.stats + .server_idle(self.last_server_id.unwrap(), self.last_address_id.unwrap()); } } } diff --git a/src/stats.rs b/src/stats.rs index 5dc4837c..efc7dc40 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -2,7 +2,7 @@ use arc_swap::ArcSwap; /// Statistics and reporting. use log::{error, info, trace, warn}; use once_cell::sync::Lazy; -use parking_lot::{Mutex}; +use parking_lot::Mutex; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError; @@ -398,8 +398,7 @@ impl Collector { // Track which state the client and server are at any given time. let mut client_server_states: HashMap> = HashMap::new(); - let mut client_states= ClientInformationLookup::default(); - + let mut client_states = ClientInformationLookup::default(); // Flush stats to StatsD and calculate averages every 15 seconds. let tx = self.tx.clone(); From 2dff989671e001f61228e9cbb31f6759eacccca5 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Fri, 9 Sep 2022 08:27:00 -0500 Subject: [PATCH 13/25] add test --- tests/ruby/admin_spec.rb | 66 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 tests/ruby/admin_spec.rb diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb new file mode 100644 index 00000000..dbdda7f1 --- /dev/null +++ b/tests/ruby/admin_spec.rb @@ -0,0 +1,66 @@ +# frozen_string_literal: true +require_relative 'spec_helper' + +describe "Admin" do + let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) } + + after do + processes.all_databases.map(&:reset) + processes.pgcat.shutdown + end + + describe "SHOW CLIENTS" do + it "reports correct number and application names" do + conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") + connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") } + + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + sleep(1) # Wait for stats to be updated + + results = admin_conn.async_exec("SHOW CLIENTS") + expect(results.count).to eq(21) # count admin clients + expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8) + expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1) + + connections[0..5].map(&:close) + sleep(1) # Wait for stats to be updated + results = admin_conn.async_exec("SHOW CLIENTS") + expect(results.count).to eq(15) + + connections[6..].map(&:close) + sleep(1) # Wait for stats to be updated + expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1) + admin_conn.close + end + + it "reports correct number of queries and transactions" do + conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") + + connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") } + connections.each do |c| + c.async_exec("SELECT 1") + c.async_exec("SELECT 2") + c.async_exec("SELECT 3") + c.async_exec("BEGIN") + c.async_exec("COMMIT") + end + + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + sleep(1) # Wait for stats to be updated + + results = admin_conn.async_exec("SHOW CLIENTS") + expect(results.count).to eq(3) + normal_client_results = results.reject { |r| r["database"] == "pgcat" } + expect(normal_client_results[0]["transaction_count"]).to eq("4") + expect(normal_client_results[1]["transaction_count"]).to eq("4") # Obviously wrong, will fix in a follow up PR + expect(normal_client_results[0]["query_count"]).to eq("5") + expect(normal_client_results[1]["query_count"]).to eq("5") + + # puts processes.pgcat.logs + + admin_conn.close + connections.map(&:close) + end + end + +end From 7b83c69edeb026807346ec4078163d3ad8aebd42 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 01:53:04 -0500 Subject: [PATCH 14/25] Add SHOW SERVERS + Make PR unreviewable --- src/admin.rs | 146 +++-- src/client.rs | 29 +- src/pool.rs | 52 +- src/prometheus.rs | 3 +- src/server.rs | 17 +- src/stats.rs | 871 +++++++++++++++++++---------- tests/ruby/admin_spec.rb | 126 ++++- tests/ruby/helpers/pgcat_helper.rb | 44 ++ 8 files changed, 893 insertions(+), 395 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 7aace675..6e02bd1b 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -2,12 +2,15 @@ use bytes::{Buf, BufMut, BytesMut}; use log::{info, trace}; use std::collections::HashMap; +use tokio::time::Instant; use crate::config::{get_config, reload_config, VERSION}; use crate::errors::Error; use crate::messages::*; use crate::pool::get_all_pools; -use crate::stats::{get_client_stats, get_stats}; +use crate::stats::{ + get_address_stats, get_client_stats, get_pool_stats, get_server_stats, ClientState, ServerState, +}; use crate::ClientServerMap; pub fn generate_server_info_for_admin() -> BytesMut { @@ -76,6 +79,10 @@ where trace!("SHOW CLIENTS"); show_clients(stream).await } + "SERVERS" => { + trace!("SHOW SERVERS"); + show_servers(stream).await + } "STATS" => { trace!("SHOW STATS"); show_stats(stream).await @@ -95,7 +102,8 @@ async fn show_lists(stream: &mut T) -> Result<(), Error> where T: tokio::io::AsyncWrite + std::marker::Unpin, { - let stats = get_stats(); + let client_stats = get_client_stats(); + let server_stats = get_server_stats(); let columns = vec![("list", DataType::Text), ("items", DataType::Int4)]; @@ -115,18 +123,18 @@ where res.put(data_row(&vec!["pools".to_string(), databases.to_string()])); res.put(data_row(&vec![ "free_clients".to_string(), - stats + client_stats .keys() - .map(|address_id| stats[&address_id]["cl_idle"]) - .sum::() + .filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Idle) + .count() .to_string(), ])); res.put(data_row(&vec![ "used_clients".to_string(), - stats + client_stats .keys() - .map(|address_id| stats[&address_id]["cl_active"]) - .sum::() + .filter(|client_id| client_stats.get(client_id).unwrap().state == ClientState::Active) + .count() .to_string(), ])); res.put(data_row(&vec![ @@ -135,18 +143,18 @@ where ])); res.put(data_row(&vec![ "free_servers".to_string(), - stats + server_stats .keys() - .map(|address_id| stats[&address_id]["sv_idle"]) - .sum::() + .filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Idle) + .count() .to_string(), ])); res.put(data_row(&vec![ "used_servers".to_string(), - stats + server_stats .keys() - .map(|address_id| stats[&address_id]["sv_active"]) - .sum::() + .filter(|server_id| server_stats.get(server_id).unwrap().state == ServerState::Active) + .count() .to_string(), ])); res.put(data_row(&vec!["dns_names".to_string(), "0".to_string()])); @@ -186,11 +194,12 @@ async fn show_pools(stream: &mut T) -> Result<(), Error> where T: tokio::io::AsyncWrite + std::marker::Unpin, { - let stats = get_stats(); + let all_pool_stats = get_pool_stats(); let columns = vec![ ("database", DataType::Text), ("user", DataType::Text), + ("pool_mode", DataType::Text), ("cl_idle", DataType::Numeric), ("cl_active", DataType::Numeric), ("cl_waiting", DataType::Numeric), @@ -202,32 +211,27 @@ where ("sv_login", DataType::Numeric), ("maxwait", DataType::Numeric), ("maxwait_us", DataType::Numeric), - ("pool_mode", DataType::Text), ]; let mut res = BytesMut::new(); res.put(row_description(&columns)); - for (_, pool) in get_all_pools() { - let pool_config = &pool.settings; - for shard in 0..pool.shards() { - for server in 0..pool.servers(shard) { - let address = pool.address(shard, server); - let stats = match stats.get(&address.id) { - Some(stats) => stats.clone(), - None => HashMap::new(), - }; - - let mut row = vec![address.name(), pool_config.user.username.clone()]; + for ((pool_name, username), pool) in get_all_pools() { + let def = HashMap::default(); + let pool_stats = all_pool_stats + .get(&(pool_name.clone(), username.clone())) + .unwrap_or(&def); - for column in &columns[2..columns.len() - 1] { - let value = stats.get(column.0).unwrap_or(&0).to_string(); - row.push(value); - } - - row.push(pool_config.pool_mode.to_string()); - res.put(data_row(&row)); - } + let pool_config = &pool.settings; + let mut row = vec![ + pool_name.clone(), + username.clone(), + pool_config.pool_mode.to_string(), + ]; + for column in &columns[3..columns.len()] { + let value = pool_stats.get(column.0).unwrap_or(&0).to_string(); + row.push(value); } + res.put(data_row(&row)); } res.put(command_complete("SHOW")); @@ -391,6 +395,7 @@ where T: tokio::io::AsyncWrite + std::marker::Unpin, { let columns = vec![ + ("instance", DataType::Text), ("database", DataType::Text), ("user", DataType::Text), ("total_xact_count", DataType::Numeric), @@ -400,32 +405,32 @@ where ("total_xact_time", DataType::Numeric), ("total_query_time", DataType::Numeric), ("total_wait_time", DataType::Numeric), + ("total_errors", DataType::Numeric), ("avg_xact_count", DataType::Numeric), ("avg_query_count", DataType::Numeric), ("avg_recv", DataType::Numeric), ("avg_sent", DataType::Numeric), + ("avg_errors", DataType::Numeric), ("avg_xact_time", DataType::Numeric), ("avg_query_time", DataType::Numeric), ("avg_wait_time", DataType::Numeric), ]; - let stats = get_stats(); + let all_stats = get_address_stats(); let mut res = BytesMut::new(); res.put(row_description(&columns)); - for ((_db_name, username), pool) in get_all_pools() { + for ((db, username), pool) in get_all_pools() { for shard in 0..pool.shards() { for server in 0..pool.servers(shard) { let address = pool.address(shard, server); - let stats = match stats.get(&address.id) { + let stats = match all_stats.get(&address.id) { Some(stats) => stats.clone(), None => HashMap::new(), }; - let mut row = vec![address.name()]; - row.push(username.clone()); - - for column in &columns[2..] { + let mut row = vec![address.name(), db.clone(), username.clone()]; + for column in &columns[3..] { row.push(stats.get(column.0).unwrap_or(&0).to_string()); } @@ -457,6 +462,7 @@ where ("transaction_count", DataType::Numeric), ("query_count", DataType::Numeric), ("error_count", DataType::Numeric), + ("age_seconds", DataType::Numeric), ]; let new_map = get_client_stats(); @@ -465,13 +471,67 @@ where for (_, client) in new_map { let row = vec![ - format!("{:#08X}", client.process_id), + format!("{:#08X}", client.client_id), client.pool_name, client.username, client.application_name.clone(), client.transaction_count.to_string(), client.query_count.to_string(), client.error_count.to_string(), + Instant::now() + .duration_since(client.connect_time) + .as_secs() + .to_string(), + ]; + + res.put(data_row(&row)); + } + + res.put(command_complete("SHOW")); + + // ReadyForQuery + res.put_u8(b'Z'); + res.put_i32(5); + res.put_u8(b'I'); + + write_all_half(stream, res).await +} + +/// Show currently connected servers +async fn show_servers(stream: &mut T) -> Result<(), Error> +where + T: tokio::io::AsyncWrite + std::marker::Unpin, +{ + let columns = vec![ + ("server_id", DataType::Text), + ("database_name", DataType::Text), + ("user", DataType::Text), + ("address_id", DataType::Text), + ("transaction_count", DataType::Numeric), + ("query_count", DataType::Numeric), + ("bytes_sent", DataType::Numeric), + ("bytes_received", DataType::Numeric), + ("age_seconds", DataType::Numeric), + ]; + + let new_map = get_server_stats(); + let mut res = BytesMut::new(); + res.put(row_description(&columns)); + + for (_, server) in new_map { + let row = vec![ + format!("{:#08X}", server.server_id), + server.pool_name, + server.username, + server.address_name, + server.transaction_count.to_string(), + server.query_count.to_string(), + server.bytes_sent.to_string(), + server.bytes_received.to_string(), + Instant::now() + .duration_since(server.connect_time) + .as_secs() + .to_string(), ]; res.put(data_row(&row)); diff --git a/src/client.rs b/src/client.rs index 45d96544..c07a21f0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -577,7 +577,7 @@ where // The query router determines where the query is going to go, // e.g. primary, replica, which shard. let mut query_router = QueryRouter::new(); - self.stats.register_client( + self.stats.client_register( self.process_id, self.pool_name.clone(), self.username.clone(), @@ -771,10 +771,11 @@ where self.connected_to_server = true; // Update statistics - self.stats.client_active(self.process_id, address.id); + self.stats + .client_active(self.process_id, server.server_id()); self.last_address_id = Some(address.id); - self.last_server_id = Some(server.process_id()); + self.last_server_id = Some(server.server_id()); debug!( "Client {:?} talking to server {:?}", @@ -832,7 +833,7 @@ where if !server.in_transaction() { // Report transaction executed statistics. - self.stats.transaction(self.process_id, address.id); + self.stats.transaction(self.process_id, server.server_id()); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. @@ -910,7 +911,7 @@ where self.buffer.clear(); if !server.in_transaction() { - self.stats.transaction(self.process_id, address.id); + self.stats.transaction(self.process_id, server.server_id()); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. @@ -945,7 +946,7 @@ where }; if !server.in_transaction() { - self.stats.transaction(self.process_id, address.id); + self.stats.transaction(self.process_id, server.server_id()); // Release server back to the pool if we are in transaction mode. // If we are in session mode, we keep the server until the client disconnects. @@ -966,11 +967,11 @@ where // The server is no longer bound to us, we can't cancel it's queries anymore. debug!("Releasing server back into the pool"); server.checkin_cleanup().await?; - self.stats.server_idle(server.process_id(), address.id); + self.stats.server_idle(server.server_id()); self.connected_to_server = false; self.release(); - self.stats.client_idle(self.process_id, address.id); + self.stats.client_idle(self.process_id); } } @@ -1012,7 +1013,7 @@ where } // Report query executed statistics. - self.stats.query(self.process_id, address.id); + self.stats.query(self.process_id, server.server_id()); Ok(()) } @@ -1093,13 +1094,9 @@ impl Drop for Client { // Dirty shutdown // TODO: refactor, this is not the best way to handle state management. - self.stats.client_disconnecting(self.process_id, 0); - if self.connected_to_server - && self.last_server_id.is_some() - && self.last_address_id.is_some() - { - self.stats - .server_idle(self.last_server_id.unwrap(), self.last_address_id.unwrap()); + self.stats.client_disconnecting(self.process_id); + if self.connected_to_server && self.last_server_id.is_some() { + self.stats.server_idle(self.last_server_id.unwrap()); } } } diff --git a/src/pool.rs b/src/pool.rs index dea29ad2..99d13f5b 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -332,7 +332,7 @@ impl ConnectionPool { } // Indicate we're waiting on a server connection from a pool. - self.stats.client_waiting(process_id, address.id); + self.stats.client_waiting(process_id); // Check if we can connect let mut conn = match self.databases[address.shard][address.address_index] @@ -343,8 +343,7 @@ impl ConnectionPool { Err(err) => { error!("Banning instance {:?}, error: {:?}", address, err); self.ban(&address, process_id); - self.stats - .checkout_time(now.elapsed().as_micros(), process_id, address.id); + self.stats.client_checkout_error(process_id, address.id); continue; } }; @@ -361,14 +360,14 @@ impl ConnectionPool { // Health checks are pretty expensive. if !require_healthcheck { self.stats - .checkout_time(now.elapsed().as_micros(), process_id, address.id); - self.stats.server_active(conn.process_id(), address.id); + .checkout_time(now.elapsed().as_micros(), process_id, server.server_id()); + self.stats.server_active(process_id, server.server_id()); return Ok((conn, address.clone())); } debug!("Running health check on server {:?}", address); - self.stats.server_tested(server.process_id(), address.id); + self.stats.server_tested(server.server_id()); match tokio::time::timeout( tokio::time::Duration::from_millis(healthcheck_timeout), @@ -379,9 +378,12 @@ impl ConnectionPool { // Check if health check succeeded. Ok(res) => match res { Ok(_) => { - self.stats - .checkout_time(now.elapsed().as_micros(), process_id, address.id); - self.stats.server_active(conn.process_id(), address.id); + self.stats.checkout_time( + now.elapsed().as_micros(), + process_id, + conn.server_id(), + ); + self.stats.server_active(process_id, conn.server_id()); return Ok((conn, address.clone())); } @@ -421,10 +423,9 @@ impl ConnectionPool { /// Ban an address (i.e. replica). It no longer will serve /// traffic for any new transactions. Existing transactions on that replica /// will finish successfully or error out to the clients. - pub fn ban(&self, address: &Address, process_id: i32) { - self.stats.client_disconnecting(process_id, address.id); - + pub fn ban(&self, address: &Address, client_id: i32) { error!("Banning {:?}", address); + self.stats.client_ban_error(client_id, address.id); let now = chrono::offset::Utc::now().naive_utc(); let mut guard = self.banlist.write(); @@ -560,14 +561,22 @@ impl ManageConnection for ServerPool { /// Attempts to create a new connection. async fn connect(&self) -> Result { info!("Creating a new server connection {:?}", self.address); + let server_id = rand::random::(); // Put a temporary process_id into the stats // for server login. - let process_id = rand::random::(); - self.stats.server_login(process_id, self.address.id); + self.stats.server_register( + server_id, + self.address.id, + self.address.name(), + self.address.pool_name.clone(), + self.address.username.clone(), + ); + self.stats.server_login(server_id); // Connect to the PostgreSQL server. match Server::startup( + server_id, &self.address, &self.user, &self.database, @@ -577,13 +586,12 @@ impl ManageConnection for ServerPool { .await { Ok(conn) => { - // Remove the temporary process_id from the stats. - self.stats.server_disconnecting(process_id, self.address.id); + self.stats.server_idle(server_id); Ok(conn) } Err(err) => { // Remove the temporary process_id from the stats. - self.stats.server_disconnecting(process_id, self.address.id); + self.stats.server_disconnecting(server_id); Err(err) } } @@ -608,6 +616,11 @@ pub fn get_pool(db: String, user: String) -> Option { } } +/// Get a pointer to all configured pools. +pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> { + return (*(*POOLS.load())).clone(); +} + /// How many total servers we have in the config. pub fn get_number_of_addresses() -> usize { get_all_pools() @@ -615,8 +628,3 @@ pub fn get_number_of_addresses() -> usize { .map(|(_, pool)| pool.databases()) .sum() } - -/// Get a pointer to all configured pools. -pub fn get_all_pools() -> HashMap<(String, String), ConnectionPool> { - return (*(*POOLS.load())).clone(); -} diff --git a/src/prometheus.rs b/src/prometheus.rs index 4e3dc3aa..44acd276 100644 --- a/src/prometheus.rs +++ b/src/prometheus.rs @@ -8,7 +8,6 @@ use std::net::SocketAddr; use crate::config::Address; use crate::pool::get_all_pools; -use crate::stats::get_stats; struct MetricHelpType { help: &'static str, @@ -164,7 +163,7 @@ impl PrometheusMetric { async fn prometheus_stats(request: Request) -> Result, hyper::http::Error> { match (request.method(), request.uri().path()) { (&Method::GET, "/metrics") => { - let stats = get_stats(); + let stats: HashMap> = HashMap::default(); //get_stats(); let mut lines = Vec::new(); for (_, pool) in get_all_pools() { diff --git a/src/server.rs b/src/server.rs index af3d680e..3204fa94 100644 --- a/src/server.rs +++ b/src/server.rs @@ -20,6 +20,8 @@ use crate::ClientServerMap; /// Server state. pub struct Server { + server_id: i32, + /// Server host, e.g. localhost, /// port, e.g. 5432, and role, e.g. primary or replica. address: Address, @@ -72,6 +74,7 @@ impl Server { /// Pretend to be the Postgres client and connect to the server given host, port and credentials. /// Perform the authentication and return the server in a ready for query state. pub async fn startup( + server_id: i32, address: &Address, user: &User, database: &str, @@ -315,6 +318,7 @@ impl Server { write: write, buffer: BytesMut::with_capacity(8196), server_info: server_info, + server_id: server_id, process_id: process_id, secret_key: secret_key, in_transaction: false, @@ -372,8 +376,7 @@ impl Server { /// Send messages to the server from the client. pub async fn send(&mut self, messages: BytesMut) -> Result<(), Error> { - self.stats - .data_sent(messages.len(), self.process_id, self.address.id); + self.stats.data_sent(messages.len(), self.server_id); match write_all_half(&mut self.write, messages).await { Ok(_) => { @@ -505,8 +508,7 @@ impl Server { let bytes = self.buffer.clone(); // Keep track of how much data we got from the server for stats. - self.stats - .data_received(bytes.len(), self.process_id, self.address.id); + self.stats.data_received(bytes.len(), self.server_id); // Clear the buffer for next query. self.buffer.clear(); @@ -630,8 +632,8 @@ impl Server { } /// Get the server's unique identifier. - pub fn process_id(&self) -> i32 { - self.process_id + pub fn server_id(&self) -> i32 { + self.server_id } // Get server's latest response timestamp @@ -650,8 +652,7 @@ impl Drop for Server { /// the socket is in non-blocking mode, so it may not be ready /// for a write. fn drop(&mut self) { - self.stats - .server_disconnecting(self.process_id(), self.address.id); + self.stats.server_disconnecting(self.server_id); let mut bytes = BytesMut::with_capacity(4); bytes.put_u8(b'X'); diff --git a/src/stats.rs b/src/stats.rs index efc7dc40..38c65aee 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -2,49 +2,88 @@ use arc_swap::ArcSwap; /// Statistics and reporting. use log::{error, info, trace, warn}; use once_cell::sync::Lazy; -use parking_lot::Mutex; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::time::Instant; -use crate::pool::get_number_of_addresses; +use crate::pool::{get_all_pools, get_number_of_addresses}; -type ClientInformationLookup = HashMap; +type ClientStatesLookup = HashMap; +type ServerStatesLookup = HashMap; +type PoolStatsLookup = HashMap<(String, String), HashMap>; +type AddressStatsLookup = HashMap>; /// Latest client stats updated every second; used in SHOW CLIENTS. -static LATEST_CLIENT_STATS: Lazy> = - Lazy::new(|| ArcSwap::from_pointee(ClientInformationLookup::default())); +static LATEST_CLIENT_STATS: Lazy> = + Lazy::new(|| ArcSwap::from_pointee(ClientStatesLookup::default())); + +/// Latest client stats updated every second; used in SHOW CLIENTS. +static LATEST_SERVER_STATS: Lazy> = + Lazy::new(|| ArcSwap::from_pointee(ServerStatesLookup::default())); + +static LATEST_POOL_STATS: Lazy> = + Lazy::new(|| ArcSwap::from_pointee(PoolStatsLookup::default())); + +static LATEST_ADDRESS_STATS: Lazy> = + Lazy::new(|| ArcSwap::from_pointee(AddressStatsLookup::default())); pub static REPORTER: Lazy> = Lazy::new(|| ArcSwap::from_pointee(Reporter::default())); -/// Latest stats updated every second; used in SHOW STATS and other admin commands. -static LATEST_STATS: Lazy>>> = - Lazy::new(|| Mutex::new(HashMap::new())); - /// Statistics period used for average calculations. /// 15 seconds. static STAT_PERIOD: u64 = 15000; /// The various states that a client can be in -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum ClientState { - ClientWaiting, - ClientActive, - ClientIdle, + Idle, + Waiting, + Active, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ServerState { + Login, + Active, + Tested, + Idle, } /// Information we keep track off which can be queried by SHOW CLIENTS #[derive(Debug, Clone)] pub struct ClientInformation { pub state: ClientState, - pub process_id: i32, + pub connect_time: Instant, + pub client_id: i32, pub application_name: String, pub username: String, pub pool_name: String, + pub total_wait_time: u64, + + pub transaction_count: u64, + pub query_count: u64, + pub error_count: u64, +} + +#[derive(Debug, Clone)] +pub struct ServerInformation { + pub state: ServerState, + pub connect_time: Instant, + pub address_name: String, + pub address_id: usize, + pub server_id: i32, + + pub username: String, + pub pool_name: String, + + pub bytes_sent: u64, + pub bytes_received: u64, + pub transaction_count: u64, pub query_count: u64, pub error_count: u64, @@ -54,23 +93,93 @@ pub struct ClientInformation { /// to the statistics collector. #[derive(Debug, Clone)] enum EventName { - CheckoutTime, - Query, - Transaction, - DataSent, - DataReceived, - ClientRegistered(String, String, String), - ClientWaiting, - ClientActive, - ClientIdle, - ClientDisconnecting, - ServerActive, - ServerIdle, - ServerTested, - ServerLogin, - ServerDisconnecting, - UpdateStats, - UpdateAverages, + CheckoutTime { + client_id: i32, + server_id: i32, + }, + Query { + client_id: i32, + server_id: i32, + }, + Transaction { + client_id: i32, + server_id: i32, + }, + + DataSentToServer { + server_id: i32, + }, + + DataReceivedFromServer { + server_id: i32, + }, + + ClientRegistered { + client_id: i32, + pool_name: String, + username: String, + application_name: String, + }, + ClientIdle { + client_id: i32, + }, + ClientWaiting { + client_id: i32, + }, + ClientActive { + client_id: i32, + #[allow(dead_code)] + server_id: i32, + }, + ClientDisconnecting { + client_id: i32, + }, + + ClientCheckoutError { + client_id: i32, + #[allow(dead_code)] + address_id: usize, + }, + + ClientBanError { + client_id: i32, + #[allow(dead_code)] + address_id: usize, + }, + + ServerRegistered { + server_id: i32, + address_id: usize, + address_name: String, + pool_name: String, + username: String, + }, + ServerLogin { + server_id: i32, + }, + ServerIdle { + server_id: i32, + }, + ServerTested { + server_id: i32, + }, + ServerActive { + #[allow(dead_code)] + client_id: i32, + server_id: i32, + }, + ServerDisconnecting { + server_id: i32, + }, + + UpdateStats { + pool_name: String, + username: String, + }, + + UpdateAverages { + address_id: usize, + }, } /// Event data sent to the collector @@ -82,12 +191,6 @@ pub struct Event { /// The value being reported. Meaning differs based on event name. value: i64, - - /// The client or server connection reporting the event. - process_id: i32, - - /// The server the client is connected to. - address_id: usize, } /// The statistics reporter. An instance is given @@ -131,196 +234,218 @@ impl Reporter { }; } - pub fn register_client( - &self, - process_id: i32, - pool_name: String, - username: String, - app_name: String, - ) { - let event = Event { - name: EventName::ClientRegistered( - pool_name.clone(), - username.clone(), - app_name.clone(), - ), - value: 1, - process_id: process_id, - address_id: 0, - }; - - self.send(event); - } - /// Report a query executed by a client against /// a server identified by the `address_id`. - pub fn query(&self, process_id: i32, address_id: usize) { + pub fn query(&self, client_id: i32, server_id: i32) { let event = Event { - name: EventName::Query, + name: EventName::Query { + client_id, + server_id, + }, value: 1, - process_id: process_id, - address_id: address_id, }; - self.send(event); } /// Report a transaction executed by a client against /// a server identified by the `address_id`. - pub fn transaction(&self, process_id: i32, address_id: usize) { + pub fn transaction(&self, client_id: i32, server_id: i32) { let event = Event { - name: EventName::Transaction, + name: EventName::Transaction { + client_id, + server_id, + }, value: 1, - process_id: process_id, - address_id: address_id, }; - - self.send(event) + self.send(event); } /// Report data sent to a server identified by `address_id`. /// The `amount` is measured in bytes. - pub fn data_sent(&self, amount: usize, process_id: i32, address_id: usize) { + pub fn data_sent(&self, amount: usize, server_id: i32) { let event = Event { - name: EventName::DataSent, + name: EventName::DataSentToServer { server_id }, value: amount as i64, - process_id: process_id, - address_id: address_id, }; - self.send(event) } /// Report data received from a server identified by `address_id`. /// The `amount` is measured in bytes. - pub fn data_received(&self, amount: usize, process_id: i32, address_id: usize) { + pub fn data_received(&self, amount: usize, server_id: i32) { let event = Event { - name: EventName::DataReceived, + name: EventName::DataReceivedFromServer { server_id }, value: amount as i64, - process_id: process_id, - address_id: address_id, }; - self.send(event) } /// Time spent waiting to get a healthy connection from the pool /// for a server identified by `address_id`. /// Measured in milliseconds. - pub fn checkout_time(&self, ms: u128, process_id: i32, address_id: usize) { + pub fn checkout_time(&self, ms: u128, client_id: i32, server_id: i32) { let event = Event { - name: EventName::CheckoutTime, + name: EventName::CheckoutTime { + client_id, + server_id, + }, value: ms as i64, - process_id: process_id, - address_id: address_id, }; - self.send(event) } - /// Reports a client identified by `process_id` waiting for a connection + pub fn client_register( + &self, + client_id: i32, + pool_name: String, + username: String, + app_name: String, + ) { + let event = Event { + name: EventName::ClientRegistered { + client_id, + pool_name: pool_name.clone(), + username: username.clone(), + application_name: app_name.clone(), + }, + value: 1, + }; + self.send(event); + } + + /// Reports a client identified by `client_id` waiting for a connection /// to a server identified by `address_id`. - pub fn client_waiting(&self, process_id: i32, address_id: usize) { + pub fn client_waiting(&self, client_id: i32) { let event = Event { - name: EventName::ClientWaiting, + name: EventName::ClientWaiting { client_id }, value: 1, - process_id: process_id, - address_id: address_id, }; + self.send(event) + } + /// Reports a client identified by `process_id` is done waiting for a connection + /// to a server identified by `address_id` and is about to query the server. + pub fn client_ban_error(&self, client_id: i32, address_id: usize) { + let event = Event { + name: EventName::ClientBanError { + client_id, + address_id, + }, + value: 1, + }; self.send(event) } /// Reports a client identified by `process_id` is done waiting for a connection /// to a server identified by `address_id` and is about to query the server. - pub fn client_active(&self, process_id: i32, address_id: usize) { + pub fn client_checkout_error(&self, client_id: i32, address_id: usize) { let event = Event { - name: EventName::ClientActive, + name: EventName::ClientCheckoutError { + client_id, + address_id, + }, value: 1, - process_id: process_id, - address_id: address_id, }; + self.send(event) + } + /// Reports a client identified by `process_id` is done waiting for a connection + /// to a server identified by `address_id` and is about to query the server. + pub fn client_active(&self, client_id: i32, server_id: i32) { + let event = Event { + name: EventName::ClientActive { + client_id, + server_id, + }, + value: 1, + }; self.send(event) } /// Reports a client identified by `process_id` is done querying the server /// identified by `address_id` and is no longer active. - pub fn client_idle(&self, process_id: i32, address_id: usize) { + pub fn client_idle(&self, client_id: i32) { let event = Event { - name: EventName::ClientIdle, + name: EventName::ClientIdle { client_id }, value: 1, - process_id: process_id, - address_id: address_id, }; - self.send(event) } /// Reports a client identified by `process_id` is disconecting from the pooler. /// The last server it was connected to is identified by `address_id`. - pub fn client_disconnecting(&self, process_id: i32, address_id: usize) { + pub fn client_disconnecting(&self, client_id: i32) { let event = Event { - name: EventName::ClientDisconnecting, + name: EventName::ClientDisconnecting { client_id }, value: 1, - process_id: process_id, - address_id: address_id, // No used }; - self.send(event) } + pub fn server_register( + &self, + server_id: i32, + address_id: usize, + address_name: String, + pool_name: String, + username: String, + ) { + let event = Event { + name: EventName::ServerRegistered { + server_id, + address_id, + address_name, + pool_name, + username, + }, + value: 1, + }; + self.send(event); + } + /// Reports a server connection identified by `process_id` for /// a configured server identified by `address_id` is actively used /// by a client. - pub fn server_active(&self, process_id: i32, address_id: usize) { + pub fn server_active(&self, client_id: i32, server_id: i32) { let event = Event { - name: EventName::ServerActive, + name: EventName::ServerActive { + client_id, + server_id, + }, value: 1, - process_id: process_id, - address_id: address_id, }; - self.send(event) } /// Reports a server connection identified by `process_id` for /// a configured server identified by `address_id` is no longer /// actively used by a client and is now idle. - pub fn server_idle(&self, process_id: i32, address_id: usize) { + pub fn server_idle(&self, server_id: i32) { let event = Event { - name: EventName::ServerIdle, + name: EventName::ServerIdle { server_id }, value: 1, - process_id: process_id, - address_id: address_id, }; - self.send(event) } /// Reports a server connection identified by `process_id` for /// a configured server identified by `address_id` is attempting /// to login. - pub fn server_login(&self, process_id: i32, address_id: usize) { + pub fn server_login(&self, server_id: i32) { let event = Event { - name: EventName::ServerLogin, + name: EventName::ServerLogin { server_id }, value: 1, - process_id: process_id, - address_id: address_id, }; - self.send(event) } /// Reports a server connection identified by `process_id` for /// a configured server identified by `address_id` is being /// tested before being given to a client. - pub fn server_tested(&self, process_id: i32, address_id: usize) { + pub fn server_tested(&self, server_id: i32) { let event = Event { - name: EventName::ServerTested, + name: EventName::ServerTested { server_id }, value: 1, - process_id: process_id, - address_id: address_id, }; self.send(event) @@ -328,14 +453,11 @@ impl Reporter { /// Reports a server connection identified by `process_id` is disconecting from the pooler. /// The configured server it was connected to is identified by `address_id`. - pub fn server_disconnecting(&self, process_id: i32, address_id: usize) { + pub fn server_disconnecting(&self, server_id: i32) { let event = Event { - name: EventName::ServerDisconnecting, + name: EventName::ServerDisconnecting { server_id }, value: 1, - process_id: process_id, - address_id: address_id, }; - self.send(event) } } @@ -363,57 +485,26 @@ impl Collector { pub async fn collect(&mut self) { info!("Events reporter started"); - let stats_template = HashMap::from([ - ("total_query_count", 0), - ("total_query_time", 0), - ("total_received", 0), - ("total_sent", 0), - ("total_xact_count", 0), - ("total_xact_time", 0), - ("total_wait_time", 0), - ("avg_query_count", 0), - ("avg_query_time", 0), - ("avg_recv", 0), - ("avg_sent", 0), - ("avg_xact_count", 0), - ("avg_xact_time", 0), - ("avg_wait_time", 0), - ("maxwait_us", 0), - ("maxwait", 0), - ("cl_waiting", 0), - ("cl_active", 0), - ("cl_idle", 0), - ("sv_idle", 0), - ("sv_active", 0), - ("sv_login", 0), - ("sv_tested", 0), - ]); - - let mut stats = HashMap::new(); - - // Stats saved after each iteration of the flush event. Used in calculation - // of averages in the last flush period. - let mut old_stats: HashMap> = HashMap::new(); - - // Track which state the client and server are at any given time. - let mut client_server_states: HashMap> = HashMap::new(); - - let mut client_states = ClientInformationLookup::default(); - - // Flush stats to StatsD and calculate averages every 15 seconds. + let mut client_states = ClientStatesLookup::default(); + let mut server_states = ServerStatesLookup::default(); + let mut pool_stat_lookup = PoolStatsLookup::default(); + + let mut address_stat_lookup = AddressStatsLookup::default(); + let mut address_old_stat_lookup = AddressStatsLookup::default(); + let tx = self.tx.clone(); tokio::task::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15)); loop { interval.tick().await; - let address_count = get_number_of_addresses(); - for address_id in 0..address_count { + for ((pool_name, username), _pool) in get_all_pools() { let _ = tx.try_send(Event { - name: EventName::UpdateStats, + name: EventName::UpdateStats { + pool_name, + username, + }, value: 0, - process_id: -1, - address_id: address_id, }); } } @@ -425,13 +516,10 @@ impl Collector { tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD)); loop { interval.tick().await; - let address_count = get_number_of_addresses(); - for address_id in 0..address_count { + for address_id in 0..get_number_of_addresses() { let _ = tx.try_send(Event { - name: EventName::UpdateAverages, + name: EventName::UpdateAverages { address_id }, value: 0, - process_id: -1, - address_id: address_id, }); } } @@ -447,79 +535,156 @@ impl Collector { } }; - let stats = stats - .entry(stat.address_id) - .or_insert(stats_template.clone()); - let client_server_states = client_server_states - .entry(stat.address_id) - .or_insert(HashMap::new()); - let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new()); - // Some are counters, some are gauges... match stat.name { - EventName::Query => { - match client_states.get_mut(&stat.process_id) { - Some(client) => client.query_count += stat.value as u64, + EventName::Query { + client_id, + server_id, + } => { + // Update client stats + match client_states.get_mut(&client_id) { + Some(client_info) => client_info.query_count += stat.value as u64, None => (), } - let counter = stats.entry("total_query_count").or_insert(0); - *counter += stat.value; - } - EventName::Transaction => { - match client_states.get_mut(&stat.process_id) { - Some(client) => client.transaction_count += stat.value as u64, + // Update server stats and pool aggergation stats + match server_states.get_mut(&server_id) { + Some(server_info) => { + server_info.query_count += stat.value as u64; + + let pool_stats = address_stat_lookup + .entry(server_info.address_id) + .or_insert(HashMap::default()); + let counter = pool_stats + .entry("total_query_count".to_string()) + .or_insert(0); + *counter += stat.value; + } None => (), } - let counter = stats.entry("total_xact_count").or_insert(0); - *counter += stat.value; } - EventName::DataSent => { - let counter = stats.entry("total_sent").or_insert(0); - *counter += stat.value; + EventName::Transaction { + client_id, + server_id, + } => { + // Update client stats + match client_states.get_mut(&client_id) { + Some(client_info) => client_info.transaction_count += stat.value as u64, + None => (), + } + // Update server stats and pool aggergation stats + match server_states.get_mut(&server_id) { + Some(server_info) => { + server_info.transaction_count += stat.value as u64; + + let address_stats = address_stat_lookup + .entry(server_info.address_id) + .or_insert(HashMap::default()); + let counter = address_stats + .entry("total_xact_count".to_string()) + .or_insert(0); + *counter += stat.value; + } + None => (), + } } - EventName::DataReceived => { - let counter = stats.entry("total_received").or_insert(0); - *counter += stat.value; + EventName::DataSentToServer { server_id } => { + // Update server stats and pool aggergation stats + match server_states.get_mut(&server_id) { + Some(server_info) => { + server_info.bytes_sent += stat.value as u64; + + let address_stats = address_stat_lookup + .entry(server_info.address_id) + .or_insert(HashMap::default()); + let counter = + address_stats.entry("total_sent".to_string()).or_insert(0); + *counter += stat.value; + } + None => (), + } } - EventName::CheckoutTime => { - let counter = stats.entry("total_wait_time").or_insert(0); - *counter += stat.value; - - let counter = stats.entry("maxwait_us").or_insert(0); - let mic_part = stat.value % 1_000_000; + EventName::DataReceivedFromServer { server_id } => { + // Update server states and server aggergation stats + match server_states.get_mut(&server_id) { + Some(server_info) => { + server_info.bytes_received += stat.value as u64; + + let address_stats = address_stat_lookup + .entry(server_info.address_id) + .or_insert(HashMap::default()); + let counter = address_stats + .entry("total_received".to_string()) + .or_insert(0); + *counter += stat.value; + } + None => (), + } + } - // Report max time here - if mic_part > *counter { - *counter = mic_part; + EventName::CheckoutTime { + client_id, + server_id, + } => { + // Update client stats + match client_states.get_mut(&client_id) { + Some(client_info) => client_info.total_wait_time += stat.value as u64, + None => (), } + // Update server stats and pool aggergation stats + match server_states.get_mut(&server_id) { + Some(server_info) => { + let pool_stats = address_stat_lookup + .entry(server_info.address_id) + .or_insert(HashMap::default()); + let counter = + pool_stats.entry("total_wait_time".to_string()).or_insert(0); + *counter += stat.value; + + let counter = pool_stats.entry("maxwait_us".to_string()).or_insert(0); + let mic_part = stat.value % 1_000_000; + + // Report max time here + if mic_part > *counter { + *counter = mic_part; + } - let counter = stats.entry("maxwait").or_insert(0); - let seconds = *counter / 1_000_000; + let counter = pool_stats.entry("maxwait".to_string()).or_insert(0); + let seconds = *counter / 1_000_000; - if seconds > *counter { - *counter = seconds; + if seconds > *counter { + *counter = seconds; + } + } + None => (), } } - EventName::ClientRegistered(pool_name, username, app_name) => { - match client_states.get_mut(&stat.process_id) { + EventName::ClientRegistered { + client_id, + pool_name, + username, + application_name, + } => { + match client_states.get_mut(&client_id) { Some(_) => { warn!("Client double registered!"); } None => { client_states.insert( - stat.process_id, + client_id, ClientInformation { - state: ClientState::ClientIdle, - process_id: stat.process_id, + state: ClientState::Idle, + connect_time: Instant::now(), + client_id, pool_name: pool_name.clone(), username: username.clone(), - application_name: app_name.clone(), + application_name: application_name.clone(), + total_wait_time: 0, transaction_count: 0, query_count: 0, error_count: 0, @@ -529,93 +694,134 @@ impl Collector { }; } - EventName::ClientActive | EventName::ClientWaiting | EventName::ClientIdle => { - client_server_states.insert(stat.process_id, stat.name.clone()); - let new_state = match stat.name { - EventName::ClientActive => ClientState::ClientActive, - EventName::ClientWaiting => ClientState::ClientWaiting, - EventName::ClientIdle => ClientState::ClientIdle, - _ => unreachable!(), - }; - - match client_states.get_mut(&stat.process_id) { - Some(client_state) => { - client_state.state = new_state; - } + EventName::ClientBanError { + client_id, + address_id, + } => { + match client_states.get_mut(&client_id) { + Some(client_info) => client_info.error_count += stat.value as u64, + None => (), + } - None => { - warn!("Stats on unregistered client!"); - } - }; + let address_stats = address_stat_lookup + .entry(address_id) + .or_insert(HashMap::default()); + let counter = address_stats.entry("total_errors".to_string()).or_insert(0); + *counter += stat.value; } - EventName::ClientDisconnecting => { - client_server_states.remove(&stat.process_id); - client_states.remove(&stat.process_id); + EventName::ClientCheckoutError { + client_id, + address_id, + } => { + match client_states.get_mut(&client_id) { + Some(client_info) => client_info.error_count += stat.value as u64, + None => (), + } + let address_stats = address_stat_lookup + .entry(address_id) + .or_insert(HashMap::default()); + let counter = address_stats.entry("total_errors".to_string()).or_insert(0); + *counter += stat.value; } - EventName::ServerActive - | EventName::ServerIdle - | EventName::ServerTested - | EventName::ServerLogin => { - client_server_states.insert(stat.process_id, stat.name); + EventName::ClientIdle { client_id } => { + match client_states.get_mut(&client_id) { + Some(client_state) => client_state.state = ClientState::Idle, + None => warn!("Stats on unregistered client!"), + }; } - EventName::ServerDisconnecting => { - client_server_states.remove(&stat.process_id); + EventName::ClientWaiting { client_id } => { + match client_states.get_mut(&client_id) { + Some(client_state) => client_state.state = ClientState::Waiting, + None => warn!("Stats on unregistered client!"), + }; } - EventName::UpdateStats => { - // Calculate connection states - for (_, state) in client_server_states.iter() { - match state { - EventName::ClientActive => { - let counter = stats.entry("cl_active").or_insert(0); - *counter += 1; - } - - EventName::ClientWaiting => { - let counter = stats.entry("cl_waiting").or_insert(0); - *counter += 1; - } + EventName::ClientActive { + client_id, + server_id: _, + } => { + match client_states.get_mut(&client_id) { + Some(client_state) => client_state.state = ClientState::Active, + None => warn!("Stats on unregistered client!"), + }; + } - EventName::ServerIdle => { - let counter = stats.entry("sv_idle").or_insert(0); - *counter += 1; - } + EventName::ClientDisconnecting { client_id } => { + client_states.remove(&client_id); + } - EventName::ServerActive => { - let counter = stats.entry("sv_active").or_insert(0); - *counter += 1; - } + EventName::ServerRegistered { + address_name, + server_id, + address_id, + pool_name, + username, + } => { + server_states.insert( + server_id, + ServerInformation { + address_id, + address_name, + server_id, + username, + pool_name, + + state: ServerState::Idle, + connect_time: Instant::now(), + bytes_sent: 0, + bytes_received: 0, + transaction_count: 0, + query_count: 0, + error_count: 0, + }, + ); + } - EventName::ServerTested => { - let counter = stats.entry("sv_tested").or_insert(0); - *counter += 1; - } + EventName::ServerLogin { server_id } => { + match server_states.get_mut(&server_id) { + Some(server_state) => server_state.state = ServerState::Login, + None => warn!("Stats on unregistered Server!"), + }; + } - EventName::ServerLogin => { - let counter = stats.entry("sv_login").or_insert(0); - *counter += 1; - } + EventName::ServerTested { server_id } => { + match server_states.get_mut(&server_id) { + Some(server_state) => server_state.state = ServerState::Tested, + None => warn!("Stats on unregistered Server!"), + }; + } - EventName::ClientIdle => { - let counter = stats.entry("cl_idle").or_insert(0); - *counter += 1; - } + EventName::ServerIdle { server_id } => { + match server_states.get_mut(&server_id) { + Some(server_state) => server_state.state = ServerState::Idle, + None => warn!("Stats on unregistered Server!"), + }; + } - _ => unreachable!(), - }; - } + EventName::ServerActive { + client_id: _, + server_id, + } => { + match server_states.get_mut(&server_id) { + Some(server_state) => server_state.state = ServerState::Active, + None => warn!("Stats on unregistered Server!"), + }; + } - // Update latest stats used in SHOW STATS - let mut guard = LATEST_STATS.lock(); - for (key, value) in stats.iter() { - let entry = guard.entry(stat.address_id).or_insert(HashMap::new()); - entry.insert(key.to_string(), value.clone()); - } + EventName::ServerDisconnecting { server_id } => { + server_states.remove(&server_id); + } - LATEST_CLIENT_STATS.store(Arc::new(client_states.clone())); + EventName::UpdateStats { + pool_name, + username, + } => { + let pool_stats = pool_stat_lookup + .entry((pool_name.clone(), username.clone())) + .or_insert(HashMap::default()); // These are re-calculated every iteration of the loop, so we don't want to add values // from the last iteration. @@ -630,17 +836,77 @@ impl Collector { "maxwait", "maxwait_us", ] { - stats.insert(stat, 0); + pool_stats.insert(stat.to_string(), 0); } + + for (_, client_info) in client_states.iter() { + if client_info.pool_name != pool_name || client_info.username != username { + continue; + } + match client_info.state { + ClientState::Idle => { + let counter = pool_stats.entry("cl_idle".to_string()).or_insert(0); + *counter += 1; + } + ClientState::Waiting => { + let counter = + pool_stats.entry("cl_waiting".to_string()).or_insert(0); + *counter += 1; + } + ClientState::Active => { + let counter = + pool_stats.entry("cl_active".to_string()).or_insert(0); + *counter += 1; + } + }; + } + + for (_, server_info) in server_states.iter() { + if server_info.pool_name != pool_name || server_info.username != username { + continue; + } + match server_info.state { + ServerState::Login => { + let counter = pool_stats.entry("sv_login".to_string()).or_insert(0); + *counter += 1; + } + ServerState::Tested => { + let counter = + pool_stats.entry("sv_tested".to_string()).or_insert(0); + *counter += 1; + } + ServerState::Active => { + let counter = + pool_stats.entry("sv_active".to_string()).or_insert(0); + *counter += 1; + } + ServerState::Idle => { + let counter = pool_stats.entry("sv_idle".to_string()).or_insert(0); + *counter += 1; + } + }; + } + + LATEST_CLIENT_STATS.store(Arc::new(client_states.clone())); + LATEST_SERVER_STATS.store(Arc::new(server_states.clone())); + LATEST_POOL_STATS.store(Arc::new(pool_stat_lookup.clone())); } - EventName::UpdateAverages => { + EventName::UpdateAverages { address_id } => { + let stats = address_stat_lookup + .entry(address_id) + .or_insert(HashMap::default()); + let old_stats = address_old_stat_lookup + .entry(address_id) + .or_insert(HashMap::default()); + // Calculate averages for stat in &[ "avg_query_count", "avg_query_time", "avg_recv", "avg_sent", + "avg_errors", "avg_xact_time", "avg_xact_count", "avg_wait_time", @@ -654,27 +920,34 @@ impl Collector { let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned(); let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second - stats.insert(stat, avg); + stats.insert(stat.to_string(), avg); *old_value = new_value; } + LATEST_ADDRESS_STATS.store(Arc::new(address_stat_lookup.clone())); } }; } } } -/// Get a snapshot of statistics. Updated once a second -/// by the `Collector`. -pub fn get_stats() -> HashMap> { - LATEST_STATS.lock().clone() -} - /// Get a snapshot of client statistics. Updated once a second /// by the `Collector`. -pub fn get_client_stats() -> ClientInformationLookup { +pub fn get_client_stats() -> ClientStatesLookup { (*(*LATEST_CLIENT_STATS.load())).clone() } +pub fn get_server_stats() -> ServerStatesLookup { + (*(*LATEST_SERVER_STATS.load())).clone() +} + +pub fn get_pool_stats() -> PoolStatsLookup { + (*(*LATEST_POOL_STATS.load())).clone() +} + +pub fn get_address_stats() -> AddressStatsLookup { + (*(*LATEST_ADDRESS_STATS.load())).clone() +} + /// Get the statistics reporter used to update stats across the pools/clients. pub fn get_reporter() -> Reporter { (*(*REPORTER.load())).clone() diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index dbdda7f1..a7398fd3 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -1,14 +1,129 @@ # frozen_string_literal: true +require 'uri' require_relative 'spec_helper' describe "Admin" do - let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) } + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) } + let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") } after do processes.all_databases.map(&:reset) processes.pgcat.shutdown end + describe "SHOW POOLS" do + context "bad credentials" do + it "does not change any stats" do + bad_passsword_url = URI(pgcat_conn_str) + bad_passsword_url.password = "wrong" + expect { PG::connect("#{bad_passsword_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad) + + sleep(1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + expect(results["sv_idle"]).to eq("1") + end + end + + context "bad database name" do + it "does not change any stats" do + bad_db_url = URI(pgcat_conn_str) + bad_db_url.path = "/wrong_db" + expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad) + + sleep(1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + expect(results["sv_idle"]).to eq("1") + end + end + + context "client connects but issues no queries" do + it "only affects cl_idle stats" do + connections = Array.new(20) { PG::connect(pgcat_conn_str) } + sleep(1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("20") + expect(results["sv_idle"]).to eq("1") + + connections.map(&:close) + sleep(1.1) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["sv_idle"]).to eq("1") + end + end + + context "clients connect and make one query" do + it "only affects cl_idle, sv_idle stats" do + connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + Thread.new { c.async_exec("SELECT pg_sleep(2.5)") } + end + + sleep(1.1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_active"]).to eq("5") + expect(results["sv_active"]).to eq("5") + + sleep(3) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("5") + expect(results["sv_idle"]).to eq("5") + + connections.map(&:close) + sleep(1) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["sv_idle"]).to eq("5") + end + end + + context "client connects and opens a transaction and closes connection uncleanly" do + it "produces correct statistics" do + connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + Thread.new do + c.async_exec("BEGIN") + c.async_exec("SELECT pg_sleep(0.01)") + c.close + end + end + + sleep(1.1) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["sv_idle"]).to eq("5") + end + end + end + describe "SHOW CLIENTS" do it "reports correct number and application names" do conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user") @@ -42,6 +157,8 @@ c.async_exec("SELECT 2") c.async_exec("SELECT 3") c.async_exec("BEGIN") + c.async_exec("SELECT 4") + c.async_exec("SELECT 5") c.async_exec("COMMIT") end @@ -52,9 +169,9 @@ expect(results.count).to eq(3) normal_client_results = results.reject { |r| r["database"] == "pgcat" } expect(normal_client_results[0]["transaction_count"]).to eq("4") - expect(normal_client_results[1]["transaction_count"]).to eq("4") # Obviously wrong, will fix in a follow up PR - expect(normal_client_results[0]["query_count"]).to eq("5") - expect(normal_client_results[1]["query_count"]).to eq("5") + expect(normal_client_results[1]["transaction_count"]).to eq("4") + expect(normal_client_results[0]["query_count"]).to eq("7") + expect(normal_client_results[1]["query_count"]).to eq("7") # puts processes.pgcat.logs @@ -62,5 +179,4 @@ connections.map(&:close) end end - end diff --git a/tests/ruby/helpers/pgcat_helper.rb b/tests/ruby/helpers/pgcat_helper.rb index 80ac9dab..55847ed6 100644 --- a/tests/ruby/helpers/pgcat_helper.rb +++ b/tests/ruby/helpers/pgcat_helper.rb @@ -46,6 +46,50 @@ def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction") end end + def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction") + user = { + "password" => "sharding_user", + "pool_size" => pool_size, + "statement_timeout" => 0, + "username" => "sharding_user" + } + + pgcat = PgcatProcess.new("trace") + pgcat_cfg = pgcat.current_config + + primary = PgInstance.new(5432, user["username"], user["password"], "shard0") + + # Main proxy configs + pgcat_cfg["pools"] = { + "#{pool_name}" => { + "default_role" => "primary", + "pool_mode" => pool_mode, + "primary_reads_enabled" => false, + "query_parser_enabled" => false, + "sharding_function" => "pg_bigint_hash", + "shards" => { + "0" => { + "database" => "shard0", + "servers" => [ + ["localhost", primary.port.to_s, "primary"] + ] + }, + }, + "users" => { "0" => user } + } + } + pgcat_cfg["general"]["port"] = pgcat.port + pgcat.update_config(pgcat_cfg) + pgcat.start + pgcat.wait_until_ready + + OpenStruct.new.tap do |struct| + struct.pgcat = pgcat + struct.primary = primary + struct.all_databases = [primary] + end + end + def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction") user = { "password" => "sharding_user", From 4669a2a852bb5ec2afe099fd145bd202b41a240d Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 02:04:02 -0500 Subject: [PATCH 15/25] prometheus --- src/prometheus.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/prometheus.rs b/src/prometheus.rs index 44acd276..d54ba329 100644 --- a/src/prometheus.rs +++ b/src/prometheus.rs @@ -8,6 +8,7 @@ use std::net::SocketAddr; use crate::config::Address; use crate::pool::get_all_pools; +use crate::stats::get_address_stats; struct MetricHelpType { help: &'static str, @@ -163,7 +164,7 @@ impl PrometheusMetric { async fn prometheus_stats(request: Request) -> Result, hyper::http::Error> { match (request.method(), request.uri().path()) { (&Method::GET, "/metrics") => { - let stats: HashMap> = HashMap::default(); //get_stats(); + let stats: HashMap> = get_address_stats(); let mut lines = Vec::new(); for (_, pool) in get_all_pools() { From c395eec5e55c6af46b00949fc18ca4b73dfa993a Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 08:00:40 -0500 Subject: [PATCH 16/25] add state to clients and servers --- src/admin.rs | 4 ++++ src/stats.rs | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/src/admin.rs b/src/admin.rs index 6e02bd1b..fdfef67e 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -459,6 +459,7 @@ where ("database", DataType::Text), ("user", DataType::Text), ("application_name", DataType::Text), + ("state", DataType::Text), ("transaction_count", DataType::Numeric), ("query_count", DataType::Numeric), ("error_count", DataType::Numeric), @@ -475,6 +476,7 @@ where client.pool_name, client.username, client.application_name.clone(), + client.state.to_string(), client.transaction_count.to_string(), client.query_count.to_string(), client.error_count.to_string(), @@ -507,6 +509,7 @@ where ("database_name", DataType::Text), ("user", DataType::Text), ("address_id", DataType::Text), + ("state", DataType::Text), ("transaction_count", DataType::Numeric), ("query_count", DataType::Numeric), ("bytes_sent", DataType::Numeric), @@ -524,6 +527,7 @@ where server.pool_name, server.username, server.address_name, + server.state.to_string(), server.transaction_count.to_string(), server.query_count.to_string(), server.bytes_sent.to_string(), diff --git a/src/stats.rs b/src/stats.rs index 38c65aee..2448bfd1 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -43,6 +43,16 @@ pub enum ClientState { Waiting, Active, } +impl std::fmt::Display for ClientState { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match *self { + ClientState::Idle => write!(f, "idle"), + ClientState::Waiting => write!(f, "waiting"), + ClientState::Active => write!(f, "active"), + } + } +} + #[derive(Debug, Clone, Copy, PartialEq)] pub enum ServerState { @@ -51,6 +61,16 @@ pub enum ServerState { Tested, Idle, } +impl std::fmt::Display for ServerState { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match *self { + ServerState::Login => write!(f, "login"), + ServerState::Active => write!(f, "active"), + ServerState::Tested => write!(f, "test"), + ServerState::Idle => write!(f, "idle"), + } + } +} /// Information we keep track off which can be queried by SHOW CLIENTS #[derive(Debug, Clone)] From badd914904ea1411ea7000a307fe613a17d60ecd Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 08:03:32 -0500 Subject: [PATCH 17/25] fmt --- src/stats.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stats.rs b/src/stats.rs index 2448bfd1..911682a5 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -53,7 +53,6 @@ impl std::fmt::Display for ClientState { } } - #[derive(Debug, Clone, Copy, PartialEq)] pub enum ServerState { Login, From 04b747212a1baa9034e86f821ffdeb85e07610f3 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 09:01:20 -0500 Subject: [PATCH 18/25] Add application_name to server stats --- src/admin.rs | 2 ++ src/stats.rs | 79 +++++++++++++++++++++++++++++++++++----------------- 2 files changed, 56 insertions(+), 25 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index fdfef67e..790cc49a 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -509,6 +509,7 @@ where ("database_name", DataType::Text), ("user", DataType::Text), ("address_id", DataType::Text), + ("application_name", DataType::Text), ("state", DataType::Text), ("transaction_count", DataType::Numeric), ("query_count", DataType::Numeric), @@ -527,6 +528,7 @@ where server.pool_name, server.username, server.address_name, + server.application_name, server.state.to_string(), server.transaction_count.to_string(), server.query_count.to_string(), diff --git a/src/stats.rs b/src/stats.rs index 911682a5..795901ee 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -89,6 +89,7 @@ pub struct ClientInformation { pub error_count: u64, } +/// Information we keep track off which can be queried by SHOW SERVERS #[derive(Debug, Clone)] pub struct ServerInformation { pub state: ServerState, @@ -99,6 +100,7 @@ pub struct ServerInformation { pub username: String, pub pool_name: String, + pub application_name: String, pub bytes_sent: u64, pub bytes_received: u64, @@ -128,7 +130,6 @@ enum EventName { DataSentToServer { server_id: i32, }, - DataReceivedFromServer { server_id: i32, }, @@ -153,13 +154,11 @@ enum EventName { ClientDisconnecting { client_id: i32, }, - ClientCheckoutError { client_id: i32, #[allow(dead_code)] address_id: usize, }, - ClientBanError { client_id: i32, #[allow(dead_code)] @@ -195,7 +194,6 @@ enum EventName { pool_name: String, username: String, }, - UpdateAverages { address_id: usize, }, @@ -561,15 +559,19 @@ impl Collector { server_id, } => { // Update client stats - match client_states.get_mut(&client_id) { - Some(client_info) => client_info.query_count += stat.value as u64, - None => (), - } + let app_name = match client_states.get_mut(&client_id) { + Some(client_info) => { + client_info.query_count += stat.value as u64; + client_info.application_name.to_string() + } + None => String::from("Undefined"), + }; // Update server stats and pool aggergation stats match server_states.get_mut(&server_id) { Some(server_info) => { server_info.query_count += stat.value as u64; + server_info.application_name = app_name; let pool_stats = address_stat_lookup .entry(server_info.address_id) @@ -588,14 +590,19 @@ impl Collector { server_id, } => { // Update client stats - match client_states.get_mut(&client_id) { - Some(client_info) => client_info.transaction_count += stat.value as u64, - None => (), - } + let app_name = match client_states.get_mut(&client_id) { + Some(client_info) => { + client_info.transaction_count += stat.value as u64; + client_info.application_name.to_string() + } + None => String::from("Undefined"), + }; + // Update server stats and pool aggergation stats match server_states.get_mut(&server_id) { Some(server_info) => { server_info.transaction_count += stat.value as u64; + server_info.application_name = app_name; let address_stats = address_stat_lookup .entry(server_info.address_id) @@ -649,13 +656,19 @@ impl Collector { server_id, } => { // Update client stats - match client_states.get_mut(&client_id) { - Some(client_info) => client_info.total_wait_time += stat.value as u64, - None => (), - } + let app_name = match client_states.get_mut(&client_id) { + Some(client_info) => { + client_info.total_wait_time += stat.value as u64; + client_info.application_name.to_string() + } + None => String::from("Undefined"), + }; + // Update server stats and pool aggergation stats match server_states.get_mut(&server_id) { Some(server_info) => { + server_info.application_name = app_name; + let pool_stats = address_stat_lookup .entry(server_info.address_id) .or_insert(HashMap::default()); @@ -689,10 +702,7 @@ impl Collector { application_name, } => { match client_states.get_mut(&client_id) { - Some(_) => { - warn!("Client double registered!"); - } - + Some(_) => warn!("Client {:?} was double registered!", client_id), None => { client_states.insert( client_id, @@ -789,6 +799,7 @@ impl Collector { pool_name, state: ServerState::Idle, + application_name: String::from("Undefined"), connect_time: Instant::now(), bytes_sent: 0, bytes_received: 0, @@ -801,31 +812,49 @@ impl Collector { EventName::ServerLogin { server_id } => { match server_states.get_mut(&server_id) { - Some(server_state) => server_state.state = ServerState::Login, + Some(server_state) => { + server_state.state = ServerState::Login; + server_state.application_name = String::from("Undefined"); + } None => warn!("Stats on unregistered Server!"), }; } EventName::ServerTested { server_id } => { match server_states.get_mut(&server_id) { - Some(server_state) => server_state.state = ServerState::Tested, + Some(server_state) => { + server_state.state = ServerState::Tested; + server_state.application_name = String::from("Undefined"); + } None => warn!("Stats on unregistered Server!"), }; } EventName::ServerIdle { server_id } => { match server_states.get_mut(&server_id) { - Some(server_state) => server_state.state = ServerState::Idle, + Some(server_state) => { + server_state.state = ServerState::Idle; + server_state.application_name = String::from("Undefined"); + } None => warn!("Stats on unregistered Server!"), }; } EventName::ServerActive { - client_id: _, + client_id, server_id, } => { + // Update client stats + let app_name = match client_states.get_mut(&client_id) { + Some(client_info) => client_info.application_name.to_string(), + None => String::from("Undefined"), + }; + match server_states.get_mut(&server_id) { - Some(server_state) => server_state.state = ServerState::Active, + Some(server_state) => { + server_state.state = ServerState::Active; + server_state.application_name = app_name; + } None => warn!("Stats on unregistered Server!"), }; } From 2e0df10c2606baad3d6a4bcf37dd2573301d4f9e Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 17:50:31 -0500 Subject: [PATCH 19/25] Add tests for waiting clients --- src/admin.rs | 4 ++-- tests/ruby/admin_spec.rb | 36 ++++++++++++++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 790cc49a..ed2d3de3 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -472,7 +472,7 @@ where for (_, client) in new_map { let row = vec![ - format!("{:#08X}", client.client_id), + format!("{:#010X}", client.client_id), client.pool_name, client.username, client.application_name.clone(), @@ -524,7 +524,7 @@ where for (_, server) in new_map { let row = vec![ - format!("{:#08X}", server.server_id), + format!("{:#010X}", server.server_id), server.pool_name, server.username, server.address_name, diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index a7398fd3..b5709845 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -122,6 +122,40 @@ expect(results["sv_idle"]).to eq("5") end end + + context "clients overwhelm server pools" do + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } + + it "cl_waiting is updated to show it" do + threads = [] + connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") } + end + + sleep(1.1) # Allow time for stats to update + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + + expect(results["cl_waiting"]).to eq("2") + expect(results["cl_active"]).to eq("2") + expect(results["sv_active"]).to eq("2") + + sleep(2.5) # Allow time for stats to update + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("4") + expect(results["sv_idle"]).to eq("2") + + threads.map(&:join) + connections.map(&:close) + end + end end describe "SHOW CLIENTS" do @@ -173,8 +207,6 @@ expect(normal_client_results[0]["query_count"]).to eq("7") expect(normal_client_results[1]["query_count"]).to eq("7") - # puts processes.pgcat.logs - admin_conn.close connections.map(&:close) end From e9392d0cea8273ad244f500e21931e8c582c7a69 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 18:24:03 -0500 Subject: [PATCH 20/25] Docs --- src/stats.rs | 106 ++++++++++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 47 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index 795901ee..493185fd 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -10,25 +10,34 @@ use tokio::time::Instant; use crate::pool::{get_all_pools, get_number_of_addresses}; +/// Convenience types for various stats type ClientStatesLookup = HashMap; type ServerStatesLookup = HashMap; type PoolStatsLookup = HashMap<(String, String), HashMap>; type AddressStatsLookup = HashMap>; -/// Latest client stats updated every second; used in SHOW CLIENTS. +/// Stats for individual client connections updated every second +/// Used in SHOW CLIENTS. static LATEST_CLIENT_STATS: Lazy> = Lazy::new(|| ArcSwap::from_pointee(ClientStatesLookup::default())); -/// Latest client stats updated every second; used in SHOW CLIENTS. +/// Stats for individual server connections updated every second +/// Used in SHOW SERVERS. static LATEST_SERVER_STATS: Lazy> = Lazy::new(|| ArcSwap::from_pointee(ServerStatesLookup::default())); +/// Aggregate stats for each pool (a pool is identified by database name and username) updated every second +/// Used in SHOW POOLS. static LATEST_POOL_STATS: Lazy> = Lazy::new(|| ArcSwap::from_pointee(PoolStatsLookup::default())); +/// Aggregate stats for individual database instances, updated every second, averages are calculated every 15 +/// Used in SHOW STATS. static LATEST_ADDRESS_STATS: Lazy> = Lazy::new(|| ArcSwap::from_pointee(AddressStatsLookup::default())); +/// The statistics reporter. An instance is given to each possible source of statistics, +/// e.g. clients, servers, connection pool. pub static REPORTER: Lazy> = Lazy::new(|| ArcSwap::from_pointee(Reporter::default())); @@ -53,6 +62,7 @@ impl std::fmt::Display for ClientState { } } +/// The various states that a server can be in #[derive(Debug, Clone, Copy, PartialEq)] pub enum ServerState { Login, @@ -65,7 +75,7 @@ impl std::fmt::Display for ServerState { match *self { ServerState::Login => write!(f, "login"), ServerState::Active => write!(f, "active"), - ServerState::Tested => write!(f, "test"), + ServerState::Tested => write!(f, "tested"), ServerState::Idle => write!(f, "idle"), } } @@ -76,12 +86,15 @@ impl std::fmt::Display for ServerState { pub struct ClientInformation { pub state: ClientState, pub connect_time: Instant, + + /// A random integer assigned to the client and used by stats to track the client pub client_id: i32, pub application_name: String, pub username: String, pub pool_name: String, + /// Total time spent waiting for a connection from pool, measures in microseconds pub total_wait_time: u64, pub transaction_count: u64, @@ -94,9 +107,12 @@ pub struct ClientInformation { pub struct ServerInformation { pub state: ServerState, pub connect_time: Instant, + + /// A random integer assigned to the server and used by stats to track the server + pub server_id: i32, + pub address_name: String, pub address_id: usize, - pub server_id: i32, pub username: String, pub pool_name: String, @@ -251,8 +267,7 @@ impl Reporter { }; } - /// Report a query executed by a client against - /// a server identified by the `address_id`. + /// Report a query executed by a client against a server pub fn query(&self, client_id: i32, server_id: i32) { let event = Event { name: EventName::Query { @@ -264,8 +279,10 @@ impl Reporter { self.send(event); } - /// Report a transaction executed by a client against - /// a server identified by the `address_id`. + /// Report a transaction executed by a client a server + /// we report each individual queries outside a transaction as a transaction + /// We only count the initial BEGIN as a transaction, all queries within do not + /// count as transactions pub fn transaction(&self, client_id: i32, server_id: i32) { let event = Event { name: EventName::Transaction { @@ -277,40 +294,38 @@ impl Reporter { self.send(event); } - /// Report data sent to a server identified by `address_id`. - /// The `amount` is measured in bytes. - pub fn data_sent(&self, amount: usize, server_id: i32) { + /// Report data sent to a server + pub fn data_sent(&self, amount_bytes: usize, server_id: i32) { let event = Event { name: EventName::DataSentToServer { server_id }, - value: amount as i64, + value: amount_bytes as i64, }; self.send(event) } - /// Report data received from a server identified by `address_id`. - /// The `amount` is measured in bytes. - pub fn data_received(&self, amount: usize, server_id: i32) { + /// Report data received from a server + pub fn data_received(&self, amount_bytes: usize, server_id: i32) { let event = Event { name: EventName::DataReceivedFromServer { server_id }, - value: amount as i64, + value: amount_bytes as i64, }; self.send(event) } - /// Time spent waiting to get a healthy connection from the pool - /// for a server identified by `address_id`. - /// Measured in milliseconds. - pub fn checkout_time(&self, ms: u128, client_id: i32, server_id: i32) { + /// Reportes the time spent by a client waiting to get a healthy connection from the pool + pub fn checkout_time(&self, microseconds: u128, client_id: i32, server_id: i32) { let event = Event { name: EventName::CheckoutTime { client_id, server_id, }, - value: ms as i64, + value: microseconds as i64, }; self.send(event) } + /// Register a client with the stats system. The stats system we use client_id + /// to track and aggregate statistics from all source that relate to that client pub fn client_register( &self, client_id: i32, @@ -330,8 +345,7 @@ impl Reporter { self.send(event); } - /// Reports a client identified by `client_id` waiting for a connection - /// to a server identified by `address_id`. + /// Reports a client is waiting for a connection pub fn client_waiting(&self, client_id: i32) { let event = Event { name: EventName::ClientWaiting { client_id }, @@ -340,8 +354,7 @@ impl Reporter { self.send(event) } - /// Reports a client identified by `process_id` is done waiting for a connection - /// to a server identified by `address_id` and is about to query the server. + /// Reports a client has had the server assigned to it be banned pub fn client_ban_error(&self, client_id: i32, address_id: usize) { let event = Event { name: EventName::ClientBanError { @@ -353,8 +366,7 @@ impl Reporter { self.send(event) } - /// Reports a client identified by `process_id` is done waiting for a connection - /// to a server identified by `address_id` and is about to query the server. + /// Reports a client has failed to obtain a connection from a connection pool pub fn client_checkout_error(&self, client_id: i32, address_id: usize) { let event = Event { name: EventName::ClientCheckoutError { @@ -366,8 +378,7 @@ impl Reporter { self.send(event) } - /// Reports a client identified by `process_id` is done waiting for a connection - /// to a server identified by `address_id` and is about to query the server. + /// Reports a client is done waiting for a connection and is about to query the server. pub fn client_active(&self, client_id: i32, server_id: i32) { let event = Event { name: EventName::ClientActive { @@ -379,8 +390,7 @@ impl Reporter { self.send(event) } - /// Reports a client identified by `process_id` is done querying the server - /// identified by `address_id` and is no longer active. + /// Reports a client is done querying the server and is no longer assigned a server connection pub fn client_idle(&self, client_id: i32) { let event = Event { name: EventName::ClientIdle { client_id }, @@ -389,8 +399,7 @@ impl Reporter { self.send(event) } - /// Reports a client identified by `process_id` is disconecting from the pooler. - /// The last server it was connected to is identified by `address_id`. + /// Reports a client is disconecting from the pooler. pub fn client_disconnecting(&self, client_id: i32) { let event = Event { name: EventName::ClientDisconnecting { client_id }, @@ -399,6 +408,8 @@ impl Reporter { self.send(event) } + /// Register a server connection with the stats system. The stats system we use server_id + /// to track and aggregate statistics from all source that relate to that server pub fn server_register( &self, server_id: i32, @@ -420,9 +431,8 @@ impl Reporter { self.send(event); } - /// Reports a server connection identified by `process_id` for - /// a configured server identified by `address_id` is actively used - /// by a client. + /// Reports a server connection has been assigned to a client that + /// is about to query the server pub fn server_active(&self, client_id: i32, server_id: i32) { let event = Event { name: EventName::ServerActive { @@ -434,9 +444,8 @@ impl Reporter { self.send(event) } - /// Reports a server connection identified by `process_id` for - /// a configured server identified by `address_id` is no longer - /// actively used by a client and is now idle. + /// Reports a server connection is no longer assigned to a client + /// and is available for the next client to pick it up pub fn server_idle(&self, server_id: i32) { let event = Event { name: EventName::ServerIdle { server_id }, @@ -445,9 +454,7 @@ impl Reporter { self.send(event) } - /// Reports a server connection identified by `process_id` for - /// a configured server identified by `address_id` is attempting - /// to login. + /// Reports a server connection is attempting to login. pub fn server_login(&self, server_id: i32) { let event = Event { name: EventName::ServerLogin { server_id }, @@ -456,9 +463,7 @@ impl Reporter { self.send(event) } - /// Reports a server connection identified by `process_id` for - /// a configured server identified by `address_id` is being - /// tested before being given to a client. + /// Reports a server connection is being tested before being given to a client. pub fn server_tested(&self, server_id: i32) { let event = Event { name: EventName::ServerTested { server_id }, @@ -468,8 +473,7 @@ impl Reporter { self.send(event) } - /// Reports a server connection identified by `process_id` is disconecting from the pooler. - /// The configured server it was connected to is identified by `address_id`. + /// Reports a server connection is disconecting from the pooler. pub fn server_disconnecting(&self, server_id: i32) { let event = Event { name: EventName::ServerDisconnecting { server_id }, @@ -935,6 +939,8 @@ impl Collector { }; } + // The following calls publish the internal stats making it visible + // to clients using admin database to issue queries like `SHOW STATS` LATEST_CLIENT_STATS.store(Arc::new(client_states.clone())); LATEST_SERVER_STATS.store(Arc::new(server_states.clone())); LATEST_POOL_STATS.store(Arc::new(pool_stat_lookup.clone())); @@ -984,14 +990,20 @@ pub fn get_client_stats() -> ClientStatesLookup { (*(*LATEST_CLIENT_STATS.load())).clone() } +/// Get a snapshot of server statistics. Updated once a second +/// by the `Collector`. pub fn get_server_stats() -> ServerStatesLookup { (*(*LATEST_SERVER_STATS.load())).clone() } +/// Get a snapshot of pool statistics. Updated once a second +/// by the `Collector`. pub fn get_pool_stats() -> PoolStatsLookup { (*(*LATEST_POOL_STATS.load())).clone() } +/// Get a snapshot of address statistics. Updated once a second +/// by the `Collector`. pub fn get_address_stats() -> AddressStatsLookup { (*(*LATEST_ADDRESS_STATS.load())).clone() } From 3059ec1065adca9c63477f5b6dd86957cfa3e654 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 18:32:30 -0500 Subject: [PATCH 21/25] remove comment --- src/pool.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index 99d13f5b..34af354a 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -563,8 +563,6 @@ impl ManageConnection for ServerPool { info!("Creating a new server connection {:?}", self.address); let server_id = rand::random::(); - // Put a temporary process_id into the stats - // for server login. self.stats.server_register( server_id, self.address.id, @@ -590,7 +588,6 @@ impl ManageConnection for ServerPool { Ok(conn) } Err(err) => { - // Remove the temporary process_id from the stats. self.stats.server_disconnecting(server_id); Err(err) } From 79d1e3c39cff31d0b4ad9a60ec941cd140d1e25d Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 18:35:36 -0500 Subject: [PATCH 22/25] comments --- src/server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index 3204fa94..d8d23d6c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -631,7 +631,8 @@ impl Server { self.address.clone() } - /// Get the server's unique identifier. + /// Get the server connection identifier + /// Used to uniquely identify connection in statistics pub fn server_id(&self) -> i32 { self.server_id } From b2c5ee6f1dc6166ab59fb02a15a1065eaf92b73c Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 18:41:20 -0500 Subject: [PATCH 23/25] typo --- src/stats.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index 493185fd..2a201a5b 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -324,7 +324,7 @@ impl Reporter { self.send(event) } - /// Register a client with the stats system. The stats system we use client_id + /// Register a client with the stats system. The stats system uses client_id /// to track and aggregate statistics from all source that relate to that client pub fn client_register( &self, @@ -408,7 +408,7 @@ impl Reporter { self.send(event) } - /// Register a server connection with the stats system. The stats system we use server_id + /// Register a server connection with the stats system. The stats system uses server_id /// to track and aggregate statistics from all source that relate to that server pub fn server_register( &self, From f6cca640004ddb1f4de563ce05d9dd7b8d1175af Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 18:48:28 -0500 Subject: [PATCH 24/25] cleanup --- src/stats.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index 2a201a5b..9ab71677 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -9,6 +9,7 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::Instant; use crate::pool::{get_all_pools, get_number_of_addresses}; +use crate::server; /// Convenience types for various stats type ClientStatesLookup = HashMap; @@ -621,7 +622,7 @@ impl Collector { } EventName::DataSentToServer { server_id } => { - // Update server stats and pool aggergation stats + // Update server stats and address aggergation stats match server_states.get_mut(&server_id) { Some(server_info) => { server_info.bytes_sent += stat.value as u64; @@ -638,7 +639,7 @@ impl Collector { } EventName::DataReceivedFromServer { server_id } => { - // Update server states and server aggergation stats + // Update server states and address aggergation stats match server_states.get_mut(&server_id) { Some(server_info) => { server_info.bytes_received += stat.value as u64; @@ -668,7 +669,7 @@ impl Collector { None => String::from("Undefined"), }; - // Update server stats and pool aggergation stats + // Update server stats and address aggergation stats match server_states.get_mut(&server_id) { Some(server_info) => { server_info.application_name = app_name; @@ -733,9 +734,10 @@ impl Collector { } => { match client_states.get_mut(&client_id) { Some(client_info) => client_info.error_count += stat.value as u64, - None => (), + None => warn!("Got event {:?} for unregistered client", stat.name), } + // Update address aggregation stats let address_stats = address_stat_lookup .entry(address_id) .or_insert(HashMap::default()); @@ -749,8 +751,10 @@ impl Collector { } => { match client_states.get_mut(&client_id) { Some(client_info) => client_info.error_count += stat.value as u64, - None => (), + None => warn!("Got event {:?} for unregistered client", stat.name), } + + // Update address aggregation stats let address_stats = address_stat_lookup .entry(address_id) .or_insert(HashMap::default()); @@ -761,14 +765,14 @@ impl Collector { EventName::ClientIdle { client_id } => { match client_states.get_mut(&client_id) { Some(client_state) => client_state.state = ClientState::Idle, - None => warn!("Stats on unregistered client!"), + None => warn!("Got event {:?} for unregistered client", stat.name), }; } EventName::ClientWaiting { client_id } => { match client_states.get_mut(&client_id) { Some(client_state) => client_state.state = ClientState::Waiting, - None => warn!("Stats on unregistered client!"), + None => warn!("Got event {:?} for unregistered client", stat.name), }; } @@ -778,7 +782,7 @@ impl Collector { } => { match client_states.get_mut(&client_id) { Some(client_state) => client_state.state = ClientState::Active, - None => warn!("Stats on unregistered client!"), + None => warn!("Got event {:?} for unregistered client", stat.name), }; } @@ -820,7 +824,7 @@ impl Collector { server_state.state = ServerState::Login; server_state.application_name = String::from("Undefined"); } - None => warn!("Stats on unregistered Server!"), + None => warn!("Got event {:?} for unregistered server", stat.name), }; } @@ -830,7 +834,7 @@ impl Collector { server_state.state = ServerState::Tested; server_state.application_name = String::from("Undefined"); } - None => warn!("Stats on unregistered Server!"), + None => warn!("Got event {:?} for unregistered server", stat.name), }; } @@ -840,7 +844,7 @@ impl Collector { server_state.state = ServerState::Idle; server_state.application_name = String::from("Undefined"); } - None => warn!("Stats on unregistered Server!"), + None => warn!("Got event {:?} for unregistered server", stat.name), }; } @@ -854,12 +858,13 @@ impl Collector { None => String::from("Undefined"), }; + // Update server stats match server_states.get_mut(&server_id) { Some(server_state) => { server_state.state = ServerState::Active; server_state.application_name = app_name; } - None => warn!("Stats on unregistered Server!"), + None => warn!("Got event {:?} for unregistered server", stat.name), }; } From 08af06d6402f31fb105ea01bf656667c7eff7099 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 11 Sep 2022 19:03:09 -0500 Subject: [PATCH 25/25] CI