Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pool.get() can acquire more connections than are necessary #221

Open
tneely opened this issue Oct 10, 2024 · 1 comment
Open

pool.get() can acquire more connections than are necessary #221

tneely opened this issue Oct 10, 2024 · 1 comment

Comments

@tneely
Copy link
Contributor

tneely commented Oct 10, 2024

Issue

It's possible to trigger more approvals than are necessary, in turn grabbing more connections than we need. This happens when we drop a connection. The drop produces a notify, which doesn't get used until the pool is empty. The first Pool::get() call on an empty pool will spawn an connect task, immediately complete notify.notified().await, then spawn a second connect task. Both will connect and we'll end up with 1 more connection than we need.

This gets worse if one of those connections is broken when it gets returned to the pool. We end up calling notify.notify_waiters();, which means all connections in the PoolInner::get() loop could spawn an extra connect task. This should just be a single notify.notify_one(); call.

self.inner.notify.notify_waiters();

Affected Versions

I have only tested on 0.8.5

Fix

Not sure on this one. I think removing the notify.notify_one() call from the drop path would fix this specific issue, but it would hurt performance and could cause other issues (e.g. if the pool is at max capacity and we have no inflight connect tasks).

We should definitely change notify.notify_waiters() -> notify.notify_one() in the broken connection case if possible.

Reproduction

#[tokio::test]
async fn test_notify() {
    let pool = Pool::builder()
        .min_idle(0)
        .max_size(100)
        .queue_strategy(QueueStrategy::Lifo)
        .build(OkManager::<FakeConnection>::new())
        .await
        .unwrap();

    // The first get should
    // 1) see nothing in the pool,
    // 2) spawn a single connect task,
    // 3) get notified of the new connection and grab it from the pool
    let conn_0 = pool.get().await.expect("should connect");
    // Dropping the connection queues up a notify
    drop(conn_0);
    // The second get should
    // 1) see the first connection in the pool and grab it
    // NB: This does not consume the notify
    let _conn_1: PooledConnection<OkManager<FakeConnection>> = pool.get().await.expect("should connect");
    // The third get will
    // 1) see nothing in the pool,
    // 2) spawn a single connect task,
    // 3) get notified from the dropped connection
    // 4) see nothing in the pool,
    // 5) spawn a single connect task (oh no!),
    // 6) get notified of the new connection and grab it from the pool
    let _conn_2: PooledConnection<OkManager<FakeConnection>> = pool.get().await.expect("should connect");

    assert_eq!(pool.state().connections, 2);
}
thread 'test_notify' panicked at bb8/tests/test.rs:1160:5:
assertion `left == right` failed
  left: 3
 right: 2
@tneely
Copy link
Contributor Author

tneely commented Oct 11, 2024

I think I have a fix, but it involves a little more book-keeping. I want to poke this a bit more, but can raise a PR soonish.

diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs
index be2ef52..90ff63e 100644
--- a/bb8/src/inner.rs
+++ b/bb8/src/inner.rs
@@ -91,6 +91,7 @@ where
         let mut wait_time_start = None;

         let future = async {
+            let _guard = self.inner.request();
             loop {
                 let (conn, approvals) = self.inner.pop();
                 self.spawn_replenishing_approvals(approvals);
@@ -158,7 +159,7 @@ where
                 }
                 let approvals = locked.dropped(1, &self.inner.statics);
                 self.spawn_replenishing_approvals(approvals);
-                self.inner.notify.notify_waiters();
+                self.inner.notify.notify_one();
             }
         }
     }
diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs
index 155e21a..c27647c 100644
--- a/bb8/src/internals.rs
+++ b/bb8/src/internals.rs
@@ -22,6 +22,29 @@ where
     pub(crate) statistics: AtomicStatistics,
 }

+pub(crate) struct GetGuard<M: ManageConnection + Send> {
+    inner: Arc<SharedPool<M>>,
+}
+
+impl<M: ManageConnection + Send> GetGuard<M> {
+    fn new(inner: Arc<SharedPool<M>>) -> Self {
+        {
+            let mut locked = inner.internals.lock();
+            locked.inflight_gets += 1;
+        }
+        GetGuard {
+            inner
+        }
+    }
+}
+
+impl<M: ManageConnection + Send> Drop for GetGuard<M> {
+    fn drop(&mut self) {
+        let mut locked = self.inner.internals.lock();
+        locked.inflight_gets -= 1;
+    }
+}
+
 impl<M> SharedPool<M>
 where
     M: ManageConnection + Send,
