Skip to content

Commit

Permalink
Fix leaky connections
Browse files Browse the repository at this point in the history
Fixes #221

It's possible to trigger more approvals than are necessary, in turn
grabbing more connections than we need. This happens when we drop a
connection. The drop produces a notify, which doesn't get used until the
pool is empty. The first `Pool::get()` call on an empty pool will spawn
an connect task, immediately complete `notify.notified().await`, then
spawn a second connect task. Both will connect and we'll end up with 1
more connection than we need.

Rather than address the notify issue directly, this fix introduces some
bookkeeping that tracks the number of open `pool.get()` requests we have
waiting on connections. If the number of pending connections >= the
number of pending gets, we will not spawn any additional connect tasks.
  • Loading branch information
tneely committed Oct 17, 2024
1 parent cb99697 commit c679d79
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 1 deletion.
1 change: 1 addition & 0 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ where
let mut wait_time_start = None;

let future = async {
let _guard = self.inner.request();
loop {
let (conn, approvals) = self.inner.pop();
self.spawn_replenishing_approvals(approvals);
Expand Down
35 changes: 34 additions & 1 deletion bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@ where
pub(crate) statistics: AtomicStatistics,
}

pub(crate) struct GetGuard<M: ManageConnection + Send> {
inner: Arc<SharedPool<M>>,
}

impl<M: ManageConnection + Send> GetGuard<M> {
fn new(inner: Arc<SharedPool<M>>) -> Self {
{
let mut locked = inner.internals.lock();
locked.in_flight += 1;
}
GetGuard { inner }
}
}

impl<M: ManageConnection + Send> Drop for GetGuard<M> {
fn drop(&mut self) {
let mut locked = self.inner.internals.lock();
locked.in_flight -= 1;
}
}

impl<M> SharedPool<M>
where
M: ManageConnection + Send,
Expand All @@ -41,12 +62,22 @@ where
let conn = locked.conns.pop_front().map(|idle| idle.conn);
let approvals = match &conn {
Some(_) => locked.wanted(&self.statics),
None => locked.approvals(&self.statics, 1),
None => {
let approvals = match locked.in_flight < locked.pending_conns {
true => 1,
false => 0,
};
locked.approvals(&self.statics, approvals)
}
};

(conn, approvals)
}

pub(crate) fn request(self: &Arc<Self>) -> GetGuard<M> {
GetGuard::new(self.clone())
}

pub(crate) fn try_put(self: &Arc<Self>, conn: M::Connection) -> Result<(), M::Connection> {
let mut locked = self.internals.lock();
let mut approvals = locked.approvals(&self.statics, 1);
Expand Down Expand Up @@ -81,6 +112,7 @@ where
conns: VecDeque<IdleConn<M::Connection>>,
num_conns: u32,
pending_conns: u32,
in_flight: u32,
}

impl<M> PoolInternals<M>
Expand Down Expand Up @@ -202,6 +234,7 @@ where
conns: VecDeque::new(),
num_conns: 0,
pending_conns: 0,
in_flight: 0,
}
}
}
Expand Down
34 changes: 34 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,3 +1068,37 @@ async fn test_add_checks_broken_connections() {
let res = pool.add(conn);
assert!(matches!(res, Err(AddError::Broken(_))));
}

#[tokio::test]
async fn test_reuse_on_drop() {
let pool = Pool::builder()
.min_idle(0)
.max_size(100)
.queue_strategy(QueueStrategy::Lifo)
.build(OkManager::<FakeConnection>::new())
.await
.unwrap();

// The first get should
// 1) see nothing in the pool,
// 2) spawn a single replenishing approval,
// 3) get notified of the new connection and grab it from the pool
let conn_0 = pool.get().await.expect("should connect");
// Dropping the connection queues up a notify
drop(conn_0);
// The second get should
// 1) see the first connection in the pool and grab it
let _conn_1: PooledConnection<OkManager<FakeConnection>> =
pool.get().await.expect("should connect");
// The third get will
// 1) see nothing in the pool,
// 2) spawn a single replenishing approval,
// 3) get notified of the new connection,
// 4) see nothing in the pool,
// 5) _not_ spawn a single replenishing approval,
// 6) get notified of the new connection and grab it from the pool
let _conn_2: PooledConnection<OkManager<FakeConnection>> =
pool.get().await.expect("should connect");

assert_eq!(pool.state().connections, 2);
}

0 comments on commit c679d79

Please sign in to comment.