Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(postgres): avoid recursively spawning tasks in PgListener::drop() #1393

Merged
merged 1 commit into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 63 additions & 26 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::database::Database;
use crate::error::Error;

use super::inner::{DecrementSizeGuard, SharedPool};
use std::future::Future;

/// A connection managed by a [`Pool`][crate::pool::Pool].
///
Expand Down Expand Up @@ -60,43 +61,79 @@ impl<DB: Database> DerefMut for PoolConnection<DB> {

impl<DB: Database> PoolConnection<DB> {
/// Explicitly release a connection from the pool
pub fn release(mut self) -> DB::Connection {
#[deprecated = "renamed to `.detach()` for clarity"]
pub fn release(self) -> DB::Connection {
self.detach()
}

/// Detach this connection from the pool, allowing it to open a replacement.
///
/// Note that if your application uses a single shared pool, this
/// effectively lets the application exceed the `max_connections` setting.
///
/// If you want the pool to treat this connection as permanently checked-out,
/// use [`.leak()`][Self::leak] instead.
pub fn detach(mut self) -> DB::Connection {
self.live
.take()
.expect("PoolConnection double-dropped")
.float(&self.pool)
.detach()
}

/// Detach this connection from the pool, treating it as permanently checked-out.
///
/// This effectively will reduce the maximum capacity of the pool by 1 every time it is used.
///
/// If you don't want to impact the pool's capacity, use [`.detach()`][Self::detach] instead.
pub fn leak(mut self) -> DB::Connection {
self.live.take().expect("PoolConnection double-dropped").raw
}

/// Test the connection to make sure it is still live before returning it to the pool.
///
/// This effectively runs the drop handler eagerly instead of spawning a task to do it.
pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
// we want these to happen synchronously so the drop handler doesn't try to spawn a task anyway
// this also makes the returned future `'static`
let live = self.live.take();
let pool = self.pool.clone();

async move {
let mut floating = if let Some(live) = live {
live.float(&pool)
} else {
return;
};

// test the connection on-release to ensure it is still viable
// if an Executor future/stream is dropped during an `.await` call, the connection
// is likely to be left in an inconsistent state, in which case it should not be
// returned to the pool; also of course, if it was dropped due to an error
// this is simply a band-aid as SQLx-next (0.6) connections should be able
// to recover from cancellations
if let Err(e) = floating.raw.ping().await {
log::warn!(
"error occurred while testing the connection on-release: {}",
e
);

// we now consider the connection to be broken; just drop it to close
// trying to close gracefully might cause something weird to happen
drop(floating);
} else {
// if the connection is still viable, release it to the pool
pool.release(floating);
}
}
}
}

/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.
impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if let Some(live) = self.live.take() {
let pool = self.pool.clone();
sqlx_rt::spawn(async move {
let mut floating = live.float(&pool);

// test the connection on-release to ensure it is still viable
// if an Executor future/stream is dropped during an `.await` call, the connection
// is likely to be left in an inconsistent state, in which case it should not be
// returned to the pool; also of course, if it was dropped due to an error
// this is simply a band-aid as SQLx-next (0.6) connections should be able
// to recover from cancellations
if let Err(e) = floating.raw.ping().await {
log::warn!(
"error occurred while testing the connection on-release: {}",
e
);

// we now consider the connection to be broken; just drop it to close
// trying to close gracefully might cause something weird to happen
drop(floating);
} else {
// if the connection is still viable, release it to th epool
pool.release(floating);
}
});
if self.live.is_some() {
sqlx_rt::spawn(self.return_to_pool());
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions sqlx-core/src/postgres/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ impl Drop for PgListener {
// Unregister any listeners before returning the connection to the pool.
sqlx_rt::spawn(async move {
let _ = conn.execute("UNLISTEN *").await;

// inline the drop handler from `PoolConnection` so it doesn't try to spawn another task
// otherwise, it may trigger a panic if this task is dropped because the runtime is going away:
// https://github.com/launchbadge/sqlx/issues/1389
conn.return_to_pool().await;
});
}
}
Expand Down