From 2a23869bcc61a14d49faae1590960c0d9a92730e Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 5 Oct 2022 19:43:06 -0400 Subject: [PATCH 01/14] Don't send discard all when state is changed in transaction --- src/server.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/server.rs b/src/server.rs index dbac9bc0..a19747f6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -450,6 +450,11 @@ impl Server { // CommandComplete 'C' => { + + if self.in_transaction { + continue; + } + let mut command_tag = String::new(); match message.reader().read_to_string(&mut command_tag) { Ok(_) => { From 558da534a859c839e70c098fe5bfe8a203a62b21 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Tue, 11 Oct 2022 15:17:02 -0400 Subject: [PATCH 02/14] Remove unnecessary clone --- src/client.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/client.rs b/src/client.rs index 64dfa8ac..f5d4fbd4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -799,7 +799,7 @@ where // If the client is in session mode, no more custom protocol // commands will be accepted. loop { - let mut message = if message.len() == 0 { + let message = if message.len() == 0 { trace!("Waiting for message inside transaction or in session mode"); match read_message(&mut self.read).await { @@ -820,10 +820,9 @@ where // The message will be forwarded to the server intact. We still would like to // parse it below to figure out what to do with it. - let original = message.clone(); - let code = message.get_u8() as char; - let _len = message.get_i32() as usize; + // Safe to unwrap because we know this message has a certain length and has the code + let code = *message.get(0).unwrap() as char; trace!("Message: {}", code); @@ -832,7 +831,7 @@ where 'Q' => { debug!("Sending query to server"); - self.send_and_receive_loop(code, original, server, &address, &pool) + self.send_and_receive_loop(code, message, server, &address, &pool) .await?; if !server.in_transaction() { @@ -858,25 +857,25 @@ where // Parse // The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`. 'P' => { - self.buffer.put(&original[..]); + self.buffer.put(&message[..]); } // Bind // The placeholder's replacements are here, e.g. 'user@email.com' and 'true' 'B' => { - self.buffer.put(&original[..]); + self.buffer.put(&message[..]); } // Describe // Command a client can issue to describe a previously prepared named statement. 'D' => { - self.buffer.put(&original[..]); + self.buffer.put(&message[..]); } // Execute // Execute a prepared statement prepared in `P` and bound in `B`. 'E' => { - self.buffer.put(&original[..]); + self.buffer.put(&message[..]); } // Sync @@ -884,7 +883,7 @@ where 'S' => { debug!("Sending query to server"); - self.buffer.put(&original[..]); + self.buffer.put(&message[..]); // Clone after freeze does not allocate let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char; @@ -929,14 +928,14 @@ where 'd' => { // Forward the data to the server, // don't buffer it since it can be rather large. - self.send_server_message(server, original, &address, &pool) + self.send_server_message(server, message, &address, &pool) .await?; } // CopyDone or CopyFail // Copy is done, successfully or not. 'c' | 'f' => { - self.send_server_message(server, original, &address, &pool) + self.send_server_message(server, message, &address, &pool) .await?; let response = self.receive_server_message(server, &address, &pool).await?; From e9f8ba71a53318407d39bf607fdf8b0dc9864d78 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Tue, 11 Oct 2022 23:41:27 -0400 Subject: [PATCH 03/14] spelling --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index f5d4fbd4..ea6d8795 100644 --- a/src/client.rs +++ b/src/client.rs @@ -827,7 +827,7 @@ where trace!("Message: {}", code); match code { - // ReadyForQuery + // Query 'Q' => { debug!("Sending query to server"); From 881d812f046ff0cfcf1cba64b920d77fedd2b05f Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Tue, 11 Oct 2022 23:49:10 -0400 Subject: [PATCH 04/14] Move transaction check to SET command --- src/server.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/server.rs b/src/server.rs index a19747f6..65512aaf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -450,11 +450,6 @@ impl Server { // CommandComplete 'C' => { - - if self.in_transaction { - continue; - } - let mut command_tag = String::new(); match message.reader().read_to_string(&mut command_tag) { Ok(_) => { @@ -462,7 +457,17 @@ impl Server { // which can leak between clients. This is a best effort to block bad clients // from poisoning a transaction-mode pool by setting inappropriate session variables match command_tag.as_str() { - "SET\0" | "PREPARE\0" => { + "SET\0" => { + // We don't detect set statements in transactions + // No great way to differentiate between set and set local + // As a result, we will miss cases when set statements are used in transactions + if self.in_transaction { + continue; + } + debug!("Server connection marked for clean up"); + self.needs_cleanup = true; + } + "PREPARE\0" => { debug!("Server connection marked for clean up"); self.needs_cleanup = true; } From 8219618d83a35b431189f07580f04fe7a6dedf2c Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Tue, 11 Oct 2022 23:49:27 -0400 Subject: [PATCH 05/14] Add test for set command in transaction --- tests/ruby/misc_spec.rb | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index 6e79e1a4..cf5860e6 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -189,5 +189,20 @@ expect(processes.primary.count_query("DISCARD ALL")).to eq(10) end end + + context "transaction mode with transactions" do + let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") } + it "Does not clear set statement state when declared in a transaction" do + 10.time do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SET SERVER ROLE to 'primary'") + conn.async_exec("BEGIN") + conn.async_exec("SET statement_timeout to 1000") + conn.async_exec("COMMIT") + conn.close + end + expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + end + end end end From 2a6a08926cb737e58eefbfcb0aa1113fbbef5115 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Tue, 11 Oct 2022 23:56:13 -0400 Subject: [PATCH 06/14] type --- tests/ruby/misc_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index cf5860e6..052c2d86 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -193,7 +193,7 @@ context "transaction mode with transactions" do let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") } it "Does not clear set statement state when declared in a transaction" do - 10.time do + 10.times do conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) conn.async_exec("SET SERVER ROLE to 'primary'") conn.async_exec("BEGIN") From 4a192464475dbbf83ecf1ea12bb3b6a3f48bdd62 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 12 Oct 2022 11:36:42 -0400 Subject: [PATCH 07/14] Update comments --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index ea6d8795..9f1cbd66 100644 --- a/src/client.rs +++ b/src/client.rs @@ -822,6 +822,7 @@ where // parse it below to figure out what to do with it. // Safe to unwrap because we know this message has a certain length and has the code + // This reads the first byte without advancing the internal pointer and mutating the bytes let code = *message.get(0).unwrap() as char; trace!("Message: {}", code); @@ -885,7 +886,6 @@ where self.buffer.put(&message[..]); - // Clone after freeze does not allocate let first_message_code = (*self.buffer.get(0).unwrap_or(&0)) as char; // Almost certainly true From 69645979f8fe7449d98b8cd4589ca8983018b2d4 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 12 Oct 2022 12:24:51 -0400 Subject: [PATCH 08/14] Update comments --- src/server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.rs b/src/server.rs index 65512aaf..be2d1141 100644 --- a/src/server.rs +++ b/src/server.rs @@ -461,6 +461,7 @@ impl Server { // We don't detect set statements in transactions // No great way to differentiate between set and set local // As a result, we will miss cases when set statements are used in transactions + // This will reduce amount of discard statements sent if self.in_transaction { continue; } From 23191d2e286644077516b52cf3578d26ee4e0ba2 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 12 Oct 2022 12:40:05 -0400 Subject: [PATCH 09/14] use moves instead of clones for initial message --- src/client.rs | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/client.rs b/src/client.rs index 9f1cbd66..b0f9c325 100644 --- a/src/client.rs +++ b/src/client.rs @@ -792,6 +792,8 @@ where // Set application_name. server.set_name(&self.application_name).await?; + let mut initial_message = Some(message); + // Transaction loop. Multiple queries can be issued by the client here. // The connection belongs to the client until the transaction is over, // or until the client disconnects if we are in session mode. @@ -799,23 +801,25 @@ where // If the client is in session mode, no more custom protocol // commands will be accepted. loop { - let message = if message.len() == 0 { - trace!("Waiting for message inside transaction or in session mode"); + let message = match initial_message { + None => { + trace!("Waiting for message inside transaction or in session mode"); - match read_message(&mut self.read).await { - Ok(message) => message, - Err(err) => { - // Client disconnected inside a transaction. - // Clean up the server and re-use it. - server.checkin_cleanup().await?; + match read_message(&mut self.read).await { + Ok(message) => message, + Err(err) => { + // Client disconnected inside a transaction. + // Clean up the server and re-use it. + server.checkin_cleanup().await?; - return Err(err); + return Err(err); + } } } - } else { - let msg = message.clone(); - message.clear(); - msg + Some(message) => { + initial_message = None; + message + } }; // The message will be forwarded to the server intact. We still would like to From c358c4240e8b3673f4d82c5e89f13e3bb3ef3314 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 12 Oct 2022 12:57:22 -0400 Subject: [PATCH 10/14] don't make message mutable --- src/client.rs | 2 +- src/server.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index b0f9c325..e72dbf79 100644 --- a/src/client.rs +++ b/src/client.rs @@ -601,7 +601,7 @@ where // in case the client is sending some custom protocol messages, e.g. // SET SHARDING KEY TO 'bigint'; - let mut message = tokio::select! { + let message = tokio::select! { _ = self.shutdown.recv() => { if !self.admin { error_response_terminal( diff --git a/src/server.rs b/src/server.rs index be2d1141..deed82db 100644 --- a/src/server.rs +++ b/src/server.rs @@ -606,7 +606,7 @@ impl Server { self.query("ROLLBACK").await?; } - // Client disconnected but it perfromed session-altering operations such as + // Client disconnected but it performed session-altering operations such as // SET statement_timeout to 1 or create a prepared statement. We clear that // to avoid leaking state between clients. For performance reasons we only // send `DISCARD ALL` if we think the session is altered instead of just sending From 5126a45ea52c63bb3c040e205c82cdb9b22497d4 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 13 Oct 2022 14:13:52 -0400 Subject: [PATCH 11/14] Update unwrap --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index e72dbf79..0981cac7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -827,7 +827,7 @@ where // Safe to unwrap because we know this message has a certain length and has the code // This reads the first byte without advancing the internal pointer and mutating the bytes - let code = *message.get(0).unwrap() as char; + let code = *message.get(0).unwrap_or(&('$' as u8)) as char; trace!("Message: {}", code); From 061ee131fd65e8a34680ce0c5a57a4aef58d8e40 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 13 Oct 2022 15:53:53 -0400 Subject: [PATCH 12/14] but i'm not a wrapper --- src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index 0981cac7..e72dbf79 100644 --- a/src/client.rs +++ b/src/client.rs @@ -827,7 +827,7 @@ where // Safe to unwrap because we know this message has a certain length and has the code // This reads the first byte without advancing the internal pointer and mutating the bytes - let code = *message.get(0).unwrap_or(&('$' as u8)) as char; + let code = *message.get(0).unwrap() as char; trace!("Message: {}", code); From 87ff7cbbf9894d01caf40cba14b8a0f598e58ec6 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 13 Oct 2022 17:47:09 -0400 Subject: [PATCH 13/14] Add set local test --- tests/ruby/misc_spec.rb | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index 052c2d86..1f5bf421 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -202,6 +202,16 @@ conn.close end expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + + 10.times do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + conn.async_exec("SET SERVER ROLE to 'primary'") + conn.async_exec("BEGIN") + conn.async_exec("SET LOCAL statement_timeout to 1000") + conn.async_exec("COMMIT") + conn.close + end + expect(processes.primary.count_query("DISCARD ALL")).to eq(0) end end end From 45a580b8dd3a2e6f089342849786326c2dac20b7 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 13 Oct 2022 19:56:39 -0400 Subject: [PATCH 14/14] change continue --- src/server.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/server.rs b/src/server.rs index deed82db..d191eb74 100644 --- a/src/server.rs +++ b/src/server.rs @@ -462,11 +462,10 @@ impl Server { // No great way to differentiate between set and set local // As a result, we will miss cases when set statements are used in transactions // This will reduce amount of discard statements sent - if self.in_transaction { - continue; + if !self.in_transaction { + debug!("Server connection marked for clean up"); + self.needs_cleanup = true; } - debug!("Server connection marked for clean up"); - self.needs_cleanup = true; } "PREPARE\0" => { debug!("Server connection marked for clean up");