From 7973dd153fefdf8342fbdccf704135d750f583df Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Fri, 23 Sep 2022 22:59:33 -0500 Subject: [PATCH 1/8] Set client state to idle after error --- src/stats.rs | 10 ++++++++-- tests/ruby/admin_spec.rb | 27 +++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index 90b55ddd..dc677c22 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -740,7 +740,10 @@ impl Collector { address_id, } => { match client_states.get_mut(&client_id) { - Some(client_info) => client_info.error_count += stat.value as u64, + Some(client_info) => { + client_info.state = ClientState::Idle; + client_info.error_count += stat.value as u64; + }, None => warn!("Got event {:?} for unregistered client", stat.name), } @@ -757,7 +760,10 @@ impl Collector { address_id, } => { match client_states.get_mut(&client_id) { - Some(client_info) => client_info.error_count += stat.value as u64, + Some(client_info) => { + client_info.state = ClientState::Idle; + client_info.error_count += stat.value as u64; + }, None => warn!("Got event {:?} for unregistered client", stat.name), } diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index a348146e..da262913 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -146,6 +146,33 @@ end end + context "client fail to checkout connection from the pool" do + it "produces counts clients as idle" do + new_configs = processes.pgcat.current_config + new_configs["general"]["connect_timeout"] = 500 + new_configs["general"]["ban_time"] = 1 + new_configs["general"]["shutdown_timeout"] = 1 + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + + + connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError } + end + + sleep(2) + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s| + raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0" + end + expect(results["cl_idle"]).to eq("5") + expect(results["sv_idle"]).to eq("1") + end + end + context "clients overwhelm server pools" do let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) } From 39000e0fb75308be0e3574634cb81a2c9810f4d4 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Fri, 23 Sep 2022 23:01:43 -0500 Subject: [PATCH 2/8] fmt --- src/stats.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index dc677c22..e37c88c7 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -743,7 +743,7 @@ impl Collector { Some(client_info) => { client_info.state = ClientState::Idle; client_info.error_count += stat.value as u64; - }, + } None => warn!("Got event {:?} for unregistered client", stat.name), } @@ -763,7 +763,7 @@ impl Collector { Some(client_info) => { client_info.state = ClientState::Idle; client_info.error_count += stat.value as u64; - }, + } None => warn!("Got event {:?} for unregistered client", stat.name), } From b2d957b2dd72a5cfba2c6cfff829d09a5e35eba9 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Fri, 23 Sep 2022 23:02:34 -0500 Subject: [PATCH 3/8] spelling --- tests/ruby/admin_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index da262913..0362697e 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -147,7 +147,7 @@ end context "client fail to checkout connection from the pool" do - it "produces counts clients as idle" do + it "counts clients as idle" do new_configs = processes.pgcat.current_config new_configs["general"]["connect_timeout"] = 500 new_configs["general"]["ban_time"] = 1 From d9729ed7ab7ebfe23927754d39dd73ce31c9020f Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Fri, 23 Sep 2022 23:07:55 -0500 Subject: [PATCH 4/8] clean up --- tests/ruby/admin_spec.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index 0362697e..3bc12641 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -156,10 +156,10 @@ processes.pgcat.update_config(new_configs) processes.pgcat.reload_config - + threads = [] connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } connections.each do |c| - Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError } + threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError } end sleep(2) @@ -170,6 +170,9 @@ end expect(results["cl_idle"]).to eq("5") expect(results["sv_idle"]).to eq("1") + + threads.map(&:join) + connections.map(&:close) end end From 11ce83891a74ba6ba9021cd0d098e08c096ec96e Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 2 Oct 2022 08:30:30 -0500 Subject: [PATCH 5/8] Fix maxwait metric --- src/admin.rs | 6 +++++- src/pool.rs | 9 +++++---- src/stats.rs | 41 ++++++++++++++++++++-------------------- tests/ruby/admin_spec.rb | 22 +++++++++++++++++++++ 4 files changed, 53 insertions(+), 25 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index d794b86a..0c96654f 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -228,7 +228,11 @@ where pool_config.pool_mode.to_string(), ]; for column in &columns[3..columns.len()] { - let value = pool_stats.get(column.0).unwrap_or(&0).to_string(); + let value = match column.0 { + "maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(), + "maxwait_us" => (pool_stats.get(column.0).unwrap_or(&0) % 1_000_000).to_string(), + _other_values => pool_stats.get(column.0).unwrap_or(&0).to_string() + }; row.push(value); } res.put(data_row(&row)); diff --git a/src/pool.rs b/src/pool.rs index bb452537..41f88520 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -334,6 +334,10 @@ impl ConnectionPool { process_id: i32, // client id ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { let now = Instant::now(); + + // Indicate we're waiting on a server connection from a pool. + self.stats.client_waiting(process_id); + let mut candidates: Vec<&Address> = self.addresses[shard] .iter() .filter(|address| address.role == role) @@ -357,9 +361,6 @@ impl ConnectionPool { continue; } - // Indicate we're waiting on a server connection from a pool. - self.stats.client_waiting(process_id); - // Check if we can connect let mut conn = match self.databases[address.shard][address.address_index] .get() @@ -397,7 +398,7 @@ impl ConnectionPool { match tokio::time::timeout( tokio::time::Duration::from_millis(healthcheck_timeout), - server.query(";"), // Cheap query (query parser not used in PG) + server.query(";"), // Cheap query as it skips the query planner ) .await { diff --git a/src/stats.rs b/src/stats.rs index 7998e454..58561829 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -316,7 +316,7 @@ impl Reporter { /// Reportes the time spent by a client waiting to get a healthy connection from the pool pub fn checkout_time(&self, microseconds: u128, client_id: i32, server_id: i32) { - let event = Event { + let event = Event { name: EventName::CheckoutTime { client_id, server_id, @@ -580,15 +580,15 @@ impl Collector { server_info.query_count += stat.value as u64; server_info.application_name = app_name; - let pool_stats = address_stat_lookup + let address_stats = address_stat_lookup .entry(server_info.address_id) .or_insert(HashMap::default()); - let counter = pool_stats + let counter = address_stats .entry("total_query_count".to_string()) .or_insert(0); *counter += stat.value; - let duration = pool_stats + let duration = address_stats .entry("total_query_time".to_string()) .or_insert(0); *duration += duration_ms as i64; @@ -681,26 +681,21 @@ impl Collector { Some(server_info) => { server_info.application_name = app_name; - let pool_stats = address_stat_lookup + let address_stats = address_stat_lookup .entry(server_info.address_id) .or_insert(HashMap::default()); let counter = - pool_stats.entry("total_wait_time".to_string()).or_insert(0); + address_stats.entry("total_wait_time".to_string()).or_insert(0); *counter += stat.value; - let counter = pool_stats.entry("maxwait_us".to_string()).or_insert(0); - let mic_part = stat.value % 1_000_000; + let pool_stats = pool_stat_lookup + .entry((server_info.pool_name.clone(), server_info.username.clone())) + .or_insert(HashMap::default()); - // Report max time here - if mic_part > *counter { - *counter = mic_part; - } - - let counter = pool_stats.entry("maxwait".to_string()).or_insert(0); - let seconds = *counter / 1_000_000; - - if seconds > *counter { - *counter = seconds; + // We record max wait in microseconds, we do the pgbouncer second/microsecond split on admin + let old_microseconds = pool_stats.entry("maxwait_us".to_string()).or_insert(0); + if stat.value > *old_microseconds { + *old_microseconds = stat.value; } } None => (), @@ -903,8 +898,6 @@ impl Collector { "sv_active", "sv_tested", "sv_login", - "maxwait", - "maxwait_us", ] { pool_stats.insert(stat.to_string(), 0); } @@ -957,11 +950,19 @@ impl Collector { }; } + // The following calls publish the internal stats making it visible // to clients using admin database to issue queries like `SHOW STATS` LATEST_CLIENT_STATS.store(Arc::new(client_states.clone())); LATEST_SERVER_STATS.store(Arc::new(server_states.clone())); LATEST_POOL_STATS.store(Arc::new(pool_stat_lookup.clone())); + + // Clear maxwait after reporting + pool_stat_lookup + .entry((pool_name.clone(), username.clone())) + .or_insert(HashMap::default()) + .insert("maxwait_us".to_string(), 0); + } EventName::UpdateAverages { address_id } => { diff --git a/tests/ruby/admin_spec.rb b/tests/ruby/admin_spec.rb index 3bc12641..40e7e1c2 100644 --- a/tests/ruby/admin_spec.rb +++ b/tests/ruby/admin_spec.rb @@ -208,6 +208,28 @@ threads.map(&:join) connections.map(&:close) end + + it "show correct max_wait" do + threads = [] + connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") } + connections.each do |c| + threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") } + end + + sleep(2.5) # Allow time for stats to update + admin_conn = PG::connect(processes.pgcat.admin_connection_string) + results = admin_conn.async_exec("SHOW POOLS")[0] + + expect(results["maxwait"]).to eq("1") + expect(results["maxwait_us"].to_i).to be_within(100_000).of(500_000) + + sleep(4.5) # Allow time for stats to update + results = admin_conn.async_exec("SHOW POOLS")[0] + expect(results["maxwait"]).to eq("0") + + threads.map(&:join) + connections.map(&:close) + end end end From f91f6a3b153f1d83626650354d854fb05002ede0 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 2 Oct 2022 08:43:16 -0500 Subject: [PATCH 6/8] fmt --- src/admin.rs | 2 +- src/stats.rs | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index 0c96654f..b89bb66e 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -231,7 +231,7 @@ where let value = match column.0 { "maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(), "maxwait_us" => (pool_stats.get(column.0).unwrap_or(&0) % 1_000_000).to_string(), - _other_values => pool_stats.get(column.0).unwrap_or(&0).to_string() + _other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(), }; row.push(value); } diff --git a/src/stats.rs b/src/stats.rs index 58561829..f6f812aa 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -316,7 +316,7 @@ impl Reporter { /// Reportes the time spent by a client waiting to get a healthy connection from the pool pub fn checkout_time(&self, microseconds: u128, client_id: i32, server_id: i32) { - let event = Event { + let event = Event { name: EventName::CheckoutTime { client_id, server_id, @@ -684,16 +684,21 @@ impl Collector { let address_stats = address_stat_lookup .entry(server_info.address_id) .or_insert(HashMap::default()); - let counter = - address_stats.entry("total_wait_time".to_string()).or_insert(0); + let counter = address_stats + .entry("total_wait_time".to_string()) + .or_insert(0); *counter += stat.value; let pool_stats = pool_stat_lookup - .entry((server_info.pool_name.clone(), server_info.username.clone())) - .or_insert(HashMap::default()); + .entry(( + server_info.pool_name.clone(), + server_info.username.clone(), + )) + .or_insert(HashMap::default()); // We record max wait in microseconds, we do the pgbouncer second/microsecond split on admin - let old_microseconds = pool_stats.entry("maxwait_us".to_string()).or_insert(0); + let old_microseconds = + pool_stats.entry("maxwait_us".to_string()).or_insert(0); if stat.value > *old_microseconds { *old_microseconds = stat.value; } @@ -950,7 +955,6 @@ impl Collector { }; } - // The following calls publish the internal stats making it visible // to clients using admin database to issue queries like `SHOW STATS` LATEST_CLIENT_STATS.store(Arc::new(client_states.clone())); @@ -962,7 +966,6 @@ impl Collector { .entry((pool_name.clone(), username.clone())) .or_insert(HashMap::default()) .insert("maxwait_us".to_string(), 0); - } EventName::UpdateAverages { address_id } => { From 4286d5335ad55948291d97709dafdc2f4d74264a Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 2 Oct 2022 08:52:11 -0500 Subject: [PATCH 7/8] minor change --- src/admin.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/admin.rs b/src/admin.rs index b89bb66e..eb3d3337 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -230,7 +230,7 @@ where for column in &columns[3..columns.len()] { let value = match column.0 { "maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(), - "maxwait_us" => (pool_stats.get(column.0).unwrap_or(&0) % 1_000_000).to_string(), + "maxwait_us" => (pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string(), _other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(), }; row.push(value); From 84b5bc989627a109451f0c7c1cd1e7728c096026 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Sun, 2 Oct 2022 08:54:15 -0500 Subject: [PATCH 8/8] move client waiting before pool checkout --- src/admin.rs | 4 +++- src/pool.rs | 9 ++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/admin.rs b/src/admin.rs index eb3d3337..42af315e 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -230,7 +230,9 @@ where for column in &columns[3..columns.len()] { let value = match column.0 { "maxwait" => (pool_stats.get("maxwait_us").unwrap_or(&0) / 1_000_000).to_string(), - "maxwait_us" => (pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string(), + "maxwait_us" => { + (pool_stats.get("maxwait_us").unwrap_or(&0) % 1_000_000).to_string() + } _other_values => pool_stats.get(column.0).unwrap_or(&0).to_string(), }; row.push(value); diff --git a/src/pool.rs b/src/pool.rs index 41f88520..edc549d8 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -333,11 +333,6 @@ impl ConnectionPool { role: Option, // primary or replica process_id: i32, // client id ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { - let now = Instant::now(); - - // Indicate we're waiting on a server connection from a pool. - self.stats.client_waiting(process_id); - let mut candidates: Vec<&Address> = self.addresses[shard] .iter() .filter(|address| address.role == role) @@ -361,6 +356,10 @@ impl ConnectionPool { continue; } + // Indicate we're waiting on a server connection from a pool. + let now = Instant::now(); + self.stats.client_waiting(process_id); + // Check if we can connect let mut conn = match self.databases[address.shard][address.address_index] .get()