From d04530b336b0c54ea27ddb87d2ab88c6e95c037d Mon Sep 17 00:00:00 2001 From: Gary Coady Date: Tue, 13 Feb 2024 09:44:02 +0100 Subject: [PATCH] feat: Implement before_connect callback to modify connect options. Allows the user to see and maybe modify the connect options before each attempt to connect to a database. May be used in a number of ways, e.g.: - adding jitter to connection lifetime - validating/setting a per-connection password - using a custom server discovery process --- sqlx-core/src/pool/inner.rs | 16 ++++++++- sqlx-core/src/pool/options.rs | 63 +++++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index c14e6a434b..cdfbca70d5 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -8,6 +8,7 @@ use crossbeam_queue::ArrayQueue; use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser}; +use std::borrow::Cow; use std::cmp; use std::future::Future; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; @@ -300,18 +301,31 @@ impl PoolInner { let mut backoff = Duration::from_millis(10); let max_backoff = deadline_as_timeout::(deadline)? / 5; + let mut num_attempts: u32 = 0; loop { let timeout = deadline_as_timeout::(deadline)?; + num_attempts += 1; // clone the connect options arc so it can be used without holding the RwLockReadGuard // across an async await point - let connect_options = self + let connect_options_arc = self .connect_options .read() .expect("write-lock holder panicked") .clone(); + let connect_options = if let Some(callback) = &self.options.before_connect { + callback(connect_options_arc.as_ref(), num_attempts) + .await + .map_err(|error| { + tracing::error!(%error, "error returned from before_connect"); + error + })? + } else { + Cow::Borrowed(connect_options_arc.as_ref()) + }; + // result here is `Result, TimeoutError>` // if this block does not return, sleep for the backoff timeout and try again match crate::rt::timeout(timeout, connect_options.connect()).await { diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index a15ceb40ba..b1ef9ad7c1 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -4,6 +4,7 @@ use crate::error::Error; use crate::pool::inner::PoolInner; use crate::pool::Pool; use futures_core::future::BoxFuture; +use std::borrow::Cow; use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -43,6 +44,18 @@ use std::time::{Duration, Instant}; /// the perspectives of both API designer and consumer. pub struct PoolOptions { pub(crate) test_before_acquire: bool, + pub(crate) before_connect: Option< + Arc< + dyn Fn( + &::Options, + u32, + ) + -> BoxFuture<'_, Result::Options>, Error>> + + 'static + + Send + + Sync, + >, + >, pub(crate) after_connect: Option< Arc< dyn Fn(&mut DB::Connection, PoolConnectionMetadata) -> BoxFuture<'_, Result<(), Error>> @@ -90,6 +103,7 @@ impl Clone for PoolOptions { fn clone(&self) -> Self { PoolOptions { test_before_acquire: self.test_before_acquire, + before_connect: self.before_connect.clone(), after_connect: self.after_connect.clone(), before_acquire: self.before_acquire.clone(), after_release: self.after_release.clone(), @@ -136,6 +150,7 @@ impl PoolOptions { pub fn new() -> Self { Self { // User-specifiable routines + before_connect: None, after_connect: None, before_acquire: None, after_release: None, @@ -292,6 +307,54 @@ impl PoolOptions { self } + /// Perform an asynchronous action before connecting to the database. + /// + /// This operation is performed on every attempt to connect, including retries. The + /// current `ConnectOptions` is passed, and this may be passed unchanged, or modified + /// after cloning. The current connection attempt is passed as the second parameter + /// (starting at 1). + /// + /// If the operation returns with an error, then the connection attempt fails without + /// attempting further retries. The operation therefore may need to implement error + /// handling and/or value caching to avoid failing the connection attempt. + /// + /// # Example: Per-Request Authentication + /// This callback may be used to modify values in the database's `ConnectOptions`, before + /// connecting to the database. + /// + /// This example is written for PostgreSQL but can likely be adapted to other databases. + /// + /// ```no_run + /// # async fn f() -> Result<(), Box> { + /// use std::borrow::Cow; + /// use sqlx::Executor; + /// use sqlx::postgres::PgPoolOptions; + /// + /// let pool = PgPoolOptions::new() + /// .after_connect(move |opts, _num_attempts| Box::pin(async move { + /// Ok(Cow::Owned(opts.clone().password("abc"))) + /// })) + /// .connect("postgres:// …").await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// For a discussion on why `Box::pin()` is required, see [the type-level docs][Self]. + pub fn before_connect(mut self, callback: F) -> Self + where + for<'c> F: Fn( + &'c ::Options, + u32, + ) + -> BoxFuture<'c, crate::Result::Options>>> + + 'static + + Send + + Sync, + { + self.before_connect = Some(Arc::new(callback)); + self + } + /// Perform an asynchronous action after connecting to the database. /// /// If the operation returns with an error then the error is logged, the connection is closed