Skip to content

Commit

Permalink
feat(pool): implement close-event notification (#1776)
Browse files Browse the repository at this point in the history
fix(postgres): integrate pool close-event into `PgListener`

closes #1764
  • Loading branch information
abonander authored Apr 5, 2022
1 parent acb3da8 commit f1c635d
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 13 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ hashlink = "0.7.0"
# https://github.com/tkaitchuck/aHash/issues/95#issuecomment-874150078
indexmap = "1.6.0"
hkdf = { version = "0.11.0", optional = true }
event-listener = "2.5.2"

[dev-dependencies]
sqlx = { version = "0.5.11", path = "..", features = ["postgres", "sqlite"] }
tokio = { version = "1", features = ["rt"] }
3 changes: 3 additions & 0 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(crate) struct SharedPool<DB: Database> {
pub(super) semaphore: Semaphore,
pub(super) size: AtomicU32,
is_closed: AtomicBool,
pub(super) on_closed: event_listener::Event,
pub(super) options: PoolOptions<DB>,
}

Expand All @@ -50,6 +51,7 @@ impl<DB: Database> SharedPool<DB> {
semaphore: Semaphore::new(options.fair, capacity),
size: AtomicU32::new(0),
is_closed: AtomicBool::new(false),
on_closed: event_listener::Event::new(),
options,
};

Expand Down Expand Up @@ -81,6 +83,7 @@ impl<DB: Database> SharedPool<DB> {
// we can't just do `usize::MAX` because that would overflow
// and we can't do this more than once cause that would _also_ overflow
self.semaphore.release(WAKE_ALL_PERMITS);
self.on_closed.notify(usize::MAX);
}

// wait for all permits to be released
Expand Down
149 changes: 149 additions & 0 deletions sqlx-core/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,14 @@ use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::transaction::Transaction;
use event_listener::EventListener;
use futures_core::FusedFuture;
use futures_util::FutureExt;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

#[macro_use]
Expand Down Expand Up @@ -229,6 +234,13 @@ pub use self::options::PoolOptions;
/// well and queries will generally benefit from these caches being "warm" (populated with data).
pub struct Pool<DB: Database>(pub(crate) Arc<SharedPool<DB>>);

/// A future that resolves when the pool is closed.
///
/// See [`Pool::close_event()`] for details.
pub struct CloseEvent {
listener: Option<EventListener>,
}

impl<DB: Database> Pool<DB> {
/// Creates a new connection pool with a default pool configuration and
/// the given connection URI; and, immediately establishes one connection.
Expand Down Expand Up @@ -324,6 +336,84 @@ impl<DB: Database> Pool<DB> {
self.0.is_closed()
}

/// Get a future that resolves when [`Pool::close()`] is called.
///
/// If the pool is already closed, the future resolves immediately.
///
/// This can be used to cancel long-running operations that hold onto a [`PoolConnection`]
/// so they don't prevent the pool from closing (which would otherwise wait until all
/// connections are returned).
///
/// Examples
/// ========
/// These examples use Postgres and Tokio, but should suffice to demonstrate the concept.
///
/// Do something when the pool is closed:
/// ```rust,no_run
/// # #[cfg(feature = "postgres")]
/// # async fn bleh() -> sqlx_core::error::Result<()> {
/// use sqlx::PgPool;
///
/// let pool = PgPool::connect("postgresql://...").await?;
///
/// let pool2 = pool.clone();
///
/// tokio::spawn(async move {
/// // Demonstrates that `CloseEvent` is itself a `Future` you can wait on.
/// // This lets you implement any kind of on-close event that you like.
/// pool2.close_event().await;
///
/// println!("Pool is closing!");
///
/// // Imagine maybe recording application statistics or logging a report, etc.
/// });
///
/// // The rest of the application executes normally...
///
/// // Close the pool before the application exits...
/// pool.close().await;
///
/// # Ok(())
/// # }
/// ```
///
/// Cancel a long-running operation:
/// ```rust,no_run
/// # #[cfg(feature = "postgres")]
/// # async fn bleh() -> sqlx_core::error::Result<()> {
/// use sqlx::{Executor, PgPool};
///
/// let pool = PgPool::connect("postgresql://...").await?;
///
/// let pool2 = pool.clone();
///
/// tokio::spawn(async move {
/// pool2.close_event().do_until(async {
/// // This statement normally won't return for 30 days!
/// // (Assuming the connection doesn't time out first, of course.)
/// pool2.execute("SELECT pg_sleep('30 days')").await;
///
/// // If the pool is closed before the statement completes, this won't be printed.
/// // This is because `.do_until()` cancels the future it's given if the
/// // pool is closed first.
/// println!("Waited!");
/// }).await;
/// });
///
/// // This normally wouldn't return until the above statement completed and the connection
/// // was returned to the pool. However, thanks to `.do_until()`, the operation was
/// // cancelled as soon as we called `.close().await`.
/// pool.close().await;
///
/// # Ok(())
/// # }
/// ```
pub fn close_event(&self) -> CloseEvent {
CloseEvent {
listener: (!self.is_closed()).then(|| self.0.on_closed.listen()),
}
}

/// Returns the number of connections currently active. This includes idle connections.
pub fn size(&self) -> u32 {
self.0.size()
Expand Down Expand Up @@ -368,6 +458,65 @@ impl<DB: Database> fmt::Debug for Pool<DB> {
}
}

impl CloseEvent {
/// Execute the given future until it returns or the pool is closed.
///
/// Cancels the future and returns `Err(PoolClosed)` if/when the pool is closed.
/// If the pool was already closed, the future is never run.
pub async fn do_until<Fut: Future>(&mut self, fut: Fut) -> Result<Fut::Output, Error> {
// Check that the pool wasn't closed already.
//
// We use `poll_immediate()` as it will use the correct waker instead of
// a no-op one like `.now_or_never()`, but it won't actually suspend execution here.
futures_util::future::poll_immediate(&mut *self)
.await
.map_or(Ok(()), |_| Err(Error::PoolClosed))?;

futures_util::pin_mut!(fut);

// I find that this is clearer in intent than `futures_util::future::select()`
// or `futures_util::select_biased!{}` (which isn't enabled anyway).
futures_util::future::poll_fn(|cx| {
// Poll `fut` first as the wakeup event is more likely for it than `self`.
if let Poll::Ready(ret) = fut.as_mut().poll(cx) {
return Poll::Ready(Ok(ret));
}

// Can't really factor out mapping to `Err(Error::PoolClosed)` though it seems like
// we should because that results in a different `Ok` type each time.
//
// Ideally we'd map to something like `Result<!, Error>` but using `!` as a type
// is not allowed on stable Rust yet.
self.poll_unpin(cx).map(|_| Err(Error::PoolClosed))
})
.await
}
}

impl Future for CloseEvent {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(listener) = &mut self.listener {
futures_core::ready!(listener.poll_unpin(cx));
}

// `EventListener` doesn't like being polled after it yields, and even if it did it
// would probably just wait for the next event, neither of which we want.
//
// So this way, once we get our close event, we fuse this future to immediately return.
self.listener = None;

Poll::Ready(())
}
}

impl FusedFuture for CloseEvent {
fn is_terminated(&self) -> bool {
self.listener.is_none()
}
}

/// get the time between the deadline and now and use that as our timeout
///
/// returns `Error::PoolTimedOut` if the deadline is in the past
Expand Down
62 changes: 53 additions & 9 deletions sqlx-core/src/postgres/listener.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::fmt::{self, Debug};
use std::io;
use std::str::from_utf8;

use either::Either;
use futures_channel::mpsc;
use futures_core::future::BoxFuture;
use futures_core::stream::{BoxStream, Stream};

use crate::describe::Describe;
use crate::error::Error;
use crate::executor::{Execute, Executor};
use crate::pool::PoolOptions;
use crate::pool::{Pool, PoolConnection};
use crate::postgres::message::{MessageFormat, Notification};
use crate::postgres::{PgConnection, PgQueryResult, PgRow, PgStatement, PgTypeInfo, Postgres};
use either::Either;
use futures_channel::mpsc;
use futures_core::future::BoxFuture;
use futures_core::stream::{BoxStream, Stream};
use std::fmt::{self, Debug};
use std::io;
use std::str::from_utf8;

/// A stream of asynchronous notifications from Postgres.
///
Expand All @@ -25,6 +27,7 @@ pub struct PgListener {
buffer_rx: mpsc::UnboundedReceiver<Notification>,
buffer_tx: Option<mpsc::UnboundedSender<Notification>>,
channels: Vec<String>,
ignore_close_event: bool,
}

/// An asynchronous notification from Postgres.
Expand All @@ -41,7 +44,11 @@ impl PgListener {
.connect(uri)
.await?;

Self::connect_with(&pool).await
let mut this = Self::connect_with(&pool).await?;
// We don't need to handle close events
this.ignore_close_event = true;

Ok(this)
}

pub async fn connect_with(pool: &Pool<Postgres>) -> Result<Self, Error> {
Expand All @@ -58,9 +65,33 @@ impl PgListener {
buffer_rx: receiver,
buffer_tx: None,
channels: Vec::new(),
ignore_close_event: false,
})
}

