From 913ccf5b6320203804b36fe927330b4d3f422f05 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 26 Apr 2023 16:08:37 -0700 Subject: [PATCH 1/2] Add some pool settings --- CONFIG.md | 8 ++++ pgcat.toml | 5 +++ src/auth_passthrough.rs | 2 + src/config.rs | 89 +++++++++++++++++++++++++++++++++++++++-- src/pool.rs | 53 +++++++++++++++++------- 5 files changed, 139 insertions(+), 18 deletions(-) diff --git a/CONFIG.md b/CONFIG.md index 72d5683a..1a05a776 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -49,6 +49,14 @@ default: 30000 # milliseconds How long an idle connection with a server is left open (ms). +### server_lifetime +``` +path: general.server_lifetime +default: 86400000 # 24 hours +``` + +Max connection lifetime before it's closed, even if actively used. + ### idle_client_in_transaction_timeout ``` path: general.idle_client_in_transaction_timeout diff --git a/pgcat.toml b/pgcat.toml index 6cb9a299..9203cb60 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -23,6 +23,9 @@ connect_timeout = 5000 # milliseconds # How long an idle connection with a server is left open (ms). idle_timeout = 30000 # milliseconds +# Max connection lifetime before it's closed, even if actively used. +server_lifetime = 86400000 # 24 hours + # How long a client is allowed to be idle while in a transaction (ms). idle_client_in_transaction_timeout = 0 # milliseconds @@ -206,6 +209,8 @@ sharding_function = "pg_bigint_hash" username = "simple_user" password = "simple_user" pool_size = 5 +min_pool_size = 3 +server_lifetime = 60000 statement_timeout = 0 [pools.simple_db.shards.0] diff --git a/src/auth_passthrough.rs b/src/auth_passthrough.rs index 4e936262..fc0f6dc6 100644 --- a/src/auth_passthrough.rs +++ b/src/auth_passthrough.rs @@ -77,6 +77,8 @@ impl AuthPassthrough { pool_size: 1, statement_timeout: 0, pool_mode: None, + server_lifetime: None, + min_pool_size: None, }; let user = &address.username; diff --git a/src/config.rs b/src/config.rs index 5ff09ee1..41bed467 100644 --- a/src/config.rs +++ b/src/config.rs @@ -181,7 +181,9 @@ pub struct User { pub server_username: Option, pub server_password: Option, pub pool_size: u32, + pub min_pool_size: Option, pub pool_mode: Option, + pub server_lifetime: Option, #[serde(default)] // 0 pub statement_timeout: u64, } @@ -194,12 +196,31 @@ impl Default for User { server_username: None, server_password: None, pool_size: 15, + min_pool_size: None, statement_timeout: 0, pool_mode: None, + server_lifetime: None, } } } +impl User { + fn validate(&self) -> Result<(), Error> { + match self.min_pool_size { + Some(min_pool_size) => { + if min_pool_size > self.pool_size { + error!("min_pool_size of {} cannot be larger than pool_size of {}", min_pool_size, self.pool_size); + return Err(Error::BadConfig); + } + }, + + None => (), + }; + + Ok(()) + } +} + /// General configuration. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct General { @@ -246,6 +267,9 @@ pub struct General { #[serde(default = "General::default_idle_client_in_transaction_timeout")] pub idle_client_in_transaction_timeout: u64, + #[serde(default = "General::default_server_lifetime")] + pub server_lifetime: u64, + #[serde(default = "General::default_worker_threads")] pub worker_threads: usize, @@ -271,6 +295,10 @@ impl General { 5432 } + pub fn default_server_lifetime() -> u64 { + 1000 * 60 * 60 * 24 // 24 hours + } + pub fn default_connect_timeout() -> u64 { 1000 } @@ -347,6 +375,7 @@ impl Default for General { auth_query: None, auth_query_user: None, auth_query_password: None, + server_lifetime: 1000 * 3600 * 24, // 24 hours, } } } @@ -411,6 +440,8 @@ pub struct Pool { pub idle_timeout: Option, + pub server_lifetime: Option, + pub sharding_function: ShardingFunction, #[serde(default = "Pool::default_automatic_sharding_key")] @@ -515,6 +546,11 @@ impl Pool { None => None, }; + for (_, user) in &self.users { + user.validate()?; + } + + Ok(()) } } @@ -539,6 +575,7 @@ impl Default for Pool { auth_query: None, auth_query_user: None, auth_query_password: None, + server_lifetime: None, } } } @@ -791,6 +828,10 @@ impl Config { ); info!("Shutdown timeout: {}ms", self.general.shutdown_timeout); info!("Healthcheck delay: {}ms", self.general.healthcheck_delay); + info!( + "Default max server lifetime: {}ms", + self.general.server_lifetime + ); match self.general.tls_certificate.clone() { Some(tls_certificate) => { info!("TLS certificate: {}", tls_certificate); @@ -867,12 +908,26 @@ impl Config { pool_name, pool_config.users.len() ); + info!( + "[pool: {}] Max server lifetime: {}", + pool_name, + match pool_config.server_lifetime { + Some(server_lifetime) => format!("{}ms", server_lifetime), + None => "default".to_string(), + } + ); for user in &pool_config.users { info!( "[pool: {}][user: {}] Pool size: {}", pool_name, user.1.username, user.1.pool_size, ); + info!( + "[pool: {}][user: {}] Minimum pool size: {}", + pool_name, + user.1.username, + user.1.min_pool_size.unwrap_or(0) + ); info!( "[pool: {}][user: {}] Statement timeout: {}", pool_name, user.1.username, user.1.statement_timeout @@ -886,6 +941,15 @@ impl Config { None => pool_config.pool_mode.to_string(), } ); + info!( + "[pool: {}][user: {}] Max server lifetime: {}", + pool_name, + user.1.username, + match user.1.server_lifetime { + Some(server_lifetime) => format!("{}ms", server_lifetime), + None => "default".to_string(), + } + ); } } } @@ -896,7 +960,13 @@ impl Config { && (self.general.auth_query_user.is_none() || self.general.auth_query_password.is_none()) { - error!("If auth_query is specified, you need to provide a value for `auth_query_user`, `auth_query_password`"); + error!( + "If auth_query is specified, \ + you need to provide a value \ + for `auth_query_user`, \ + `auth_query_password`" + ); + return Err(Error::BadConfig); } @@ -904,7 +974,14 @@ impl Config { if pool.auth_query.is_some() && (pool.auth_query_user.is_none() || pool.auth_query_password.is_none()) { - error!("Error in pool {{ {} }}. If auth_query is specified, you need to provide a value for `auth_query_user`, `auth_query_password`", name); + error!( + "Error in pool {{ {} }}. \ + If auth_query is specified, you need \ + to provide a value for `auth_query_user`, \ + `auth_query_password`", + name + ); + return Err(Error::BadConfig); } @@ -914,7 +991,13 @@ impl Config { || pool.auth_query_user.is_none()) && user_data.password.is_none() { - error!("Error in pool {{ {} }}. You have to specify a user password for every pool if auth_query is not specified", name); + error!( + "Error in pool {{ {} }}. \ + You have to specify a user password \ + for every pool if auth_query is not specified", + name + ); + return Err(Error::BadConfig); } } diff --git a/src/pool.rs b/src/pool.rs index 7f8e41c0..8ec88604 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -311,21 +311,34 @@ impl ConnectionPool { if let Some(apt) = &auth_passthrough { match apt.fetch_hash(&address).await { - Ok(ok) => { - if let Some(ref pool_auth_hash_value) = *(pool_auth_hash.read()) { - if ok != *pool_auth_hash_value { - warn!("Hash is not the same across shards of the same pool, client auth will \ - be done using last obtained hash. Server: {}:{}, Database: {}", server.host, server.port, shard.database); - } - } - debug!("Hash obtained for {:?}", address); - { - let mut pool_auth_hash = pool_auth_hash.write(); - *pool_auth_hash = Some(ok.clone()); - } - }, - Err(err) => warn!("Could not obtain password hashes using auth_query config, ignoring. Error: {:?}", err), - } + Ok(ok) => { + if let Some(ref pool_auth_hash_value) = *(pool_auth_hash.read()) + { + if ok != *pool_auth_hash_value { + warn!( + "Hash is not the same across shards \ + of the same pool, client auth will \ + be done using last obtained hash. \ + Server: {}:{}, Database: {}", + server.host, server.port, shard.database, + ); + } + } + + debug!("Hash obtained for {:?}", address); + + { + let mut pool_auth_hash = pool_auth_hash.write(); + *pool_auth_hash = Some(ok.clone()); + } + } + Err(err) => warn!( + "Could not obtain password hashes \ + using auth_query config, ignoring. \ + Error: {:?}", + err, + ), + } } let manager = ServerPool::new( @@ -347,10 +360,20 @@ impl ConnectionPool { None => config.general.idle_timeout, }; + let server_lifetime = match user.server_lifetime { + Some(server_lifetime) => server_lifetime, + None => match pool_config.server_lifetime { + Some(server_lifetime) => server_lifetime, + None => config.general.server_lifetime, + }, + }; + let pool = Pool::builder() .max_size(user.pool_size) + .min_idle(user.min_pool_size) .connection_timeout(std::time::Duration::from_millis(connect_timeout)) .idle_timeout(Some(std::time::Duration::from_millis(idle_timeout))) + .max_lifetime(Some(std::time::Duration::from_millis(server_lifetime))) .test_on_check_out(false) .build(manager) .await From 26b15b8e01e3720ee283cdec48d6c4a895d46e0e Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Wed, 26 Apr 2023 16:10:23 -0700 Subject: [PATCH 2/2] fmt --- src/config.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/config.rs b/src/config.rs index 41bed467..d822486d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -209,10 +209,13 @@ impl User { match self.min_pool_size { Some(min_pool_size) => { if min_pool_size > self.pool_size { - error!("min_pool_size of {} cannot be larger than pool_size of {}", min_pool_size, self.pool_size); + error!( + "min_pool_size of {} cannot be larger than pool_size of {}", + min_pool_size, self.pool_size + ); return Err(Error::BadConfig); } - }, + } None => (), }; @@ -550,7 +553,6 @@ impl Pool { user.validate()?; } - Ok(()) } }