Skip to content

Commit

Permalink
Use futures 0.3-compatible hyper and tokio and the tokio runtime instead
Browse files Browse the repository at this point in the history
of futures-rs executor.

Despite this change, using body_bytes_wait on (for example) a File will
still fail due to tokio-rs/tokio#1356.
  • Loading branch information
jebrosen authored and SergioBenitez committed Jul 11, 2020
1 parent 999a5c6 commit 0c91080
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 118 deletions.
2 changes: 1 addition & 1 deletion core/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ private-cookies = ["cookie/private", "cookie/key-expansion"]
[dependencies]
smallvec = "1.0"
percent-encoding = "1"
hyper = { version = "0.12.31", default-features = false, features = ["runtime"] }
hyper = { git = "https://github.com/hyperium/hyper", rev = "a22dabd", default-features = false, features = ["runtime"] }
http = "0.1.17"
mime = "0.3.13"
time = "0.2.11"
Expand Down
4 changes: 2 additions & 2 deletions core/http/src/hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
//! while necessary.

#[doc(hidden)] pub use hyper::{Body, Request, Response, Server};
#[doc(hidden)] pub use hyper::body::Payload as Payload;
#[doc(hidden)] pub use hyper::body::{Payload, Sender as BodySender};
#[doc(hidden)] pub use hyper::error::Error;
#[doc(hidden)] pub use hyper::service::{make_service_fn, MakeService, Service};
#[doc(hidden)] pub use hyper::service::{make_service_fn, service_fn, MakeService, Service};
#[doc(hidden)] pub use hyper::server::conn::{AddrIncoming, AddrStream};

#[doc(hidden)] pub use hyper::Chunk;
Expand Down
5 changes: 3 additions & 2 deletions core/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ private-cookies = ["rocket_http/private-cookies"]
[dependencies]
rocket_codegen = { version = "0.5.0-dev", path = "../codegen" }
rocket_http = { version = "0.5.0-dev", path = "../http" }
futures-preview = { version = "0.3.0-alpha.18", features = ["compat", "io-compat"] }
tokio = "0.1.16"
futures-preview = "0.3.0-alpha.18"
futures-tokio-compat = { git = "https://github.com/Nemo157/futures-tokio-compat", rev = "8a93702" }
tokio = "0.2.0-alpha.2"
yansi = "0.5"
log = { version = "0.4", features = ["std"] }
toml = "0.4.7"
Expand Down
10 changes: 5 additions & 5 deletions core/lib/src/data/data.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::path::Path;

use futures::compat::{Future01CompatExt, Stream01CompatExt, AsyncWrite01CompatExt};
use futures::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite};
use futures::future::Future;
use futures::stream::TryStreamExt;
use futures_tokio_compat::Compat as TokioCompat;

use super::data_stream::DataStream;

Expand Down Expand Up @@ -171,10 +171,10 @@ impl Data {
/// }
/// ```
#[inline(always)]
pub fn stream_to_file<P: AsRef<Path> + Send + 'static>(self, path: P) -> impl Future<Output = io::Result<u64>> {
pub fn stream_to_file<P: AsRef<Path> + Send + Unpin + 'static>(self, path: P) -> impl Future<Output = io::Result<u64>> {
Box::pin(async move {
let file = tokio::fs::File::create(path).compat().await?.compat();
self.stream_to(file).await
let mut file = TokioCompat::new(tokio::fs::File::create(path).await?);
self.stream_to(&mut file).await
})
}

