Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more pool settings #416

Merged
merged 2 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ default: 30000 # milliseconds

How long an idle connection with a server is left open (ms).

### server_lifetime
```
path: general.server_lifetime
default: 86400000 # 24 hours
```

Max connection lifetime before it's closed, even if actively used.

### idle_client_in_transaction_timeout
```
path: general.idle_client_in_transaction_timeout
Expand Down
5 changes: 5 additions & 0 deletions pgcat.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ connect_timeout = 5000 # milliseconds
# How long an idle connection with a server is left open (ms).
idle_timeout = 30000 # milliseconds

# Max connection lifetime before it's closed, even if actively used.
server_lifetime = 86400000 # 24 hours

# How long a client is allowed to be idle while in a transaction (ms).
idle_client_in_transaction_timeout = 0 # milliseconds

Expand Down Expand Up @@ -206,6 +209,8 @@ sharding_function = "pg_bigint_hash"
username = "simple_user"
password = "simple_user"
pool_size = 5
min_pool_size = 3
server_lifetime = 60000
statement_timeout = 0

[pools.simple_db.shards.0]
Expand Down
2 changes: 2 additions & 0 deletions src/auth_passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ impl AuthPassthrough {
pool_size: 1,
statement_timeout: 0,
pool_mode: None,
server_lifetime: None,
min_pool_size: None,
};

let user = &address.username;
Expand Down
91 changes: 88 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ pub struct User {
pub server_username: Option<String>,
pub server_password: Option<String>,
pub pool_size: u32,
pub min_pool_size: Option<u32>,
pub pool_mode: Option<PoolMode>,
pub server_lifetime: Option<u64>,
#[serde(default)] // 0
pub statement_timeout: u64,
}
Expand All @@ -194,12 +196,34 @@ impl Default for User {
server_username: None,
server_password: None,
pool_size: 15,
min_pool_size: None,
statement_timeout: 0,
pool_mode: None,
server_lifetime: None,
}
}
}

impl User {
fn validate(&self) -> Result<(), Error> {
match self.min_pool_size {
Some(min_pool_size) => {
if min_pool_size > self.pool_size {
error!(
"min_pool_size of {} cannot be larger than pool_size of {}",
min_pool_size, self.pool_size
);
return Err(Error::BadConfig);
}
}

None => (),
};

Ok(())
}
}

/// General configuration.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct General {
Expand Down Expand Up @@ -246,6 +270,9 @@ pub struct General {
#[serde(default = "General::default_idle_client_in_transaction_timeout")]
pub idle_client_in_transaction_timeout: u64,

#[serde(default = "General::default_server_lifetime")]
pub server_lifetime: u64,

#[serde(default = "General::default_worker_threads")]
pub worker_threads: usize,

Expand All @@ -271,6 +298,10 @@ impl General {
5432
}

pub fn default_server_lifetime() -> u64 {
1000 * 60 * 60 * 24 // 24 hours
}

pub fn default_connect_timeout() -> u64 {
1000
}
Expand Down Expand Up @@ -347,6 +378,7 @@ impl Default for General {
auth_query: None,
auth_query_user: None,
auth_query_password: None,
server_lifetime: 1000 * 3600 * 24, // 24 hours,
}
}
}
Expand Down Expand Up @@ -411,6 +443,8 @@ pub struct Pool {

pub idle_timeout: Option<u64>,

pub server_lifetime: Option<u64>,

pub sharding_function: ShardingFunction,

#[serde(default = "Pool::default_automatic_sharding_key")]
Expand Down Expand Up @@ -515,6 +549,10 @@ impl Pool {
None => None,
};

for (_, user) in &self.users {
user.validate()?;
}

Ok(())
}
}
Expand All @@ -539,6 +577,7 @@ impl Default for Pool {
auth_query: None,
auth_query_user: None,
auth_query_password: None,
server_lifetime: None,
}
}
}
Expand Down Expand Up @@ -791,6 +830,10 @@ impl Config {
);
info!("Shutdown timeout: {}ms", self.general.shutdown_timeout);
info!("Healthcheck delay: {}ms", self.general.healthcheck_delay);
info!(
"Default max server lifetime: {}ms",
self.general.server_lifetime
);
match self.general.tls_certificate.clone() {
Some(tls_certificate) => {
info!("TLS certificate: {}", tls_certificate);
Expand Down Expand Up @@ -867,12 +910,26 @@ impl Config {
pool_name,
pool_config.users.len()
);
info!(
"[pool: {}] Max server lifetime: {}",
pool_name,
match pool_config.server_lifetime {
Some(server_lifetime) => format!("{}ms", server_lifetime),
None => "default".to_string(),
}
);

for user in &pool_config.users {
info!(
"[pool: {}][user: {}] Pool size: {}",
pool_name, user.1.username, user.1.pool_size,
);
info!(
"[pool: {}][user: {}] Minimum pool size: {}",
pool_name,
user.1.username,
user.1.min_pool_size.unwrap_or(0)
);
info!(
"[pool: {}][user: {}] Statement timeout: {}",
pool_name, user.1.username, user.1.statement_timeout
Expand All @@ -886,6 +943,15 @@ impl Config {
None => pool_config.pool_mode.to_string(),
}
);
info!(
"[pool: {}][user: {}] Max server lifetime: {}",
pool_name,
user.1.username,
match user.1.server_lifetime {
Some(server_lifetime) => format!("{}ms", server_lifetime),
None => "default".to_string(),
}
);
}
}
}
Expand All @@ -896,15 +962,28 @@ impl Config {
&& (self.general.auth_query_user.is_none()
|| self.general.auth_query_password.is_none())
{
error!("If auth_query is specified, you need to provide a value for `auth_query_user`, `auth_query_password`");
error!(
"If auth_query is specified, \
you need to provide a value \
for `auth_query_user`, \
`auth_query_password`"
);

return Err(Error::BadConfig);
}

for (name, pool) in self.pools.iter() {
if pool.auth_query.is_some()
&& (pool.auth_query_user.is_none() || pool.auth_query_password.is_none())
{
error!("Error in pool {{ {} }}. If auth_query is specified, you need to provide a value for `auth_query_user`, `auth_query_password`", name);
error!(
"Error in pool {{ {} }}. \
If auth_query is specified, you need \
to provide a value for `auth_query_user`, \
`auth_query_password`",
name
);

return Err(Error::BadConfig);
}

Expand All @@ -914,7 +993,13 @@ impl Config {
|| pool.auth_query_user.is_none())
&& user_data.password.is_none()
{
error!("Error in pool {{ {} }}. You have to specify a user password for every pool if auth_query is not specified", name);
error!(
"Error in pool {{ {} }}. \
You have to specify a user password \
for every pool if auth_query is not specified",
name
);

return Err(Error::BadConfig);
}
}
Expand Down
53 changes: 38 additions & 15 deletions src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,21 +311,34 @@ impl ConnectionPool {

if let Some(apt) = &auth_passthrough {
match apt.fetch_hash(&address).await {
Ok(ok) => {
if let Some(ref pool_auth_hash_value) = *(pool_auth_hash.read()) {
if ok != *pool_auth_hash_value {
warn!("Hash is not the same across shards of the same pool, client auth will \
be done using last obtained hash. Server: {}:{}, Database: {}", server.host, server.port, shard.database);
}
}
debug!("Hash obtained for {:?}", address);
{
let mut pool_auth_hash = pool_auth_hash.write();
*pool_auth_hash = Some(ok.clone());
}
},
Err(err) => warn!("Could not obtain password hashes using auth_query config, ignoring. Error: {:?}", err),
}
Ok(ok) => {
if let Some(ref pool_auth_hash_value) = *(pool_auth_hash.read())
{
if ok != *pool_auth_hash_value {
warn!(
"Hash is not the same across shards \
of the same pool, client auth will \
be done using last obtained hash. \
Server: {}:{}, Database: {}",
server.host, server.port, shard.database,
);
}
}

debug!("Hash obtained for {:?}", address);

{
let mut pool_auth_hash = pool_auth_hash.write();
*pool_auth_hash = Some(ok.clone());
}
}
Err(err) => warn!(
"Could not obtain password hashes \
using auth_query config, ignoring. \
Error: {:?}",
err,
),
}
}

let manager = ServerPool::new(
Expand All @@ -347,10 +360,20 @@ impl ConnectionPool {
None => config.general.idle_timeout,
};

let server_lifetime = match user.server_lifetime {
Some(server_lifetime) => server_lifetime,
None => match pool_config.server_lifetime {
Some(server_lifetime) => server_lifetime,
None => config.general.server_lifetime,
},
};

let pool = Pool::builder()
.max_size(user.pool_size)
.min_idle(user.min_pool_size)
.connection_timeout(std::time::Duration::from_millis(connect_timeout))
.idle_timeout(Some(std::time::Duration::from_millis(idle_timeout)))
.max_lifetime(Some(std::time::Duration::from_millis(server_lifetime)))
.test_on_check_out(false)
.build(manager)
.await
Expand Down