diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index 4ea780610d..647171e764 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -12,7 +12,7 @@ use futures_util::ready; use http::{Request, Response}; use httparse::ParserConfig; -use super::super::dispatch; +use super::super::dispatch::{self, TrySendError}; use crate::body::{Body, Incoming as IncomingBody}; use crate::proto; @@ -208,33 +208,38 @@ where } } - /* - pub(super) fn send_request_retryable( + /// Sends a `Request` on the associated connection. + /// + /// Returns a future that if successful, yields the `Response`. + /// + /// # Error + /// + /// If there was an error before trying to serialize the request to the + /// connection, the message will be returned as part of this error. + pub fn try_send_request( &mut self, req: Request, - ) -> impl Future, (crate::Error, Option>)>> + Unpin - where - B: Send, - { - match self.dispatch.try_send(req) { - Ok(rx) => { - Either::Left(rx.then(move |res| { - match res { - Ok(Ok(res)) => future::ok(res), - Ok(Err(err)) => future::err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_) => panic!("dispatch dropped without returning error"), - } - })) - } - Err(req) => { - debug!("connection was not ready"); - let err = crate::Error::new_canceled().with("connection was not ready"); - Either::Right(future::err((err, Some(req)))) + ) -> impl Future, TrySendError>>> { + let sent = self.dispatch.try_send(req); + async move { + match sent { + Ok(rx) => match rx.await { + Ok(Ok(res)) => Ok(res), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + }, + Err(req) => { + debug!("connection was not ready"); + let error = crate::Error::new_canceled().with("connection was not ready"); + Err(TrySendError { + error, + message: Some(req), + }) + } } } } - */ } impl fmt::Debug for SendRequest { diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index 94c56e6e49..aee135f672 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -13,7 +13,7 @@ use crate::rt::{Read, Write}; use futures_util::ready; use http::{Request, Response}; -use super::super::dispatch; +use super::super::dispatch::{self, TrySendError}; use crate::body::{Body, Incoming as IncomingBody}; use crate::common::time::Time; use crate::proto; @@ -152,33 +152,38 @@ where } } - /* - pub(super) fn send_request_retryable( + /// Sends a `Request` on the associated connection. + /// + /// Returns a future that if successful, yields the `Response`. + /// + /// # Error + /// + /// If there was an error before trying to serialize the request to the + /// connection, the message will be returned as part of this error. + pub fn try_send_request( &mut self, req: Request, - ) -> impl Future, (crate::Error, Option>)>> + Unpin - where - B: Send, - { - match self.dispatch.try_send(req) { - Ok(rx) => { - Either::Left(rx.then(move |res| { - match res { - Ok(Ok(res)) => future::ok(res), - Ok(Err(err)) => future::err(err), - // this is definite bug if it happens, but it shouldn't happen! - Err(_) => panic!("dispatch dropped without returning error"), - } - })) - } - Err(req) => { - debug!("connection was not ready"); - let err = crate::Error::new_canceled().with("connection was not ready"); - Either::Right(future::err((err, Some(req)))) + ) -> impl Future, TrySendError>>> { + let sent = self.dispatch.try_send(req); + async move { + match sent { + Ok(rx) => match rx.await { + Ok(Ok(res)) => Ok(res), + Ok(Err(err)) => Err(err), + // this is definite bug if it happens, but it shouldn't happen! + Err(_) => panic!("dispatch dropped without returning error"), + }, + Err(req) => { + debug!("connection was not ready"); + let error = crate::Error::new_canceled().with("connection was not ready"); + Err(TrySendError { + error, + message: Some(req), + }) + } } } } - */ } impl fmt::Debug for SendRequest { diff --git a/src/client/conn/mod.rs b/src/client/conn/mod.rs index 316aac9d48..f982ae6ddb 100644 --- a/src/client/conn/mod.rs +++ b/src/client/conn/mod.rs @@ -18,3 +18,5 @@ pub mod http1; #[cfg(feature = "http2")] pub mod http2; + +pub use super::dispatch::TrySendError; diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 7a291c716f..b52da60e72 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -13,10 +13,21 @@ use tokio::sync::{mpsc, oneshot}; #[cfg(feature = "http2")] use crate::{body::Incoming, proto::h2::client::ResponseFutMap}; -#[cfg(test)] -pub(crate) type RetryPromise = oneshot::Receiver)>>; +pub(crate) type RetryPromise = oneshot::Receiver>>; pub(crate) type Promise = oneshot::Receiver>; +/// An error when calling `try_send_request`. +/// +/// There is a possibility of an error occuring on a connection in-between the +/// time that a request is queued and when it is actually written to the IO +/// transport. If that happens, it is safe to return the request back to the +/// caller, as it was never fully sent. +#[derive(Debug)] +pub struct TrySendError { + pub(crate) error: crate::Error, + pub(crate) message: Option, +} + pub(crate) fn channel() -> (Sender, Receiver) { let (tx, rx) = mpsc::unbounded_channel(); let (giver, taker) = want::new(); @@ -92,7 +103,7 @@ impl Sender { } } - #[cfg(test)] + #[cfg(feature = "http1")] pub(crate) fn try_send(&mut self, val: T) -> Result, T> { if !self.can_send() { return Err(val); @@ -135,7 +146,6 @@ impl UnboundedSender { self.giver.is_canceled() } - #[cfg(test)] pub(crate) fn try_send(&mut self, val: T) -> Result, T> { let (tx, rx) = oneshot::channel(); self.inner @@ -210,17 +220,17 @@ struct Envelope(Option<(T, Callback)>); impl Drop for Envelope { fn drop(&mut self) { if let Some((val, cb)) = self.0.take() { - cb.send(Err(( - crate::Error::new_canceled().with("connection closed"), - Some(val), - ))); + cb.send(Err(TrySendError { + error: crate::Error::new_canceled().with("connection closed"), + message: Some(val), + })); } } } pub(crate) enum Callback { #[allow(unused)] - Retry(Option)>>>), + Retry(Option>>>), NoRetry(Option>>), } @@ -229,7 +239,10 @@ impl Drop for Callback { match self { Callback::Retry(tx) => { if let Some(tx) = tx.take() { - let _ = tx.send(Err((dispatch_gone(), None))); + let _ = tx.send(Err(TrySendError { + error: dispatch_gone(), + message: None, + })); } } Callback::NoRetry(tx) => { @@ -269,18 +282,34 @@ impl Callback { } } - pub(crate) fn send(mut self, val: Result)>) { + pub(crate) fn send(mut self, val: Result>) { match self { Callback::Retry(ref mut tx) => { let _ = tx.take().unwrap().send(val); } Callback::NoRetry(ref mut tx) => { - let _ = tx.take().unwrap().send(val.map_err(|e| e.0)); + let _ = tx.take().unwrap().send(val.map_err(|e| e.error)); } } } } +impl TrySendError { + /// Take the message from this error. + /// + /// The message will not always have been recovered. If an error occurs + /// after the message has been serialized onto the connection, it will not + /// be available here. + pub fn take_message(&mut self) -> Option { + self.message.take() + } + + /// Consumes this to return the inner error. + pub fn into_error(self) -> crate::Error { + self.error + } +} + #[cfg(feature = "http2")] pin_project! { pub struct SendWhen @@ -325,8 +354,8 @@ where trace!("send_when canceled"); Poll::Ready(()) } - Poll::Ready(Err(err)) => { - call_back.send(Err(err)); + Poll::Ready(Err((error, message))) => { + call_back.send(Err(TrySendError { error, message })); Poll::Ready(()) } } @@ -389,8 +418,8 @@ mod tests { let err = fulfilled .expect("fulfilled") .expect_err("promise should error"); - match (err.0.kind(), err.1) { - (&crate::error::Kind::Canceled, Some(_)) => (), + match (err.error.is_canceled(), err.message) { + (true, Some(_)) => (), e => panic!("expected Error::Cancel(_), found {:?}", e), } } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index dabcff6be3..79ea48be9f 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -13,6 +13,8 @@ use http::Request; use super::{Http1Transaction, Wants}; use crate::body::{Body, DecodedLength, Incoming as IncomingBody}; +#[cfg(feature = "client")] +use crate::client::dispatch::TrySendError; use crate::common::task; use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead}; use crate::upgrade::OnUpgrade; @@ -655,7 +657,10 @@ cfg_client! { } Err(err) => { if let Some(cb) = self.callback.take() { - cb.send(Err((err, None))); + cb.send(Err(TrySendError { + error: err, + message: None, + })); Ok(()) } else if !self.rx_closed { self.rx.close(); @@ -663,7 +668,10 @@ cfg_client! { trace!("canceling queued request with connection error: {}", err); // in this case, the message was never even started, so it's safe to tell // the user that the request was completely canceled - cb.send(Err((crate::Error::new_canceled().with(err), Some(req)))); + cb.send(Err(TrySendError { + error: crate::Error::new_canceled().with(err), + message: Some(req), + })); Ok(()) } else { Err(err) @@ -729,9 +737,9 @@ mod tests { let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx)) .expect_err("callback should send error"); - match (err.0.kind(), err.1) { - (&crate::error::Kind::Canceled, Some(_)) => (), - other => panic!("expected Canceled, got {:?}", other), + match (err.error.is_canceled(), err.message.as_ref()) { + (true, Some(_)) => (), + _ => panic!("expected Canceled, got {:?}", err), } }); } diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 53f36a7f85..679b9dfada 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -22,7 +22,7 @@ use pin_project_lite::pin_project; use super::ping::{Ponger, Recorder}; use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; use crate::body::{Body, Incoming as IncomingBody}; -use crate::client::dispatch::{Callback, SendWhen}; +use crate::client::dispatch::{Callback, SendWhen, TrySendError}; use crate::common::io::Compat; use crate::common::time::Time; use crate::ext::Protocol; @@ -662,10 +662,10 @@ where .map_or(false, |len| len != 0) { warn!("h2 connect request with non-zero body not supported"); - cb.send(Err(( - crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), - None, - ))); + cb.send(Err(TrySendError { + error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()), + message: None, + })); continue; } @@ -677,7 +677,10 @@ where Ok(ok) => ok, Err(err) => { debug!("client send request error: {}", err); - cb.send(Err((crate::Error::new_h2(err), None))); + cb.send(Err(TrySendError { + error: crate::Error::new_h2(err), + message: None, + })); continue; } }; @@ -702,7 +705,10 @@ where } Poll::Ready(Ok(())) => (), Poll::Ready(Err(err)) => { - f.cb.send(Err((crate::Error::new_h2(err), None))); + f.cb.send(Err(TrySendError { + error: crate::Error::new_h2(err), + message: None, + })); continue; } } diff --git a/tests/client.rs b/tests/client.rs index 89b90796e0..6b6a6df2fe 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -2041,6 +2041,91 @@ mod conn { assert_eq!(vec, b"bar=foo"); } + #[tokio::test] + async fn test_try_send_request() { + use std::future::Future; + let (listener, addr) = setup_tk_test_server().await; + let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>(); + + tokio::spawn(async move { + let mut sock = listener.accept().await.unwrap().0; + let mut buf = [0u8; 8192]; + sock.read(&mut buf).await.expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n") + .await + .expect("write 1"); + let _ = done_rx.await; + }); + + // make polling fair by putting both in spawns + tokio::spawn(async move { + let io = tcp_connect(&addr).await.expect("tcp connect"); + let (mut client, mut conn) = conn::http1::Builder::new() + .handshake::<_, Empty>(io) + .await + .expect("http handshake"); + + // get the conn ready + assert!( + future::poll_fn(|cx| Poll::Ready(Pin::new(&mut conn).poll(cx))) + .await + .is_pending() + ); + assert!(client.is_ready()); + + // use the connection once + let mut fut1 = std::pin::pin!(client.send_request(http::Request::new(Empty::new()))); + let _res1 = future::poll_fn(|cx| loop { + if let Poll::Ready(res) = fut1.as_mut().poll(cx) { + return Poll::Ready(res); + } + return match Pin::new(&mut conn).poll(cx) { + Poll::Ready(_) => panic!("ruh roh"), + Poll::Pending => Poll::Pending, + }; + }) + .await + .expect("resp 1"); + + assert!(client.is_ready()); + + // simulate the server dropping the conn + let _ = done_tx.send(()); + // let the server task die + tokio::task::yield_now().await; + + let mut fut2 = + std::pin::pin!(client.try_send_request(http::Request::new(Empty::new()))); + let poll1 = future::poll_fn(|cx| Poll::Ready(fut2.as_mut().poll(cx))).await; + assert!(poll1.is_pending(), "not already known to error"); + + let mut conn_opt = Some(conn); + // wasn't a known error, req is in queue, and now the next poll, the + // conn will be noticed as errored + let mut err = future::poll_fn(|cx| { + loop { + if let Poll::Ready(res) = fut2.as_mut().poll(cx) { + return Poll::Ready(res); + } + if let Some(ref mut conn) = conn_opt { + match Pin::new(conn).poll(cx) { + Poll::Ready(_) => { + conn_opt = None; + } // ok + Poll::Pending => return Poll::Pending, + }; + } + } + }) + .await + .expect_err("resp 2"); + + assert!(err.take_message().is_some(), "request was returned"); + }) + .await + .unwrap(); + } + #[tokio::test] async fn http2_detect_conn_eof() { use futures_util::future;