Skip to content

Commit

Permalink
fix(client): check for drained stream in Response::drop
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Jun 17, 2015
1 parent 306f39c commit e689f20
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 34 deletions.
18 changes: 5 additions & 13 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ impl<C: NetworkConnector<Stream=S>, S: NetworkStream + Send> NetworkConnector fo
Ok(PooledStream {
inner: Some((key, conn)),
is_closed: false,
is_drained: false,
pool: self.inner.clone()
})
}
Expand All @@ -130,20 +129,13 @@ impl<C: NetworkConnector<Stream=S>, S: NetworkStream + Send> NetworkConnector fo
pub struct PooledStream<S> {
inner: Option<(Key, S)>,
is_closed: bool,
is_drained: bool,
pool: Arc<Mutex<PoolImpl<S>>>
}

impl<S: NetworkStream> Read for PooledStream<S> {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match self.inner.as_mut().unwrap().1.read(buf) {
Ok(0) => {
self.is_drained = true;
Ok(0)
}
r => r
}
self.inner.as_mut().unwrap().1.read(buf)
}
}

Expand Down Expand Up @@ -174,8 +166,8 @@ impl<S: NetworkStream> NetworkStream for PooledStream<S> {

impl<S> Drop for PooledStream<S> {
fn drop(&mut self) {
trace!("PooledStream.drop, is_closed={}, is_drained={}", self.is_closed, self.is_drained);
if !self.is_closed && self.is_drained {
trace!("PooledStream.drop, is_closed={}", self.is_closed);
if !self.is_closed {
self.inner.take().map(|(key, conn)| {
if let Ok(mut pool) = self.pool.lock() {
pool.reuse(key, conn);
Expand Down Expand Up @@ -205,13 +197,13 @@ mod tests {
fn test_connect_and_drop() {
let pool = mocked!();
let key = key("127.0.0.1", 3000, "http");
pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true;
pool.connect("127.0.0.1", 3000, "http").unwrap();
{
let locked = pool.inner.lock().unwrap();
assert_eq!(locked.conns.len(), 1);
assert_eq!(locked.conns.get(&key).unwrap().len(), 1);
}
pool.connect("127.0.0.1", 3000, "http").unwrap().is_drained = true; //reused
pool.connect("127.0.0.1", 3000, "http").unwrap(); //reused
{
let locked = pool.inner.lock().unwrap();
assert_eq!(locked.conns.len(), 1);
Expand Down
48 changes: 27 additions & 21 deletions src/client/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Response {
pub version: version::HttpVersion,
status_raw: RawStatus,
message: Box<HttpMessage>,
is_drained: bool,
}

impl Response {
Expand All @@ -43,41 +44,54 @@ impl Response {
headers: headers,
message: message,
status_raw: raw_status,
is_drained: false,
})
}

/// Get the raw status code and reason.
pub fn status_raw(&self) -> &RawStatus {
&self.status_raw
}

}

impl Read for Response {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let count = try!(self.message.read(buf));
match self.message.read(buf) {
Ok(0) => {
self.is_drained = true;
Ok(0)
},
r => r
}
}
}

if count == 0 {
if !http::should_keep_alive(self.version, &self.headers) {
try!(self.message.close_connection()
.map_err(|_| io::Error::new(io::ErrorKind::Other,
"Error closing connection")));
impl Drop for Response {
fn drop(&mut self) {
// if not drained, theres old bits in the Reader. we can't reuse this,
// since those old bits would end up in new Responses
//
// otherwise, the response has been drained. we should check that the
// server has agreed to keep the connection open
trace!("Response.is_drained = {:?}", self.is_drained);
if !(self.is_drained && http::should_keep_alive(self.version, &self.headers)) {
trace!("closing connection");
if let Err(e) = self.message.close_connection() {
error!("error closing connection: {}", e);
}
}

Ok(count)
}
}

#[cfg(test)]
mod tests {
use std::borrow::Cow::Borrowed;
use std::io::{self, Read};

use header::Headers;
use header::TransferEncoding;
use header::Encoding;
use http::RawStatus;
use http::HttpMessage;
use mock::MockStream;
use status;
use version;
Expand All @@ -94,18 +108,10 @@ mod tests {

#[test]
fn test_into_inner() {
let res = Response {
status: status::StatusCode::Ok,
headers: Headers::new(),
version: version::HttpVersion::Http11,
message: Box::new(Http11Message::with_stream(Box::new(MockStream::new()))),
status_raw: RawStatus(200, Borrowed("OK")),
};

let message = res.message.downcast::<Http11Message>().ok().unwrap();
let message: Box<HttpMessage> = Box::new(Http11Message::with_stream(Box::new(MockStream::new())));
let message = message.downcast::<Http11Message>().ok().unwrap();
let b = message.into_inner().downcast::<MockStream>().ok().unwrap();
assert_eq!(b, Box::new(MockStream::new()));

}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct RawStatus(pub u16, pub Cow<'static, str>);
/// Checks if a connection should be kept alive.
#[inline]
pub fn should_keep_alive(version: HttpVersion, headers: &Headers) -> bool {
trace!("should_keep_alive( {:?}, {:?} )", version, headers.get::<Connection>());
match (version, headers.get::<Connection>()) {
(Http10, Some(conn)) if !conn.contains(&KeepAlive) => false,
(Http11, Some(conn)) if conn.contains(&Close) => false,
Expand Down

0 comments on commit e689f20

Please sign in to comment.