diff --git a/CONFIG.md b/CONFIG.md index fc118cb4..7635bf6b 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -259,22 +259,6 @@ Password to be used for connecting to servers to obtain the hash used for md5 au specified in `auth_query_user`. The connection will be established using the database configured in the pool. This parameter is inherited by every pool and can be redefined in pool configuration. -### prepared_statements -``` -path: general.prepared_statements -default: false -``` - -Whether to use prepared statements or not. - -### prepared_statements_cache_size -``` -path: general.prepared_statements_cache_size -default: 500 -``` - -Size of the prepared statements cache. - ### dns_cache_enabled ``` path: general.dns_cache_enabled @@ -324,6 +308,15 @@ If the client doesn't specify, PgCat routes traffic to this role by default. `replica` round-robin between replicas only without touching the primary, `primary` all queries go to the primary unless otherwise specified. +### prepared_statements_cache_size +``` +path: general.prepared_statements_cache_size +default: 0 +``` + +Size of the prepared statements cache. 0 means disabled. +TODO: update documentation + ### query_parser_enabled ``` path: pools..query_parser_enabled diff --git a/Cargo.lock b/Cargo.lock index 929f1a81..eeaf106c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "1.0.2" @@ -26,6 +37,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -553,6 +570,10 @@ name = "hashbrown" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "heck" @@ -821,6 +842,15 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +[[package]] +name = "lru" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efa59af2ddfad1854ae27d75009d538d0998b4b2fd47083e743ac1a10e46c60" +dependencies = [ + "hashbrown 0.14.0", +] + [[package]] name = "lru-cache" version = "0.1.2" @@ -1008,6 +1038,7 @@ dependencies = [ "itertools", "jemallocator", "log", + "lru", "md-5", "nix", "num_cpus", diff --git a/Cargo.toml b/Cargo.toml index 805a4c7a..3e5f7069 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ itertools = "0.10" clap = { version = "4.3.1", features = ["derive", "env"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter", "std"]} +lru = "0.12.0" [target.'cfg(not(target_env = "msvc"))'.dependencies] jemallocator = "0.5.0" diff --git a/pgcat.toml b/pgcat.toml index 772a1365..841649ee 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -60,12 +60,6 @@ tcp_keepalives_count = 5 # Number of seconds between keepalive packets. tcp_keepalives_interval = 5 -# Handle prepared statements. -prepared_statements = true - -# Prepared statements server cache size. -prepared_statements_cache_size = 500 - # Path to TLS Certificate file to use for TLS connections # tls_certificate = ".circleci/server.cert" # Path to TLS private key file to use for TLS connections @@ -156,6 +150,10 @@ load_balancing_mode = "random" # `primary` all queries go to the primary unless otherwise specified. default_role = "any" +# Prepared statements cache size. +# TODO: update documentation +prepared_statements_cache_size = 500 + # If Query Parser is enabled, we'll attempt to parse # every incoming query to determine if it's a read or a write. # If it's a read query, we'll direct it to a replica. Otherwise, if it's a write, diff --git a/src/admin.rs b/src/admin.rs index f1b0c63f..80baa3fb 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -744,6 +744,7 @@ where ("age_seconds", DataType::Numeric), ("prepare_cache_hit", DataType::Numeric), ("prepare_cache_miss", DataType::Numeric), + ("prepare_cache_eviction", DataType::Numeric), ("prepare_cache_size", DataType::Numeric), ]; @@ -776,6 +777,10 @@ where .prepared_miss_count .load(Ordering::Relaxed) .to_string(), + server + .prepared_eviction_count + .load(Ordering::Relaxed) + .to_string(), server .prepared_cache_size .load(Ordering::Relaxed) diff --git a/src/client.rs b/src/client.rs index c3fd747c..31dcb4bd 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,7 +4,7 @@ use crate::pool::BanReason; use bytes::{Buf, BufMut, BytesMut}; use log::{debug, error, info, trace, warn}; use once_cell::sync::Lazy; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Instant; use tokio::io::{split, AsyncReadExt, BufReader, ReadHalf, WriteHalf}; @@ -14,9 +14,7 @@ use tokio::sync::mpsc::Sender; use crate::admin::{generate_server_parameters_for_admin, handle_admin}; use crate::auth_passthrough::refetch_auth_hash; -use crate::config::{ - get_config, get_idle_client_in_transaction_timeout, get_prepared_statements, Address, PoolMode, -}; +use crate::config::{get_config, get_idle_client_in_transaction_timeout, Address, PoolMode}; use crate::constants::*; use crate::messages::*; use crate::plugins::PluginOutput; @@ -53,6 +51,9 @@ pub struct Client { /// them to the backend. buffer: BytesMut, + /// Used to buffer response messages to the client + response_message_queue_buffer: BytesMut, + /// Address addr: std::net::SocketAddr, @@ -102,8 +103,14 @@ pub struct Client { /// Used to notify clients about an impending shutdown shutdown: Receiver<()>, - /// Prepared statements - prepared_statements: HashMap, + /// Whether prepared statements are enabled for this client + prepared_statements_enabled: bool, + + /// Mapping of client named prepared statement to rewritten parse messages + prepared_statements: HashMap, u64)>, + + /// Buffered extended protocol data + extended_protocol_data_buffer: VecDeque, } /// Client entrypoint. @@ -518,6 +525,8 @@ where } }; + let mut prepared_statements_enabled = false; + // Authenticate admin user. let (transaction_mode, mut server_parameters) = if admin { let config = get_config(); @@ -651,6 +660,8 @@ where } let transaction_mode = pool.settings.pool_mode == PoolMode::Transaction; + prepared_statements_enabled = + transaction_mode && pool.prepared_statement_cache.is_some(); // If the pool hasn't been validated yet, // connect to the servers and figure out what's what. @@ -682,7 +693,7 @@ where auth_ok(&mut write).await?; write_all(&mut write, (&server_parameters).into()).await?; backend_key_data(&mut write, process_id, secret_key).await?; - ready_for_query(&mut write).await?; + send_ready_for_query(&mut write).await?; trace!("Startup OK"); let stats = Arc::new(ClientStats::new( @@ -696,8 +707,9 @@ where Ok(Client { read: BufReader::new(read), write, - addr, buffer: BytesMut::with_capacity(8196), + response_message_queue_buffer: BytesMut::with_capacity(8196), + addr, cancel_mode: false, transaction_mode, process_id, @@ -708,12 +720,14 @@ where admin, last_address_id: None, last_server_stats: None, + connected_to_server: false, pool_name: pool_name.clone(), username: username.clone(), server_parameters, shutdown, - connected_to_server: false, + prepared_statements_enabled, prepared_statements: HashMap::new(), + extended_protocol_data_buffer: VecDeque::new(), }) } @@ -731,8 +745,9 @@ where Ok(Client { read: BufReader::new(read), write, - addr, buffer: BytesMut::with_capacity(8196), + response_message_queue_buffer: BytesMut::with_capacity(8196), + addr, cancel_mode: true, transaction_mode: false, process_id, @@ -743,12 +758,14 @@ where admin: false, last_address_id: None, last_server_stats: None, + connected_to_server: false, pool_name: String::from("undefined"), username: String::from("undefined"), server_parameters: ServerParameters::new(), shutdown, - connected_to_server: false, + prepared_statements_enabled: false, prepared_statements: HashMap::new(), + extended_protocol_data_buffer: VecDeque::new(), }) } @@ -790,10 +807,6 @@ where // Result returned by one of the plugins. let mut plugin_output = None; - // Prepared statement being executed - let mut prepared_statement = None; - let mut will_prepare = false; - let client_identifier = ClientIdentifier::new( self.server_parameters.get_application_name(), &self.username, @@ -821,16 +834,13 @@ where self.transaction_mode ); - // Should we rewrite prepared statements and bind messages? - let mut prepared_statements_enabled = get_prepared_statements(); - // Read a complete message from the client, which normally would be // either a `Q` (query) or `P` (prepare, extended protocol). // We can parse it here before grabbing a server from the pool, // in case the client is sending some custom protocol messages, e.g. // SET SHARDING KEY TO 'bigint'; - let mut message = tokio::select! { + let message = tokio::select! { _ = self.shutdown.recv() => { if !self.admin { error_response_terminal( @@ -865,35 +875,18 @@ where continue; } + // Handle all custom protocol commands, if any. + if self + .handle_custom_protocol(&mut query_router, &message, &pool) + .await? + { + continue; + } + let mut initial_parsed_ast = None; match message[0] as char { - // Buffer extended protocol messages even if we do not have - // a server connection yet. Hopefully, when we get the S message - // we'll be able to allocate a connection. Also, clients do not expect - // the server to respond to these messages so even if we were not able to - // allocate a connection, we wouldn't be able to send back an error message - // to the client so we buffer them and defer the decision to error out or not - // to when we get the S message - 'D' => { - if prepared_statements_enabled { - let name; - (name, message) = self.rewrite_describe(message).await?; - - if let Some(name) = name { - prepared_statement = Some(name); - } - } - - self.buffer.put(&message[..]); - continue; - } - - 'E' => { - self.buffer.put(&message[..]); - continue; - } - + // Query 'Q' => { if query_router.query_parser_enabled() { match query_router.parse(&message) { @@ -928,14 +921,15 @@ where } } + // Buffer extended protocol messages even if we do not have + // a server connection yet. Hopefully, when we get the S message + // we'll be able to allocate a connection. Also, clients do not expect + // the server to respond to these messages so even if we were not able to + // allocate a connection, we wouldn't be able to send back an error message + // to the client so we buffer them and defer the decision to error out or not + // to when we get the S message + // Parse 'P' => { - if prepared_statements_enabled { - (prepared_statement, message) = self.rewrite_parse(message)?; - will_prepare = true; - } - - self.buffer.put(&message[..]); - if query_router.query_parser_enabled() { match query_router.parse(&message) { Ok(ast) => { @@ -954,34 +948,41 @@ where }; } + self.buffer_parse(message, &pool)?; + continue; } + // Bind 'B' => { - if prepared_statements_enabled { - (prepared_statement, message) = self.rewrite_bind(message).await?; - } - - self.buffer.put(&message[..]); - if query_router.query_parser_enabled() { query_router.infer_shard_from_bind(&message); } + self.buffer_bind(message).await?; + + continue; + } + + // Describe + 'D' => { + self.buffer_describe(message).await?; + continue; + } + + 'E' => { + self.extended_protocol_data_buffer + .push_back(ExtendedProtocolData::create_new_execute(message)); continue; } // Close (F) 'C' => { - if prepared_statements_enabled { - let close: Close = (&message).try_into()?; + let close: Close = (&message).try_into()?; - if close.is_prepared_statement() && !close.anonymous() { - self.prepared_statements.remove(&close.name); - write_all_flush(&mut self.write, &close_complete()).await?; - continue; - } - } + self.extended_protocol_data_buffer + .push_back(ExtendedProtocolData::create_new_close(message, close)); + continue; } _ => (), @@ -989,7 +990,7 @@ where // Check on plugin results. if let Some(PluginOutput::Deny(error)) = plugin_output { - self.buffer.clear(); + self.reset_buffered_state(); error_response(&mut self.write, &error).await?; plugin_output = None; continue; @@ -1002,77 +1003,6 @@ where pool = self.get_pool().await?; query_router.update_pool_settings(&pool.settings); - let current_shard = query_router.shard(); - - // Handle all custom protocol commands, if any. - match query_router.try_execute_command(&message) { - // Normal query, not a custom command. - None => (), - - // SET SHARD TO - Some((Command::SetShard, _)) => { - match query_router.shard() { - None => (), - Some(selected_shard) => { - if selected_shard >= pool.shards() { - // Bad shard number, send error message to client. - query_router.set_shard(current_shard); - - error_response( - &mut self.write, - &format!( - "shard {} is not configured {}, staying on shard {:?} (shard numbers start at 0)", - selected_shard, - pool.shards(), - current_shard, - ), - ) - .await?; - } else { - custom_protocol_response_ok(&mut self.write, "SET SHARD").await?; - } - } - } - continue; - } - - // SET PRIMARY READS TO - Some((Command::SetPrimaryReads, _)) => { - custom_protocol_response_ok(&mut self.write, "SET PRIMARY READS").await?; - continue; - } - - // SET SHARDING KEY TO - Some((Command::SetShardingKey, _)) => { - custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?; - continue; - } - - // SET SERVER ROLE TO - Some((Command::SetServerRole, _)) => { - custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?; - continue; - } - - // SHOW SERVER ROLE - Some((Command::ShowServerRole, value)) => { - show_response(&mut self.write, "server role", &value).await?; - continue; - } - - // SHOW SHARD - Some((Command::ShowShard, value)) => { - show_response(&mut self.write, "shard", &value).await?; - continue; - } - - // SHOW PRIMARY READS - Some((Command::ShowPrimaryReads, value)) => { - show_response(&mut self.write, "primary reads", &value).await?; - continue; - } - }; - debug!("Waiting for connection from pool"); if !self.admin { self.stats.waiting(); @@ -1096,7 +1026,7 @@ where if message[0] as char == 'S' { error!("Got Sync message but failed to get a connection from the pool"); - self.buffer.clear(); + self.reset_buffered_state(); } error_response( @@ -1162,58 +1092,7 @@ where // If the client is in session mode, no more custom protocol // commands will be accepted. loop { - // Only check if we should rewrite prepared statements - // in session mode. In transaction mode, we check at the beginning of - // each transaction. - if !self.transaction_mode { - prepared_statements_enabled = get_prepared_statements(); - } - - debug!("Prepared statement active: {:?}", prepared_statement); - - // We are processing a prepared statement. - if let Some(ref name) = prepared_statement { - debug!("Checking prepared statement is on server"); - // Get the prepared statement the server expects to see. - let statement = match self.prepared_statements.get(name) { - Some(statement) => { - debug!("Prepared statement `{}` found in cache", name); - statement - } - None => { - return Err(Error::ClientError(format!( - "prepared statement `{}` not found", - name - ))) - } - }; - - // Since it's already in the buffer, we don't need to prepare it on this server. - if will_prepare { - server.will_prepare(&statement.name); - will_prepare = false; - } else { - // The statement is not prepared on the server, so we need to prepare it. - if server.should_prepare(&statement.name) { - match server.prepare(statement).await { - Ok(_) => (), - Err(err) => { - pool.ban( - &address, - BanReason::MessageSendFailed, - Some(&self.stats), - ); - return Err(err); - } - } - } - } - - // Done processing the prepared statement. - prepared_statement = None; - } - - let mut message = match initial_message { + let message = match initial_message { None => { trace!("Waiting for message inside transaction or in session mode"); @@ -1344,21 +1223,12 @@ where self.stats.disconnect(); self.release(); - if prepared_statements_enabled { - server.maintain_cache().await?; - } - return Ok(()); } // Parse // The query with placeholders is here, e.g. `SELECT * FROM users WHERE email = $1 AND active = $2`. 'P' => { - if prepared_statements_enabled { - (prepared_statement, message) = self.rewrite_parse(message)?; - will_prepare = true; - } - if query_router.query_parser_enabled() { if let Ok(ast) = query_router.parse(&message) { if let Ok(output) = query_router.execute_plugins(&ast).await { @@ -1367,55 +1237,35 @@ where } } - self.buffer.put(&message[..]); + self.buffer_parse(message, &pool)?; } // Bind // The placeholder's replacements are here, e.g. 'user@email.com' and 'true' 'B' => { - if prepared_statements_enabled { - (prepared_statement, message) = self.rewrite_bind(message).await?; - } - - self.buffer.put(&message[..]); + self.buffer_bind(message).await?; } // Describe // Command a client can issue to describe a previously prepared named statement. 'D' => { - if prepared_statements_enabled { - let name; - (name, message) = self.rewrite_describe(message).await?; - - if let Some(name) = name { - prepared_statement = Some(name); - } - } + self.buffer_describe(message).await?; + } - self.buffer.put(&message[..]); + // Execute + // Execute a prepared statement prepared in `P` and bound in `B`. + 'E' => { + self.extended_protocol_data_buffer + .push_back(ExtendedProtocolData::create_new_execute(message)); } + // Close // Close the prepared statement. 'C' => { - if prepared_statements_enabled { - let close: Close = (&message).try_into()?; - - if close.is_prepared_statement() && !close.anonymous() { - if let Some(parse) = self.prepared_statements.get(&close.name) { - server.will_close(&parse.generated_name); - } else { - // A prepared statement slipped through? Not impossible, since we don't support PREPARE yet. - }; - } - } - - self.buffer.put(&message[..]); - } + let close: Close = (&message).try_into()?; - // Execute - // Execute a prepared statement prepared in `P` and bound in `B`. - 'E' => { - self.buffer.put(&message[..]); + self.extended_protocol_data_buffer + .push_back(ExtendedProtocolData::create_new_close(message, close)); } // Sync @@ -1427,47 +1277,182 @@ where Some(PluginOutput::Deny(error)) => { error_response(&mut self.write, &error).await?; plugin_output = None; - self.buffer.clear(); + self.reset_buffered_state(); continue; } Some(PluginOutput::Intercept(result)) => { write_all(&mut self.write, result).await?; plugin_output = None; - self.buffer.clear(); + self.reset_buffered_state(); continue; } _ => (), }; + // Prepared statements can arrive like this + // 1. Without named describe + // Client: Parse, with name, query and params + // Sync + // Server: ParseComplete + // ReadyForQuery + // 3. Without named describe + // Client: Parse, with name, query and params + // Describe, with no name + // Sync + // Server: ParseComplete + // ParameterDescription + // RowDescription + // ReadyForQuery + // 2. With named describe + // Client: Parse, with name, query and params + // Describe, with name + // Sync + // Server: ParseComplete + // ParameterDescription + // RowDescription + // ReadyForQuery + + // Iterate over our extended protocol data that we've buffered + while let Some(protocol_data) = + self.extended_protocol_data_buffer.pop_front() + { + match protocol_data { + ExtendedProtocolData::Parse { data, metadata } => { + let (parse, hash) = match metadata { + Some(metadata) => metadata, + None => { + let first_char_in_name = *data.get(5).unwrap_or(&0); + if first_char_in_name != 0 { + // This is a named prepared statement while prepared statements are disabled + // Server connection state will need to be cleared at checkin + server.mark_dirty(); + } + // Not a prepared statement + self.buffer.put(&data[..]); + continue; + } + }; + + // This is a prepared statement we already have on the checked out server + if server.has_prepared_statement(&parse.name) { + debug!( + "Prepared statement `{}` found in server cache", + parse.name + ); + + // We don't want to send the parse message to the server + // Instead queue up a parse complete message to send to the client + self.response_message_queue_buffer.put(parse_complete()); + } else { + debug!( + "Prepared statement `{}` not found in server cache", + parse.name + ); + + // TODO: Consider adding the close logic that this function can send for eviction to the client buffer instead + // In this case we don't want to send the parse message to the server since the client is sending it + self.register_parse_to_server_cache( + false, &hash, &parse, &pool, server, &address, + ) + .await?; + + // Add parse message to buffer + self.buffer.put(&data[..]); + } + } + ExtendedProtocolData::Bind { data, metadata } => { + // This is using a prepared statement + if let Some(client_given_name) = metadata { + self.ensure_prepared_statement_is_on_server( + client_given_name, + &pool, + server, + &address, + ) + .await?; + } + + self.buffer.put(&data[..]); + } + ExtendedProtocolData::Describe { data, metadata } => { + // This is using a prepared statement + if let Some(client_given_name) = metadata { + self.ensure_prepared_statement_is_on_server( + client_given_name, + &pool, + server, + &address, + ) + .await?; + } + + self.buffer.put(&data[..]); + } + ExtendedProtocolData::Execute { data } => { + self.buffer.put(&data[..]) + } + ExtendedProtocolData::Close { data, close } => { + // We don't send the close message to the server if prepared statements are enabled + // and it's a close with a prepared statement name provided + if self.prepared_statements_enabled + && close.is_prepared_statement() + && !close.anonymous() + { + self.prepared_statements.remove(&close.name); + + // Queue up a close complete message to send to the client + self.response_message_queue_buffer.put(close_complete()); + } else { + self.buffer.put(&data[..]); + } + } + } + } + + // Add the sync message self.buffer.put(&message[..]); - let first_message_code = (*self.buffer.first().unwrap_or(&0)) as char; - - // Almost certainly true - if first_message_code == 'P' && !prepared_statements_enabled { - // Message layout - // P followed by 32 int followed by null-terminated statement name - // So message code should be in offset 0 of the buffer, first character - // in prepared statement name would be index 5 - let first_char_in_name = *self.buffer.get(5).unwrap_or(&0); - if first_char_in_name != 0 { - // This is a named prepared statement - // Server connection state will need to be cleared at checkin - server.mark_dirty(); + let mut should_send_to_server = true; + + // If we have just a sync message left (maybe after omitting sending some messages to the server) no need to send it to the server + if *self.buffer.first().unwrap() == b'S' { + should_send_to_server = false; + // queue up a ready for query message to send to the client, respecting the transaction state of the server + self.response_message_queue_buffer + .put(ready_for_query(server.in_transaction())); + } + + // Send all queued messages to the client + // NOTE: it's possible we don't perfectly send things back in the same order as postgres would, + // however clients should be able to handle this + if !self.response_message_queue_buffer.is_empty() { + if let Err(err) = write_all_flush( + &mut self.write, + &self.response_message_queue_buffer, + ) + .await + { + // We might be in some kind of error/in between protocol state + server.mark_bad(); + return Err(err); } + + self.response_message_queue_buffer.clear(); } - self.send_and_receive_loop( - code, - None, - server, - &address, - &pool, - &self.stats.clone(), - ) - .await?; + if should_send_to_server { + self.send_and_receive_loop( + code, + None, + server, + &address, + &pool, + &self.stats.clone(), + ) + .await?; + } self.buffer.clear(); @@ -1549,10 +1534,6 @@ where server.checkin_cleanup().await?; - if prepared_statements_enabled { - server.maintain_cache().await?; - } - server.stats().idle(); self.connected_to_server = false; @@ -1586,68 +1567,237 @@ where } } - /// Rewrite Parse (F) message to set the prepared statement name to one we control. - /// Save it into the client cache. - fn rewrite_parse(&mut self, message: BytesMut) -> Result<(Option, BytesMut), Error> { - let parse: Parse = (&message).try_into()?; + /// Handles custom protocol messages + /// Returns true if the message is custom protocol message, false otherwise + /// Does not work with prepared statements, only simple and extended protocol without parameters + async fn handle_custom_protocol( + &mut self, + query_router: &mut QueryRouter, + message: &BytesMut, + pool: &ConnectionPool, + ) -> Result { + let current_shard = query_router.shard(); + + match query_router.try_execute_command(message) { + None => Ok(false), + + Some(custom) => { + match custom { + // SET SHARD TO + (Command::SetShard, _) => { + match query_router.shard() { + None => {} + Some(selected_shard) => { + if selected_shard >= pool.shards() { + // Bad shard number, send error message to client. + query_router.set_shard(current_shard); + + error_response( + &mut self.write, + &format!( + "shard {} is not configured {}, staying on shard {:?} (shard numbers start at 0)", + selected_shard, + pool.shards(), + current_shard, + ), + ) + .await?; + } else { + custom_protocol_response_ok(&mut self.write, "SET SHARD") + .await?; + } + } + } + } + + // SET PRIMARY READS TO + (Command::SetPrimaryReads, _) => { + custom_protocol_response_ok(&mut self.write, "SET PRIMARY READS").await?; + } + + // SET SHARDING KEY TO + (Command::SetShardingKey, _) => { + custom_protocol_response_ok(&mut self.write, "SET SHARDING KEY").await?; + } + + // SET SERVER ROLE TO + (Command::SetServerRole, _) => { + custom_protocol_response_ok(&mut self.write, "SET SERVER ROLE").await?; + } + + // SHOW SERVER ROLE + (Command::ShowServerRole, value) => { + show_response(&mut self.write, "server role", &value).await?; + } + + // SHOW SHARD + (Command::ShowShard, value) => { + show_response(&mut self.write, "shard", &value).await?; + } + + // SHOW PRIMARY READS + (Command::ShowPrimaryReads, value) => { + show_response(&mut self.write, "primary reads", &value).await?; + } + }; + + Ok(true) + } + } + } - let name = parse.name.clone(); + /// Makes sure the the checked out server has the prepared statement and sends it to the server if it doesn't + async fn ensure_prepared_statement_is_on_server( + &mut self, + client_name: String, + pool: &ConnectionPool, + server: &mut Server, + address: &Address, + ) -> Result<(), Error> { + match self.prepared_statements.get(&client_name) { + Some((parse, hash)) => { + debug!("Prepared statement `{}` found in cache", parse.name); + // In this case we want to send the parse message to the server + // since pgcat is initiating the prepared statement on this specific server + self.register_parse_to_server_cache(true, hash, parse, pool, server, address) + .await?; + } + + None => { + return Err(Error::ClientError(format!( + "prepared statement `{}` not found", + client_name + ))) + } + }; + + Ok(()) + } + + /// Register the parse to the server cache and send it to the server if requested (ie. requested by pgcat) + /// + /// Also updates the pool LRU that this parse was used recently + async fn register_parse_to_server_cache( + &self, + should_send_parse_to_server: bool, + hash: &u64, + parse: &Arc, + pool: &ConnectionPool, + server: &mut Server, + address: &Address, + ) -> Result<(), Error> { + // We want to promote this in the pool's LRU + pool.promote_prepared_statement_hash(hash); - // Don't rewrite anonymous prepared statements - if parse.anonymous() { - debug!("Anonymous prepared statement"); - return Ok((None, message)); + if let Err(err) = server + .register_prepared_statement(parse, should_send_parse_to_server) + .await + { + pool.ban(address, BanReason::MessageSendFailed, Some(&self.stats)); + return Err(err); } - let parse = parse.rename(); + Ok(()) + } + + /// Register and rewrite the parse statement to the clients statement cache + /// and also the pool's statement cache. Add it to extended protocol data. + fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> { + // Avoid parsing if prepared statements not enabled + let client_given_name = match self.prepared_statements_enabled { + true => Parse::get_name(&message)?, + false => "".to_string(), + }; + + if client_given_name.is_empty() { + debug!("Anonymous parse message"); + self.extended_protocol_data_buffer + .push_back(ExtendedProtocolData::create_new_parse(message, None)); + return Ok(()); + } + + let parse: Parse = (&message).try_into()?; + + // Compute the hash of the parse statement + let hash = parse.get_hash(); + + // Add the statement to the cache or check if we already have it + let new_parse = match pool.register_parse_to_cache(hash, &parse) { + Some(parse) => parse, + None => { + return Err(Error::ClientError(format!( + "Could not store Prepared statement `{}`", + client_given_name + ))) + } + }; debug!( "Renamed prepared statement `{}` to `{}` and saved to cache", - name, parse.name + client_given_name, new_parse.name ); - self.prepared_statements.insert(name.clone(), parse.clone()); + self.prepared_statements + .insert(client_given_name, (new_parse.clone(), hash)); - Ok((Some(name), parse.try_into()?)) + self.extended_protocol_data_buffer + .push_back(ExtendedProtocolData::create_new_parse( + new_parse.as_ref().try_into()?, + Some((new_parse.clone(), hash)), + )); + + Ok(()) } /// Rewrite the Bind (F) message to use the prepared statement name /// saved in the client cache. - async fn rewrite_bind( - &mut self, - message: BytesMut, - ) -> Result<(Option, BytesMut), Error> { - let bind: Bind = (&message).try_into()?; - let name = bind.prepared_statement.clone(); + async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> { + // Avoid parsing if prepared statements not enabled + let client_given_name = match self.prepared_statements_enabled { + true => Bind::get_name(&message)?, + false => "".to_string(), + }; - if bind.anonymous() { + if client_given_name.is_empty() { debug!("Anonymous bind message"); - return Ok((None, message)); + self.extended_protocol_data_buffer + .push_back(ExtendedProtocolData::create_new_bind(message, None)); + return Ok(()); } - match self.prepared_statements.get(&name) { - Some(prepared_stmt) => { - let bind = bind.reassign(prepared_stmt); + match self.prepared_statements.get(&client_given_name) { + Some((rewritten_parse, _)) => { + let message = Bind::rename(message, &rewritten_parse.name)?; - debug!("Rewrote bind `{}` to `{}`", name, bind.prepared_statement); + debug!( + "Rewrote bind `{}` to `{}`", + client_given_name, rewritten_parse.name + ); - Ok((Some(name), bind.try_into()?)) + self.extended_protocol_data_buffer.push_back( + ExtendedProtocolData::create_new_bind(message, Some(client_given_name)), + ); + + Ok(()) } None => { - debug!("Got bind for unknown prepared statement {:?}", bind); + debug!( + "Got bind for unknown prepared statement {:?}", + client_given_name + ); error_response( &mut self.write, &format!( "prepared statement \"{}\" does not exist", - bind.prepared_statement + client_given_name ), ) .await?; Err(Error::ClientError(format!( "Prepared statement `{}` doesn't exist", - name + client_given_name ))) } } @@ -1655,38 +1805,68 @@ where /// Rewrite the Describe (F) message to use the prepared statement name /// saved in the client cache. - async fn rewrite_describe( - &mut self, - message: BytesMut, - ) -> Result<(Option, BytesMut), Error> { - let describe: Describe = (&message).try_into()?; - let name = describe.statement_name.clone(); + async fn buffer_describe(&mut self, message: BytesMut) -> Result<(), Error> { + // Avoid parsing if prepared statements not enabled + let describe: Describe = match self.prepared_statements_enabled { + true => (&message).try_into()?, + false => Describe::empty_new(), + }; if describe.anonymous() { - debug!("Anonymous describe"); - return Ok((None, message)); + debug!("Anonymous describe message"); + self.extended_protocol_data_buffer + .push_back(ExtendedProtocolData::create_new_describe(message, None)); + + return Ok(()); } - match self.prepared_statements.get(&name) { - Some(prepared_stmt) => { - let describe = describe.rename(&prepared_stmt.name); + let client_given_name = describe.statement_name.clone(); + + match self.prepared_statements.get(&client_given_name) { + Some((rewritten_parse, _)) => { + let describe = describe.rename(&rewritten_parse.name); debug!( "Rewrote describe `{}` to `{}`", - name, describe.statement_name + client_given_name, describe.statement_name ); - Ok((Some(name), describe.try_into()?)) + self.extended_protocol_data_buffer.push_back( + ExtendedProtocolData::create_new_describe( + describe.try_into()?, + Some(client_given_name), + ), + ); + + Ok(()) } None => { debug!("Got describe for unknown prepared statement {:?}", describe); - Ok((None, message)) + error_response( + &mut self.write, + &format!( + "prepared statement \"{}\" does not exist", + client_given_name + ), + ) + .await?; + + Err(Error::ClientError(format!( + "Prepared statement `{}` doesn't exist", + client_given_name + ))) } } } + fn reset_buffered_state(&mut self) { + self.buffer.clear(); + self.extended_protocol_data_buffer.clear(); + self.response_message_queue_buffer.clear(); + } + /// Release the server from the client: it can't cancel its queries anymore. pub fn release(&self) { let mut guard = self.client_server_map.lock(); @@ -1723,6 +1903,7 @@ where match write_all_flush(&mut self.write, &response).await { Ok(_) => (), Err(err) => { + // We might be in some kind of error/in between protocol state, better to just kill this server server.mark_bad(); return Err(err); } diff --git a/src/config.rs b/src/config.rs index f91e488e..3d140b9b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -116,10 +116,10 @@ impl Default for Address { host: String::from("127.0.0.1"), port: 5432, shard: 0, - address_index: 0, - replica_number: 0, database: String::from("database"), role: Role::Replica, + replica_number: 0, + address_index: 0, username: String::from("username"), pool_name: String::from("pool_name"), mirrors: Vec::new(), @@ -337,12 +337,6 @@ pub struct General { pub auth_query: Option, pub auth_query_user: Option, pub auth_query_password: Option, - - #[serde(default)] - pub prepared_statements: bool, - - #[serde(default = "General::default_prepared_statements_cache_size")] - pub prepared_statements_cache_size: usize, } impl General { @@ -424,10 +418,6 @@ impl General { pub fn default_server_round_robin() -> bool { true } - - pub fn default_prepared_statements_cache_size() -> usize { - 500 - } } impl Default for General { @@ -439,35 +429,33 @@ impl Default for General { prometheus_exporter_port: 9930, connect_timeout: General::default_connect_timeout(), idle_timeout: General::default_idle_timeout(), - shutdown_timeout: Self::default_shutdown_timeout(), - healthcheck_timeout: Self::default_healthcheck_timeout(), - healthcheck_delay: Self::default_healthcheck_delay(), - ban_time: Self::default_ban_time(), - worker_threads: Self::default_worker_threads(), - idle_client_in_transaction_timeout: Self::default_idle_client_in_transaction_timeout(), tcp_keepalives_idle: Self::default_tcp_keepalives_idle(), tcp_keepalives_count: Self::default_tcp_keepalives_count(), tcp_keepalives_interval: Self::default_tcp_keepalives_interval(), tcp_user_timeout: Self::default_tcp_user_timeout(), log_client_connections: false, log_client_disconnections: false, - autoreload: None, dns_cache_enabled: false, dns_max_ttl: Self::default_dns_max_ttl(), + shutdown_timeout: Self::default_shutdown_timeout(), + healthcheck_timeout: Self::default_healthcheck_timeout(), + healthcheck_delay: Self::default_healthcheck_delay(), + ban_time: Self::default_ban_time(), + idle_client_in_transaction_timeout: Self::default_idle_client_in_transaction_timeout(), + server_lifetime: Self::default_server_lifetime(), + server_round_robin: Self::default_server_round_robin(), + worker_threads: Self::default_worker_threads(), + autoreload: None, tls_certificate: None, tls_private_key: None, server_tls: false, verify_server_certificate: false, admin_username: String::from("admin"), admin_password: String::from("admin"), + validate_config: true, auth_query: None, auth_query_user: None, auth_query_password: None, - server_lifetime: Self::default_server_lifetime(), - server_round_robin: Self::default_server_round_robin(), - validate_config: true, - prepared_statements: false, - prepared_statements_cache_size: 500, } } } @@ -568,6 +556,9 @@ pub struct Pool { #[serde(default)] // False pub log_client_parameter_status_changes: bool, + #[serde(default = "Pool::default_prepared_statements_cache_size")] + pub prepared_statements_cache_size: usize, + pub plugins: Option, pub shards: BTreeMap, pub users: BTreeMap, @@ -617,6 +608,10 @@ impl Pool { true } + pub fn default_prepared_statements_cache_size() -> usize { + 0 + } + pub fn validate(&mut self) -> Result<(), Error> { match self.default_role.as_ref() { "any" => (), @@ -708,17 +703,16 @@ impl Default for Pool { Pool { pool_mode: Self::default_pool_mode(), load_balancing_mode: Self::default_load_balancing_mode(), - shards: BTreeMap::from([(String::from("1"), Shard::default())]), - users: BTreeMap::default(), default_role: String::from("any"), query_parser_enabled: false, query_parser_max_length: None, query_parser_read_write_splitting: false, primary_reads_enabled: false, - sharding_function: ShardingFunction::PgBigintHash, - automatic_sharding_key: None, connect_timeout: None, idle_timeout: None, + server_lifetime: None, + sharding_function: ShardingFunction::PgBigintHash, + automatic_sharding_key: None, sharding_key_regex: None, shard_id_regex: None, regex_search_limit: Some(1000), @@ -726,10 +720,12 @@ impl Default for Pool { auth_query: None, auth_query_user: None, auth_query_password: None, - server_lifetime: None, - plugins: None, cleanup_server_connections: true, log_client_parameter_status_changes: false, + prepared_statements_cache_size: Self::default_prepared_statements_cache_size(), + plugins: None, + shards: BTreeMap::from([(String::from("1"), Shard::default())]), + users: BTreeMap::default(), } } } @@ -841,13 +837,13 @@ impl Shard { impl Default for Shard { fn default() -> Shard { Shard { + database: String::from("postgres"), + mirrors: None, servers: vec![ServerConfig { host: String::from("localhost"), port: 5432, role: Role::Primary, }], - mirrors: None, - database: String::from("postgres"), } } } @@ -1018,8 +1014,8 @@ impl Default for Config { Config { path: Self::default_path(), general: General::default(), - pools: HashMap::default(), plugins: None, + pools: HashMap::default(), } } } @@ -1128,6 +1124,7 @@ impl From<&Config> for std::collections::HashMap { impl Config { /// Print current configuration. pub fn show(&self) { + info!("Config path: {}", self.path); info!("Ban time: {}s", self.general.ban_time); info!( "Idle client in transaction timeout: {}ms", @@ -1174,13 +1171,6 @@ impl Config { "Server TLS certificate verification: {}", self.general.verify_server_certificate ); - info!("Prepared statements: {}", self.general.prepared_statements); - if self.general.prepared_statements { - info!( - "Prepared statements server cache size: {}", - self.general.prepared_statements_cache_size - ); - } info!( "Plugins: {}", match self.plugins { @@ -1271,6 +1261,10 @@ impl Config { "[pool: {}] Log client parameter status changes: {}", pool_name, pool_config.log_client_parameter_status_changes ); + info!( + "[pool: {}] Prepared statements server cache size: {}", + pool_name, pool_config.prepared_statements_cache_size + ); info!( "[pool: {}] Plugins: {}", pool_name, @@ -1413,14 +1407,6 @@ pub fn get_idle_client_in_transaction_timeout() -> u64 { CONFIG.load().general.idle_client_in_transaction_timeout } -pub fn get_prepared_statements() -> bool { - CONFIG.load().general.prepared_statements -} - -pub fn get_prepared_statements_cache_size() -> usize { - CONFIG.load().general.prepared_statements_cache_size -} - /// Parse the configuration file located at the path. pub async fn parse(path: &str) -> Result<(), Error> { let mut contents = String::new(); diff --git a/src/messages.rs b/src/messages.rs index 86036a92..3a26f42a 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -12,13 +12,16 @@ use crate::config::get_config; use crate::errors::Error; use crate::constants::MESSAGE_TERMINATOR; +use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; use std::ffi::CString; use std::fmt::{Display, Formatter}; +use std::hash::{Hash, Hasher}; use std::io::{BufRead, Cursor}; use std::mem; use std::str::FromStr; use std::sync::atomic::Ordering; +use std::sync::Arc; use std::time::Duration; /// Postgres data type mappings @@ -114,19 +117,11 @@ pub fn simple_query(query: &str) -> BytesMut { } /// Tell the client we're ready for another query. -pub async fn ready_for_query(stream: &mut S) -> Result<(), Error> +pub async fn send_ready_for_query(stream: &mut S) -> Result<(), Error> where S: tokio::io::AsyncWrite + std::marker::Unpin, { - let mut bytes = BytesMut::with_capacity( - mem::size_of::() + mem::size_of::() + mem::size_of::(), - ); - - bytes.put_u8(b'Z'); - bytes.put_i32(5); - bytes.put_u8(b'I'); // Idle - - write_all(stream, bytes).await + write_all(stream, ready_for_query(false)).await } /// Send the startup packet the server. We're pretending we're a Pg client. @@ -320,7 +315,7 @@ where res.put_slice(&set_complete[..]); write_all_half(stream, &res).await?; - ready_for_query(stream).await + send_ready_for_query(stream).await } /// Send a custom error message to the client. @@ -331,7 +326,7 @@ where S: tokio::io::AsyncWrite + std::marker::Unpin, { error_response_terminal(stream, message).await?; - ready_for_query(stream).await + send_ready_for_query(stream).await } /// Send a custom error message to the client. @@ -432,7 +427,7 @@ where res.put(command_complete("SELECT 1")); write_all_half(stream, &res).await?; - ready_for_query(stream).await + send_ready_for_query(stream).await } pub fn row_description(columns: &Vec<(&str, DataType)>) -> BytesMut { @@ -562,6 +557,37 @@ pub fn flush() -> BytesMut { bytes } +pub fn sync() -> BytesMut { + let mut bytes = BytesMut::with_capacity(mem::size_of::() + mem::size_of::()); + bytes.put_u8(b'S'); + bytes.put_i32(4); + bytes +} + +pub fn parse_complete() -> BytesMut { + let mut bytes = BytesMut::with_capacity(mem::size_of::() + mem::size_of::()); + + bytes.put_u8(b'1'); + bytes.put_i32(4); + bytes +} + +pub fn ready_for_query(in_transaction: bool) -> BytesMut { + let mut bytes = BytesMut::with_capacity( + mem::size_of::() + mem::size_of::() + mem::size_of::(), + ); + + bytes.put_u8(b'Z'); + bytes.put_i32(5); + if in_transaction { + bytes.put_u8(b'T'); + } else { + bytes.put_u8(b'I'); + } + + bytes +} + /// Write all data in the buffer to the TcpStream. pub async fn write_all(stream: &mut S, buf: BytesMut) -> Result<(), Error> where @@ -740,6 +766,51 @@ impl BytesMutReader for BytesMut { } } } + +pub enum ExtendedProtocolData { + Parse { + data: BytesMut, + metadata: Option<(Arc, u64)>, + }, + Bind { + data: BytesMut, + metadata: Option, + }, + Describe { + data: BytesMut, + metadata: Option, + }, + Execute { + data: BytesMut, + }, + Close { + data: BytesMut, + close: Close, + }, +} + +impl ExtendedProtocolData { + pub fn create_new_parse(data: BytesMut, metadata: Option<(Arc, u64)>) -> Self { + Self::Parse { data, metadata } + } + + pub fn create_new_bind(data: BytesMut, metadata: Option) -> Self { + Self::Bind { data, metadata } + } + + pub fn create_new_describe(data: BytesMut, metadata: Option) -> Self { + Self::Describe { data, metadata } + } + + pub fn create_new_execute(data: BytesMut) -> Self { + Self::Execute { data } + } + + pub fn create_new_close(data: BytesMut, close: Close) -> Self { + Self::Close { data, close } + } +} + /// Parse (F) message. /// See: #[derive(Clone, Debug)] @@ -748,7 +819,6 @@ pub struct Parse { #[allow(dead_code)] len: i32, pub name: String, - pub generated_name: String, query: String, num_params: i16, param_types: Vec, @@ -774,7 +844,6 @@ impl TryFrom<&BytesMut> for Parse { code, len, name, - generated_name: prepared_statement_name(), query, num_params, param_types, @@ -823,11 +892,44 @@ impl TryFrom<&Parse> for BytesMut { } impl Parse { - pub fn rename(mut self) -> Self { - self.name = self.generated_name.to_string(); + /// Renames the prepared statement to a new name based on the global counter + pub fn rewrite(mut self) -> Self { + self.name = format!( + "PGCAT_{}", + PREPARED_STATEMENT_COUNTER.fetch_add(1, Ordering::SeqCst) + ); self } + /// Gets the name of the prepared statement from the buffer + pub fn get_name(buf: &BytesMut) -> Result { + let mut cursor = Cursor::new(buf); + // Skip the code and length + cursor.advance(mem::size_of::() + mem::size_of::()); + cursor.read_string() + } + + /// Hashes the parse statement to be used as a key in the global cache + pub fn get_hash(&self) -> u64 { + // TODO_ZAIN: Take a look at which hashing function is being used + let mut hasher = DefaultHasher::new(); + + let concatenated = format!( + "{}{}{}", + self.query, + self.num_params, + self.param_types + .iter() + .map(ToString::to_string) + .collect::>() + .join(",") + ); + + concatenated.hash(&mut hasher); + + hasher.finish() + } + pub fn anonymous(&self) -> bool { self.name.is_empty() } @@ -958,9 +1060,42 @@ impl TryFrom for BytesMut { } impl Bind { - pub fn reassign(mut self, parse: &Parse) -> Self { - self.prepared_statement = parse.name.clone(); - self + /// Gets the name of the prepared statement from the buffer + pub fn get_name(buf: &BytesMut) -> Result { + let mut cursor = Cursor::new(buf); + // Skip the code and length + cursor.advance(mem::size_of::() + mem::size_of::()); + cursor.read_string()?; + cursor.read_string() + } + + /// Renames the prepared statement to a new name + pub fn rename(buf: BytesMut, new_name: &str) -> Result { + let mut cursor = Cursor::new(&buf); + // Read basic data from the cursor + let code = cursor.get_u8(); + let current_len = cursor.get_i32(); + let portal = cursor.read_string()?; + let prepared_statement = cursor.read_string()?; + + // Calculate new length + let new_len = current_len + new_name.len() as i32 - prepared_statement.len() as i32; + + // Begin building the response buffer + let mut response_buf = BytesMut::with_capacity(new_len as usize + 1); + response_buf.put_u8(code); + response_buf.put_i32(new_len); + + // Put the portal and new name into the buffer + // Note: panic if the provided string contains null byte + response_buf.put_slice(CString::new(portal)?.as_bytes_with_nul()); + response_buf.put_slice(CString::new(new_name)?.as_bytes_with_nul()); + + // Add the remainder of the original buffer into the response + response_buf.put_slice(&buf[cursor.position() as usize..]); + + // Return the buffer + Ok(response_buf) } pub fn anonymous(&self) -> bool { @@ -1016,6 +1151,15 @@ impl TryFrom for BytesMut { } impl Describe { + pub fn empty_new() -> Describe { + Describe { + code: 'D', + len: 4 + 1 + 1, + target: 'S', + statement_name: "".to_string(), + } + } + pub fn rename(mut self, name: &str) -> Self { self.statement_name = name.to_string(); self @@ -1104,13 +1248,6 @@ pub fn close_complete() -> BytesMut { bytes } -pub fn prepared_statement_name() -> String { - format!( - "P_{}", - PREPARED_STATEMENT_COUNTER.fetch_add(1, Ordering::SeqCst) - ) -} - // from https://www.postgresql.org/docs/12/protocol-error-fields.html #[derive(Debug, Default, PartialEq)] pub struct PgErrorMsg { @@ -1193,7 +1330,7 @@ impl Display for PgErrorMsg { } impl PgErrorMsg { - pub fn parse(error_msg: Vec) -> Result { + pub fn parse(error_msg: &[u8]) -> Result { let mut out = PgErrorMsg { severity_localized: "".to_string(), severity: "".to_string(), @@ -1341,7 +1478,7 @@ mod tests { info!( "full message: {}", - PgErrorMsg::parse(complete_msg.clone()).unwrap() + PgErrorMsg::parse(&complete_msg).unwrap() ); assert_eq!( PgErrorMsg { @@ -1364,7 +1501,7 @@ mod tests { line: Some(335), routine: Some(routine_msg.to_string()), }, - PgErrorMsg::parse(complete_msg).unwrap() + PgErrorMsg::parse(&complete_msg).unwrap() ); let mut only_mandatory_msg = vec![]; @@ -1374,7 +1511,7 @@ mod tests { only_mandatory_msg.extend(field('M', message)); only_mandatory_msg.extend(field('D', detail_msg)); - let err_fields = PgErrorMsg::parse(only_mandatory_msg.clone()).unwrap(); + let err_fields = PgErrorMsg::parse(&only_mandatory_msg).unwrap(); info!("only mandatory fields: {}", &err_fields); error!( "server error: {}: {}", @@ -1401,7 +1538,7 @@ mod tests { line: None, routine: None, }, - PgErrorMsg::parse(only_mandatory_msg).unwrap() + PgErrorMsg::parse(&only_mandatory_msg).unwrap() ); } } diff --git a/src/mirrors.rs b/src/mirrors.rs index f704a8cd..5c39504e 100644 --- a/src/mirrors.rs +++ b/src/mirrors.rs @@ -23,14 +23,15 @@ impl MirroredClient { async fn create_pool(&self) -> Pool { let config = get_config(); let default = std::time::Duration::from_millis(10_000).as_millis() as u64; - let (connection_timeout, idle_timeout, _cfg) = + let (connection_timeout, idle_timeout, _cfg, prepared_statement_cache_size) = match config.pools.get(&self.address.pool_name) { Some(cfg) => ( cfg.connect_timeout.unwrap_or(default), cfg.idle_timeout.unwrap_or(default), cfg.clone(), + cfg.prepared_statements_cache_size, ), - None => (default, default, crate::config::Pool::default()), + None => (default, default, crate::config::Pool::default(), 0), }; let manager = ServerPool::new( @@ -42,6 +43,7 @@ impl MirroredClient { None, true, false, + prepared_statement_cache_size, ); Pool::builder() diff --git a/src/pool.rs b/src/pool.rs index 77394070..0de16c2a 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -3,6 +3,7 @@ use async_trait::async_trait; use bb8::{ManageConnection, Pool, PooledConnection, QueueStrategy}; use chrono::naive::NaiveDateTime; use log::{debug, error, info, warn}; +use lru::LruCache; use once_cell::sync::Lazy; use parking_lot::{Mutex, RwLock}; use rand::seq::SliceRandom; @@ -10,6 +11,7 @@ use rand::thread_rng; use regex::Regex; use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use std::num::NonZeroUsize; use std::sync::atomic::AtomicU64; use std::sync::{ atomic::{AtomicBool, Ordering}, @@ -24,6 +26,7 @@ use crate::config::{ use crate::errors::Error; use crate::auth_passthrough::AuthPassthrough; +use crate::messages::Parse; use crate::plugins::prewarmer; use crate::server::{Server, ServerParameters}; use crate::sharding::ShardingFunction; @@ -54,6 +57,57 @@ pub enum BanReason { AdminBan(i64), } +pub type PreparedStatementCacheType = Arc>; + +// TODO: Add stats the this cache +// TODO: Add application name to the cache value to help identify which application is using the cache +// TODO: Create admin command to show which statements are in the cache +#[derive(Debug)] +pub struct PreparedStatementCache { + cache: LruCache>, +} + +impl PreparedStatementCache { + pub fn new(mut size: usize) -> Self { + // Cannot be zeros + if size == 0 { + size = 1; + } + + PreparedStatementCache { + cache: LruCache::new(NonZeroUsize::new(size).unwrap()), + } + } + + /// Adds the prepared statement to the cache if it doesn't exist with a new name + /// if it already exists will give you the existing parse + /// + /// Pass the hash to this so that we can do the compute before acquiring the lock + pub fn get_or_insert(&mut self, parse: &Parse, hash: u64) -> Arc { + match self.cache.get(&hash) { + Some(rewritten_parse) => rewritten_parse.clone(), + None => { + let new_parse = Arc::new(parse.clone().rewrite()); + let evicted = self.cache.push(hash, new_parse.clone()); + + if let Some((_, evicted_parse)) = evicted { + debug!( + "Evicted prepared statement {} from cache", + evicted_parse.name + ); + } + + new_parse + } + } + } + + /// Marks the hash as most recently used if it exists + pub fn promote(&mut self, hash: &u64) { + self.cache.promote(hash); + } +} + /// An identifier for a PgCat pool, /// a database visible to clients. #[derive(Hash, Debug, Clone, PartialEq, Eq, Default)] @@ -223,6 +277,9 @@ pub struct ConnectionPool { /// AuthInfo pub auth_hash: Arc>>, + + /// Cache + pub prepared_statement_cache: Option, } impl ConnectionPool { @@ -376,6 +433,7 @@ impl ConnectionPool { }, pool_config.cleanup_server_connections, pool_config.log_client_parameter_status_changes, + pool_config.prepared_statements_cache_size, ); let connect_timeout = match pool_config.connect_timeout { @@ -498,6 +556,12 @@ impl ConnectionPool { validated: Arc::new(AtomicBool::new(false)), paused: Arc::new(AtomicBool::new(false)), paused_waiter: Arc::new(Notify::new()), + prepared_statement_cache: match pool_config.prepared_statements_cache_size { + 0 => None, + _ => Some(Arc::new(Mutex::new(PreparedStatementCache::new( + pool_config.prepared_statements_cache_size, + )))), + }, }; // Connect to the servers to make sure pool configuration is valid @@ -998,6 +1062,29 @@ impl ConnectionPool { Some(shard) => shard < self.shards(), } } + + /// Register a parse statement to the pool's cache and return the rewritten parse + /// + /// Do not pass an anonymous parse statement to this function + pub fn register_parse_to_cache(&self, hash: u64, parse: &Parse) -> Option> { + // We should only be calling this function if the cache is enabled + match self.prepared_statement_cache { + Some(ref prepared_statement_cache) => { + let mut cache = prepared_statement_cache.lock(); + Some(cache.get_or_insert(parse, hash)) + } + None => None, + } + } + + /// Promote a prepared statement hash in the LRU + pub fn promote_prepared_statement_hash(&self, hash: &u64) { + // We should only be calling this function if the cache is enabled + if let Some(ref prepared_statement_cache) = self.prepared_statement_cache { + let mut cache = prepared_statement_cache.lock(); + cache.promote(hash); + } + } } /// Wrapper for the bb8 connection pool. @@ -1025,6 +1112,9 @@ pub struct ServerPool { /// Log client parameter status changes log_client_parameter_status_changes: bool, + + /// Prepared statement cache size + prepared_statement_cache_size: usize, } impl ServerPool { @@ -1038,6 +1128,7 @@ impl ServerPool { plugins: Option, cleanup_connections: bool, log_client_parameter_status_changes: bool, + prepared_statement_cache_size: usize, ) -> ServerPool { ServerPool { address, @@ -1048,6 +1139,7 @@ impl ServerPool { plugins, cleanup_connections, log_client_parameter_status_changes, + prepared_statement_cache_size, } } } @@ -1078,6 +1170,7 @@ impl ManageConnection for ServerPool { self.auth_hash.clone(), self.cleanup_connections, self.log_client_parameter_status_changes, + self.prepared_statement_cache_size, ) .await { diff --git a/src/server.rs b/src/server.rs index 3394cda7..dff6a769 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,12 +3,14 @@ use bytes::{Buf, BufMut, BytesMut}; use fallible_iterator::FallibleIterator; use log::{debug, error, info, trace, warn}; +use lru::LruCache; use once_cell::sync::Lazy; use parking_lot::{Mutex, RwLock}; use postgres_protocol::message; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use std::mem; use std::net::IpAddr; +use std::num::NonZeroUsize; use std::sync::Arc; use std::time::SystemTime; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, BufStream}; @@ -16,7 +18,7 @@ use tokio::net::TcpStream; use tokio_rustls::rustls::{OwnedTrustAnchor, RootCertStore}; use tokio_rustls::{client::TlsStream, TlsConnector}; -use crate::config::{get_config, get_prepared_statements_cache_size, Address, User}; +use crate::config::{get_config, Address, User}; use crate::constants::*; use crate::dns_cache::{AddrSet, CACHED_RESOLVER}; use crate::errors::{Error, ServerIdentifier}; @@ -322,7 +324,7 @@ pub struct Server { log_client_parameter_status_changes: bool, /// Prepared statements - prepared_statements: BTreeSet, + prepared_statement_cache: Option>, } impl Server { @@ -338,6 +340,7 @@ impl Server { auth_hash: Arc>>, cleanup_connections: bool, log_client_parameter_status_changes: bool, + prepared_statement_cache_size: usize, ) -> Result { let cached_resolver = CACHED_RESOLVER.load(); let mut addr_set: Option = None; @@ -713,7 +716,7 @@ impl Server { } }; - let fields = match PgErrorMsg::parse(error) { + let fields = match PgErrorMsg::parse(&error) { Ok(f) => f, Err(err) => { return Err(err); @@ -818,7 +821,12 @@ impl Server { }, cleanup_connections, log_client_parameter_status_changes, - prepared_statements: BTreeSet::new(), + prepared_statement_cache: match prepared_statement_cache_size { + 0 => None, + _ => Some(LruCache::new( + NonZeroUsize::new(prepared_statement_cache_size).unwrap(), + )), + }, }; return Ok(server); @@ -957,6 +965,20 @@ impl Server { if self.in_copy_mode { self.in_copy_mode = false; } + + if self.prepared_statement_cache.is_some() { + let error_message = PgErrorMsg::parse(&message)?; + if error_message.message == "cached plan must not change result type" { + warn!("Server {:?} changed schema, dropping connection to clean up prepared statements", self.address); + // This will still result in an error to the client, but this server connection will drop all cached prepared statements + // so that any new queries will be re-prepared + // TODO: Other ideas to solve errors when there are DDL changes after a statement has been prepared + // - Recreate entire connection pool to force recreation of all server connections + // - Clear the ConnectionPool's statement cache so that new statement names are generated + // - Implement a retry (re-prepare) so the client doesn't see an error + self.cleanup_state.needs_cleanup_prepare = true; + } + } } // CommandComplete @@ -1067,115 +1089,92 @@ impl Server { Ok(bytes) } - /// Add the prepared statement to being tracked by this server. - /// The client is processing data that will create a prepared statement on this server. - pub fn will_prepare(&mut self, name: &str) { - debug!("Will prepare `{}`", name); - - self.prepared_statements.insert(name.to_string()); - self.stats.prepared_cache_add(); - } - - /// Check if we should prepare a statement on the server. - pub fn should_prepare(&self, name: &str) -> bool { - let should_prepare = !self.prepared_statements.contains(name); - - debug!("Should prepare `{}`: {}", name, should_prepare); + // Determines if the server already has a prepared statement with the given name + // Increments the prepared statement cache hit counter + pub fn has_prepared_statement(&mut self, name: &str) -> bool { + let cache = match &mut self.prepared_statement_cache { + Some(cache) => cache, + None => return false, + }; - if should_prepare { - self.stats.prepared_cache_miss(); - } else { + let has_it = cache.get(name).is_some(); + if has_it { self.stats.prepared_cache_hit(); + } else { + self.stats.prepared_cache_miss(); } - should_prepare + has_it } - /// Create a prepared statement on the server. - pub async fn prepare(&mut self, parse: &Parse) -> Result<(), Error> { - debug!("Preparing `{}`", parse.name); - - let bytes: BytesMut = parse.try_into()?; - self.send(&bytes).await?; - self.send(&flush()).await?; - - // Read and discard ParseComplete (B) - match read_message(&mut self.stream).await { - Ok(_) => (), - Err(err) => { - self.bad = true; - return Err(err); - } - } + pub fn add_prepared_statement_to_cache(&mut self, name: &str) -> Option { + let cache = match &mut self.prepared_statement_cache { + Some(cache) => cache, + None => return None, + }; - self.prepared_statements.insert(parse.name.to_string()); self.stats.prepared_cache_add(); - debug!("Prepared `{}`", parse.name); - - Ok(()) - } - - /// Maintain adequate cache size on the server. - pub async fn maintain_cache(&mut self) -> Result<(), Error> { - debug!("Cache maintenance run"); - - let max_cache_size = get_prepared_statements_cache_size(); - let mut names = Vec::new(); - - while self.prepared_statements.len() >= max_cache_size { - // The prepared statmeents are alphanumerically sorted by the BTree. - // FIFO. - if let Some(name) = self.prepared_statements.pop_last() { - names.push(name); + // If we evict something, we need to close it on the server + if let Some((evicted_name, _)) = cache.push(name.to_string(), ()) { + if evicted_name != name { + debug!( + "Evicted prepared statement {} from cache, replaced with {}", + evicted_name, name + ); + return Some(evicted_name); } - } - - if !names.is_empty() { - self.deallocate(names).await?; - } + }; - Ok(()) + None } - /// Remove the prepared statement from being tracked by this server. - /// The client is processing data that will cause the server to close the prepared statement. - pub fn will_close(&mut self, name: &str) { - debug!("Will close `{}`", name); + pub fn remove_prepared_statement_from_cache(&mut self, name: &str) { + let cache = match &mut self.prepared_statement_cache { + Some(cache) => cache, + None => return, + }; - self.prepared_statements.remove(name); + self.stats.prepared_cache_remove(); + cache.pop(name); } - /// Close a prepared statement on the server. - pub async fn deallocate(&mut self, names: Vec) -> Result<(), Error> { - for name in &names { - debug!("Deallocating prepared statement `{}`", name); + pub async fn register_prepared_statement( + &mut self, + parse: &Parse, + should_send_parse_to_server: bool, + ) -> Result<(), Error> { + if !self.has_prepared_statement(&parse.name) { + let mut bytes = BytesMut::new(); - let close = Close::new(name); - let bytes: BytesMut = close.try_into()?; + if should_send_parse_to_server { + let parse_bytes: BytesMut = parse.try_into()?; + bytes.extend_from_slice(&parse_bytes); + } - self.send(&bytes).await?; - } + // If we evict something, we need to close it on the server + // We do this by adding it to the messages we're sending to the server before the sync + if let Some(evicted_name) = self.add_prepared_statement_to_cache(&parse.name) { + self.remove_prepared_statement_from_cache(&evicted_name); + let close_bytes: BytesMut = Close::new(&evicted_name).try_into()?; + bytes.extend_from_slice(&close_bytes); + }; - if !names.is_empty() { - self.send(&flush()).await?; - } + // If we have a parse or close we need to send to the server, send them and sync + if !bytes.is_empty() { + bytes.extend_from_slice(&sync()); - // Read and discard CloseComplete (3) - for name in &names { - match read_message(&mut self.stream).await { - Ok(_) => { - self.prepared_statements.remove(name); - self.stats.prepared_cache_remove(); - debug!("Closed `{}`", name); - } + self.send(&bytes).await?; - Err(err) => { - self.bad = true; - return Err(err); + loop { + self.recv(None).await?; + + if !self.is_data_available() { + break; + } } - }; - } + } + }; Ok(()) } @@ -1312,6 +1311,10 @@ impl Server { if self.cleanup_state.needs_cleanup_prepare { reset_string.push_str("DEALLOCATE ALL;"); + // Since we deallocated all prepared statements, we need to clear the cache + if let Some(cache) = &mut self.prepared_statement_cache { + cache.clear(); + } }; self.query(&reset_string).await?; @@ -1377,6 +1380,7 @@ impl Server { Arc::new(RwLock::new(None)), true, false, + 0, ) .await?; debug!("Connected!, sending query."); diff --git a/src/stats/server.rs b/src/stats/server.rs index 443c0b6a..5d255994 100644 --- a/src/stats/server.rs +++ b/src/stats/server.rs @@ -49,6 +49,7 @@ pub struct ServerStats { pub error_count: Arc, pub prepared_hit_count: Arc, pub prepared_miss_count: Arc, + pub prepared_eviction_count: Arc, pub prepared_cache_size: Arc, } @@ -68,6 +69,7 @@ impl Default for ServerStats { reporter: get_reporter(), prepared_hit_count: Arc::new(AtomicU64::new(0)), prepared_miss_count: Arc::new(AtomicU64::new(0)), + prepared_eviction_count: Arc::new(AtomicU64::new(0)), prepared_cache_size: Arc::new(AtomicU64::new(0)), } } @@ -221,6 +223,7 @@ impl ServerStats { } pub fn prepared_cache_remove(&self) { + self.prepared_eviction_count.fetch_add(1, Ordering::Relaxed); self.prepared_cache_size.fetch_sub(1, Ordering::Relaxed); } } diff --git a/tests/pgbench/simple.sql b/tests/pgbench/simple.sql index ad5e6139..a8429d2b 100644 --- a/tests/pgbench/simple.sql +++ b/tests/pgbench/simple.sql @@ -36,4 +36,4 @@ SELECT abalance FROM pgbench_accounts WHERE aid = :aid; SET SERVER ROLE TO 'replica'; -- Read load balancing -SELECT abalance FROM pgbench_accounts WHERE aid = :aid; +SELECT abalance FROM pgbench_accounts WHERE aid = :aid; \ No newline at end of file diff --git a/tests/ruby/prepared_spec.rb b/tests/ruby/prepared_spec.rb index 58a30006..8a31243b 100644 --- a/tests/ruby/prepared_spec.rb +++ b/tests/ruby/prepared_spec.rb @@ -1,29 +1,214 @@ require_relative 'spec_helper' describe 'Prepared statements' do - let(:processes) { Helpers::Pgcat.three_shard_setup('sharded_db', 5) } + let(:pool_size) { 5 } + let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", pool_size) } + let(:prepared_statements_cache_size) { 100 } + let(:server_round_robin) { false } - context 'enabled' do - it 'will work over the same connection' do + before do + new_configs = processes.pgcat.current_config + new_configs["general"]["server_round_robin"] = server_round_robin + new_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = prepared_statements_cache_size + new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = pool_size + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + end + + context 'when trying prepared statements' do + it 'it allows unparameterized statements to succeed' do + conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + conn2 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + + prepared_query = "SELECT 1" + + # prepare query on server 1 and client 1 + conn1.prepare('statement1', prepared_query) + conn1.exec_prepared('statement1') + + conn2.transaction do + # Claim server 1 with client 2 + conn2.exec("SELECT 2") + + # Client 1 now runs the prepared query, and it's automatically + # prepared on server 2 + conn1.prepare('statement2', prepared_query) + conn1.exec_prepared('statement2') + + # Client 2 now prepares the same query that was already + # prepared on server 1. And PgBouncer reuses that already + # prepared query for this different client. + conn2.prepare('statement3', prepared_query) + conn2.exec_prepared('statement3') + end + ensure + conn1.close if conn1 + conn2.close if conn2 + end + + it 'it allows parameterized statements to succeed' do + conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + conn2 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + + prepared_query = "SELECT $1" + + # prepare query on server 1 and client 1 + conn1.prepare('statement1', prepared_query) + conn1.exec_prepared('statement1', [1]) + + conn2.transaction do + # Claim server 1 with client 2 + conn2.exec("SELECT 2") + + # Client 1 now runs the prepared query, and it's automatically + # prepared on server 2 + conn1.prepare('statement2', prepared_query) + conn1.exec_prepared('statement2', [1]) + + # Client 2 now prepares the same query that was already + # prepared on server 1. And PgBouncer reuses that already + # prepared query for this different client. + conn2.prepare('statement3', prepared_query) + conn2.exec_prepared('statement3', [1]) + end + ensure + conn1.close if conn1 + conn2.close if conn2 + + end + end + + context 'when trying large packets' do + it "works with large parse" do + conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + + long_string = "1" * 4096 * 10 + prepared_query = "SELECT '#{long_string}'" + + + # prepare query on server 1 and client 1 + conn1.prepare('statement1', prepared_query) + result = conn1.exec_prepared('statement1') + + # assert result matches long_string + expect(result.getvalue(0, 0)).to eq(long_string) + ensure + conn1.close if conn1 + end + + it "works with large bind" do + conn1 = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + + long_string = "1" * 4096 * 10 + prepared_query = "SELECT $1::text" + + # prepare query on server 1 and client 1 + conn1.prepare('statement1', prepared_query) + result = conn1.exec_prepared('statement1', [long_string]) + + # assert result matches long_string + expect(result.getvalue(0, 0)).to eq(long_string) + ensure + conn1.close if conn1 + end + end + + context 'when statement cache is smaller than set of unqiue statements' do + let(:prepared_statements_cache_size) { 1 } + let(:pool_size) { 1 } + + it "evicts all but 1 statement from the server cache" do + conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + + 5.times do |i| + prepared_query = "SELECT '#{i}'" + conn.prepare("statement#{i}", prepared_query) + result = conn.exec_prepared("statement#{i}") + expect(result.getvalue(0, 0)).to eq(i.to_s) + end + + # Check number of prepared statements (expected: 1) + n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i + expect(n_statements).to eq(1) + end + end + + context 'when statement cache is larger than set of unqiue statements' do + let(:pool_size) { 1 } + + it "does not evict any of the statements from the cache" do + # cache size 5 + conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + + 5.times do |i| + prepared_query = "SELECT '#{i}'" + conn.prepare("statement#{i}", prepared_query) + result = conn.exec_prepared("statement#{i}") + expect(result.getvalue(0, 0)).to eq(i.to_s) + end + + # Check number of prepared statements (expected: 1) + n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i + expect(n_statements).to eq(5) + end + end + + context 'when preparing the same query' do + let(:prepared_statements_cache_size) { 5 } + let(:pool_size) { 5 } + + it "reuses statement cache when there are different statement names on the same connection" do conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) 10.times do |i| statement_name = "statement_#{i}" conn.prepare(statement_name, 'SELECT $1::int') conn.exec_prepared(statement_name, [1]) - conn.describe_prepared(statement_name) end + + # Check number of prepared statements (expected: 1) + n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i + expect(n_statements).to eq(1) end - it 'will work with new connections' do - 10.times do + it "reuses statement cache when there are different statement names on different connections" do + 10.times do |i| conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) - - statement_name = 'statement1' - conn.prepare('statement1', 'SELECT $1::int') - conn.exec_prepared('statement1', [1]) - conn.describe_prepared('statement1') + statement_name = "statement_#{i}" + conn.prepare(statement_name, 'SELECT $1::int') + conn.exec_prepared(statement_name, [1]) end + + # Check number of prepared statements (expected: 1) + conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + n_statements = conn.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i + expect(n_statements).to eq(1) + end + end + + context 'when reloading config' do + let(:pool_size) { 1 } + + it "test_reload_config" do + conn = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + + # prepare query + conn.prepare('statement1', 'SELECT 1') + conn.exec_prepared('statement1') + + # Reload config which triggers pool recreation + new_configs = processes.pgcat.current_config + new_configs["pools"]["sharded_db"]["prepared_statements_cache_size"] = prepared_statements_cache_size + 1 + processes.pgcat.update_config(new_configs) + processes.pgcat.reload_config + + # check that we're starting with no prepared statements on the server + conn_check = PG.connect(processes.pgcat.connection_string('sharded_db', 'sharding_user')) + n_statements = conn_check.exec("SELECT count(*) FROM pg_prepared_statements").getvalue(0, 0).to_i + expect(n_statements).to eq(0) + + # still able to run prepared query + conn.exec_prepared('statement1') end end end