Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more pool fixes #1211

Merged
merged 1 commit into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 88 additions & 53 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ use std::sync::{Arc, Weak};
use std::task::Context;
use std::time::{Duration, Instant};

/// Waiters should wake at least every this often to check if a connection has not come available
/// since they went to sleep.
const MIN_WAKE_PERIOD: Duration = Duration::from_millis(500);
type Waiters = SegQueue<Weak<WaiterInner>>;

pub(crate) struct SharedPool<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
waiters: SegQueue<Weak<Waiter>>,
waiters: Waiters,
pub(super) size: AtomicU32,
is_closed: AtomicBool,
pub(super) options: PoolOptions<DB>,
Expand Down Expand Up @@ -152,7 +150,7 @@ impl<DB: Database> SharedPool<DB> {
// the strong ref of the `Weak<Waiter>` that we push to the queue
// initialized during the `timeout()` call below
// as long as we own this, we keep our place in line
let mut waiter = None;
let mut waiter: Option<Waiter<'_>> = None;

// Unless the pool has been closed ...
while !self.is_closed() {
Expand All @@ -173,32 +171,33 @@ impl<DB: Database> SharedPool<DB> {
}
}

// Wait for a connection to become available (or we are allowed to open a new one)
let timeout_duration = cmp::min(
// Returns an error if `deadline` passes
deadline_as_timeout::<DB>(deadline)?,
MIN_WAKE_PERIOD,
);
if let Some(ref waiter) = waiter {
// return the waiter to the queue, note that this does put it to the back
// of the queue when it should ideally stay at the front
self.waiters.push(Arc::downgrade(&waiter.inner));
}

sqlx_rt::timeout(
timeout_duration,
// Returns an error if `deadline` passes
deadline_as_timeout::<DB>(deadline)?,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|cx| -> Poll<()> {
let waiter = waiter.get_or_insert_with(|| {
let waiter = Waiter::new(cx);
self.waiters.push(Arc::downgrade(&waiter));
waiter
});
let waiter = waiter.get_or_insert_with(|| Waiter::push_new(cx, &self.waiters));

if waiter.is_woken() {
waiter.actually_woke = true;
Poll::Ready(())
} else {
Poll::Pending
}
}),
)
.await
.ok(); // timeout is no longer fatal here; we check if the deadline expired above
.map_err(|_| Error::PoolTimedOut)?;

if let Some(ref mut waiter) = waiter {
waiter.reset();
}

waited = true;
}
Expand Down Expand Up @@ -329,37 +328,40 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
let pool = Arc::clone(&pool);

sqlx_rt::spawn(async move {
while !pool.is_closed.load(Ordering::Acquire) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);

// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.pop_idle())
.partition::<Vec<_>, _>(|conn| {
is_beyond_idle(conn, &pool.options) || is_beyond_lifetime(conn, &pool.options)
});

for conn in keep {
// return these connections to the pool first
let is_ok = pool.idle_conns.push(conn.into_leakable()).is_ok();

if !is_ok {
panic!("BUG: connection queue overflow in spawn_reaper");
}
}

for conn in reap {
let _ = conn.close().await;
while !pool.is_closed() {
// only reap idle connections when no tasks are waiting
if pool.waiters.is_empty() {
do_reap(&pool).await;
}

sqlx_rt::sleep(period).await;
}
});
}

fn wake_one(waiters: &SegQueue<Weak<Waiter>>) {
async fn do_reap<DB: Database>(pool: &SharedPool<DB>) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);

// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.pop_idle())
.partition::<Vec<_>, _>(|conn| {
is_beyond_idle(conn, &pool.options) || is_beyond_lifetime(conn, &pool.options)
});

for conn in keep {
// return valid connections to the pool first
pool.release(conn.into_live());
}

for conn in reap {
let _ = conn.close().await;
}
}

fn wake_one(waiters: &Waiters) {
while let Some(weak) = waiters.pop() {
if let Some(waiter) = weak.upgrade() {
if waiter.wake() {
Expand All @@ -375,7 +377,7 @@ fn wake_one(waiters: &SegQueue<Weak<Waiter>>) {
/// (where the pool thinks it has more connections than it does).
pub(in crate::pool) struct DecrementSizeGuard<'a> {
size: &'a AtomicU32,
waiters: &'a SegQueue<Weak<Waiter>>,
waiters: &'a Waiters,
dropped: bool,
}

Expand Down Expand Up @@ -407,19 +409,12 @@ impl Drop for DecrementSizeGuard<'_> {
}
}

struct Waiter {
struct WaiterInner {
woken: AtomicBool,
waker: Waker,
}

impl Waiter {
fn new(cx: &mut Context<'_>) -> Arc<Self> {
Arc::new(Self {
woken: AtomicBool::new(false),
waker: cx.waker().clone(),
})
}

impl WaiterInner {
/// Wake this waiter if it has not previously been woken.
///
/// Return `true` if this waiter was newly woken, or `false` if it was already woken.
Expand All @@ -435,8 +430,48 @@ impl Waiter {

false
}
}

struct Waiter<'a> {
inner: Arc<WaiterInner>,
queue: &'a Waiters,
actually_woke: bool,
}

impl<'a> Waiter<'a> {
fn push_new(cx: &mut Context<'_>, queue: &'a Waiters) -> Self {
let inner = Arc::new(WaiterInner {
woken: AtomicBool::new(false),
waker: cx.waker().clone(),
});

queue.push(Arc::downgrade(&inner));

Self {
inner,
queue,
actually_woke: false,
}
}

fn is_woken(&self) -> bool {
self.woken.load(Ordering::Acquire)
self.inner.woken.load(Ordering::Acquire)
}

fn reset(&mut self) {
self.inner
.woken
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.ok();
self.actually_woke = false;
}
}

impl Drop for Waiter<'_> {
fn drop(&mut self) {
// if we didn't actually wake to get a connection, wake the next task instead
if self.is_woken() && !self.actually_woke {
wake_one(self.queue);
}
}
}
19 changes: 12 additions & 7 deletions tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,26 +509,31 @@ async fn pool_smoke_test() -> anyhow::Result<()> {
eprintln!("starting pool");

let pool = PgPoolOptions::new()
.connect_timeout(Duration::from_secs(30))
.min_connections(5)
.max_connections(10)
.connect_timeout(Duration::from_secs(5))
.min_connections(1)
.max_connections(1)
.connect(&dotenv::var("DATABASE_URL")?)
.await?;

// spin up more tasks than connections available, and ensure we don't deadlock
for i in 0..20 {
for i in 0..200 {
let pool = pool.clone();
sqlx_rt::spawn(async move {
loop {
if let Err(e) = sqlx::query("select 1 + 1").execute(&pool).await {
eprintln!("pool task {} dying due to {}", i, e);
break;
// normal error at termination of the test
if !matches!(e, sqlx::Error::PoolClosed) {
eprintln!("pool task {} dying due to {}", i, e);
break;
}
}
}
});
}

for _ in 0..5 {
// spawn a bunch of tasks that attempt to acquire but give up to ensure correct handling
// of cancellations
for _ in 0..50 {
let pool = pool.clone();
sqlx_rt::spawn(async move {
while !pool.is_closed() {
Expand Down