/// Set whether or not to ignore [`Pool::close_event()`]. Defaults to `false`.
///
/// By default, when [`Pool::close()`] is called on the pool this listener is using
/// while [`Self::recv()`] or [`Self::try_recv()`] are waiting for a message, the wait is
/// cancelled and `Err(PoolClosed)` is returned.
///
/// This is because `Pool::close()` will wait until _all_ connections are returned and closed,
/// including the one being used by this listener.
///
/// Otherwise, `pool.close().await` would have to wait until `PgListener` encountered a
/// need to acquire a new connection (timeout, error, etc.) and dropped the one it was
/// currently holding, at which point `.recv()` or `.try_recv()` would return `Err(PoolClosed)`
/// on the attempt to acquire a new connection anyway.
///
/// However, if you want `PgListener` to ignore the close event and continue waiting for a
/// message as long as it can, set this to `true`.
///
/// Does nothing if this was constructed with [`PgListener::connect()`], as that creates an
/// internal pool just for the new instance of `PgListener` which cannot be closed manually.
pub fn ignore_pool_close_event(&mut self, val: bool) {
self.ignore_close_event = val;
}

/// Starts listening for notifications on a channel.
/// The channel name is quoted here to ensure case sensitivity.
pub async fn listen(&mut self, channel: &str) -> Result<(), Error> {
Expand Down Expand Up @@ -202,11 +233,24 @@ impl PgListener {
return Ok(Some(PgNotification(notification)));
}

// Fetch our `CloseEvent` listener, if applicable.
let mut close_event = (!self.ignore_close_event).then(|| self.pool.close_event());

loop {
// Ensure we have an active connection to work with.
self.connect_if_needed().await?;

let message = match self.connection().stream.recv_unchecked().await {
let next_message = self.connection().stream.recv_unchecked();

let res = if let Some(ref mut close_event) = close_event {
// cancels the wait and returns `Err(PoolClosed)` if the pool is closed
// before `next_message` returns, or if the pool was already closed
close_event.do_until(next_message).await?
} else {
next_message.await
};

let message = match res {
Ok(message) => message,

// The connection is dead, ensure that it is dropped,
Expand Down
22 changes: 20 additions & 2 deletions tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use futures::{StreamExt, TryStreamExt};
use sqlx::postgres::{
PgAdvisoryLock, PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgSeverity,
PgAdvisoryLock, PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgListener,
PgSeverity,
};
use sqlx::postgres::{PgConnectionInfo, PgPoolOptions, PgRow, Postgres};
use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
use sqlx_test::{new, setup_if_needed};
use sqlx_test::{new, pool, setup_if_needed};
use std::env;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -967,6 +968,23 @@ async fn test_listener_cleanup() -> anyhow::Result<()> {
Ok(())
}

#[sqlx_macros::test]
async fn test_pg_listener_allows_pool_to_close() -> anyhow::Result<()> {
let pool = pool::<Postgres>().await?;

// acquires and holds a connection which would normally prevent the pool from closing
let mut listener = PgListener::connect_with(&pool).await?;

sqlx_rt::spawn(async move {
listener.recv().await;
});

// would previously hang forever since `PgListener` had no way to know the pool wanted to close
pool.close().await;

Ok(())
}

#[sqlx_macros::test]
async fn it_supports_domain_types_in_composite_domain_types() -> anyhow::Result<()> {
// Only supported in Postgres 11+
Expand Down

0 comments on commit f1c635d

Please sign in to comment.