Expand All @@ -186,7 +186,7 @@ impl Data {
pub(crate) async fn new(body: hyper::Body) -> Data {
trace_!("Data::new({:?})", body);

let mut stream = body.compat().map_err(|e| {
let mut stream = body.map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
}).into_async_read();

Expand Down
5 changes: 1 addition & 4 deletions core/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,5 @@ pub fn custom(config: config::Config) -> Rocket {
/// WARNING: This is unstable! Do not use this method outside of Rocket!
#[doc(hidden)]
pub fn async_test(fut: impl std::future::Future<Output = ()> + Send + 'static) {
use futures::future::{FutureExt, TryFutureExt};

let mut runtime = tokio::runtime::Runtime::new().expect("create tokio runtime");
runtime.block_on(fut.boxed().unit_error().compat()).expect("unit_error future returned Err");
tokio::runtime::Runtime::new().expect("create tokio runtime").block_on(fut)
}
8 changes: 4 additions & 4 deletions core/lib/src/local/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,13 @@ impl<'c> LocalRequest<'c> {
request.set_uri(uri.into_owned());
} else {
error!("Malformed request URI: {}", uri);
return futures::executor::block_on(async move {
return tokio::runtime::Runtime::new().expect("create runtime").block_on(async move {
let res = client.rocket().handle_error(Status::BadRequest, request).await;
LocalResponse { _request: owned_request, response: res }
})
}

futures::executor::block_on(async move {
tokio::runtime::Runtime::new().expect("create runtime").block_on(async move {
// Actually dispatch the request.
let response = client.rocket().dispatch(request, Data::local(data)).await;

Expand Down Expand Up @@ -460,11 +460,11 @@ pub struct LocalResponse<'c> {

impl LocalResponse<'_> {
pub fn body_string_wait(&mut self) -> Option<String> {
futures::executor::block_on(self.body_string())
tokio::runtime::Runtime::new().expect("create runtime").block_on(self.body_string())
}

pub fn body_bytes_wait(&mut self) -> Option<Vec<u8>> {
futures::executor::block_on(self.body_bytes())
tokio::runtime::Runtime::new().expect("create runtime").block_on(self.body_bytes())
}
}

Expand Down
7 changes: 4 additions & 3 deletions core/lib/src/response/responder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::fs::File;
use std::io::{Cursor, BufReader};
use std::io::Cursor;

use futures::compat::AsyncRead01CompatExt;
use futures::io::BufReader;
use futures_tokio_compat::Compat as TokioCompat;

use crate::http::{Status, ContentType, StatusClass};
use crate::response::{self, Response, Body};
Expand Down Expand Up @@ -255,7 +256,7 @@ impl Responder<'_> for File {
fn respond_to(self, _: &Request<'_>) -> response::ResultFuture<'static> {
Box::pin(async move {
let metadata = self.metadata();
let stream = BufReader::new(tokio::fs::File::from_std(self)).compat();
let stream = BufReader::new(TokioCompat::new(tokio::fs::File::from_std(self)));
match metadata {
Ok(md) => Response::build().raw_body(Body::Sized(stream, md.len())).ok(),
Err(_) => Response::build().streamed_body(stream).ok()
Expand Down
157 changes: 61 additions & 96 deletions core/lib/src/rocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;

use futures::compat::{Compat, Executor01CompatExt, Sink01CompatExt};
use futures::future::{Future, FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::future::Future;
use futures::stream::StreamExt;
use futures::task::SpawnExt;
use futures_tokio_compat::Compat as TokioCompat;

use yansi::Paint;
use state::Container;
Expand Down Expand Up @@ -47,76 +46,53 @@ pub struct Rocket {
fairings: Fairings,
}

struct RocketHyperService {
// This function tries to hide all of the Hyper-ness from Rocket. It
// essentially converts Hyper types into Rocket types, then calls the
// `dispatch` function, which knows nothing about Hyper. Because responding
// depends on the `HyperResponse` type, this function does the actual
// response processing.
fn hyper_service_fn(
rocket: Arc<Rocket>,
spawn: Box<dyn futures::task::Spawn + Send>,
remote_addr: std::net::SocketAddr,
}

impl std::ops::Deref for RocketHyperService {
type Target = Rocket;

fn deref(&self) -> &Self::Target {
&*self.rocket
}
}

#[doc(hidden)]
impl hyper::Service for RocketHyperService {
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = io::Error;
type Future = Compat<Pin<Box<dyn Future<Output = Result<hyper::Response<Self::ResBody>, Self::Error>> + Send>>>;

// This function tries to hide all of the Hyper-ness from Rocket. It
// essentially converts Hyper types into Rocket types, then calls the
// `dispatch` function, which knows nothing about Hyper. Because responding
// depends on the `HyperResponse` type, this function does the actual
// response processing.
fn call<'h>(
&mut self,
hyp_req: hyper::Request<Self::ReqBody>,
) -> Self::Future {
let rocket = self.rocket.clone();
let h_addr = self.remote_addr;

// This future must return a hyper::Response, but that's not easy
// because the response body might borrow from the request. Instead,
// we do the body writing in another future that will send us
// the response metadata (and a body channel) beforehand.
let (tx, rx) = futures::channel::oneshot::channel();

self.spawn.spawn(async move {
// Get all of the information from Hyper.
let (h_parts, h_body) = hyp_req.into_parts();

// Convert the Hyper request into a Rocket request.
let req_res = Request::from_hyp(&rocket, h_parts.method, h_parts.headers, h_parts.uri, h_addr);
let mut req = match req_res {
Ok(req) => req,
Err(e) => {
error!("Bad incoming request: {}", e);
// TODO: We don't have a request to pass in, so we just
// fabricate one. This is weird. We should let the user know
// that we failed to parse a request (by invoking some special
// handler) instead of doing this.
let dummy = Request::new(&rocket, Method::Get, Origin::dummy());
let r = rocket.handle_error(Status::BadRequest, &dummy).await;
return rocket.issue_response(r, tx).await;
}
};
h_addr: std::net::SocketAddr,
mut spawn: impl futures::task::Spawn,
hyp_req: hyper::Request<hyper::Body>,
) -> impl Future<Output = Result<hyper::Response<hyper::Body>, io::Error>> {
// This future must return a hyper::Response, but that's not easy
// because the response body might borrow from the request. Instead,
// we do the body writing in another future that will send us
// the response metadata (and a body channel) beforehand.
let (tx, rx) = futures::channel::oneshot::channel();

spawn.spawn(async move {
// Get all of the information from Hyper.
let (h_parts, h_body) = hyp_req.into_parts();

// Convert the Hyper request into a Rocket request.
let req_res = Request::from_hyp(&rocket, h_parts.method, h_parts.headers, h_parts.uri, h_addr);
let mut req = match req_res {
Ok(req) => req,
Err(e) => {
error!("Bad incoming request: {}", e);
// TODO: We don't have a request to pass in, so we just
// fabricate one. This is weird. We should let the user know
// that we failed to parse a request (by invoking some special
// handler) instead of doing this.
let dummy = Request::new(&rocket, Method::Get, Origin::dummy());
let r = rocket.handle_error(Status::BadRequest, &dummy).await;
return rocket.issue_response(r, tx).await;
}
};

// Retrieve the data from the hyper body.
let data = Data::from_hyp(h_body).await;
// Retrieve the data from the hyper body.
let data = Data::from_hyp(h_body).await;

// Dispatch the request to get a response, then write that response out.
let r = rocket.dispatch(&mut req, data).await;
rocket.issue_response(r, tx).await;
}).expect("failed to spawn handler");
// Dispatch the request to get a response, then write that response out.
let r = rocket.dispatch(&mut req, data).await;
rocket.issue_response(r, tx).await;
}).expect("failed to spawn handler");

async move {
Ok(rx.await.expect("TODO.async: sender was dropped, error instead"))
}.boxed().compat()
async move {
Ok(rx.await.expect("TODO.async: sender was dropped, error instead"))
}
}

Expand Down Expand Up @@ -170,40 +146,26 @@ impl Rocket {
}
Some(Body::Sized(body, size)) => {
hyp_res.header(header::CONTENT_LENGTH, size.to_string());
let (sender, hyp_body) = hyper::Body::channel();
let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;

let mut stream = body.into_chunk_stream(4096);
let mut sink = sender.sink_compat().sink_map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
});

while let Some(next) = stream.next().await {
sink.send(next?).await?;
futures::future::poll_fn(|cx| sender.poll_ready(cx)).await.expect("TODO.async client gone?");
sender.send_data(next?).expect("send chunk");
}

// TODO.async: This should be better, but it creates an
// incomprehensible error messasge instead
// stream.forward(sink).await;
}
Some(Body::Chunked(body, chunk_size)) => {
// TODO.async: This is identical to Body::Sized except for the chunk size

let (sender, hyp_body) = hyper::Body::channel();
let (mut sender, hyp_body) = hyper::Body::channel();
send_response(hyp_res, hyp_body)?;

let mut stream = body.into_chunk_stream(chunk_size.try_into().expect("u64 -> usize overflow"));
let mut sink = sender.sink_compat().sink_map_err(|e| {
io::Error::new(io::ErrorKind::Other, e)
});

while let Some(next) = stream.next().await {
sink.send(next?).await?;
futures::future::poll_fn(|cx| sender.poll_ready(cx)).await.expect("TODO.async client gone?");
sender.send_data(next?).expect("send chunk");
}

// TODO.async: This should be better, but it creates an
// incomprehensible error messasge instead
// stream.forward(sink).await;
}
};

Expand Down Expand Up @@ -757,7 +719,7 @@ impl Rocket {

// TODO.async What meaning should config.workers have now?
// Initialize the tokio runtime
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.core_threads(self.config.workers as usize)
.build()
.expect("Cannot build runtime!");
Expand Down Expand Up @@ -802,13 +764,16 @@ impl Rocket {
logger::pop_max_level();

let rocket = Arc::new(self);
let spawn = Box::new(runtime.executor().compat());
let spawn = Box::new(TokioCompat::new(runtime.executor()));
let service = hyper::make_service_fn(move |socket: &hyper::AddrStream| {
futures::future::ok::<_, Box<dyn std::error::Error + Send + Sync>>(RocketHyperService {
rocket: rocket.clone(),
spawn: spawn.clone(),
remote_addr: socket.remote_addr(),
}).compat()
let rocket = rocket.clone();
let remote_addr = socket.remote_addr();
let spawn = spawn.clone();
async move {
Ok::<_, std::convert::Infallible>(hyper::service_fn(move |req| {
hyper_service_fn(rocket.clone(), remote_addr, spawn.clone(), req)
}))
}
});

// NB: executor must be passed manually here, see hyperium/hyper#1537
Expand Down
2 changes: 1 addition & 1 deletion core/lib/tests/head_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod head_handling_tests {
match body {
Body::Sized(mut body, size) => {
let mut buffer = vec![];
futures::executor::block_on(async {
tokio::runtime::Runtime::new().expect("create runtime").block_on(async {
body.read_to_end(&mut buffer).await.unwrap();
});
assert_eq!(size, expected_size);
Expand Down

0 comments on commit 0c91080

Please sign in to comment.