Skip to content

Fix maxwait metric #183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,13 @@ where
pool_config.pool_mode.to_string(),
];
for column in &columns[3..columns.len()] {
let value = pool_stats.get(column.0).unwrap_or(&0).to_string();
let value = match column.0 {
"maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also show these in Prometheus metrics (prometheus.rs), it would be good to update this logic there too.

"maxwait_us" => {
(pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string()
}
_other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(),
};
row.push(value);
}
res.put(data_row(&row));
Expand Down
4 changes: 2 additions & 2 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ impl ConnectionPool {
role: Option<Role>, // primary or replica
process_id: i32, // client id
) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> {
let now = Instant::now();
let mut candidates: Vec<&Address> = self.addresses[shard]
.iter()
.filter(|address| address.role == role)
Expand All @@ -358,6 +357,7 @@ impl ConnectionPool {
}

// Indicate we're waiting on a server connection from a pool.
let now = Instant::now();
self.stats.client_waiting(process_id);

// Check if we can connect
Expand Down Expand Up @@ -397,7 +397,7 @@ impl ConnectionPool {

match tokio::time::timeout(
tokio::time::Duration::from_millis(healthcheck_timeout),
server.query(";"), // Cheap query (query parser not used in PG)
server.query(";"), // Cheap query as it skips the query planner
)
.await
{
Expand Down
44 changes: 24 additions & 20 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,15 +580,15 @@ impl Collector {
server_info.query_count += stat.value as u64;
server_info.application_name = app_name;

let pool_stats = address_stat_lookup
let address_stats = address_stat_lookup
.entry(server_info.address_id)
.or_insert(HashMap::default());
let counter = pool_stats
let counter = address_stats
.entry("total_query_count".to_string())
.or_insert(0);
*counter += stat.value;

let duration = pool_stats
let duration = address_stats
.entry("total_query_time".to_string())
.or_insert(0);
*duration += duration_ms as i64;
Expand Down Expand Up @@ -681,26 +681,26 @@ impl Collector {
Some(server_info) => {
server_info.application_name = app_name;

let pool_stats = address_stat_lookup
let address_stats = address_stat_lookup
.entry(server_info.address_id)
.or_insert(HashMap::default());
let counter =
pool_stats.entry("total_wait_time".to_string()).or_insert(0);
let counter = address_stats
.entry("total_wait_time".to_string())
.or_insert(0);
*counter += stat.value;

let counter = pool_stats.entry("maxwait_us".to_string()).or_insert(0);
let mic_part = stat.value % 1_000_000;

// Report max time here
if mic_part > *counter {
*counter = mic_part;
}

let counter = pool_stats.entry("maxwait".to_string()).or_insert(0);
let seconds = *counter / 1_000_000;
let pool_stats = pool_stat_lookup
.entry((
server_info.pool_name.clone(),
server_info.username.clone(),
))
.or_insert(HashMap::default());

if seconds > *counter {
*counter = seconds;
// We record max wait in microseconds, we do the pgbouncer second/microsecond split on admin
let old_microseconds =
pool_stats.entry("maxwait_us".to_string()).or_insert(0);
if stat.value > *old_microseconds {
*old_microseconds = stat.value;
}
}
None => (),
Expand Down Expand Up @@ -903,8 +903,6 @@ impl Collector {
"sv_active",
"sv_tested",
"sv_login",
"maxwait",
"maxwait_us",
] {
pool_stats.insert(stat.to_string(), 0);
}
Expand Down Expand Up @@ -962,6 +960,12 @@ impl Collector {
LATEST_CLIENT_STATS.store(Arc::new(client_states.clone()));
LATEST_SERVER_STATS.store(Arc::new(server_states.clone()));
LATEST_POOL_STATS.store(Arc::new(pool_stat_lookup.clone()));

// Clear maxwait after reporting
pool_stat_lookup
.entry((pool_name.clone(), username.clone()))
.or_insert(HashMap::default())
.insert("maxwait_us".to_string(), 0);
}

EventName::UpdateAverages { address_id } => {
Expand Down
22 changes: 22 additions & 0 deletions tests/ruby/admin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,28 @@
threads.map(&:join)
connections.map(&:close)
end

it "show correct max_wait" do
threads = []
connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
connections.each do |c|
threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
end

sleep(2.5) # Allow time for stats to update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should introduce a command to refresh stats immediately. I can see our CI getting slower and slower over time as we add features :)

admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW POOLS")[0]

expect(results["maxwait"]).to eq("1")
expect(results["maxwait_us"].to_i).to be_within(100_000).of(500_000)

sleep(4.5) # Allow time for stats to update
results = admin_conn.async_exec("SHOW POOLS")[0]
expect(results["maxwait"]).to eq("0")

threads.map(&:join)
connections.map(&:close)
end
end
end

Expand Down