Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leaky connections #223

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bb8/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ where
let mut wait_time_start = None;

let future = async {
let _guard = self.inner.request();
loop {
let (conn, approvals) = self.inner.pop();
self.spawn_replenishing_approvals(approvals);
Expand Down
35 changes: 34 additions & 1 deletion bb8/src/internals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,27 @@ where
pub(crate) statistics: AtomicStatistics,
}

pub(crate) struct GetGuard<M: ManageConnection + Send> {
inner: Arc<SharedPool<M>>,
}

impl<M: ManageConnection + Send> GetGuard<M> {
fn new(inner: Arc<SharedPool<M>>) -> Self {
{
let mut locked = inner.internals.lock();
locked.in_flight += 1;
}
GetGuard { inner }
}
}

impl<M: ManageConnection + Send> Drop for GetGuard<M> {
fn drop(&mut self) {
let mut locked = self.inner.internals.lock();
locked.in_flight -= 1;
}
}

impl<M> SharedPool<M>
where
M: ManageConnection + Send,
Expand All @@ -41,12 +62,22 @@ where
let conn = locked.conns.pop_front().map(|idle| idle.conn);
let approvals = match &conn {
Some(_) => locked.wanted(&self.statics),
None => locked.approvals(&self.statics, 1),
None => {
let approvals = match locked.in_flight > locked.pending_conns {
true => 1,
false => 0,
};
locked.approvals(&self.statics, approvals)
}
};

(conn, approvals)
}

pub(crate) fn request(self: &Arc<Self>) -> GetGuard<M> {
GetGuard::new(self.clone())
}
djc marked this conversation as resolved.
Show resolved Hide resolved

pub(crate) fn try_put(self: &Arc<Self>, conn: M::Connection) -> Result<(), M::Connection> {
let mut locked = self.internals.lock();
let mut approvals = locked.approvals(&self.statics, 1);
Expand Down Expand Up @@ -81,6 +112,7 @@ where
conns: VecDeque<IdleConn<M::Connection>>,
num_conns: u32,
pending_conns: u32,
in_flight: u32,
}

impl<M> PoolInternals<M>
Expand Down Expand Up @@ -202,6 +234,7 @@ where
conns: VecDeque::new(),
num_conns: 0,
pending_conns: 0,
in_flight: 0,
}
}
}
Expand Down
34 changes: 34 additions & 0 deletions bb8/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,3 +1068,37 @@ async fn test_add_checks_broken_connections() {
let res = pool.add(conn);
assert!(matches!(res, Err(AddError::Broken(_))));
}

#[tokio::test]
async fn test_reuse_on_drop() {
let pool = Pool::builder()
.min_idle(0)
.max_size(100)
.queue_strategy(QueueStrategy::Lifo)
.build(OkManager::<FakeConnection>::new())
.await
.unwrap();

// The first get should
// 1) see nothing in the pool,
// 2) spawn a single replenishing approval,
// 3) get notified of the new connection and grab it from the pool
let conn_0 = pool.get().await.expect("should connect");
// Dropping the connection queues up a notify
drop(conn_0);
// The second get should
// 1) see the first connection in the pool and grab it
let _conn_1: PooledConnection<OkManager<FakeConnection>> =
pool.get().await.expect("should connect");
// The third get will
// 1) see nothing in the pool,
// 2) spawn a single replenishing approval,
// 3) get notified of the new connection,
// 4) see nothing in the pool,
// 5) _not_ spawn a single replenishing approval,
// 6) get notified of the new connection and grab it from the pool
let _conn_2: PooledConnection<OkManager<FakeConnection>> =
pool.get().await.expect("should connect");

assert_eq!(pool.state().connections, 2);
}
Loading