From 9f212410026c780ea2a76ba81705ed137022260d Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 28 Nov 2017 16:17:20 -0800 Subject: [PATCH] fix(client): always wait on reads for pooled connections --- src/proto/conn.rs | 25 +++++-------------------- src/proto/dispatch.rs | 15 ++++++++++----- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/src/proto/conn.rs b/src/proto/conn.rs index f909fd8571..a7573947d8 100644 --- a/src/proto/conn.rs +++ b/src/proto/conn.rs @@ -102,16 +102,7 @@ where I: AsyncRead + AsyncWrite, pub fn can_read_head(&self) -> bool { match self.state.reading { - Reading::Init => { - if T::should_read_first() { - true - } else { - match self.state.writing { - Writing::Init => false, - _ => true, - } - } - } + Reading::Init => true, _ => false, } } @@ -245,19 +236,13 @@ where I: AsyncRead + AsyncWrite, Reading::Closed => false, }; - let wants_write = match self.state.writing { + match self.state.writing { Writing::Continue(..) | Writing::Body(..) | Writing::Ending(..) => return, - Writing::Init => true, - Writing::KeepAlive => false, - Writing::Closed => false, - }; - - // if the client is at Reading::Init and Writing::Init, - // it's not actually looking for a read, but a write. - if wants_write && !T::should_read_first() { - return; + Writing::Init | + Writing::KeepAlive | + Writing::Closed => (), } if !self.io.is_read_blocked() { diff --git a/src/proto/dispatch.rs b/src/proto/dispatch.rs index ea5ae79bf2..bd7b86caf2 100644 --- a/src/proto/dispatch.rs +++ b/src/proto/dispatch.rs @@ -1,3 +1,5 @@ +use std::io; + use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use futures::sync::{mpsc, oneshot}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -64,7 +66,7 @@ where } else { None }; - self.dispatch.recv_msg(Ok((head, body))).expect("recv_msg with Ok shouldn't error"); + self.dispatch.recv_msg(Ok((head, body)))?; }, Ok(Async::Ready(None)) => { // read eof, conn will start to shutdown automatically @@ -306,10 +308,13 @@ where fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Option)>) -> ::Result<()> { match msg { Ok((msg, body)) => { - let res = super::response::from_wire(msg, body); - let cb = self.callback.take().expect("recv_msg without callback"); - let _ = cb.send(Ok(res)); - Ok(()) + if let Some(cb) = self.callback.take() { + let res = super::response::from_wire(msg, body); + let _ = cb.send(Ok(res)); + Ok(()) + } else { + Err(::Error::Io(io::Error::new(io::ErrorKind::InvalidData, "response received without matching request"))) + } }, Err(err) => { if let Some(cb) = self.callback.take() {