@@ -41,12 +64,19 @@ where
         let conn = locked.conns.pop_front().map(|idle| idle.conn);
         let approvals = match &conn {
             Some(_) => locked.wanted(&self.statics),
-            None => locked.approvals(&self.statics, 1),
+            None => {
+                let approvals = min(1, locked.inflight_gets.saturating_sub(locked.pending_conns));
+                locked.approvals(&self.statics, approvals)
+            }
         };

         (conn, approvals)
     }

+    pub(crate) fn request(self: &Arc<Self>) -> GetGuard<M> {
+        GetGuard::new(self.clone())
+    }
+
     pub(crate) fn try_put(self: &Arc<Self>, conn: M::Connection) -> Result<(), M::Connection> {
         let mut locked = self.internals.lock();
         let mut approvals = locked.approvals(&self.statics, 1);
@@ -81,6 +111,7 @@ where
     conns: VecDeque<IdleConn<M::Connection>>,
     num_conns: u32,
     pending_conns: u32,
+    inflight_gets: u32,
 }

 impl<M> PoolInternals<M>
@@ -202,6 +233,7 @@ where
             conns: VecDeque::new(),
             num_conns: 0,
             pending_conns: 0,
+            inflight_gets: 0,
         }
     }
 }

tneely added a commit to tneely/bb8 that referenced this issue Oct 14, 2024
Fixes djc#221

It's possible to trigger more approvals than are necessary, in turn
grabbing more connections than we need. This happens when we drop a
connection. The drop produces a notify, which doesn't get used until the
pool is empty. The first `Pool::get()` call on an empty pool will spawn
an connect task, immediately complete `notify.notified().await`, then
spawn a second connect task. Both will connect and we'll end up with 1
more connection than we need.

Rather than address the notify issue directly, this fix introduces some
bookkeeping that tracks the number of open `pool.get()` requests we have
waiting on connections. If the number of pending connections >= the
number of pending gets, we will not spawn any additional connect tasks.

I have additionally changed `notify.notify_waiters();` to a
`notify.notify_one();` call in the broken connection branch. A single
broken connection should only need to notify one pending task to spawn a
new connect task, not all of them.
tneely added a commit to tneely/bb8 that referenced this issue Oct 15, 2024
Fixes djc#221

It's possible to trigger more approvals than are necessary, in turn
grabbing more connections than we need. This happens when we drop a
connection. The drop produces a notify, which doesn't get used until the
pool is empty. The first `Pool::get()` call on an empty pool will spawn
an connect task, immediately complete `notify.notified().await`, then
spawn a second connect task. Both will connect and we'll end up with 1
more connection than we need.

Rather than address the notify issue directly, this fix introduces some
bookkeeping that tracks the number of open `pool.get()` requests we have
waiting on connections. If the number of pending connections >= the
number of pending gets, we will not spawn any additional connect tasks.
tneely added a commit to tneely/bb8 that referenced this issue Oct 17, 2024
Fixes djc#221

It's possible to trigger more approvals than are necessary, in turn
grabbing more connections than we need. This happens when we drop a
connection. The drop produces a notify, which doesn't get used until the
pool is empty. The first `Pool::get()` call on an empty pool will spawn
an connect task, immediately complete `notify.notified().await`, then
spawn a second connect task. Both will connect and we'll end up with 1
more connection than we need.

Rather than address the notify issue directly, this fix introduces some
bookkeeping that tracks the number of open `pool.get()` requests we have
waiting on connections. If the number of pending connections >= the
number of pending gets, we will not spawn any additional connect tasks.
tneely added a commit to tneely/bb8 that referenced this issue Oct 17, 2024
Fixes djc#221

It's possible to trigger more approvals than are necessary, in turn
grabbing more connections than we need. This happens when we drop a
connection. The drop produces a notify, which doesn't get used until the
pool is empty. The first `Pool::get()` call on an empty pool will spawn
an connect task, immediately complete `notify.notified().await`, then
spawn a second connect task. Both will connect and we'll end up with 1
more connection than we need.

Rather than address the notify issue directly, this fix introduces some
bookkeeping that tracks the number of open `pool.get()` requests we have
waiting on connections. If the number of pending connections >= the
number of pending gets, we will not spawn any additional connect tasks.
djc pushed a commit that referenced this issue Oct 17, 2024
Fixes #221

It's possible to trigger more approvals than are necessary, in turn
grabbing more connections than we need. This happens when we drop a
connection. The drop produces a notify, which doesn't get used until the
pool is empty. The first `Pool::get()` call on an empty pool will spawn
an connect task, immediately complete `notify.notified().await`, then
spawn a second connect task. Both will connect and we'll end up with 1
more connection than we need.

Rather than address the notify issue directly, this fix introduces some
bookkeeping that tracks the number of open `pool.get()` requests we have
waiting on connections. If the number of pending connections >= the
number of pending gets, we will not spawn any additional connect tasks.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant