Skip to content

Commit

Permalink
Pool: check idle connections while opening a new one
Browse files Browse the repository at this point in the history
  • Loading branch information
joeydewaal committed Sep 21, 2024
1 parent 9e03327 commit e32c121
Showing 1 changed file with 60 additions and 21 deletions.
81 changes: 60 additions & 21 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::database::Database;
use crate::error::Error;
use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions};
use crossbeam_queue::ArrayQueue;
use either::Either;

use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser};

Expand Down Expand Up @@ -251,7 +252,7 @@ impl<DB: Database> PoolInner<DB> {
}

let acquire_started_at = Instant::now();
let deadline = acquire_started_at + self.options.acquire_timeout;
let connect_deadline = acquire_started_at + self.options.connect_timeout;

let acquired = crate::rt::timeout(
self.options.acquire_timeout,
Expand All @@ -260,21 +261,18 @@ impl<DB: Database> PoolInner<DB> {
// Handles the close-event internally
let permit = self.acquire_permit().await?;

// First attempt to pop a connection from the idle queue and check if we can
// use it.
let guard = match self.get_live_idle(permit).await {
// All good!
Ok(Either::Left(conn)) => return Ok(conn),

// First attempt to pop a connection from the idle queue.
let guard = match self.pop_idle(permit) {
// if the connection isn't usable for one reason or another,
// we get the `DecrementSizeGuard` back to open a new one
Err(guard) => guard,

// Then, check that we can use it...
Ok(conn) => match check_idle_conn(conn, &self.options).await {

// All good!
Ok(live) => return Ok(live),

// if the connection isn't usable for one reason or another,
// we get the `DecrementSizeGuard` back to open a new one
Err(guard) => guard,
},
Err(permit) => if let Ok(guard) = self.try_increment_size(permit) {
// we can open a new connection
Ok(Either::Right(permit)) => if let Ok(guard) = self.try_increment_size(permit) {
// we can open a new connection
guard
} else {
Expand All @@ -290,14 +288,19 @@ impl<DB: Database> PoolInner<DB> {
}
};

// Attempt to connect...

let pool = self.clone();
return crate::rt::spawn(async move {
crate::rt::timeout(pool.options.connect_timeout, async move {
pool.connect(deadline, guard).await
}).await
}).await
.map_err(|_| Error::PoolTimedOut)?;
let pool2 = self.clone();

// Future that tries to get a live idle connection.
let idle_fut = std::pin::pin!(pool.get_idle_conn(connect_deadline.clone()));

// Future that tries to open a new connection.
let conn_fut = crate::rt::spawn(async move {
pool2.connect(connect_deadline, guard).await
});

return future::select(idle_fut, conn_fut).await.factor_first().0;
}
}
)
Expand Down Expand Up @@ -404,6 +407,28 @@ impl<DB: Database> PoolInner<DB> {
}
}

// Tries to get a live connection that was idle in a loop.
pub async fn get_idle_conn(
self: &Arc<Self>,
deadline: Instant,
) -> Result<Floating<DB, Live<DB>>, Error> {

let mut backoff = Duration::from_millis(10);
let max_backoff = deadline_as_timeout(deadline)? / 5;

loop {
let new_permit = self.acquire_permit().await?;
match self.get_live_idle(new_permit).await {
Ok(Either::Left(live)) => {
return Ok(live)
},
_ => (),
};
crate::rt::sleep(backoff).await;
backoff = cmp::min(backoff * 2, max_backoff);
}
}

/// Try to maintain `min_connections`, returning any errors (including `PoolTimedOut`).
pub async fn try_min_connections(self: &Arc<Self>, deadline: Instant) -> Result<(), Error> {
while self.size() < self.options.min_connections {
Expand All @@ -428,6 +453,20 @@ impl<DB: Database> PoolInner<DB> {
Ok(())
}

// Tries to get a live idle connection.
pub async fn get_live_idle<'a>(self: &'a Arc<Self>, permit: AsyncSemaphoreReleaser<'a>) -> Result<Either<Floating<DB, Live<DB>>, AsyncSemaphoreReleaser<'a>>, DecrementSizeGuard<DB>> {
match self.pop_idle(permit) {
Ok(conn) => match check_idle_conn(conn, &self.options).await {
// All good!
Ok(live) => Ok(Either::Left(live)),
// if the connection isn't usable for one reason or another,
// we get the `DecrementSizeGuard` back to open a new one
Err(guard) => Err(guard)
},
Err(permit) => Ok(Either::Right(permit))
}
}

/// Attempt to maintain `min_connections`, logging if unable.
pub async fn min_connections_maintenance(self: &Arc<Self>, deadline: Option<Instant>) {
let deadline = deadline.unwrap_or_else(|| {
Expand Down

0 comments on commit e32c121

Please sign in to comment.