diff --git a/src/admin.rs b/src/admin.rs index d794b86a..42af315e 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -228,7 +228,13 @@ where pool_config.pool_mode.to_string(), ]; for column in &columns[3..columns.len()] { - let value = pool_stats.get(column.0).unwrap_or(&0).to_string(); + let value = match column.0 { + "maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(), + "maxwait_us" => { + (pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string() + } + _other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(), + }; row.push(value); } res.put(data_row(&row)); diff --git a/src/pool.rs b/src/pool.rs index bb452537..edc549d8 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -333,7 +333,6 @@ impl ConnectionPool { role: Option, // primary or replica process_id: i32, // client id ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { - let now = Instant::now(); let mut candidates: Vec<&Address> = self.addresses[shard] .iter() .filter(|address| address.role == role) @@ -358,6 +357,7 @@ impl ConnectionPool { } // Indicate we're waiting on a server connection from a pool. + let now = Instant::now(); self.stats.client_waiting(process_id); // Check if we can connect @@ -397,7 +397,7 @@ impl ConnectionPool { match tokio::time::timeout( tokio::time::Duration::from_millis(healthcheck_timeout), - server.query(";"), // Cheap query (query parser not used in PG) + server.query(";"), // Cheap query as it skips the query planner ) .await { diff --git a/src/stats.rs b/src/stats.rs index 7998e454..f6f812aa 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -580,15 +580,15 @@ impl Collector { server_info.query_count += stat.value as u64; server_info.application_name = app_name; - let pool_stats = address_stat_lookup + let address_stats = address_stat_lookup .entry(server_info.address_id) .or_insert(HashMap::default()); - let counter = pool_stats + let counter = address_stats .entry("total_query_count".to_string()) .or_insert(0); *counter += stat.value; - let duration = pool_stats + let duration = address_stats .entry("total_query_time".to_string()) .or_insert(0); *duration += duration_ms as i64; @@ -681,26 +681,26 @@ impl Collector { Some(server_info) => { server_info.application_name = app_name; - let pool_stats = address_stat_lookup + let address_stats = address_stat_lookup .entry(server_info.address_id) .or_insert(HashMap::default()); - let counter = - pool_stats.entry("total_wait_time".to_string()).or_insert(0); + let counter = address_stats + .entry("total_wait_time".to_string()) + .or_insert(0); *counter += stat.value; - let counter = pool_stats.entry("maxwait_us".to_string()).or_insert(0); - let mic_part = stat.value % 1_000_000; - - // Report max time here - if mic_part > *counter { - *counter = mic_part; - } - - let counter = pool_stats.entry("maxwait".to_string()).or_insert(0); - let seconds = *counter / 1_000_000; + let pool_stats = pool_stat_lookup + .entry(( + server_info.pool_name.clone(), + server_info.username.clone(), + )) + .or_insert(HashMap::default()); - if seconds > *counter { - *counter = seconds; + // We record max wait in microseconds, we do the pgbouncer second/microsecond split on admin + let old_microseconds = + pool_stats.entry("maxwait_us".to_string()).or_insert(0); + if stat.value > *old_microseconds { + *old_microseconds = stat.value; } } None => (), @@ -903,8 +903,6 @@ impl Collector { "sv_active", "sv_tested", "sv_login", - "maxwait", - "maxwait_us", ] { pool_stats.insert(stat.to_string(), 0); } @@ -962,6 +960,12 @@ impl Collector { LATEST_CLIENT_STATS.store(Arc::new(client_states.clone())); LATEST_SERVER_STATS.store(Arc::new(server_states.clone())); LATEST_POOL_STATS.store(Arc::new(pool_stat_lookup.clone())); + + // Clear maxwait after reporting + pool_stat_lookup + .entry((pool_name.clone(), username.clone())) + .or_insert(HashMap::default()) + .insert("maxwait_us".to_string(), 0); } EventName::UpdateAverages { address_id } => { diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index 3bc12641..40e7e1c2 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -208,6 +208,28 @@ threads.map(&:join) connections.map(&:close) end + + it "show correct max_wait" do + threads = [] + connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") } + end + + sleep(2.5) # Allow time for stats to update + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + + expect(results["maxwait"]).to eq("1") + expect(results["maxwait_us"].to_i).to be_within(100_000).of(500_000) + + sleep(4.5) # Allow time for stats to update + results = admin_conn.async_exec("SHOW POOLS")[0] + expect(results["maxwait"]).to eq("0") + + threads.map(&:join) + connections.map(&:close) + end end end