From 2c250290797607a05d33cd922d2c874ddf5b5c18 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Wed, 31 Aug 2022 12:56:44 -0500 Subject: [PATCH 01/16] Send DISCARD ALL even if client is not in transaction --- .circleci/run_tests.sh | 6 +++--- src/client.rs | 20 ++++---------------- src/server.rs | 12 ++++++++++++ tests/ruby/helpers/pgcat_helper.rb | 8 ++++---- tests/ruby/misc_spec.rb | 30 ++++++++++++++++++++++++++++++ 5 files changed, 53 insertions(+), 23 deletions(-) diff --git a/.circleci/run_tests.sh b/.circleci/run_tests.sh index 1585ebd8..6ffef8ba 100644 --- a/.circleci/run_tests.sh +++ b/.circleci/run_tests.sh @@ -90,8 +90,8 @@ kill -SIGHUP $(pgrep pgcat) # Reload config again cd tests/ruby sudo gem install bundler bundle install -bundle exec ruby tests.rb -bundle exec rspec *_spec.rb +bundle exec ruby tests.rb || exit 1 +bundle exec rspec *_spec.rb || exit 1 cd ../.. # @@ -99,7 +99,7 @@ cd ../.. # These tests will start and stop the pgcat server so it will need to be restarted after the tests # pip3 install -r tests/python/requirements.txt -python3 tests/python/tests.py +python3 tests/python/tests.py || exit 1 start_pgcat "info" diff --git a/src/client.rs b/src/client.rs index 419448fb..5b46a5c9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -782,12 +782,7 @@ where Err(err) => { // Client disconnected inside a transaction. // Clean up the server and re-use it. - // This prevents connection thrashing by bad clients. - if server.in_transaction() { - server.query("ROLLBACK").await?; - server.query("DISCARD ALL").await?; - server.set_name("pgcat").await?; - } + server.checkin_cleanup().await?; return Err(err); } @@ -829,16 +824,7 @@ where // Terminate 'X' => { - // Client closing. Rollback and clean up - // connection before releasing into the pool. - // Pgbouncer closes the connection which leads to - // connection thrashing when clients misbehave. - if server.in_transaction() { - server.query("ROLLBACK").await?; - server.query("DISCARD ALL").await?; - server.set_name("pgcat").await?; - } - + server.checkin_cleanup().await?; self.release(); return Ok(()); @@ -942,8 +928,10 @@ where // The server is no longer bound to us, we can't cancel it's queries anymore. debug!("Releasing server back into the pool"); + server.checkin_cleanup().await?; self.stats.server_idle(server.process_id(), address.id); self.connected_to_server = false; + self.release(); self.stats.client_idle(self.process_id, address.id); } diff --git a/src/server.rs b/src/server.rs index 3134a65d..37d9ca07 100644 --- a/src/server.rs +++ b/src/server.rs @@ -553,6 +553,17 @@ impl Server { Ok(()) } + pub async fn checkin_cleanup(&mut self) -> Result<(), Error> { + if self.in_transaction() { + self.query("ROLLBACK").await?; + } + + self.query("DISCARD ALL").await?; + self.set_name("pgcat").await?; + + return Ok(()); + } + /// A shorthand for `SET application_name = $1`. #[allow(dead_code)] pub async fn set_name(&mut self, name: &str) -> Result<(), Error> { @@ -581,6 +592,7 @@ impl Server { pub fn last_activity(&self) -> SystemTime { self.last_activity } + } impl Drop for Server { diff --git a/tests/ruby/helpers/pgcat_helper.rb b/tests/ruby/helpers/pgcat_helper.rb index 30b2bc82..80ac9dab 100644 --- a/tests/ruby/helpers/pgcat_helper.rb +++ b/tests/ruby/helpers/pgcat_helper.rb @@ -5,7 +5,7 @@ module Helpers module Pgcat - def self.three_shard_setup(pool_name, pool_size) + def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction") user = { "password" => "sharding_user", "pool_size" => pool_size, @@ -22,7 +22,7 @@ def self.three_shard_setup(pool_name, pool_size) pgcat_cfg["pools"] = { "#{pool_name}" => { "default_role" => "any", - "pool_mode" => "transaction", + "pool_mode" => pool_mode, "primary_reads_enabled" => false, "query_parser_enabled" => false, "sharding_function" => "pg_bigint_hash", @@ -46,7 +46,7 @@ def self.three_shard_setup(pool_name, pool_size) end end - def self.single_shard_setup(pool_name, pool_size) + def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction") user = { "password" => "sharding_user", "pool_size" => pool_size, @@ -66,7 +66,7 @@ def self.single_shard_setup(pool_name, pool_size) pgcat_cfg["pools"] = { "#{pool_name}" => { "default_role" => "any", - "pool_mode" => "transaction", + "pool_mode" => pool_mode, "primary_reads_enabled" => false, "query_parser_enabled" => false, "sharding_function" => "pg_bigint_hash", diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index 9aee49af..40530fe3 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -106,4 +106,34 @@ admin_conn.close end end + + describe "State clearance" do + context "session mode" do + let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "session") } + + it "Clears state before connection checkin" do + # Both modes of operation should not raise + # ERROR: prepared statement "prepared_q" already exists + 15.times do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("PREPARE prepared_q (int) AS SELECT $1") + conn.close + end + end + end + + context "transaction mode" do + let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") } + + it "Clears state before connection checkin" do + # Both modes of operation should not raise + # ERROR: prepared statement "prepared_q" already exists + 15.times do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("PREPARE prepared_q (int) AS SELECT $1") + conn.close + end + end + end + end end From ed64eb9cdca06bcf7d8dd2ec85a5a725abab5cde Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Wed, 31 Aug 2022 13:01:48 -0500 Subject: [PATCH 02/16] fmt --- src/server.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index 37d9ca07..c2987095 100644 --- a/src/server.rs +++ b/src/server.rs @@ -592,7 +592,6 @@ impl Server { pub fn last_activity(&self) -> SystemTime { self.last_activity } - } impl Drop for Server { From 4a88ba66dac59e2329ac6bac5d58cf0165feca17 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Wed, 31 Aug 2022 19:19:56 -0500 Subject: [PATCH 03/16] Added tests + avoided sending extra discard all --- src/server.rs | 38 +++++++++++++++++++++++++++--- tests/ruby/misc_spec.rb | 51 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 84 insertions(+), 5 deletions(-) diff --git a/src/server.rs b/src/server.rs index c2987095..208385ef 100644 --- a/src/server.rs +++ b/src/server.rs @@ -48,6 +48,9 @@ pub struct Server { /// Is the server broken? We'll remote it from the pool if so. bad: bool, + /// If server connection requires a DISCARD ALL before checkin + needs_cleanup: bool, + /// Mapping of clients and servers used for query cancellation. client_server_map: ClientServerMap, @@ -316,6 +319,7 @@ impl Server { in_transaction: false, data_available: false, bad: false, + needs_cleanup: false, client_server_map: client_server_map, connected_at: chrono::offset::Utc::now().naive_utc(), stats: stats, @@ -440,6 +444,24 @@ impl Server { break; } + // CommandComplete + 'C' => { + let full_message = String::from_utf8_lossy(message.as_ref()); + let mut it = full_message.split_ascii_whitespace(); + let command_tag = it.next().unwrap().trim_end_matches(char::from(0)); + + // Non-exhuastive list of commands that are likely to change session variables/resources + // which can leak between client. This is a best effort to block bad clients + // from poisoning a transaction-mode pool by setting inappropriate session variables + match command_tag { + "SET" | "PREPARE" => { + debug!("Server connection marked for clean up"); + self.needs_cleanup = true; + } + _ => (), + } + } + // DataRow 'D' => { // More data is available after this message, this is not the end of the reply. @@ -558,7 +580,11 @@ impl Server { self.query("ROLLBACK").await?; } - self.query("DISCARD ALL").await?; + if self.needs_cleanup { + self.query("DISCARD ALL").await?; + self.needs_cleanup = false; + } + self.set_name("pgcat").await?; return Ok(()); @@ -569,9 +595,15 @@ impl Server { pub async fn set_name(&mut self, name: &str) -> Result<(), Error> { if self.application_name != name { self.application_name = name.to_string(); - Ok(self + // We don't want `SET application_name` to mark the server connection + // as needing cleanup + let needs_cleanup_before = self.needs_cleanup; + + let result = Ok(self .query(&format!("SET application_name = '{}'", name)) - .await?) + .await?); + self.needs_cleanup = needs_cleanup_before; + return result; } else { Ok(()) } diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index 40530fe3..d5b529a9 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -91,7 +91,6 @@ conn.close expect(processes.primary.count_query("ROLLBACK")).to eq(1) - expect(processes.primary.count_query("DISCARD ALL")).to eq(1) end end @@ -119,12 +118,39 @@ conn.async_exec("PREPARE prepared_q (int) AS SELECT $1") conn.close end + + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + initial_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"] + conn.async_exec("SET statement_timeout to 1000") + current_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"] + expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq("1s") + conn.close + end + + it "Does not send DISCARD ALL unless necessary" do + 10.times do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SET SERVER ROLE to 'primary'") + conn.async_exec("SELECT 1") + conn.close + end + + expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + + 10.times do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SET SERVER ROLE to 'primary'") + conn.async_exec("SELECT 1") + conn.async_exec("SET statement_timeout to 5000") + conn.close + end + + expect(processes.primary.count_query("DISCARD ALL")).to eq(10) end end context "transaction mode" do let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") } - it "Clears state before connection checkin" do # Both modes of operation should not raise # ERROR: prepared statement "prepared_q" already exists @@ -134,6 +160,27 @@ conn.close end end + + it "Does not send DISCARD ALL unless necessary" do + 10.times do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SET SERVER ROLE to 'primary'") + conn.async_exec("SELECT 1") + conn.close + end + + expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + + 10.times do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SET SERVER ROLE to 'primary'") + conn.async_exec("SELECT 1") + conn.async_exec("SET statement_timeout to 5000") + conn.close + end + + expect(processes.primary.count_query("DISCARD ALL")).to eq(10) + end end end end From 70c6048289251fe890a933880359d16f57ff378b Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 31 Aug 2022 22:48:44 -0400 Subject: [PATCH 04/16] Adds set name logic to beginning of handle client --- src/client.rs | 26 +++++++++++++++++--------- src/server.rs | 3 --- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/client.rs b/src/client.rs index 5b46a5c9..57fd95c2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -59,7 +59,7 @@ pub struct Client { client_server_map: ClientServerMap, /// Client parameters, e.g. user, client_encoding, etc. - parameters: HashMap, + _parameters: HashMap, /// Statistics stats: Reporter, @@ -82,6 +82,9 @@ pub struct Client { /// Postgres user for this client (This comes from the user in the connection string) username: String, + /// Application name for this client (defaults to pgcat) + application_name: String, + /// Used to notify clients about an impending shutdown shutdown: Receiver<()>, } @@ -365,6 +368,11 @@ where None => return Err(Error::ClientError), }; + let application_name = match parameters.get("application_name") { + Some(application_name) => application_name, + None => "pgcat", + }; + let admin = ["pgcat", "pgbouncer"] .iter() .filter(|db| *db == &pool_name) @@ -486,13 +494,14 @@ where process_id, secret_key, client_server_map, - parameters: parameters.clone(), + _parameters: parameters.clone(), stats: stats, admin: admin, last_address_id: None, last_server_id: None, pool_name: pool_name.clone(), username: username.clone(), + application_name: application_name.to_string(), shutdown, connected_to_server: false, }); @@ -519,13 +528,14 @@ where process_id, secret_key, client_server_map, - parameters: HashMap::new(), + _parameters: HashMap::new(), stats: get_reporter(), admin: false, last_address_id: None, last_server_id: None, pool_name: String::from("undefined"), username: String::from("undefined"), + application_name: String::from("undefined"), shutdown, connected_to_server: false, }); @@ -759,13 +769,11 @@ where server.address() ); - // Set application_name if any. + // TODO: investigate other parameters and set them too. - if self.parameters.contains_key("application_name") { - server - .set_name(&self.parameters["application_name"]) - .await?; - } + + // Set application_name. + server.set_name(&self.application_name).await?; // Transaction loop. Multiple queries can be issued by the client here. // The connection belongs to the client until the transaction is over, diff --git a/src/server.rs b/src/server.rs index 208385ef..a35c8ec2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -585,13 +585,10 @@ impl Server { self.needs_cleanup = false; } - self.set_name("pgcat").await?; - return Ok(()); } /// A shorthand for `SET application_name = $1`. - #[allow(dead_code)] pub async fn set_name(&mut self, name: &str) -> Result<(), Error> { if self.application_name != name { self.application_name = name.to_string(); From 0e60a8c64737bdcac330389c82e0639abe3a2902 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 31 Aug 2022 22:51:59 -0400 Subject: [PATCH 05/16] fmt --- src/client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 57fd95c2..76c597a6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -769,7 +769,6 @@ where server.address() ); - // TODO: investigate other parameters and set them too. // Set application_name. From fac78a35773848e0f05de9de936545c1713dd14f Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 31 Aug 2022 23:10:21 -0400 Subject: [PATCH 06/16] refactor dead code handling --- src/client.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 76c597a6..806547d9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -59,7 +59,8 @@ pub struct Client { client_server_map: ClientServerMap, /// Client parameters, e.g. user, client_encoding, etc. - _parameters: HashMap, + #[allow(dead_code)] + parameters: HashMap, /// Statistics stats: Reporter, @@ -494,7 +495,7 @@ where process_id, secret_key, client_server_map, - _parameters: parameters.clone(), + parameters: parameters.clone(), stats: stats, admin: admin, last_address_id: None, @@ -528,7 +529,7 @@ where process_id, secret_key, client_server_map, - _parameters: HashMap::new(), + parameters: HashMap::new(), stats: get_reporter(), admin: false, last_address_id: None, From f5f29d94b5764c4ff20989a08fd5361e0736bd8c Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 1 Sep 2022 00:40:30 -0400 Subject: [PATCH 07/16] Refactor reading command tag --- src/server.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/server.rs b/src/server.rs index a35c8ec2..db86fa44 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,6 +2,7 @@ /// Here we are pretending to the a Postgres client. use bytes::{Buf, BufMut, BytesMut}; use log::{debug, error, info, trace}; +use std::io::Read; use std::time::SystemTime; use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::{ @@ -446,15 +447,15 @@ impl Server { // CommandComplete 'C' => { - let full_message = String::from_utf8_lossy(message.as_ref()); - let mut it = full_message.split_ascii_whitespace(); - let command_tag = it.next().unwrap().trim_end_matches(char::from(0)); + let mut command_tag = String::new(); + message.reader().read_to_string(&mut command_tag).unwrap(); // Non-exhuastive list of commands that are likely to change session variables/resources // which can leak between client. This is a best effort to block bad clients // from poisoning a transaction-mode pool by setting inappropriate session variables - match command_tag { - "SET" | "PREPARE" => { + match command_tag.as_str().trim() { + "SET\0" | "PREPARE\0" => { + println!("MATCHED {}", command_tag); debug!("Server connection marked for clean up"); self.needs_cleanup = true; } From 00929e990f2cc6bcf017d7a2b5bf7d029dc15acb Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 1 Sep 2022 00:41:16 -0400 Subject: [PATCH 08/16] remove unnecessary trim --- src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index db86fa44..0c995496 100644 --- a/src/server.rs +++ b/src/server.rs @@ -453,7 +453,7 @@ impl Server { // Non-exhuastive list of commands that are likely to change session variables/resources // which can leak between client. This is a best effort to block bad clients // from poisoning a transaction-mode pool by setting inappropriate session variables - match command_tag.as_str().trim() { + match command_tag.as_str() { "SET\0" | "PREPARE\0" => { println!("MATCHED {}", command_tag); debug!("Server connection marked for clean up"); From 2adeba052f8560830fd607d8d1c6748b52081106 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 1 Sep 2022 00:44:05 -0400 Subject: [PATCH 09/16] Removing debugging statement --- src/server.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index 0c995496..01a7433a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -455,7 +455,6 @@ impl Server { // from poisoning a transaction-mode pool by setting inappropriate session variables match command_tag.as_str() { "SET\0" | "PREPARE\0" => { - println!("MATCHED {}", command_tag); debug!("Server connection marked for clean up"); self.needs_cleanup = true; } From 623c4535fd353124b66043b9636bb25680f18459 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 1 Sep 2022 01:05:47 -0400 Subject: [PATCH 10/16] typo --- src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index 01a7433a..3e5cf90a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -450,7 +450,7 @@ impl Server { let mut command_tag = String::new(); message.reader().read_to_string(&mut command_tag).unwrap(); - // Non-exhuastive list of commands that are likely to change session variables/resources + // Non-exhaustive list of commands that are likely to change session variables/resources // which can leak between client. This is a best effort to block bad clients // from poisoning a transaction-mode pool by setting inappropriate session variables match command_tag.as_str() { From 2493f978ea2c6b4d712e12034fbc2ed4505b7c9d Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Thu, 1 Sep 2022 07:12:43 -0500 Subject: [PATCH 11/16] typo{ --- src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index 3e5cf90a..82bbd2ff 100644 --- a/src/server.rs +++ b/src/server.rs @@ -451,7 +451,7 @@ impl Server { message.reader().read_to_string(&mut command_tag).unwrap(); // Non-exhaustive list of commands that are likely to change session variables/resources - // which can leak between client. This is a best effort to block bad clients + // which can leak between clients. This is a best effort to block bad clients // from poisoning a transaction-mode pool by setting inappropriate session variables match command_tag.as_str() { "SET\0" | "PREPARE\0" => { From bbef0428e56d8cbee6172d5e43d3e9280b70b619 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Thu, 1 Sep 2022 07:22:40 -0500 Subject: [PATCH 12/16] documentation --- src/server.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/server.rs b/src/server.rs index 82bbd2ff..376659fa 100644 --- a/src/server.rs +++ b/src/server.rs @@ -575,11 +575,22 @@ impl Server { Ok(()) } + /// Perform any necessary cleanup before putting the server + /// connection back in the pool pub async fn checkin_cleanup(&mut self) -> Result<(), Error> { + // Client disconnected with an open transaction on the server connection + // Pgbouncer behavior is to close the server connection but that can cause + // server connection thrashing if clients repeatedly do this. + // Instead, we Roll that transaction back before putting the connection back in the pool if self.in_transaction() { self.query("ROLLBACK").await?; } + // Client disconnected but it perfromed session-altering operations such as + // SET statement_timeout to 1 or create a prepared statement. We clear that + // to avoid leaking state between clients. For performance reasons we only + // send `DISCARD ALL` if we think the session is altered instead of just sending + // it before each checkin, if self.needs_cleanup { self.query("DISCARD ALL").await?; self.needs_cleanup = false; From afc9f5a1dcad696cac8361e581412789f0a14b00 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Thu, 1 Sep 2022 07:26:23 -0500 Subject: [PATCH 13/16] edit text --- src/server.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/server.rs b/src/server.rs index 376659fa..49d480d4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -578,10 +578,10 @@ impl Server { /// Perform any necessary cleanup before putting the server /// connection back in the pool pub async fn checkin_cleanup(&mut self) -> Result<(), Error> { - // Client disconnected with an open transaction on the server connection + // Client disconnected with an open transaction on the server connection. // Pgbouncer behavior is to close the server connection but that can cause // server connection thrashing if clients repeatedly do this. - // Instead, we Roll that transaction back before putting the connection back in the pool + // Instead, we ROLLBACK that transaction before putting the connection back in the pool if self.in_transaction() { self.query("ROLLBACK").await?; } @@ -590,7 +590,7 @@ impl Server { // SET statement_timeout to 1 or create a prepared statement. We clear that // to avoid leaking state between clients. For performance reasons we only // send `DISCARD ALL` if we think the session is altered instead of just sending - // it before each checkin, + // it before each checkin. if self.needs_cleanup { self.query("DISCARD ALL").await?; self.needs_cleanup = false; From 213d284441a080f434a5c9f70d1a48a1a1ce19a0 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Thu, 1 Sep 2022 10:46:13 -0500 Subject: [PATCH 14/16] un-unwrap --- src/server.rs | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/server.rs b/src/server.rs index 49d480d4..0a53f115 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ /// Implementation of the PostgreSQL server (database) protocol. /// Here we are pretending to the a Postgres client. use bytes::{Buf, BufMut, BytesMut}; -use log::{debug, error, info, trace}; +use log::{debug, error, info, trace, warn}; use std::io::Read; use std::time::SystemTime; use tokio::io::{AsyncReadExt, BufReader}; @@ -448,17 +448,23 @@ impl Server { // CommandComplete 'C' => { let mut command_tag = String::new(); - message.reader().read_to_string(&mut command_tag).unwrap(); - - // Non-exhaustive list of commands that are likely to change session variables/resources - // which can leak between clients. This is a best effort to block bad clients - // from poisoning a transaction-mode pool by setting inappropriate session variables - match command_tag.as_str() { - "SET\0" | "PREPARE\0" => { - debug!("Server connection marked for clean up"); - self.needs_cleanup = true; + match message.reader().read_to_string(&mut command_tag) { + Ok(_) => { + // Non-exhaustive list of commands that are likely to change session variables/resources + // which can leak between clients. This is a best effort to block bad clients + // from poisoning a transaction-mode pool by setting inappropriate session variables + match command_tag.as_str() { + "SET\0" | "PREPARE\0" => { + debug!("Server connection marked for clean up"); + self.needs_cleanup = true; + } + _ => (), + } + } + + Err(err) => { + warn!("Encountered an error while parsing CommandTag {}", err); } - _ => (), } } From 042b8e6d68dc1c2b1531e6096db6634984bf5339 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Thu, 1 Sep 2022 12:50:57 -0500 Subject: [PATCH 15/16] run ci From 98835cf285e6007bacd834c59ebe0ac060b9e464 Mon Sep 17 00:00:00 2001 From: Mostafa Abdelraouf Date: Thu, 1 Sep 2022 13:04:43 -0500 Subject: [PATCH 16/16] run ci