Skip to content

Commit

Permalink
fix(client): prevent pool checkout looping on not-ready connections
Browse files Browse the repository at this point in the history
Closes #1519
  • Loading branch information
seanmonstar committed May 16, 2018
1 parent 2c48101 commit ccec79d
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 214 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tokio-proto = { version = "0.1", optional = true }
tokio-service = "0.1"
tokio-io = "0.1"
unicase = "2.0"
want = "0.0.4"

[dev-dependencies]
num_cpus = "1.0"
Expand Down
4 changes: 0 additions & 4 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,6 @@ impl<B> SendRequest<B>
pub(super) fn is_ready(&self) -> bool {
self.dispatch.is_ready()
}

pub(super) fn is_closed(&self) -> bool {
self.dispatch.is_closed()
}
}

impl<B> SendRequest<B>
Expand Down
14 changes: 6 additions & 8 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use futures::{Async, Poll, Stream};
use futures::sync::{mpsc, oneshot};
use want;

use common::Never;
use super::signal;

//pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>;
pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>;
pub type Promise<T> = oneshot::Receiver<Result<T, ::Error>>;

pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::channel(0);
let (giver, taker) = signal::new();
let (giver, taker) = want::new();
let tx = Sender {
giver: giver,
inner: tx,
Expand All @@ -27,7 +27,7 @@ pub struct Sender<T, U> {
// when the queue is empty. This helps us know when a request and
// response have been fully processed, and a connection is ready
// for more.
giver: signal::Giver,
giver: want::Giver,
inner: mpsc::Sender<(T, Callback<T, U>)>,
}

Expand All @@ -49,18 +49,16 @@ impl<T, U> Sender<T, U> {
self.giver.is_wanting()
}

pub fn is_closed(&self) -> bool {
self.giver.is_canceled()
}

pub fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
self.giver.give();
let (tx, rx) = oneshot::channel();
self.inner.try_send((val, Callback::Retry(tx)))
.map(move |_| rx)
.map_err(|e| e.into_inner().0)
}

pub fn send(&mut self, val: T) -> Result<Promise<U>, T> {
self.giver.give();
let (tx, rx) = oneshot::channel();
self.inner.try_send((val, Callback::NoRetry(tx)))
.map(move |_| rx)
Expand All @@ -70,7 +68,7 @@ impl<T, U> Sender<T, U> {

pub struct Receiver<T, U> {
inner: mpsc::Receiver<(T, Callback<T, U>)>,
taker: signal::Taker,
taker: want::Taker,
}

impl<T, U> Stream for Receiver<T, U> {
Expand Down
16 changes: 12 additions & 4 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ mod dns;
mod pool;
#[cfg(feature = "compat")]
pub mod compat;
mod signal;
#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -262,13 +261,22 @@ where C: Connect,
// If the executor doesn't have room, oh well. Things will likely
// be blowing up soon, but this specific task isn't required.
let _ = executor.execute(future::poll_fn(move || {
pooled.tx.poll_ready().map_err(|_| ())
pooled.tx.poll_ready()
}).then(move |_| {
// At this point, `pooled` is dropped, and had a chance
// to insert into the pool (if conn was idle)
drop(delayed_tx);
Ok(())
}));
} else {
// There's no body to delay, but the connection isn't
// ready yet. Only re-insert when it's ready...
let _ = executor.execute(
future::poll_fn(move || {
pooled.tx.poll_ready()
})
.then(|_| Ok(()))
);
}

res
Expand Down Expand Up @@ -395,8 +403,8 @@ impl<B> self::pool::Closed for PoolClient<B>
where
B: 'static,
{
fn is_closed(&self) -> bool {
self.tx.is_closed()
fn is_open(&self) -> bool {
self.tx.is_ready()
}
}

Expand Down
17 changes: 11 additions & 6 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Pool<T> {
//
// See https://github.com/hyperium/hyper/issues/1429
pub trait Closed {
fn is_closed(&self) -> bool;
fn is_open(&self) -> bool;
}

struct PoolInner<T> {
Expand Down Expand Up @@ -77,7 +77,7 @@ impl<T: Closed> Pool<T> {
trace!("take; url = {:?}, expiration = {:?}", key, expiration.0);
while let Some(entry) = list.pop() {
if !expiration.expires(entry.idle_at) {
if !entry.value.is_closed() {
if entry.value.is_open() {
should_remove = list.is_empty();
return Some(entry);
}
Expand Down Expand Up @@ -205,7 +205,7 @@ impl<T: Closed> PoolInner<T> {
self.idle.retain(|_key, values| {

values.retain(|entry| {
if entry.value.is_closed() {
if !entry.value.is_open() {
return false;
}
now - entry.idle_at < dur
Expand Down Expand Up @@ -293,6 +293,11 @@ impl<T: Closed> DerefMut for Pooled<T> {
impl<T: Closed> Drop for Pooled<T> {
fn drop(&mut self) {
if let Some(value) = self.value.take() {
if !value.is_open() {
// don't ever re-insert not-open connections back
// into the pool!
return;
}
if let Some(inner) = self.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
inner.put(self.key.clone(), value);
Expand Down Expand Up @@ -331,7 +336,7 @@ impl<T: Closed> Checkout<T> {
if let Some(ref mut rx) = self.parked {
match rx.poll() {
Ok(Async::Ready(value)) => {
if !value.is_closed() {
if value.is_open() {
return Ok(Async::Ready(self.pool.reuse(&self.key, value)));
}
drop_parked = true;
Expand Down Expand Up @@ -434,8 +439,8 @@ mod tests {
use super::{Closed, Pool};

impl Closed for i32 {
fn is_closed(&self) -> bool {
false
fn is_open(&self) -> bool {
true
}
}

Expand Down
192 changes: 0 additions & 192 deletions src/client/signal.rs

This file was deleted.

1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ extern crate tokio_core as tokio;
extern crate tokio_proto;
extern crate tokio_service;
extern crate unicase;
extern crate want;

#[cfg(all(test, feature = "nightly"))]
extern crate test;
Expand Down

0 comments on commit ccec79d

Please sign in to comment.