diff --git a/src/pool.rs b/src/pool.rs index 00f4dc2a..c03aaf4c 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -769,7 +769,6 @@ impl ConnectionPool { ); self.ban(address, BanReason::FailedCheckout, Some(client_stats)); address.stats.error(); - client_stats.idle(); client_stats.checkout_error(); continue; } @@ -788,7 +787,7 @@ impl ConnectionPool { // Health checks are pretty expensive. if !require_healthcheck { let checkout_time = now.elapsed().as_micros() as u64; - client_stats.checkout_time(checkout_time); + client_stats.checkout_success(); server .stats() .checkout_time(checkout_time, client_stats.application_name()); @@ -802,7 +801,7 @@ impl ConnectionPool { .await { let checkout_time = now.elapsed().as_micros() as u64; - client_stats.checkout_time(checkout_time); + client_stats.checkout_success(); server .stats() .checkout_time(checkout_time, client_stats.application_name()); @@ -814,10 +813,7 @@ impl ConnectionPool { } } - client_stats.idle(); - - let checkout_time = now.elapsed().as_micros() as u64; - client_stats.checkout_time(checkout_time); + client_stats.checkout_success(); Err(Error::AllServersDown) } @@ -843,7 +839,7 @@ impl ConnectionPool { Ok(res) => match res { Ok(_) => { let checkout_time: u64 = start.elapsed().as_micros() as u64; - client_info.checkout_time(checkout_time); + client_info.checkout_success(); server .stats() .checkout_time(checkout_time, client_info.application_name()); diff --git a/src/stats/client.rs b/src/stats/client.rs index 6a30ec15..bd59a2f5 100644 --- a/src/stats/client.rs +++ b/src/stats/client.rs @@ -41,6 +41,11 @@ pub struct ClientStats { /// Maximum time spent waiting for a connection from pool, measures in microseconds pub max_wait_time: Arc, + // Time when the client started waiting for a connection from pool, measures in microseconds + // We use connect_time as the reference point for this value + // U64 can represent ~5850 centuries in microseconds, so we should be fine + pub wait_start_us: Arc, + /// Current state of the client pub state: Arc, @@ -64,6 +69,7 @@ impl Default for ClientStats { pool_name: String::new(), total_wait_time: Arc::new(AtomicU64::new(0)), max_wait_time: Arc::new(AtomicU64::new(0)), + wait_start_us: Arc::new(AtomicU64::new(0)), state: Arc::new(AtomicClientState::new(ClientState::Idle)), transaction_count: Arc::new(AtomicU64::new(0)), query_count: Arc::new(AtomicU64::new(0)), @@ -111,6 +117,9 @@ impl ClientStats { /// Reports a client is waiting for a connection pub fn waiting(&self) { + let wait_start = self.connect_time.elapsed().as_micros() as u64; + + self.wait_start_us.store(wait_start, Ordering::Relaxed); self.state.store(ClientState::Waiting, Ordering::Relaxed); } @@ -122,6 +131,13 @@ impl ClientStats { /// Reports a client has failed to obtain a connection from a connection pool pub fn checkout_error(&self) { self.state.store(ClientState::Idle, Ordering::Relaxed); + self.update_wait_times(); + } + + /// Reports a client has succeeded in obtaining a connection from a connection pool + pub fn checkout_success(&self) { + self.state.store(ClientState::Active, Ordering::Relaxed); + self.update_wait_times(); } /// Reports a client has had the server assigned to it be banned @@ -130,12 +146,26 @@ impl ClientStats { self.error_count.fetch_add(1, Ordering::Relaxed); } - /// Reporters the time spent by a client waiting to get a healthy connection from the pool - pub fn checkout_time(&self, microseconds: u64) { + fn update_wait_times(&self) { + if self.wait_start_us.load(Ordering::Relaxed) == 0 { + return; + } + + let wait_time_us = self.get_current_wait_time_us(); self.total_wait_time - .fetch_add(microseconds, Ordering::Relaxed); + .fetch_add(wait_time_us, Ordering::Relaxed); self.max_wait_time - .fetch_max(microseconds, Ordering::Relaxed); + .fetch_max(wait_time_us, Ordering::Relaxed); + self.wait_start_us.store(0, Ordering::Relaxed); + } + + pub fn get_current_wait_time_us(&self) -> u64 { + let wait_start_us = self.wait_start_us.load(Ordering::Relaxed); + let microseconds_since_connection_epoch = self.connect_time.elapsed().as_micros() as u64; + if wait_start_us == 0 || microseconds_since_connection_epoch < wait_start_us { + return 0; + } + microseconds_since_connection_epoch - wait_start_us } /// Report a query executed by a client against a server diff --git a/src/stats/pool.rs b/src/stats/pool.rs index 46c74632..b5c6ff5b 100644 --- a/src/stats/pool.rs +++ b/src/stats/pool.rs @@ -64,8 +64,11 @@ impl PoolStats { ClientState::Idle => pool_stats.cl_idle += 1, ClientState::Waiting => pool_stats.cl_waiting += 1, } - let max_wait = client.max_wait_time.load(Ordering::Relaxed); - pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, max_wait); + let wait_start_us = client.wait_start_us.load(Ordering::Relaxed); + if wait_start_us > 0 { + let wait_time_us = client.get_current_wait_time_us(); + pool_stats.maxwait = std::cmp::max(pool_stats.maxwait, wait_time_us); + } } None => debug!("Client from an obselete pool"), } diff --git a/tests/ruby/stats_spec.rb b/tests/ruby/stats_spec.rb index ddf63cd3..8a683a01 100644 --- a/tests/ruby/stats_spec.rb +++ b/tests/ruby/stats_spec.rb @@ -233,17 +233,19 @@ sleep(1.1) # Allow time for stats to update admin_conn = PG::connect(processes.pgcat.admin_connection_string) results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s| + + %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login].each do |s| raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" end + expect(results["maxwait"]).to eq("1") expect(results["cl_waiting"]).to eq("2") expect(results["cl_active"]).to eq("2") expect(results["sv_active"]).to eq("2") sleep(2.5) # Allow time for stats to update results = admin_conn.async_exec("SHOW POOLS")[0] - %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s| + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" end expect(results["cl_idle"]).to eq("4") @@ -255,22 +257,23 @@ it "show correct max_wait" do threads = [] + admin_conn = PG::connect(processes.pgcat.admin_connection_string) connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } connections.each do |c| threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") rescue nil } end - - sleep(2.5) # Allow time for stats to update - admin_conn = PG::connect(processes.pgcat.admin_connection_string) + sleep(1.1) results = admin_conn.async_exec("SHOW POOLS")[0] - + # Value is only reported when there are clients waiting expect(results["maxwait"]).to eq("1") - expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000) - connections.map(&:close) + expect(results["maxwait_us"].to_i).to be_within(20_000).of(100_000) - sleep(4.5) # Allow time for stats to update + sleep(2.5) # Allow time for stats to update results = admin_conn.async_exec("SHOW POOLS")[0] + # no clients are waiting so value is 0 expect(results["maxwait"]).to eq("0") + expect(results["maxwait_us"]).to eq("0") + connections.map(&:close) threads.map(&:join) end