From ab8d2af8c2d1fb0b2689cc899552b651090005cb Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 16 Oct 2018 13:19:48 -0700 Subject: [PATCH] feat(client): adds `HttpInfo` to responses when `HttpConnector` is used - Adds `client::connect::Connected::extra()`, which allows connectors to specify arbitrary custom information about a connected transport. If a connector provides this extra value, it will be set in the `Response` extensions. Closes #1402 --- examples/hello.rs | 1 - src/client/connect/http.rs | 51 ++++++++++++++++++++++++++- src/client/connect/mod.rs | 71 ++++++++++++++++++++++++++++++++++++-- src/client/mod.rs | 23 ++++++++---- tests/client.rs | 12 ++++++- 5 files changed, 145 insertions(+), 13 deletions(-) diff --git a/examples/hello.rs b/examples/hello.rs index 7740e8ee65..3a1d865a35 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -10,7 +10,6 @@ static PHRASE: &'static [u8] = b"Hello World!"; fn main() { pretty_env_logger::init(); - let addr = ([127, 0, 0, 1], 3000).into(); // new_service is run for each connection, creating a 'service' diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index 2e69bcdcfd..31e865fcf7 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -24,6 +24,11 @@ use self::sealed::HttpConnectorBlockingTask; /// A connector for the `http` scheme. /// /// Performs DNS resolution in a thread pool, and then connects over TCP. +/// +/// # Note +/// +/// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes +/// transport information such as the remote socket address used. #[derive(Clone)] pub struct HttpConnector { executor: HttpConnectExecutor, @@ -36,6 +41,37 @@ pub struct HttpConnector { reuse_address: bool, } +/// Extra information about the transport when an HttpConnector is used. +/// +/// # Example +/// +/// ```rust +/// use hyper::client::{Client, connect::HttpInfo}; +/// use hyper::rt::Future; +/// +/// let client = Client::new(); +/// +/// let fut = client.get("http://example.local".parse().unwrap()) +/// .inspect(|resp| { +/// resp +/// .extensions() +/// .get::() +/// .map(|info| { +/// println!("remote addr = {}", info.remote_addr()); +/// }); +/// }); +/// ``` +/// +/// # Note +/// +/// If a different connector is used besides [`HttpConnector`](HttpConnector), +/// this value will not exist in the extensions. Consult that specific +/// connector to see what "extra" information it might provide to responses. +#[derive(Clone, Debug)] +pub struct HttpInfo { + remote_addr: SocketAddr, +} + impl HttpConnector { /// Construct a new HttpConnector. /// @@ -187,6 +223,13 @@ impl Connect for HttpConnector { } } +impl HttpInfo { + /// Get the remote address of the transport used. + pub fn remote_addr(&self) -> SocketAddr { + self.remote_addr + } +} + #[inline] fn invalid_url(err: InvalidUrl, handle: &Option) -> HttpConnecting { HttpConnecting { @@ -277,7 +320,13 @@ impl Future for HttpConnecting { sock.set_nodelay(self.nodelay)?; - return Ok(Async::Ready((sock, Connected::new()))); + let extra = HttpInfo { + remote_addr: sock.peer_addr()?, + }; + let connected = Connected::new() + .extra(extra); + + return Ok(Async::Ready((sock, connected))); }, State::Error(ref mut e) => return Err(e.take().expect("polled more than once")), } diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index 4fe0692254..a67d6d1079 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -6,16 +6,16 @@ //! establishes connections over TCP. //! - The [`Connect`](Connect) trait and related types to build custom connectors. use std::error::Error as StdError; -use std::mem; +use std::{fmt, mem}; use bytes::{BufMut, Bytes, BytesMut}; use futures::Future; -use http::{uri, Uri}; +use http::{uri, Response, Uri}; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] mod dns; #[cfg(feature = "runtime")] mod http; -#[cfg(feature = "runtime")] pub use self::http::HttpConnector; +#[cfg(feature = "runtime")] pub use self::http::{HttpConnector, HttpInfo}; /// Connect to a destination, returning an IO transport. /// @@ -48,8 +48,11 @@ pub struct Destination { pub struct Connected { //alpn: Alpn, pub(super) is_proxied: bool, + pub(super) extra: Option, } +pub(super) struct Extra(Box); + /*TODO: when HTTP1 Upgrades to H2 are added, this will be needed #[derive(Debug)] pub(super) enum Alpn { @@ -245,6 +248,7 @@ impl Connected { Connected { //alpn: Alpn::Http1, is_proxied: false, + extra: None, } } @@ -260,6 +264,12 @@ impl Connected { self } + /// Set extra connection information to be set in the extensions of every `Response`. + pub fn extra(mut self, extra: T) -> Connected { + self.extra = Some(Extra(Box::new(ExtraEnvelope(extra)))); + self + } + /* /// Set that the connected transport negotiated HTTP/2 as it's /// next protocol. @@ -268,6 +278,61 @@ impl Connected { self } */ + + // Don't public expose that `Connected` is `Clone`, unsure if we want to + // keep that contract... + pub(super) fn clone(&self) -> Connected { + Connected { + is_proxied: self.is_proxied, + extra: self.extra.clone(), + } + } +} + +// ===== impl Extra ===== + +impl Extra { + pub(super) fn set(&self, res: &mut Response<::Body>) { + self.0.set(res); + } +} + +impl Clone for Extra { + fn clone(&self) -> Extra { + Extra(self.0.clone_box()) + } +} + +impl fmt::Debug for Extra { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Extra") + .finish() + } +} + +// This indirection allows the `Connected` to have a type-erased "extra" value, +// while that type still knows its inner extra type. This allows the correct +// TypeId to be used when inserting into `res.extensions_mut()`. +#[derive(Clone)] +struct ExtraEnvelope(T); + +trait ExtraInner: Send + Sync { + fn clone_box(&self) -> Box; + fn set(&self, res: &mut Response<::Body>); +} + +impl ExtraInner for ExtraEnvelope +where + T: Clone + Send + Sync + 'static +{ + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } + + fn set(&self, res: &mut Response<::Body>) { + let extra = self.0.clone(); + res.extensions_mut().insert(extra); + } } #[cfg(test)] diff --git a/src/client/mod.rs b/src/client/mod.rs index 8949a11643..2660089362 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -91,7 +91,7 @@ use http::uri::Scheme; use body::{Body, Payload}; use common::{Exec, lazy as hyper_lazy, Lazy}; -use self::connect::{Connect, Destination}; +use self::connect::{Connect, Connected, Destination}; use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation}; #[cfg(feature = "runtime")] pub use self::connect::HttpConnector; @@ -290,7 +290,7 @@ where C: Connect + Sync + 'static, // CONNECT always sends origin-form, so check it first... if req.method() == &Method::CONNECT { authority_form(req.uri_mut()); - } else if pooled.is_proxied { + } else if pooled.conn_info.is_proxied { absolute_form(req.uri_mut()); } else { origin_form(req.uri_mut()); @@ -305,6 +305,15 @@ where C: Connect + Sync + 'static, let fut = pooled.send_request_retryable(req) .map_err(ClientError::map_with_reused(pooled.is_reused())); + // If the Connector included 'extra' info, add to Response... + let extra_info = pooled.conn_info.extra.clone(); + let fut = fut.map(move |mut res| { + if let Some(extra) = extra_info { + extra.set(&mut res); + } + res + }); + // As of futures@0.1.21, there is a race condition in the mpsc // channel, such that sending when the receiver is closing can // result in the message being stuck inside the queue. It won't @@ -499,7 +508,7 @@ where C: Connect + Sync + 'static, }) .map(move |tx| { pool.pooled(connecting, PoolClient { - is_proxied: connected.is_proxied, + conn_info: connected, tx: match ver { Ver::Http1 => PoolTx::Http1(tx), Ver::Http2 => PoolTx::Http2(tx.into_http2()), @@ -565,7 +574,7 @@ impl Future for ResponseFuture { // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] struct PoolClient { - is_proxied: bool, + conn_info: Connected, tx: PoolTx, } @@ -624,17 +633,17 @@ where match self.tx { PoolTx::Http1(tx) => { Reservation::Unique(PoolClient { - is_proxied: self.is_proxied, + conn_info: self.conn_info, tx: PoolTx::Http1(tx), }) }, PoolTx::Http2(tx) => { let b = PoolClient { - is_proxied: self.is_proxied, + conn_info: self.conn_info.clone(), tx: PoolTx::Http2(tx.clone()), }; let a = PoolClient { - is_proxied: self.is_proxied, + conn_info: self.conn_info, tx: PoolTx::Http2(tx), }; Reservation::Shared(a, b) diff --git a/tests/client.rs b/tests/client.rs index 6d2efb6755..fa48d89919 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -274,7 +274,17 @@ macro_rules! test { let rx = rx.expect("thread panicked"); - rt.block_on(res.join(rx).map(|r| r.0)) + rt.block_on(res.join(rx).map(|r| r.0)).map(move |mut resp| { + // Always check that HttpConnector has set the "extra" info... + let extra = resp + .extensions_mut() + .remove::<::hyper::client::connect::HttpInfo>() + .expect("HttpConnector should set HttpInfo"); + + assert_eq!(extra.remote_addr(), addr, "HttpInfo should have server addr"); + + resp + }) }); }