Skip to content

feat(lib): add support to disable tokio-proto internals #1362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ matrix:
env: FEATURES="--features nightly"
- rust: beta
- rust: stable
- rust: stable
env: HYPER_NO_PROTO=1
- rust: stable
env: FEATURES="--features compat"
- rust: 1.17.0
Expand Down
6 changes: 4 additions & 2 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![deny(warnings)]
//#![deny(warnings)]
extern crate futures;
extern crate hyper;
extern crate tokio_core;
Expand Down Expand Up @@ -32,7 +32,9 @@ fn main() {

let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle);
let client = Client::configure()
.no_proto()
.build(&handle);

let work = client.get(url).and_then(|res| {
println!("Response: {}", res.status());
Expand Down
3 changes: 2 additions & 1 deletion examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ impl Service for Hello {
fn main() {
pretty_env_logger::init().unwrap();
let addr = "127.0.0.1:3000".parse().unwrap();
let server = Http::new().bind(&addr, || Ok(Hello)).unwrap();
let mut server = Http::new().bind(&addr, || Ok(Hello)).unwrap();
server.no_proto();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
}
3 changes: 2 additions & 1 deletion examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ fn main() {
pretty_env_logger::init().unwrap();
let addr = "127.0.0.1:1337".parse().unwrap();

let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
let mut server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
server.no_proto();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
}
179 changes: 126 additions & 53 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio_proto::util::client_proxy::ClientProxy;
pub use tokio_service::Service;

use header::{Headers, Host};
use proto::{self, TokioBody};
use proto::{self, RequestHead, TokioBody};
use proto::response;
use proto::request;
use method::Method;
Expand All @@ -45,7 +45,7 @@ pub mod compat;
pub struct Client<C, B = proto::Body> {
connector: C,
handle: Handle,
pool: Pool<TokioClient<B>>,
pool: Dispatch<B>,
}

impl Client<HttpConnector, proto::Body> {
Expand Down Expand Up @@ -93,7 +93,11 @@ impl<C, B> Client<C, B> {
Client {
connector: config.connector,
handle: handle.clone(),
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
pool: if config.no_proto {
Dispatch::Hyper(Pool::new(config.keep_alive, config.keep_alive_timeout))
} else {
Dispatch::Proto(Pool::new(config.keep_alive, config.keep_alive_timeout))
}
}
}
}
Expand Down Expand Up @@ -187,48 +191,100 @@ where C: Connect,
headers.extend(head.headers.iter());
head.headers = headers;

let checkout = self.pool.checkout(domain.as_ref());
let connect = {
let handle = self.handle.clone();
let pool = self.pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = oneshot::channel();
let client = HttpClient {
client_rx: RefCell::new(Some(rx)),
}.bind_client(&handle, io);
let pooled = pool.pooled(pool_key, client);
drop(tx.send(pooled.clone()));
pooled
})
};
match self.pool {
Dispatch::Proto(ref pool) => {
trace!("proto_dispatch");
let checkout = pool.checkout(domain.as_ref());
let connect = {
let handle = self.handle.clone();
let pool = pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = oneshot::channel();
let client = HttpClient {
client_rx: RefCell::new(Some(rx)),
}.bind_client(&handle, io);
let pooled = pool.pooled(pool_key, client);
drop(tx.send(pooled.clone()));
pooled
})
};

let race = checkout.select(connect)
.map(|(client, _work)| client)
.map_err(|(e, _work)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
e.into()
});
let resp = race.and_then(move |client| {
let msg = match body {
Some(body) => {
Message::WithBody(head, body.into())
},
None => Message::WithoutBody(head),
};
client.call(msg)
});
FutureResponse(Box::new(resp.map(|msg| {
match msg {
Message::WithoutBody(head) => response::from_wire(head, None),
Message::WithBody(head, body) => response::from_wire(head, Some(body.into())),
}
})))
},
Dispatch::Hyper(ref pool) => {
trace!("no_proto dispatch");
use futures::Sink;
use futures::sync::{mpsc, oneshot};

let checkout = pool.checkout(domain.as_ref());
let connect = {
let handle = self.handle.clone();
let pool = pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = mpsc::channel(1);
let pooled = pool.pooled(pool_key, RefCell::new(tx));
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err)));
pooled
})
};

let race = checkout.select(connect)
.map(|(client, _work)| client)
.map_err(|(e, _work)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
e.into()
});

let resp = race.and_then(move |client| {
let (callback, rx) = oneshot::channel();
client.borrow_mut().start_send((head, body, callback)).unwrap();
rx.then(|res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Err(_) => panic!("dispatch dropped without returning error"),
}
})
});

