From 2864b11640deeceaf59e89928280b0426ee1a1db Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Wed, 17 Jan 2024 10:49:36 -0600 Subject: [PATCH 1/4] Report waiting time only for currently witing clients --- src/pool.rs | 12 ++++-------- src/stats/client.rs | 31 ++++++++++++++++++++++++++----- src/stats/pool.rs | 9 +++++++-- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index 00f4dc2a..c03aaf4c 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -769,7 +769,6 @@ impl ConnectionPool { ); self.ban(address, BanReason::FailedCheckout, Some(client_stats)); address.stats.error(); - client_stats.idle(); client_stats.checkout_error(); continue; } @@ -788,7 +787,7 @@ impl ConnectionPool { // Health checks are pretty expensive. if !require_healthcheck { let checkout_time = now.elapsed().as_micros() as u64; - client_stats.checkout_time(checkout_time); + client_stats.checkout_success(); server .stats() .checkout_time(checkout_time, client_stats.application_name()); @@ -802,7 +801,7 @@ impl ConnectionPool { .await { let checkout_time = now.elapsed().as_micros() as u64; - client_stats.checkout_time(checkout_time); + client_stats.checkout_success(); server .stats() .checkout_time(checkout_time, client_stats.application_name()); @@ -814,10 +813,7 @@ impl ConnectionPool { } } - client_stats.idle(); - - let checkout_time = now.elapsed().as_micros() as u64; - client_stats.checkout_time(checkout_time); + client_stats.checkout_success(); Err(Error::AllServersDown) } @@ -843,7 +839,7 @@ impl ConnectionPool { Ok(res) => match res { Ok(_) => { let checkout_time: u64 = start.elapsed().as_micros() as u64; - client_info.checkout_time(checkout_time); + client_info.checkout_success(); server .stats() .checkout_time(checkout_time, client_info.application_name()); diff --git a/src/stats/client.rs b/src/stats/client.rs index 6a30ec15..15c64d3a 100644 --- a/src/stats/client.rs +++ b/src/stats/client.rs @@ -41,6 +41,11 @@ pub struct ClientStats { /// Maximum time spent waiting for a connection from pool, measures in microseconds pub max_wait_time: Arc, + // Time when the client started waiting for a connection from pool, measures in microseconds + // We use connect_time as the reference point for this value + // U64 can represent ~60 centuries in microseconds, so we should be fine + pub wait_start_us: Arc, + /// Current state of the client pub state: Arc, @@ -64,6 +69,7 @@ impl Default for ClientStats { pool_name: String::new(), total_wait_time: Arc::new(AtomicU64::new(0)), max_wait_time: Arc::new(AtomicU64::new(0)), + wait_start_us: Arc::new(AtomicU64::new(0)), state: Arc::new(AtomicClientState::new(ClientState::Idle)), transaction_count: Arc::new(AtomicU64::new(0)), query_count: Arc::new(AtomicU64::new(0)), @@ -111,6 +117,9 @@ impl ClientStats { /// Reports a client is waiting for a connection pub fn waiting(&self) { + let wait_start = self.connect_time.elapsed().as_micros() as u64; + + self.wait_start_us.store(wait_start, Ordering::Relaxed); self.state.store(ClientState::Waiting, Ordering::Relaxed); } @@ -122,6 +131,13 @@ impl ClientStats { /// Reports a client has failed to obtain a connection from a connection pool pub fn checkout_error(&self) { self.state.store(ClientState::Idle, Ordering::Relaxed); + self.update_wait_times(); + } + + /// Reports a client has succeeded in obtaining a connection from a connection pool + pub fn checkout_success(&self) { + self.state.store(ClientState::Active, Ordering::Relaxed); + self.update_wait_times(); } /// Reports a client has had the server assigned to it be banned @@ -130,12 +146,17 @@ impl ClientStats { self.error_count.fetch_add(1, Ordering::Relaxed); } - /// Reporters the time spent by a client waiting to get a healthy connection from the pool - pub fn checkout_time(&self, microseconds: u64) { + fn update_wait_times(&self) { + if self.wait_start_us.load(Ordering::Relaxed) == 0 { + return; + } + let wait_total = self.connect_time.elapsed().as_micros() as u64 + - self.wait_start_us.load(Ordering::Relaxed); + self.total_wait_time - .fetch_add(microseconds, Ordering::Relaxed); - self.max_wait_time - .fetch_max(microseconds, Ordering::Relaxed); + .fetch_add(wait_total, Ordering::Relaxed); + self.max_wait_time.fetch_max(wait_total, Ordering::Relaxed); + self.wait_start_us.fetch_max(0, Ordering::Relaxed); } /// Report a query executed by a client against a server diff --git a/src/stats/pool.rs b/src/stats/pool.rs index 46c74632..85254c48 100644 --- a/src/stats/pool.rs +++ b/src/stats/pool.rs @@ -64,8 +64,13 @@ impl PoolStats { ClientState::Idle => pool_stats.cl_idle += 1, ClientState::Waiting => pool_stats.cl_waiting += 1, } - let max_wait = client.max_wait_time.load(Ordering::Relaxed); - pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait); + let wait_start_us = client.wait_start_us.load(Ordering::Relaxed); + if wait_start_us > 0 { + let current_wait_time_us = + wait_start_us - client.connect_time().elapsed().as_micros() as u64; + pool_stats.maxwait = + std::cmp::max(pool_stats.maxwait, current_wait_time_us); + } } None => debug!("Client from an obselete pool"), } From f37a208cf84dd8feea6fd36f94990208311c33cd Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Wed, 17 Jan 2024 19:25:42 -0600 Subject: [PATCH 2/4] reset --- src/stats/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stats/client.rs b/src/stats/client.rs index 15c64d3a..9404a540 100644 --- a/src/stats/client.rs +++ b/src/stats/client.rs @@ -156,7 +156,7 @@ impl ClientStats { self.total_wait_time .fetch_add(wait_total, Ordering::Relaxed); self.max_wait_time.fetch_max(wait_total, Ordering::Relaxed); - self.wait_start_us.fetch_max(0, Ordering::Relaxed); + self.wait_start_us.store(0, Ordering::Relaxed); } /// Report a query executed by a client against a server From 2434b4974949ed610f23f2902972ad86c1a1c6c9 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Wed, 17 Jan 2024 20:28:18 -0600 Subject: [PATCH 3/4] fix tests --- src/stats/client.rs | 17 +++++++++++++---- src/stats/pool.rs | 6 ++---- tests/ruby/stats_spec.rb | 21 ++++++++++++--------- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/stats/client.rs b/src/stats/client.rs index 9404a540..356f08bd 100644 --- a/src/stats/client.rs +++ b/src/stats/client.rs @@ -150,15 +150,24 @@ impl ClientStats { if self.wait_start_us.load(Ordering::Relaxed) == 0 { return; } - let wait_total = self.connect_time.elapsed().as_micros() as u64 - - self.wait_start_us.load(Ordering::Relaxed); + let wait_time_us = self.get_current_wait_time_us(); self.total_wait_time - .fetch_add(wait_total, Ordering::Relaxed); - self.max_wait_time.fetch_max(wait_total, Ordering::Relaxed); + .fetch_add(wait_time_us, Ordering::Relaxed); + self.max_wait_time + .fetch_max(wait_time_us, Ordering::Relaxed); self.wait_start_us.store(0, Ordering::Relaxed); } + pub fn get_current_wait_time_us(&self) -> u64 { + let wait_start_us = self.wait_start_us.load(Ordering::Relaxed); + let microseconds_since_connection_epoch = self.connect_time.elapsed().as_micros() as u64; + if wait_start_us == 0 || microseconds_since_connection_epoch < wait_start_us { + return 0; + } + microseconds_since_connection_epoch - wait_start_us + } + /// Report a query executed by a client against a server pub fn query(&self) { self.query_count.fetch_add(1, Ordering::Relaxed); diff --git a/src/stats/pool.rs b/src/stats/pool.rs index 85254c48..b5c6ff5b 100644 --- a/src/stats/pool.rs +++ b/src/stats/pool.rs @@ -66,10 +66,8 @@ impl PoolStats { } let wait_start_us = client.wait_start_us.load(Ordering::Relaxed); if wait_start_us > 0 { - let current_wait_time_us = - wait_start_us - client.connect_time().elapsed().as_micros() as u64; - pool_stats.maxwait = - std::cmp::max(pool_stats.maxwait, current_wait_time_us); + let wait_time_us = client.get_current_wait_time_us(); + pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time_us); } } None => debug!("Client from an obselete pool"), diff --git a/tests/ruby/stats_spec.rb b/tests/ruby/stats_spec.rb index ddf63cd3..8a683a01 100644 --- a/tests/ruby/stats_spec.rb +++ b/tests/ruby/stats_spec.rb @@ -233,17 +233,19 @@ sleep(1.1) # Allow time for stats to update admin_conn = PG::connect(processes.pgcat.admin_connection_string) results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| + + %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login].each do |s| raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" end + expect(results["maxwait"]).to eq("1") expect(results["cl_waiting"]).to eq("2") expect(results["cl_active"]).to eq("2") expect(results["sv_active"]).to eq("2") sleep(2.5) # Allow time for stats to update results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s| + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" end expect(results["cl_idle"]).to eq("4") @@ -255,22 +257,23 @@ it "show correct max_wait" do threads = [] + admin_conn = PG::connect(processes.pgcat.admin_connection_string) connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } connections.each do |c| threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil } end - - sleep(2.5) # Allow time for stats to update - admin_conn = PG::connect(processes.pgcat.admin_connection_string) + sleep(1.1) results = admin_conn.async_exec("SHOW POOLS")[0] - + # Value is only reported when there are clients waiting expect(results["maxwait"]).to eq("1") - expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000) - connections.map(&:close) + expect(results["maxwait_us"].to_i).to be_within(20_000).of(100_000) - sleep(4.5) # Allow time for stats to update + sleep(2.5) # Allow time for stats to update results = admin_conn.async_exec("SHOW POOLS")[0] + # no clients are waiting so value is 0 expect(results["maxwait"]).to eq("0") + expect(results["maxwait_us"]).to eq("0") + connections.map(&:close) threads.map(&:join) end From 77b11e236c32c9e088b13aa11da8f69d28ba60d6 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Wed, 17 Jan 2024 20:56:12 -0600 Subject: [PATCH 4/4] fix comment --- src/stats/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stats/client.rs b/src/stats/client.rs index 356f08bd..bd59a2f5 100644 --- a/src/stats/client.rs +++ b/src/stats/client.rs @@ -43,7 +43,7 @@ pub struct ClientStats { // Time when the client started waiting for a connection from pool, measures in microseconds // We use connect_time as the reference point for this value - // U64 can represent ~60 centuries in microseconds, so we should be fine + // U64 can represent ~5850 centuries in microseconds, so we should be fine pub wait_start_us: Arc, /// Current state of the client