Closed
Description
I'm not sure if this is a hyper, tokio_timer or general futures issue, it seems similar to #1353 - but that suggests this is fixed by using no_proto
on the Client, and I see the same problem whether or not I use no_proto
.
Originally raised as jimmycuadra/rust-etcd#20, but managed to isolate it to hyper.
The trigger seems to be using tokio_timer::Timeout
on a Client.get()
that has returned the headers with Chunked-Encoding but no chunks received yet.
Nothing obvious in trace-level logs.
Repro instructions (on Linux - not tried on any other platforms), mostly cribbed from the rust-etcd repo:
- Start an etcd instance running locally (assuming you have docker,
docker run --net=host -d quay.io/coreos/etcd:v2.2.0
works) - Run this simple etcd client code (below) that loops watching for changes to a key.
- Observe leaking TCP connections - I view through
sudo netstat - plant | grep 2379
(and eventually you'll run out of ports).
extern crate hyper;
extern crate tokio_core;
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate futures;
extern crate tokio_timer;
use std::time::Duration;
use futures::future::Future;
use futures::Stream;
use tokio_core::reactor;
use tokio_timer::{Timer, TimeoutError};
use hyper::{Client, Uri};
use std::error::Error;
// Dummy error type to handle this
#[derive(Debug)]
enum MyError {
Timeout,
Other(String),
}
impl std::fmt::Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(f, "{}", self.description())
}
}
impl Error for MyError {
fn description(&self) -> &str {
match *self {
MyError::Timeout => "Timeout",
MyError::Other(ref s) => s,
}
}
}
impl<T> From<TimeoutError<T>> for MyError {
fn from(_error: TimeoutError<T>) -> MyError {
MyError::Timeout
}
}
impl From<hyper::Error> for MyError {
fn from(error: hyper::Error) -> MyError {
MyError::Other(error.description().to_string())
}
}
fn main() {
env_logger::init().unwrap();
let mut core = reactor::Core::new().unwrap();
let client = Client::new(&core.handle());
let mut idx = 0;
loop {
let url_str = format!(
"http://localhost:2379/v2/keys/foo?waitIndex={}&wait=true&recursive=true",
idx
);
let url = url_str.parse::<Uri>().unwrap();
debug!("Kicking off watch for index: {}", idx);
let timer = Timer::default();
let cur_idx = idx;
let work: Box<Future<Item = (), Error = MyError>> = Box::new(
timer.timeout(
client
.get(url)
.map_err(|e| MyError::Other(e.description().to_string()))
.and_then(|response| {
let status = response.status();
response
.body()
.concat2()
.map_err(|e| MyError::Other(format!("Body parse error: {}", e)))
.and_then(move |ref _body| if status == hyper::StatusCode::Ok {
idx += 1;
Ok(())
} else {
Err(MyError::Other("Bad response from etcd".to_string()))
})
})
.map(|_| {
debug!("Completed watch for index {}", cur_idx);
}),
Duration::from_secs(3),
),
);
let res = core.run(work);
match res {
Ok(_) => debug!("Spotted change from etcd"),
Err(MyError::Timeout) => info!("Timeout"),
Err(MyError::Other(e)) => error!(" error from etcd: {}", e),
}
}
}