Skip to content

Commit

Permalink
feat(http): allow specifying custom body streams
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Feb 16, 2017
1 parent 2266d86 commit 1b1311a
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 269 deletions.
2 changes: 1 addition & 1 deletion benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
req.headers_mut().set(ContentLength(post.len() as u64));
req.set_body(post);

let work = client.get(url.clone()).and_then(|res| {
let work = client.request(req).and_then(|res| {
res.body().for_each(|_chunk| {
Ok(())
})
Expand Down
171 changes: 125 additions & 46 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
use std::cell::RefCell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::rc::Rc;
use std::time::Duration;

use futures::{Poll, Async, Future};
use futures::{Poll, Async, Future, Stream};
use relay;
use tokio::io::Io;
use tokio::reactor::Handle;
Expand Down Expand Up @@ -37,14 +38,21 @@ mod response;

/// A Client to make outgoing HTTP requests.
// If the Connector is clone, then the Client can be clone easily.
#[derive(Clone)]
pub struct Client<C> {
pub struct Client<C, B = http::Body> {
connector: C,
handle: Handle,
pool: Pool<TokioClient>,
pool: Pool<TokioClient<B>>,
}

impl Client<HttpConnector> {
impl Client<HttpConnector, http::Body> {
/// Create a new Client with the default config.
#[inline]
pub fn new(handle: &Handle) -> Client<HttpConnector, http::Body> {
Config::default().build(handle)
}
}

impl Client<HttpConnector, http::Body> {
/// Configure a Client.
///
/// # Example
Expand All @@ -63,30 +71,28 @@ impl Client<HttpConnector> {
/// # }
/// ```
#[inline]
pub fn configure() -> Config<UseDefaultConnector> {
pub fn configure() -> Config<UseDefaultConnector, http::Body> {
Config::default()
}
}

impl Client<HttpConnector> {
/// Create a new Client with the default config.
#[inline]
pub fn new(handle: &Handle) -> Client<HttpConnector> {
Client::configure().build(handle)
}
}

impl<C: Connect> Client<C> {
impl<C, B> Client<C, B> {
/// Create a new client with a specific connector.
#[inline]
fn configured(config: Config<C>, handle: &Handle) -> Client<C> {
fn configured(config: Config<C, B>, handle: &Handle) -> Client<C, B> {
Client {
connector: config.connector,
handle: handle.clone(),
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
}
}
}

impl<C, B> Client<C, B>
where C: Connect,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
/// Send a GET Request using this Client.
#[inline]
pub fn get(&self, url: Url) -> FutureResponse {
Expand All @@ -95,7 +101,7 @@ impl<C: Connect> Client<C> {

/// Send a constructed Request using this Client.
#[inline]
pub fn request(&self, req: Request) -> FutureResponse {
pub fn request(&self, req: Request<B>) -> FutureResponse {
self.call(req)
}
}
Expand All @@ -118,13 +124,17 @@ impl Future for FutureResponse {
}
}

impl<C: Connect> Service for Client<C> {
type Request = Request;
impl<C, B> Service for Client<C, B>
where C: Connect,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Request = Request<B>;
type Response = Response;
type Error = ::Error;
type Future = FutureResponse;

fn call(&self, req: Request) -> Self::Future {
fn call(&self, req: Self::Request) -> Self::Future {
let url = req.url().clone();
let (mut head, body) = request::split(req);
let mut headers = Headers::new();
Expand Down Expand Up @@ -178,26 +188,40 @@ impl<C: Connect> Service for Client<C> {

}

impl<C> fmt::Debug for Client<C> {
impl<C: Clone, B> Clone for Client<C, B> {
fn clone(&self) -> Client<C, B> {
Client {
connector: self.connector.clone(),
handle: self.handle.clone(),
pool: self.pool.clone(),
}
}
}

impl<C, B> fmt::Debug for Client<C, B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Client")
}
}

type TokioClient = ClientProxy<Message<http::RequestHead, TokioBody>, Message<http::ResponseHead, TokioBody>, ::Error>;
type TokioClient<B> = ClientProxy<Message<http::RequestHead, B>, Message<http::ResponseHead, TokioBody>, ::Error>;

struct HttpClient {
client_rx: RefCell<Option<relay::Receiver<Pooled<TokioClient>>>>,
struct HttpClient<B> {
client_rx: RefCell<Option<relay::Receiver<Pooled<TokioClient<B>>>>>,
}

impl<T: Io + 'static> ClientProto<T> for HttpClient {
impl<T, B> ClientProto<T> for HttpClient<B>
where T: Io + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Request = http::RequestHead;
type RequestBody = http::Chunk;
type RequestBody = B::Item;
type Response = http::ResponseHead;
type ResponseBody = http::Chunk;
type Error = ::Error;
type Transport = http::Conn<T, http::ClientTransaction, Pooled<TokioClient>>;
type BindTransport = BindingClient<T>;
type Transport = http::Conn<T, B::Item, http::ClientTransaction, Pooled<TokioClient<B>>>;
type BindTransport = BindingClient<T, B>;

fn bind_transport(&self, io: T) -> Self::BindTransport {
BindingClient {
Expand All @@ -207,13 +231,17 @@ impl<T: Io + 'static> ClientProto<T> for HttpClient {
}
}

struct BindingClient<T> {
rx: relay::Receiver<Pooled<TokioClient>>,
struct BindingClient<T, B> {
rx: relay::Receiver<Pooled<TokioClient<B>>>,
io: Option<T>,
}

impl<T: Io + 'static> Future for BindingClient<T> {
type Item = http::Conn<T, http::ClientTransaction, Pooled<TokioClient>>;
impl<T, B> Future for BindingClient<T, B>
where T: Io + 'static,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type Item = http::Conn<T, B::Item, http::ClientTransaction, Pooled<TokioClient<B>>>;
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Expand All @@ -228,8 +256,8 @@ impl<T: Io + 'static> Future for BindingClient<T> {
}

/// Configuration for a Client
#[derive(Debug, Clone)]
pub struct Config<C> {
pub struct Config<C, B> {
_body_type: PhantomData<B>,
//connect_timeout: Duration,
connector: C,
keep_alive: bool,
Expand All @@ -242,9 +270,10 @@ pub struct Config<C> {
#[derive(Debug, Clone, Copy)]
pub struct UseDefaultConnector(());

impl Config<UseDefaultConnector> {
fn default() -> Config<UseDefaultConnector> {
impl Default for Config<UseDefaultConnector, http::Body> {
fn default() -> Config<UseDefaultConnector, http::Body> {
Config {
_body_type: PhantomData::<http::Body>,
//connect_timeout: Duration::from_secs(10),
connector: UseDefaultConnector(()),
keep_alive: true,
Expand All @@ -254,11 +283,33 @@ impl Config<UseDefaultConnector> {
}
}

impl<C> Config<C> {
impl<C, B> Config<C, B> {
/// Set the body stream to be used by the `Client`.
///
/// # Example
///
/// ```rust
/// # use hyper::client::Config;
/// let cfg = Config::default()
/// .body::<hyper::Body>();
/// # drop(cfg);
#[inline]
pub fn body<BB>(self) -> Config<C, BB> {
Config {
_body_type: PhantomData::<BB>,
//connect_timeout: self.connect_timeout,
connector: self.connector,
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
}
}

/// Set the `Connect` type to be used.
#[inline]
pub fn connector<CC: Connect>(self, val: CC) -> Config<CC> {
pub fn connector<CC: Connect>(self, val: CC) -> Config<CC, B> {
Config {
_body_type: self._body_type,
//connect_timeout: self.connect_timeout,
connector: val,
keep_alive: self.keep_alive,
Expand All @@ -271,7 +322,7 @@ impl<C> Config<C> {
///
/// Default is enabled.
#[inline]
pub fn keep_alive(mut self, val: bool) -> Config<C> {
pub fn keep_alive(mut self, val: bool) -> Config<C, B> {
self.keep_alive = val;
self
}
Expand All @@ -280,9 +331,9 @@ impl<C> Config<C> {
///
/// Pass `None` to disable timeout.
///
/// Default is 2 minutes.
/// Default is 90 seconds.
#[inline]
pub fn keep_alive_timeout(mut self, val: Option<Duration>) -> Config<C> {
pub fn keep_alive_timeout(mut self, val: Option<Duration>) -> Config<C, B> {
self.keep_alive_timeout = val;
self
}
Expand All @@ -292,29 +343,57 @@ impl<C> Config<C> {
///
/// Default is 10 seconds.
#[inline]
pub fn connect_timeout(mut self, val: Duration) -> Config<C> {
pub fn connect_timeout(mut self, val: Duration) -> Config<C, B> {
self.connect_timeout = val;
self
}
*/
}

impl<C: Connect> Config<C> {
impl<C, B> Config<C, B>
where C: Connect,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
/// Construct the Client with this configuration.
#[inline]
pub fn build(self, handle: &Handle) -> Client<C> {
pub fn build(self, handle: &Handle) -> Client<C, B> {
Client::configured(self, handle)
}
}

impl Config<UseDefaultConnector> {
impl<B> Config<UseDefaultConnector, B>
where B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
/// Construct the Client with this configuration.
#[inline]
pub fn build(self, handle: &Handle) -> Client<HttpConnector> {
pub fn build(self, handle: &Handle) -> Client<HttpConnector, B> {
self.connector(HttpConnector::new(4, handle)).build(handle)
}
}

impl<C, B> fmt::Debug for Config<C, B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Config")
.field("keep_alive", &self.keep_alive)
.field("keep_alive_timeout", &self.keep_alive_timeout)
.field("max_idle", &self.max_idle)
.finish()
}
}

impl<C: Clone, B> Clone for Config<C, B> {
fn clone(&self) -> Config<C, B> {
Config {
_body_type: PhantomData::<B>,
connector: self.connector.clone(),
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
}
}
}

#[cfg(test)]
mod tests {
Expand Down
Loading

0 comments on commit 1b1311a

Please sign in to comment.