diff --git a/sqlx-postgres/src/listener.rs b/sqlx-postgres/src/listener.rs index 2c6ce758e5..9998a832c7 100644 --- a/sqlx-postgres/src/listener.rs +++ b/sqlx-postgres/src/listener.rs @@ -31,6 +31,7 @@ pub struct PgListener { buffer_tx: Option>, channels: Vec, ignore_close_event: bool, + eager_reconnect: bool, } /// An asynchronous notification from Postgres. @@ -69,6 +70,7 @@ impl PgListener { buffer_tx: None, channels: Vec::new(), ignore_close_event: false, + eager_reconnect: true, }) } @@ -95,6 +97,19 @@ impl PgListener { self.ignore_close_event = val; } + /// Set whether a lost connection in `try_recv()` should be re-established before it returns + /// `Ok(None)`, or on the next call to `try_recv()`. + /// + /// By default, this is `true` and the connection is re-established before returning `Ok(None)`. + /// + /// If this is set to `false` then notifications will continue to be lost until the next call + /// to `try_recv()`. If your recovery logic uses a different database connection then + /// notifications that occur after it completes may be lost without any way to tell that they + /// have been. + pub fn eager_reconnect(&mut self, val: bool) { + self.eager_reconnect = val; + } + /// Starts listening for notifications on a channel. /// The channel name is quoted here to ensure case sensitivity. pub async fn listen(&mut self, channel: &str) -> Result<(), Error> { @@ -214,7 +229,8 @@ impl PgListener { /// Receives the next notification available from any of the subscribed channels. /// /// If the connection to PostgreSQL is lost, `None` is returned, and the connection is - /// reconnected on the next call to `try_recv()`. + /// reconnected either immediately, or on the next call to `try_recv()`, depending on + /// the value of [`eager_reconnect`]. /// /// # Example /// @@ -234,6 +250,8 @@ impl PgListener { /// # Result::<(), sqlx::Error>::Ok(()) /// # }).unwrap(); /// ``` + /// + /// [`eager_reconnect`]: PgListener::eager_reconnect pub async fn try_recv(&mut self) -> Result, Error> { // Flush the buffer first, if anything // This would only fill up if this listener is used as a connection @@ -270,6 +288,10 @@ impl PgListener { conn.close_on_drop(); } + if self.eager_reconnect { + self.connect_if_needed().await?; + } + // lost connection return Ok(None); }