FutureResponse(Box::new(resp))

let race = checkout.select(connect)
.map(|(client, _work)| client)
.map_err(|(e, _work)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
e.into()
});
let resp = race.and_then(move |client| {
let msg = match body {
Some(body) => {
Message::WithBody(head, body.into())
},
None => Message::WithoutBody(head),
};
client.call(msg)
});
FutureResponse(Box::new(resp.map(|msg| {
match msg {
Message::WithoutBody(head) => response::from_wire(head, None),
Message::WithBody(head, body) => response::from_wire(head, Some(body.into())),
}
})))
}
}

}
Expand All @@ -238,7 +294,10 @@ impl<C: Clone, B> Clone for Client<C, B> {
Client {
connector: self.connector.clone(),
handle: self.handle.clone(),
pool: self.pool.clone(),
pool: match self.pool {
Dispatch::Proto(ref pool) => Dispatch::Proto(pool.clone()),
Dispatch::Hyper(ref pool) => Dispatch::Hyper(pool.clone()),
}
}
}
}
Expand All @@ -249,10 +308,16 @@ impl<C, B> fmt::Debug for Client<C, B> {
}
}

type TokioClient<B> = ClientProxy<Message<proto::RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>;
type ProtoClient<B> = ClientProxy<Message<RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>;
type HyperClient<B> = RefCell<::futures::sync::mpsc::Sender<(RequestHead, Option<B>, ::futures::sync::oneshot::Sender<::Result<::Response>>)>>;

enum Dispatch<B> {
Proto(Pool<ProtoClient<B>>),
Hyper(Pool<HyperClient<B>>),
}

struct HttpClient<B> {
client_rx: RefCell<Option<oneshot::Receiver<Pooled<TokioClient<B>>>>>,
client_rx: RefCell<Option<oneshot::Receiver<Pooled<ProtoClient<B>>>>>,
}

impl<T, B> ClientProto<T> for HttpClient<B>
Expand All @@ -265,7 +330,7 @@ where T: AsyncRead + AsyncWrite + 'static,
type Response = proto::ResponseHead;
type ResponseBody = proto::Chunk;
type Error = ::Error;
type Transport = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<TokioClient<B>>>;
type Transport = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<ProtoClient<B>>>;
type BindTransport = BindingClient<T, B>;

fn bind_transport(&self, io: T) -> Self::BindTransport {
Expand All @@ -277,7 +342,7 @@ where T: AsyncRead + AsyncWrite + 'static,
}

struct BindingClient<T, B> {
rx: oneshot::Receiver<Pooled<TokioClient<B>>>,
rx: oneshot::Receiver<Pooled<ProtoClient<B>>>,
io: Option<T>,
}

Expand All @@ -286,7 +351,7 @@ where T: AsyncRead + AsyncWrite + 'static,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type Item = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<TokioClient<B>>>;
type Item = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<ProtoClient<B>>>;
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Expand All @@ -309,6 +374,7 @@ pub struct Config<C, B> {
keep_alive_timeout: Option<Duration>,
//TODO: make use of max_idle config
max_idle: usize,
no_proto: bool,
}

/// Phantom type used to signal that `Config` should create a `HttpConnector`.
Expand All @@ -324,6 +390,7 @@ impl Default for Config<UseDefaultConnector, proto::Body> {
keep_alive: true,
keep_alive_timeout: Some(Duration::from_secs(90)),
max_idle: 5,
no_proto: false,
}
}
}
Expand All @@ -347,6 +414,7 @@ impl<C, B> Config<C, B> {
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
no_proto: self.no_proto,
}
}

Expand All @@ -360,6 +428,7 @@ impl<C, B> Config<C, B> {
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
no_proto: self.no_proto,
}
}

Expand Down Expand Up @@ -393,6 +462,13 @@ impl<C, B> Config<C, B> {
self
}
*/

/// Disable tokio-proto internal usage.
#[inline]
pub fn no_proto(mut self) -> Config<C, B> {
self.no_proto = true;
self
}
}

impl<C, B> Config<C, B>
Expand Down Expand Up @@ -431,11 +507,8 @@ impl<C, B> fmt::Debug for Config<C, B> {
impl<C: Clone, B> Clone for Config<C, B> {
fn clone(&self) -> Config<C, B> {
Config {
_body_type: PhantomData::<B>,
connector: self.connector.clone(),
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
.. *self
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/proto/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::borrow::Cow;
use super::Chunk;

pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>;
pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;

/// A `Stream` for `Chunk`s used in requests and responses.
#[must_use = "streams do nothing unless polled"]
Expand Down
Loading