Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve max_lifetime handling #3065

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;

use super::inner::{DecrementSizeGuard, PoolInner};
use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
use crate::pool::options::PoolConnectionMetadata;
use std::future::Future;

Expand Down Expand Up @@ -239,6 +239,13 @@ impl<DB: Database> Floating<DB, Live<DB>> {
return false;
}

// If the connection is beyond max lifetime, close the connection and
// immediately create a new connection
if is_beyond_max_lifetime(&self.inner, &self.guard.pool.options) {
self.close().await;
return false;
}

if let Some(test) = &self.guard.pool.options.after_release {
let meta = self.metadata();
match (test)(&mut self.inner.raw, meta).await {
Expand Down
64 changes: 22 additions & 42 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl<DB: Database> PoolInner<DB> {
}

pub(super) fn release(&self, floating: Floating<DB, Live<DB>>) {
// `options.after_release` is invoked by `PoolConnection::release_to_pool()`.
// `options.after_release` and other checks are in `PoolConnection::return_to_pool()`.

let Floating { inner: idle, guard } = floating.into_idle();

Expand Down Expand Up @@ -273,7 +273,7 @@ impl<DB: Database> PoolInner<DB> {
// `try_increment_size()`.
tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying");
// If so, we're likely in the current-thread runtime if it's Tokio
// and so we should yield to let any spawned release_to_pool() tasks
// and so we should yield to let any spawned return_to_pool() tasks
// execute.
crate::rt::yield_now().await;
continue;
Expand Down Expand Up @@ -417,7 +417,10 @@ impl<DB: Database> Drop for PoolInner<DB> {
}

/// Returns `true` if the connection has exceeded `options.max_lifetime` if set, `false` otherwise.
fn is_beyond_max_lifetime<DB: Database>(live: &Live<DB>, options: &PoolOptions<DB>) -> bool {
pub(super) fn is_beyond_max_lifetime<DB: Database>(
live: &Live<DB>,
options: &PoolOptions<DB>,
) -> bool {
options
.max_lifetime
.map_or(false, |max| live.created_at.elapsed() > max)
Expand All @@ -434,12 +437,6 @@ async fn check_idle_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_max_lifetime(&conn, options) {
return Err(conn.close().await);
}

if options.test_before_acquire {
// Check that the connection is still live
if let Err(error) = conn.ping().await {
Expand Down Expand Up @@ -503,22 +500,30 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
crate::rt::spawn(async move {
let _ = close_event
.do_until(async {
let mut slept = true;

// If the last handle to the pool was dropped while we were sleeping
while let Some(pool) = pool_weak.upgrade() {
if pool.is_closed() {
return;
}

// Don't run the reaper right away.
if slept && !pool.idle_conns.is_empty() {
do_reap(&pool).await;
}

let next_run = Instant::now() + period;

pool.min_connections_maintenance(Some(next_run)).await;
// Go over all idle connections, check for idleness and lifetime,
// and if we have fewer than min_connections after reaping a connection,
// open a new one immediately. Note that other connections may be popped from
// the queue in the meantime - that's fine, there is no harm in checking more
for _ in 0..pool.num_idle() {
if let Some(conn) = pool.try_acquire() {
if is_beyond_idle_timeout(&conn, &pool.options)
|| is_beyond_max_lifetime(&conn, &pool.options)
{
let _ = conn.close().await;
pool.min_connections_maintenance(Some(next_run)).await;
} else {
pool.release(conn.into_live());
}
}
}

// Don't hold a reference to the pool while sleeping.
drop(pool);
Expand All @@ -530,37 +535,12 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
// `next_run` is in the past, just yield.
crate::rt::yield_now().await;
}

slept = true;
}
})
.await;
});
}

async fn do_reap<DB: Database>(pool: &Arc<PoolInner<DB>>) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);

// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.try_acquire())
.partition::<Vec<_>, _>(|conn| {
is_beyond_idle_timeout(conn, &pool.options)
|| is_beyond_max_lifetime(conn, &pool.options)
});

for conn in keep {
// return valid connections to the pool first
pool.release(conn.into_live());
}

for conn in reap {
let _ = conn.close().await;
}
}

/// RAII guard returned by `Pool::try_increment_size()` and others.
///
/// Will decrement the pool size if dropped, to avoid semantically "leaking" connections
Expand Down
74 changes: 68 additions & 6 deletions tests/any/pool.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use sqlx::any::{AnyConnectOptions, AnyPoolOptions};
use sqlx::Executor;
use std::sync::atomic::AtomicI32;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
atomic::{AtomicI32, AtomicUsize, Ordering},
Arc, Mutex,
};
use std::time::Duration;

Expand Down Expand Up @@ -116,7 +115,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {
CREATE TEMPORARY TABLE conn_stats(
id int primary key,
before_acquire_calls int default 0,
after_release_calls int default 0
after_release_calls int default 0
);
INSERT INTO conn_stats(id) VALUES ({});
"#,
Expand All @@ -137,7 +136,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {
// MySQL and MariaDB don't support UPDATE ... RETURNING
sqlx::query(
r#"
UPDATE conn_stats
UPDATE conn_stats
SET before_acquire_calls = before_acquire_calls + 1
"#,
)
Expand All @@ -161,7 +160,7 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {
Box::pin(async move {
sqlx::query(
r#"
UPDATE conn_stats
UPDATE conn_stats
SET after_release_calls = after_release_calls + 1
"#,
)
Expand Down Expand Up @@ -216,3 +215,66 @@ async fn test_pool_callbacks() -> anyhow::Result<()> {

Ok(())
}

#[sqlx_macros::test]
async fn test_connection_maintenance() -> anyhow::Result<()> {
sqlx::any::install_default_drivers();
sqlx_test::setup_if_needed();
let conn_options: AnyConnectOptions = std::env::var("DATABASE_URL")?.parse()?;

let last_meta = Arc::new(Mutex::new(None));
let last_meta_ = last_meta.clone();
let pool = AnyPoolOptions::new()
.max_lifetime(Duration::from_millis(400))
.min_connections(3)
.before_acquire(move |_conn, _meta| {
*last_meta_.lock().unwrap() = Some(_meta);
Box::pin(async { Ok(true) })
})
.connect_lazy_with(conn_options);

// Open and release 5 connections
let conns = vec![
pool.acquire().await?,
pool.acquire().await?,
pool.acquire().await?,
pool.acquire().await?,
pool.acquire().await?,
];
assert_eq!(pool.size(), 5);
assert_eq!(pool.num_idle(), 0);
for mut conn in conns {
conn.return_to_pool().await;
}

assert_eq!(pool.size(), 5);
assert_eq!(pool.num_idle(), 5);

// Wait for at least two iterations of maintenance task
sqlx_core::rt::sleep(Duration::from_secs(1)).await;

// Existing connections should have been closed due to max lifetime
// and the pool should have reopened min_connections new ones.
// One connection might be in the process of being replaced so we assert 2-3.
assert!(
pool.size() >= 2 && pool.size() <= 3,
"pool.size() = {}",
pool.size()
);
for _ in 0..2 {
// Check that the connections was both acquired from the pool AND it's new
let _ = pool.acquire().await.expect("failed to acquire connection");
let meta = last_meta
.lock()
.unwrap()
.take()
.expect("expected a connection from the pool");
assert!(
meta.age < Duration::from_secs(1),
"expected a fresh connection (age {:?})",
meta.age
);
}

Ok(())
}
Loading