From 89f34b764af2d8637eba9f206cb9a9d333195bf8 Mon Sep 17 00:00:00 2001 From: Kevin Stone Date: Fri, 8 Nov 2019 12:17:53 -0600 Subject: [PATCH] Migrate Gotham to std Futures to support Async/Await With the anticipated release of async-await in stable rust this week, I took an effort to migrate Gotham to run on std futures using the pre-release versions of the dependencies (tokio, hyper, etc). *NOTE*: This doesn't attempt to introduce `await` or `async fn` into the codebase (there was a single unavoidable case due to an `tokio::File::metadata` API change). That is designed as a future activity. This migration involved a few key efforts: 1. Convert from Futures 0.1 to Futures-preview 0.3 (mostly `Future` to `Future>`). 2. Update dependencies to pre-release versions (`tokio-0.2` and `hyper-0.13`). There's still other dependencies that are outstanding and blocking the full release. 3. Migrate Handler trait to a pinned box HandlerFuture. This is a **work-in-progress** with a few blockers before this would be ready: Gotham Dependencies: - [ ] Update Futures from `futures-preview` to `futures = 0.3` when the other dependencies (hyper, tokio, etc) update in concert. - [ ] Update Tokio to `0.2` from alpha pre-releases - [ ] Update Hyper to `0.13` from alpha pre-releases - [ ] Update Tower-Service to `0.3` from alpha pre-releases. Hyper is migrating many of its traits to `tower-service::Service` and so is now a direct dependency. - [ ] Released version of `futures_rustls` which is currently a branch of `tokio-rustls` ported to Futures-preview - [ ] Released version of `futures-tokio-compat` or suggested `tokio::compat` library for bridging `futures::AsyncRead` and `tokio::AsyncRead`. See https://github.com/tokio-rs/tokio/issues/1297 and https://github.com/async-rs/async-std/issues/54 Middleware Dependencies: - [ ] Diesel - Requires updated release of `tokio-threadpool` - [ ] JWT - Requires updated release of `jsonwebtoken` since it's dependency `ring` conflicts withs `futures-rustls`. --- Cargo.toml | 17 +- gotham/Cargo.toml | 13 +- gotham/src/handler/assets/mod.rs | 115 +++++---- gotham/src/handler/error.rs | 15 +- gotham/src/handler/mod.rs | 45 ++-- gotham/src/lib.rs | 18 +- gotham/src/middleware/chain.rs | 13 +- gotham/src/middleware/cookie/mod.rs | 5 +- gotham/src/middleware/logger/mod.rs | 15 +- gotham/src/middleware/mod.rs | 62 +++-- gotham/src/middleware/security/mod.rs | 10 +- .../src/middleware/session/backend/memory.rs | 29 ++- gotham/src/middleware/session/backend/mod.rs | 7 +- gotham/src/middleware/session/mod.rs | 68 +++--- gotham/src/middleware/state/mod.rs | 5 +- gotham/src/middleware/timer/mod.rs | 10 +- gotham/src/pipeline/chain.rs | 17 +- gotham/src/pipeline/mod.rs | 49 ++-- gotham/src/plain.rs | 26 +- gotham/src/plain/test.rs | 98 +++++--- gotham/src/router/builder/draw.rs | 9 +- gotham/src/router/builder/mod.rs | 28 ++- gotham/src/router/builder/single.rs | 8 +- gotham/src/router/mod.rs | 31 +-- gotham/src/router/response/finalizer.rs | 7 +- gotham/src/router/route/dispatch.rs | 21 +- gotham/src/router/route/mod.rs | 27 +-- gotham/src/service/mod.rs | 27 ++- gotham/src/service/trap.rs | 224 ++++++++---------- gotham/src/test.rs | 52 ++-- gotham/src/test/request.rs | 2 +- gotham/src/tls.rs | 39 +-- gotham/src/tls/test.rs | 150 +++++++----- middleware/diesel/Cargo.toml | 6 +- middleware/jwt/Cargo.toml | 4 +- middleware/template/Cargo.toml | 2 +- middleware/template/src/lib.rs | 30 +-- 37 files changed, 719 insertions(+), 585 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7c2fed96d..93f190834 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,13 +7,15 @@ members = [ ## Middleware "middleware/template", - "middleware/diesel", - "middleware/jwt", + # TODO: Re-enable middleware when their dependencies are updated + # "middleware/diesel", + # "middleware/jwt", ## Examples (these crates are not published) "examples/hello_world", "examples/hello_world_tls", - "examples/hello_world_until", + # TODO: Re-enable when the tokio-signal dependency is updated + # "examples/hello_world_until", "examples/shared_state", # Tera template @@ -65,15 +67,18 @@ members = [ "examples/static_assets", # diesel - "examples/diesel", + # TODO: Re-enable when the middleware is updated + # "examples/diesel", # openssl - "examples/openssl", + # TODO: Re-enable when this example is updated + # "examples/openssl", # example_contribution_template "examples/example_contribution_template/name", - "examples/websocket", + # TODO: Re-enable when tokio-tungstenite is updated + # "examples/websocket", ] [patch.crates-io] diff --git a/gotham/Cargo.toml b/gotham/Cargo.toml index f1cae42d8..8ba1a16d3 100644 --- a/gotham/Cargo.toml +++ b/gotham/Cargo.toml @@ -17,11 +17,11 @@ edition = "2018" [features] default = ["rustls"] -rustls = ["tokio-rustls"] +rustls = ["futures-rustls"] [dependencies] log = "0.4" -hyper = "0.12" +hyper = { version = "0.13.0-alpha.4", features = ["unstable-stream"] } serde = "1.0" serde_derive = "1.0" bincode = "1.0" @@ -29,8 +29,8 @@ mime = "0.3" # Using alpha version of mime_guess until mime crate stabilizes (releases 1.0). # see https://github.com/hyperium/mime/issues/52 mime_guess = "2.0.1" -futures = "0.1" -tokio = "0.1" +futures-preview = { version = "0.3.0-alpha.19", features = ["io-compat"] } +tokio = "=0.2.0-alpha.6" bytes = "0.4" mio = "0.6" borrow-bag = "1.0" @@ -47,8 +47,9 @@ cookie = "0.12" http = "0.1" httpdate = "0.3" failure = "0.1" -tokio-rustls = {version = "0.9", optional = true } -tokio-io = "0.1" +futures-rustls = { git = "https://github.com/quininer/tokio-rustls", branch = "futures-rustls", optional = true } +tower-service = "=0.3.0-alpha.2" +futures-tokio-compat = { git = "https://github.com/nemo157/futures-tokio-compat/" } [dev-dependencies] gotham_derive = "0.5.0-dev" diff --git a/gotham/src/handler/assets/mod.rs b/gotham/src/handler/assets/mod.rs index e38d8a173..3d3cc9fa3 100644 --- a/gotham/src/handler/assets/mod.rs +++ b/gotham/src/handler/assets/mod.rs @@ -8,7 +8,9 @@ mod accepted_encoding; use crate::error::Result; use bytes::{BufMut, BytesMut}; -use futures::{stream, try_ready, Future, Stream}; +use futures::prelude::*; +use futures::ready; +use futures::task::Poll; use http; use httpdate::parse_http_date; use hyper::header::*; @@ -31,6 +33,7 @@ use std::fs::Metadata; use std::io; use std::iter::FromIterator; use std::path::{Component, Path, PathBuf}; +use std::pin::Pin; use std::time::UNIX_EPOCH; /// Represents a handler for any files under a directory. @@ -176,7 +179,7 @@ impl NewHandler for DirHandler { } impl Handler for DirHandler { - fn handle(self, state: State) -> Box { + fn handle(self, state: State) -> Pin> { let path = { let mut base_path = self.options.path; let file_path = PathBuf::from_iter(&FilePathExtractor::borrow_from(&state).parts); @@ -194,59 +197,62 @@ impl Handler for DirHandler { } impl Handler for FileHandler { - fn handle(self, state: State) -> Box { + fn handle(self, state: State) -> Pin> { create_file_response(self.options, state) } } // Creates the `HandlerFuture` response based on the given `FileOptions`. -fn create_file_response(options: FileOptions, state: State) -> Box { +fn create_file_response(options: FileOptions, state: State) -> Pin> { let mime_type = mime_for_path(&options.path); let headers = HeaderMap::borrow_from(&state).clone(); let (path, encoding) = check_compressed_options(&options, &headers); - let response_future = - File::open(path) - .and_then(File::metadata) - .and_then(move |(file, meta)| { - if not_modified(&meta, &headers) { - return Ok(http::Response::builder() - .status(StatusCode::NOT_MODIFIED) - .body(Body::empty()) - .unwrap()); - } - let len = meta.len(); - let buf_size = optimal_buf_size(&meta); - - let stream = file_stream(file, buf_size, len); - let body = Body::wrap_stream(stream); - let mut response = http::Response::builder(); - response.status(StatusCode::OK); - response.header(CONTENT_LENGTH, len); - response.header(CONTENT_TYPE, mime_type.as_ref()); - response.header(CACHE_CONTROL, options.cache_control); - - if let Some(etag) = entity_tag(&meta) { - response.header(ETAG, etag); - } - if let Some(content_encoding) = encoding { - response.header(CONTENT_ENCODING, content_encoding); - } - - Ok(response.body(body).unwrap()) - }); - Box::new(response_future.then(|result| match result { - Ok(response) => Ok((state, response)), - Err(err) => { - let status = match err.kind() { - io::ErrorKind::NotFound => StatusCode::NOT_FOUND, - io::ErrorKind::PermissionDenied => StatusCode::FORBIDDEN, - _ => StatusCode::INTERNAL_SERVER_ERROR, - }; - Err((state, err.into_handler_error().with_status(status))) + let response_future = File::open(path).and_then(|file| { + async move { + let meta = file.metadata().await?; + if not_modified(&meta, &headers) { + return Ok(http::Response::builder() + .status(StatusCode::NOT_MODIFIED) + .body(Body::empty()) + .unwrap()); + } + let len = meta.len(); + let buf_size = optimal_buf_size(&meta); + + let stream = file_stream(file, buf_size, len); + let body = Body::wrap_stream(stream.into_stream()); + let mut response = http::Response::builder(); + response.status(StatusCode::OK); + response.header(CONTENT_LENGTH, len); + response.header(CONTENT_TYPE, mime_type.as_ref()); + response.header(CACHE_CONTROL, options.cache_control); + + if let Some(etag) = entity_tag(&meta) { + response.header(ETAG, etag); + } + if let Some(content_encoding) = encoding { + response.header(CONTENT_ENCODING, content_encoding); + } + + Ok(response.body(body).unwrap()) } - })) + }); + + response_future + .map(|result| match result { + Ok(response) => Ok((state, response)), + Err(err) => { + let status = match err.kind() { + io::ErrorKind::NotFound => StatusCode::NOT_FOUND, + io::ErrorKind::PermissionDenied => StatusCode::FORBIDDEN, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + Err((state, err.into_handler_error().with_status(status))) + } + }) + .boxed() } // Checks for existence of compressed files if `FileOptions` and @@ -370,23 +376,28 @@ fn file_stream( mut f: File, buf_size: usize, mut len: u64, -) -> impl Stream + Send { - let mut buf = BytesMut::new(); - stream::poll_fn(move || { +) -> impl TryStream + Send { + let mut buf = BytesMut::with_capacity(buf_size); + stream::poll_fn(move |cx| { if len == 0 { - return Ok(None.into()); + return Poll::Ready(None); } if buf.remaining_mut() < buf_size { buf.reserve(buf_size); } - let n = try_ready!(f.read_buf(&mut buf).map_err(|err| { + + let read = Pin::new(&mut f).poll_read_buf(cx, &mut buf); + let n = ready!(read).map_err(|err| { debug!("file read error: {}", err); err - })) as u64; + })? as u64; if n == 0 { debug!("file read found EOF before expected length"); - return Ok(None.into()); + return Poll::Ready(Some(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "file read found EOF before expected length", + )))); } let mut chunk = buf.take().freeze(); @@ -397,7 +408,7 @@ fn file_stream( len -= n; } - Ok(Some(Chunk::from(chunk)).into()) + Poll::Ready(Some(Ok(Chunk::from(chunk)))) }) } diff --git a/gotham/src/handler/error.rs b/gotham/src/handler/error.rs index 1834e936d..f4113ec0b 100644 --- a/gotham/src/handler/error.rs +++ b/gotham/src/handler/error.rs @@ -25,14 +25,15 @@ pub struct HandlerError { /// # extern crate futures; /// # /// # use std::fs::File; +/// # use std::pin::Pin; /// # use gotham::state::State; /// # use gotham::handler::{IntoHandlerError, HandlerFuture}; -/// # use futures::future; +/// # use futures::prelude::*; /// # /// # #[allow(dead_code)] -/// fn my_handler(state: State) -> Box { +/// fn my_handler(state: State) -> Pin> { /// match File::open("config.toml") { -/// Err(e) => Box::new(future::err((state, e.into_handler_error()))), +/// Err(e) => future::err((state, e.into_handler_error())).boxed(), /// Ok(_) => // Create and return a response /// # unimplemented!(), /// } @@ -95,13 +96,15 @@ impl HandlerError { /// # extern crate hyper; /// # extern crate futures; /// # - /// # use futures::future; + /// # use std::pin::Pin; + /// # + /// # use futures::prelude::*; /// # use hyper::StatusCode; /// # use gotham::state::State; /// # use gotham::handler::{IntoHandlerError, HandlerFuture}; /// # use gotham::test::TestServer; /// # - /// fn handler(state: State) -> Box { + /// fn handler(state: State) -> Pin> { /// // It's OK if this is bogus, we just need something to convert into a `HandlerError`. /// let io_error = std::io::Error::last_os_error(); /// @@ -109,7 +112,7 @@ impl HandlerError { /// .into_handler_error() /// .with_status(StatusCode::IM_A_TEAPOT); /// - /// Box::new(future::err((state, handler_error))) + /// future::err((state, handler_error)).boxed() /// } /// /// # fn main() { diff --git a/gotham/src/handler/mod.rs b/gotham/src/handler/mod.rs index 4d451c97c..5ff0a2c7e 100644 --- a/gotham/src/handler/mod.rs +++ b/gotham/src/handler/mod.rs @@ -5,9 +5,10 @@ //! `Handler` trait for some examples of valid handlers. use std::borrow::Cow; use std::panic::RefUnwindSafe; +use std::pin::Pin; use bytes::Bytes; -use futures::{future, Future}; +use futures::prelude::*; use hyper::{Body, Chunk, Response, StatusCode}; use mime::{self, Mime}; @@ -27,7 +28,7 @@ pub use self::error::{HandlerError, IntoHandlerError}; /// When the `Future` resolves to an error, the `(State, HandlerError)` value is used to generate /// an appropriate HTTP error response. pub type HandlerFuture = - dyn Future), Error = (State, HandlerError)> + Send; + dyn Future), (State, HandlerError)>> + Send; /// A `Handler` is an asynchronous function, taking a `State` value which represents the request /// and related runtime state, and returns a future which resolves to a response. @@ -72,11 +73,13 @@ pub type HandlerFuture = /// # extern crate gotham; /// # extern crate hyper; /// # +/// # use std::pin::Pin; +/// # /// # use gotham::handler::{Handler, HandlerFuture}; /// # use gotham::state::State; /// # /// # fn main() { -/// fn async_handler(_state: State) -> Box { +/// fn async_handler(_state: State) -> Pin> { /// // Implementation elided. /// # unimplemented!() /// } @@ -95,15 +98,17 @@ pub type HandlerFuture = /// # extern crate hyper; /// # extern crate futures; /// # -/// # use gotham::handler::{HandlerFuture, NewHandler}; +/// # use gotham::handler::{NewHandler, IntoHandlerFuture}; +/// # use gotham::helpers::http::response::create_empty_response; /// # use gotham::state::State; -/// # use futures::future; +/// # use hyper::StatusCode; /// # /// # fn main() { /// let new_handler = || { -/// let handler = |_state: State| { +/// let handler = |state: State| { /// // Implementation elided. -/// # Box::new(future::empty()) as Box +/// # let res = create_empty_response(&state, StatusCode::OK); +/// # (state, res).into_handler_future() /// }; /// Ok(handler) /// }; @@ -122,6 +127,8 @@ pub type HandlerFuture = /// # extern crate gotham; /// # extern crate hyper; /// # +/// # use std::pin::Pin; +/// # /// # use gotham::handler::{Handler, HandlerFuture, NewHandler}; /// # use gotham::state::State; /// # use gotham::error::*; @@ -139,7 +146,7 @@ pub type HandlerFuture = /// } /// /// impl Handler for MyCustomHandler { -/// fn handle(self, _state: State) -> Box { +/// fn handle(self, _state: State) -> Pin> { /// // Implementation elided. /// # unimplemented!() /// } @@ -151,7 +158,7 @@ pub type HandlerFuture = /// ``` pub trait Handler: Send { /// Handles the request, returning a boxed future which resolves to a response. - fn handle(self, state: State) -> Box; + fn handle(self, state: State) -> Pin>; } impl Handler for F @@ -159,7 +166,7 @@ where F: FnOnce(State) -> R + Send, R: IntoHandlerFuture, { - fn handle(self, state: State) -> Box { + fn handle(self, state: State) -> Pin> { self(state).into_handler_future() } } @@ -178,6 +185,8 @@ where /// # extern crate gotham; /// # extern crate hyper; /// # +/// # use std::pin::Pin; +/// # /// # use gotham::handler::{Handler, HandlerFuture, NewHandler}; /// # use gotham::state::State; /// # use gotham::error::*; @@ -195,7 +204,7 @@ where /// } /// /// impl Handler for MyCustomHandler { -/// fn handle(self, _state: State) -> Box { +/// fn handle(self, _state: State) -> Pin> { /// // Implementation elided. /// # unimplemented!() /// } @@ -212,6 +221,8 @@ where /// # extern crate gotham; /// # extern crate hyper; /// # +/// # use std::pin::Pin; +/// # /// # use gotham::handler::{Handler, HandlerFuture, NewHandler}; /// # use gotham::state::State; /// # use gotham::error::*; @@ -231,7 +242,7 @@ where /// struct MyHandler; /// /// impl Handler for MyHandler { -/// fn handle(self, _state: State) -> Box { +/// fn handle(self, _state: State) -> Pin> { /// // Implementation elided. /// # unimplemented!() /// } @@ -267,22 +278,22 @@ where /// bound via the generic function implementation. pub trait IntoHandlerFuture { /// Converts this value into a boxed future resolving to a state and response. - fn into_handler_future(self) -> Box; + fn into_handler_future(self) -> Pin>; } impl IntoHandlerFuture for (State, T) where T: IntoResponse, { - fn into_handler_future(self) -> Box { + fn into_handler_future(self) -> Pin> { let (state, t) = self; let response = t.into_response(&state); - Box::new(future::ok((state, response))) + future::ok((state, response)).boxed() } } -impl IntoHandlerFuture for Box { - fn into_handler_future(self) -> Box { +impl IntoHandlerFuture for Pin> { + fn into_handler_future(self) -> Pin> { self } } diff --git a/gotham/src/lib.rs b/gotham/src/lib.rs index be1311a52..817aacda9 100644 --- a/gotham/src/lib.rs +++ b/gotham/src/lib.rs @@ -45,14 +45,14 @@ pub mod plain; #[cfg(feature = "rustls")] pub mod tls; -use futures::{Future, Stream}; +use futures::prelude::*; use hyper::server::conn::Http; use std::net::ToSocketAddrs; use std::sync::Arc; use tokio::executor; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; use tokio::runtime::{self, Runtime}; -use tokio_io::{AsyncRead, AsyncWrite}; use crate::{handler::NewHandler, service::GothamService}; @@ -68,7 +68,7 @@ fn new_runtime(threads: usize) -> Runtime { .unwrap() } -fn tcp_listener(addr: A) -> TcpListener +fn tcp_listener(addr: A) -> impl Future> where A: ToSocketAddrs + 'static, { @@ -78,7 +78,7 @@ where .next() .expect("unable to resolve listener address"); - TcpListener::bind(&addr).expect("unable to open TCP listener") + TcpListener::bind(addr) } /// Returns a `Future` used to spawn a Gotham application. @@ -91,11 +91,11 @@ pub fn bind_server( listener: TcpListener, new_handler: NH, mut wrap: Wrap, -) -> impl Future +) -> impl Future> where NH: NewHandler + 'static, - F: Future + Send + 'static, - Wrapped: AsyncRead + AsyncWrite + Send + 'static, + F: Future> + Unpin + Send + 'static, + Wrapped: Unpin + AsyncRead + AsyncWrite + Send + 'static, Wrap: FnMut(TcpStream) -> F, { let protocol = Arc::new(Http::new()); @@ -104,7 +104,7 @@ where listener .incoming() .map_err(|e| panic!("socket error = {:?}", e)) - .for_each(move |socket| { + .try_for_each(move |socket| { let addr = socket.peer_addr().unwrap(); let service = gotham_service.connect(addr); let accepted_protocol = protocol.clone(); @@ -121,6 +121,6 @@ where executor::spawn(handler); - Ok(()) + future::ok(()) }) } diff --git a/gotham/src/middleware/chain.rs b/gotham/src/middleware/chain.rs index 3d17e0c68..337b970c9 100644 --- a/gotham/src/middleware/chain.rs +++ b/gotham/src/middleware/chain.rs @@ -4,6 +4,7 @@ use log::trace; use std::io; use std::panic::RefUnwindSafe; +use std::pin::Pin; use crate::handler::HandlerFuture; use crate::middleware::{Middleware, NewMiddleware}; @@ -58,15 +59,15 @@ unsafe impl NewMiddlewareChain for () { #[doc(hidden)] pub unsafe trait MiddlewareChain: Sized { /// Recursive function for processing middleware and chaining to the given function. - fn call(self, state: State, f: F) -> Box + fn call(self, state: State, f: F) -> Pin> where - F: FnOnce(State) -> Box + Send + 'static; + F: FnOnce(State) -> Pin> + Send + 'static; } unsafe impl MiddlewareChain for () { - fn call(self, state: State, f: F) -> Box + fn call(self, state: State, f: F) -> Pin> where - F: FnOnce(State) -> Box + Send + 'static, + F: FnOnce(State) -> Pin> + Send + 'static, { // At the last item in the `MiddlewareChain`, the function is invoked to serve the // request. `f` is the nested function of all `Middleware` and the `Handler`. @@ -83,9 +84,9 @@ where T: Middleware + Send + 'static, U: MiddlewareChain, { - fn call(self, state: State, f: F) -> Box + fn call(self, state: State, f: F) -> Pin> where - F: FnOnce(State) -> Box + Send + 'static, + F: FnOnce(State) -> Pin> + Send + 'static, { let (m, p) = self; // Construct the function from the inside, out. Starting with a function which calls the diff --git a/gotham/src/middleware/cookie/mod.rs b/gotham/src/middleware/cookie/mod.rs index b3dcf7f35..6a43a53b2 100644 --- a/gotham/src/middleware/cookie/mod.rs +++ b/gotham/src/middleware/cookie/mod.rs @@ -1,5 +1,6 @@ //! Defines a cookie parsing middleware to be attach cookies on requests. use std::io; +use std::pin::Pin; use cookie::{Cookie, CookieJar}; use hyper::header::{HeaderMap, HeaderValue, COOKIE}; @@ -36,9 +37,9 @@ impl CookieParser { /// `Middleware` trait implementation. impl Middleware for CookieParser { /// Attaches a set of parsed cookies to the request state. - fn call(self, mut state: State, chain: Chain) -> Box + fn call(self, mut state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box, + Chain: FnOnce(State) -> Pin>, { let cookies = { CookieParser::from_state(&state) }; state.put(cookies); diff --git a/gotham/src/middleware/logger/mod.rs b/gotham/src/middleware/logger/mod.rs index 37c9824cf..e96946441 100644 --- a/gotham/src/middleware/logger/mod.rs +++ b/gotham/src/middleware/logger/mod.rs @@ -5,11 +5,12 @@ //! [Common Log Format](https://en.wikipedia.org/wiki/Common_Log_Format) (CLF). //! //! There is also a `SimpleLogger` which emits only basic request logs. -use futures::{future, Future}; +use futures::prelude::*; use hyper::{header::CONTENT_LENGTH, Method, Uri, Version}; use log::Level; use log::{log, log_enabled}; use std::io; +use std::pin::Pin; use crate::handler::HandlerFuture; use crate::helpers::timing::Timer; @@ -49,9 +50,9 @@ impl NewMiddleware for RequestLogger { /// Implementing `gotham::middleware::Middleware` allows us to hook into the request chain /// in order to correctly log out after a request has executed. impl Middleware for RequestLogger { - fn call(self, state: State, chain: Chain) -> Box + fn call(self, state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box, + Chain: FnOnce(State) -> Pin>, { // skip everything if logging is disabled if !log_enabled!(self.level) { @@ -103,7 +104,7 @@ impl Middleware for RequestLogger { }); // box it up - Box::new(f) + f.boxed() } } @@ -139,9 +140,9 @@ impl NewMiddleware for SimpleLogger { /// Implementing `gotham::middleware::Middleware` allows us to hook into the request chain /// in order to correctly log out after a request has executed. impl Middleware for SimpleLogger { - fn call(self, state: State, chain: Chain) -> Box + fn call(self, state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box, + Chain: FnOnce(State) -> Pin>, { // skip everything if logging is disabled if !log_enabled!(self.level) { @@ -165,6 +166,6 @@ impl Middleware for SimpleLogger { future::ok((state, response)) }); - Box::new(f) + f.boxed() } } diff --git a/gotham/src/middleware/mod.rs b/gotham/src/middleware/mod.rs index 92cb814dd..78797b24c 100644 --- a/gotham/src/middleware/mod.rs +++ b/gotham/src/middleware/mod.rs @@ -3,6 +3,7 @@ use std::io; use std::panic::RefUnwindSafe; +use std::pin::Pin; use crate::handler::HandlerFuture; use crate::state::State; @@ -34,6 +35,8 @@ pub mod timer; /// # extern crate gotham_derive; /// # extern crate hyper; /// # +/// # use std::pin::Pin; +/// # /// # use hyper::{Body, Response, StatusCode}; /// # use gotham::handler::HandlerFuture; /// # use gotham::middleware::Middleware; @@ -47,8 +50,8 @@ pub mod timer; /// struct NoopMiddleware; /// /// impl Middleware for NoopMiddleware { -/// fn call(self, state: State, chain: Chain) -> Box -/// where Chain: FnOnce(State) -> Box + Send + 'static +/// fn call(self, state: State, chain: Chain) -> Pin> +/// where Chain: FnOnce(State) -> Pin> + Send + 'static /// { /// chain(state) /// } @@ -83,6 +86,8 @@ pub mod timer; /// # extern crate gotham_derive; /// # extern crate hyper; /// # +/// # use std::pin::Pin; +/// # /// # use hyper::{Response, StatusCode}; /// # use gotham::handler::HandlerFuture; /// # use gotham::middleware::Middleware; @@ -101,8 +106,8 @@ pub mod timer; /// } /// /// impl Middleware for MiddlewareWithStateData { -/// fn call(self, mut state: State, chain: Chain) -> Box -/// where Chain: FnOnce(State) -> Box + Send + 'static +/// fn call(self, mut state: State, chain: Chain) -> Pin> +/// where Chain: FnOnce(State) -> Pin> + Send + 'static /// { /// state.put(MiddlewareStateData { i: 10 }); /// chain(state) @@ -145,7 +150,9 @@ pub mod timer; /// # extern crate hyper; /// # extern crate futures; /// # -/// # use futures::Future; +/// # use std::pin::Pin; +/// # +/// # use futures::prelude::*; /// # use hyper::{Body, Response, StatusCode}; /// # use hyper::header::WARNING; /// # use gotham::handler::HandlerFuture; @@ -160,16 +167,15 @@ pub mod timer; /// struct MiddlewareAddingResponseHeader; /// /// impl Middleware for MiddlewareAddingResponseHeader { -/// fn call(self, state: State, chain: Chain) -> Box -/// where Chain: FnOnce(State) -> Box + Send + 'static +/// fn call(self, state: State, chain: Chain) -> Pin> +/// where Chain: FnOnce(State) -> Pin> + Send + 'static /// { -/// let f = chain(state) -/// .map(|(state, mut response)| { +/// chain(state) +/// .map_ok(|(state, mut response)| { /// response.headers_mut().insert(WARNING, "299 example.com Deprecated".parse().unwrap()); /// (state, response) -/// }); -/// -/// Box::new(f) +/// }) +/// .boxed() /// } /// } /// # @@ -208,8 +214,10 @@ pub mod timer; /// # extern crate hyper; /// # extern crate futures; /// # +/// # use std::pin::Pin; +/// # /// # use hyper::{Body, Response, Method, StatusCode}; -/// # use futures::future; +/// # use futures::prelude::*; /// # use gotham::helpers::http::response::create_empty_response; /// # use gotham::handler::HandlerFuture; /// # use gotham::middleware::Middleware; @@ -223,14 +231,14 @@ pub mod timer; /// struct ConditionalMiddleware; /// /// impl Middleware for ConditionalMiddleware { -/// fn call(self, state: State, chain: Chain) -> Box -/// where Chain: FnOnce(State) -> Box + Send + 'static +/// fn call(self, state: State, chain: Chain) -> Pin> +/// where Chain: FnOnce(State) -> Pin> + Send + 'static /// { /// if *Method::borrow_from(&state) == Method::GET { /// chain(state) /// } else { /// let response = create_empty_response(&state, StatusCode::METHOD_NOT_ALLOWED); -/// Box::new(future::ok((state, response))) +/// future::ok((state, response)).boxed() /// } /// } /// } @@ -269,7 +277,9 @@ pub mod timer; /// # extern crate hyper; /// # extern crate futures; /// # -/// # use futures::{future, Future}; +/// # use std::pin::Pin; +/// # +/// # use futures::prelude::*; /// # use hyper::{Body, Response, StatusCode}; /// # use gotham::handler::HandlerFuture; /// # use gotham::middleware::Middleware; @@ -283,13 +293,13 @@ pub mod timer; /// struct AsyncMiddleware; /// /// impl Middleware for AsyncMiddleware { -/// fn call(self, state: State, chain: Chain) -> Box -/// where Chain: FnOnce(State) -> Box + Send + 'static +/// fn call(self, state: State, chain: Chain) -> Pin> +/// where Chain: FnOnce(State) -> Pin> + Send + 'static /// { /// // This could be any asynchronous action. `future::lazy(_)` defers a function /// // until the next cycle of tokio's event loop. -/// let f = future::lazy(|| future::ok(())); -/// Box::new(f.and_then(move |_| chain(state))) +/// let f = future::lazy(|_| Ok(())); +/// f.and_then(move |_| chain(state)).boxed() /// } /// } /// # @@ -322,9 +332,9 @@ pub trait Middleware { /// * Not modify any request components added to `State` by Gotham. /// * Avoid modifying parts of the `State` that don't strictly need to be modified to perform /// its function. - fn call(self, state: State, chain: Chain) -> Box + fn call(self, state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box + Send + 'static, + Chain: FnOnce(State) -> Pin> + Send + 'static, Self: Sized; } @@ -338,6 +348,8 @@ pub trait Middleware { /// # extern crate gotham; /// # /// # use std::io; +/// # use std::pin::Pin; +/// # /// # use gotham::middleware::{NewMiddleware, Middleware}; /// # use gotham::handler::HandlerFuture; /// # use gotham::pipeline::new_pipeline; @@ -356,8 +368,8 @@ pub trait Middleware { /// } /// # /// # impl Middleware for MyMiddleware { -/// # fn call(self, _state: State, _chain: Chain) -> Box -/// # where Chain: FnOnce(State) -> Box + 'static +/// # fn call(self, _state: State, _chain: Chain) -> Pin> +/// # where Chain: FnOnce(State) -> Pin> + 'static /// # { /// # unimplemented!() /// # } diff --git a/gotham/src/middleware/security/mod.rs b/gotham/src/middleware/security/mod.rs index 87a6389f7..f898f532a 100644 --- a/gotham/src/middleware/security/mod.rs +++ b/gotham/src/middleware/security/mod.rs @@ -15,7 +15,9 @@ use crate::handler::HandlerFuture; use crate::middleware::{Middleware, NewMiddleware}; use crate::state::State; -use futures::{future, Future}; +use futures::prelude::*; +use std::pin::Pin; + use hyper::header::{HeaderValue, X_CONTENT_TYPE_OPTIONS, X_FRAME_OPTIONS, X_XSS_PROTECTION}; use std::io; @@ -34,9 +36,9 @@ pub struct SecurityMiddleware; /// `Middleware` trait implementation. impl Middleware for SecurityMiddleware { /// Attaches security headers to the response. - fn call(self, state: State, chain: Chain) -> Box + fn call(self, state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box, + Chain: FnOnce(State) -> Pin>, { let f = chain(state).and_then(|(state, mut response)| { { @@ -49,7 +51,7 @@ impl Middleware for SecurityMiddleware { future::ok((state, response)) }); - Box::new(f) + f.boxed() } } diff --git a/gotham/src/middleware/session/backend/memory.rs b/gotham/src/middleware/session/backend/memory.rs index 95be9d9ee..6b51273c8 100644 --- a/gotham/src/middleware/session/backend/memory.rs +++ b/gotham/src/middleware/session/backend/memory.rs @@ -1,8 +1,9 @@ +use std::pin::Pin; use std::sync::{Arc, Mutex, PoisonError, Weak}; use std::time::{Duration, Instant}; use std::{io, thread}; -use futures::future; +use futures::prelude::*; use linked_hash_map::LinkedHashMap; use log::trace; @@ -93,14 +94,14 @@ impl Backend for MemoryBackend { } } - fn read_session(&self, identifier: SessionIdentifier) -> Box { + fn read_session(&self, identifier: SessionIdentifier) -> Pin> { match self.storage.lock() { Ok(mut storage) => match storage.get_refresh(&identifier.value) { Some(&mut (ref mut instant, ref value)) => { *instant = Instant::now(); - Box::new(future::ok(Some(value.clone()))) + future::ok(Some(value.clone())).boxed() } - None => Box::new(future::ok(None)), + None => future::ok(None).boxed(), }, Err(PoisonError { .. }) => { unreachable!("session memory backend lock poisoned, HashMap panicked?") @@ -192,7 +193,6 @@ fn cleanup_once( mod tests { use super::*; - use futures::Future; use rand; #[test] @@ -233,13 +233,14 @@ mod tests { .persist_session(identifier.clone(), &bytes[..]) .expect("failed to persist"); - let received = new_backend - .new_backend() - .expect("can't create backend for read") - .read_session(identifier.clone()) - .wait() - .expect("no response from backend") - .expect("session data missing"); + let received = futures::executor::block_on( + new_backend + .new_backend() + .expect("can't create backend for read") + .read_session(identifier.clone()), + ) + .expect("no response from backend") + .expect("session data missing"); assert_eq!(bytes, received); } @@ -281,9 +282,7 @@ mod tests { ); } - backend - .read_session(identifier.clone()) - .wait() + futures::executor::block_on(backend.read_session(identifier.clone())) .expect("failed to read session"); { diff --git a/gotham/src/middleware/session/backend/mod.rs b/gotham/src/middleware/session/backend/mod.rs index 57666519f..24b41e71e 100644 --- a/gotham/src/middleware/session/backend/mod.rs +++ b/gotham/src/middleware/session/backend/mod.rs @@ -2,8 +2,9 @@ pub(super) mod memory; use std::io; use std::panic::RefUnwindSafe; +use std::pin::Pin; -use futures::Future; +use futures::prelude::*; use crate::middleware::session::{SessionError, SessionIdentifier}; @@ -17,7 +18,7 @@ pub trait NewBackend: Sync + Clone + RefUnwindSafe { } /// Type alias for the trait objects returned by `Backend`. -pub type SessionFuture = dyn Future>, Error = SessionError> + Send; +pub type SessionFuture = dyn Future>, SessionError>> + Send; /// A `Backend` receives session data and stores it, and recalls the session data subsequently. /// @@ -36,7 +37,7 @@ pub trait Backend: Send { /// The returned future will resolve to an `Option>` on success, where a value of /// `None` indicates that the session is not available for use and a new session should be /// established. - fn read_session(&self, identifier: SessionIdentifier) -> Box; + fn read_session(&self, identifier: SessionIdentifier) -> Pin>; /// Drops a session from the underlying storage. fn drop_session(&self, identifier: SessionIdentifier) -> Result<(), SessionError>; diff --git a/gotham/src/middleware/session/mod.rs b/gotham/src/middleware/session/mod.rs index 7d2194b6c..ac81adb60 100644 --- a/gotham/src/middleware/session/mod.rs +++ b/gotham/src/middleware/session/mod.rs @@ -4,15 +4,13 @@ use std::io; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::panic::RefUnwindSafe; +use std::pin::Pin; use std::sync::{Arc, Mutex, PoisonError}; use base64; use bincode; use cookie::{Cookie, CookieJar}; -use futures::{ - future::{self, FutureResult}, - Future, -}; +use futures::prelude::*; use hyper::header::SET_COOKIE; use hyper::{Body, Response, StatusCode}; use log::{error, trace, warn}; @@ -199,8 +197,7 @@ impl SessionCookieConfig { /// # /// # use std::sync::Arc; /// # use std::time::Duration; -/// # use futures::future; -/// # use gotham::handler::HandlerFuture; +/// # use futures::prelude::*; /// # use gotham::state::{State, FromState}; /// # use gotham::middleware::{NewMiddleware, Middleware}; /// # use gotham::middleware::session::{SessionData, NewSessionMiddleware, Backend, MemoryBackend, @@ -249,7 +246,7 @@ impl SessionCookieConfig { /// # /// # let handler = move |state| { /// # let m = nm.new_middleware().unwrap(); -/// # let chain = |state| Box::new(future::ok(my_handler(state))) as Box; +/// # let chain = |state| future::ok(my_handler(state)).boxed(); /// # /// # m.call(state, chain) /// # }; @@ -799,9 +796,9 @@ where B: Backend + Send + 'static, T: Default + Serialize + for<'de> Deserialize<'de> + Send + 'static, { - fn call(self, state: State, chain: Chain) -> Box + fn call(self, state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box + Send + 'static, + Chain: FnOnce(State) -> Pin> + Send + 'static, Self: Sized, { // cookies might have been parsed already by middleware @@ -824,14 +821,12 @@ where id.value ); - let f = self - .backend + self.backend .read_session(id.clone()) .then(move |r| self.load_session_into_state(state, id, r)) - .and_then(chain) - .and_then(persist_session::); - - Box::new(f) + .and_then(move |state| chain(state)) + .and_then(persist_session::) + .boxed() } None => { trace!( @@ -839,12 +834,11 @@ where state::request_id(&state), ); - let f = self - .new_session(state) + self.new_session(state) + .boxed() .and_then(chain) - .and_then(persist_session::); - - Box::new(f) + .and_then(persist_session::) + .boxed() } } } @@ -871,7 +865,7 @@ where fn persist_session( (mut state, mut response): (State, Response), -) -> FutureResult<(State, Response), (State, HandlerError)> +) -> impl Future), (State, HandlerError)>> where T: Default + Serialize + for<'de> Deserialize<'de> + Send + 'static, { @@ -882,7 +876,7 @@ where state::request_id(&state) ); reset_cookie(&mut response, session_drop_data); - return future::ok((state, response)); + return future::ok((state, response)).right_future(); } None => { trace!( @@ -899,12 +893,14 @@ where } match session_data.state { - SessionDataState::Dirty => write_session(state, response, session_data), - SessionDataState::Clean => future::ok((state, response)), + SessionDataState::Dirty => { + write_session(state, response, session_data).left_future() + } + SessionDataState::Clean => future::ok((state, response)).right_future(), } } // Session was discarded with `SessionData::discard`, or otherwise removed - None => future::ok((state, response)), + None => future::ok((state, response)).right_future(), } } @@ -939,7 +935,7 @@ fn write_session( state: State, response: Response, session_data: SessionData, -) -> future::FutureResult<(State, Response), (State, HandlerError)> +) -> impl Future), (State, HandlerError)>> where T: Default + Serialize + for<'de> Deserialize<'de> + Send + 'static, { @@ -993,7 +989,7 @@ where mut state: State, identifier: SessionIdentifier, result: Result>, SessionError>, - ) -> future::FutureResult { + ) -> impl Future> { match result { Ok(v) => { trace!( @@ -1026,7 +1022,10 @@ where } } - fn new_session(self, mut state: State) -> future::FutureResult { + fn new_session( + self, + mut state: State, + ) -> impl Future> { let session_data = SessionData::::new(self); trace!( @@ -1188,13 +1187,14 @@ mod tests { session_data.val += 1; } - Box::new(future::ok(( + future::ok(( state, Response::builder() .status(StatusCode::ACCEPTED) .body(Body::empty()) .unwrap(), - ))) as Box + )) + .boxed() }; let mut state = State::new(); @@ -1203,8 +1203,8 @@ mod tests { headers.insert(COOKIE, cookie.to_string().parse().unwrap()); state.put(headers); - let r: Box = m.call(state, handler); - match r.wait() { + let r = m.call(state, handler); + match futures::executor::block_on(r) { Ok(_) => { let guard = received.lock().unwrap(); if let Some(value) = *guard { @@ -1217,7 +1217,9 @@ mod tests { } let m = nm.new_middleware().unwrap(); - let bytes = m.backend.read_session(identifier).wait().unwrap().unwrap(); + let bytes = futures::executor::block_on(m.backend.read_session(identifier)) + .unwrap() + .unwrap(); let updated = bincode::deserialize::(&bytes[..]).unwrap(); assert_eq!(updated.val, session.val + 1); diff --git a/gotham/src/middleware/state/mod.rs b/gotham/src/middleware/state/mod.rs index c26c2f437..d3cde454d 100644 --- a/gotham/src/middleware/state/mod.rs +++ b/gotham/src/middleware/state/mod.rs @@ -9,6 +9,7 @@ use crate::middleware::{Middleware, NewMiddleware}; use crate::state::{State, StateData}; use std::io; use std::panic::RefUnwindSafe; +use std::pin::Pin; /// Middleware binding for generic types to enable easy shared state. /// @@ -45,9 +46,9 @@ where /// Attaches the inner generic value to the request state. /// /// This will enable the `Handler` to borrow the value directly from the state. - fn call(self, mut state: State, chain: Chain) -> Box + fn call(self, mut state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box, + Chain: FnOnce(State) -> Pin>, { state.put(self.t); chain(state) diff --git a/gotham/src/middleware/timer/mod.rs b/gotham/src/middleware/timer/mod.rs index 29ddc9fcc..e47689f8d 100644 --- a/gotham/src/middleware/timer/mod.rs +++ b/gotham/src/middleware/timer/mod.rs @@ -4,7 +4,9 @@ use crate::helpers::http::header::X_RUNTIME_DURATION; use crate::helpers::timing::Timer; use crate::middleware::{Middleware, NewMiddleware}; use crate::state::State; -use futures::{future, Future}; +use futures::prelude::*; +use std::pin::Pin; + use std::io; /// Middleware binding to attach request execution times inside headers. @@ -17,9 +19,9 @@ pub struct RequestTimer; /// `Middleware` trait implementation. impl Middleware for RequestTimer { /// Attaches the request execution time to the response headers. - fn call(self, state: State, chain: Chain) -> Box + fn call(self, state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box, + Chain: FnOnce(State) -> Pin>, { // start the timer let timer = Timer::new(); @@ -35,7 +37,7 @@ impl Middleware for RequestTimer { future::ok((state, response)) }); - Box::new(f) + f.boxed() } } diff --git a/gotham/src/pipeline/chain.rs b/gotham/src/pipeline/chain.rs index 9fe352b04..4dbb29929 100644 --- a/gotham/src/pipeline/chain.rs +++ b/gotham/src/pipeline/chain.rs @@ -2,9 +2,10 @@ //! dispatcher for a route. use borrow_bag::{Handle, Lookup}; -use futures::future; +use futures::prelude::*; use log::trace; use std::panic::RefUnwindSafe; +use std::pin::Pin; use crate::handler::{HandlerFuture, IntoHandlerError}; use crate::middleware::chain::NewMiddlewareChain; @@ -28,9 +29,9 @@ use crate::state::{request_id, State}; pub trait PipelineHandleChain

: RefUnwindSafe { /// Invokes this part of the `PipelineHandleChain`, with requests being passed through to `f` /// once all `Middleware` in the `Pipeline` have passed the request through. - fn call(&self, pipelines: &PipelineSet

, state: State, f: F) -> Box + fn call(&self, pipelines: &PipelineSet

, state: State, f: F) -> Pin> where - F: FnOnce(State) -> Box + Send + 'static; + F: FnOnce(State) -> Pin> + Send + 'static; } /// Part of a `PipelineHandleChain` which references a `Pipeline` and continues with a tail element. @@ -42,16 +43,16 @@ where P: Lookup, N>, N: RefUnwindSafe, { - fn call(&self, pipelines: &PipelineSet

, state: State, f: F) -> Box + fn call(&self, pipelines: &PipelineSet

, state: State, f: F) -> Pin> where - F: FnOnce(State) -> Box + Send + 'static, + F: FnOnce(State) -> Pin> + Send + 'static, { let (handle, ref chain) = *self; match pipelines.borrow(handle).construct() { Ok(p) => chain.call(pipelines, state, move |state| p.call(state, f)), Err(e) => { trace!("[{}] error borrowing pipeline", request_id(&state)); - Box::new(future::err((state, e.into_handler_error()))) + future::err((state, e.into_handler_error())).boxed() } } } @@ -59,9 +60,9 @@ where /// The marker for the end of a `PipelineHandleChain`. impl

PipelineHandleChain

for () { - fn call(&self, _: &PipelineSet

, state: State, f: F) -> Box + fn call(&self, _: &PipelineSet

, state: State, f: F) -> Pin> where - F: FnOnce(State) -> Box + Send + 'static, + F: FnOnce(State) -> Pin> + Send + 'static, { trace!("[{}] start pipeline", request_id(&state)); f(state) diff --git a/gotham/src/pipeline/mod.rs b/gotham/src/pipeline/mod.rs index 555398461..ff1df1d3d 100644 --- a/gotham/src/pipeline/mod.rs +++ b/gotham/src/pipeline/mod.rs @@ -6,6 +6,7 @@ pub mod single; use log::trace; use std::io; +use std::pin::Pin; use crate::handler::HandlerFuture; use crate::middleware::chain::{MiddlewareChain, NewMiddlewareChain}; @@ -28,6 +29,8 @@ use crate::state::{request_id, State}; /// # extern crate hyper; /// # extern crate mime; /// # +/// # use std::pin::Pin; +/// # /// # use gotham::helpers::http::response::create_response; /// # use gotham::state::State; /// # use gotham::handler::HandlerFuture; @@ -49,8 +52,8 @@ use crate::state::{request_id, State}; /// impl Middleware for MiddlewareOne { /// // Implementation elided. /// // Appends `1` to `MiddlewareData.vec` -/// # fn call(self, mut state: State, chain: Chain) -> Box -/// # where Chain: FnOnce(State) -> Box + Send + 'static +/// # fn call(self, mut state: State, chain: Chain) -> Pin> +/// # where Chain: FnOnce(State) -> Pin> + Send + 'static /// # { /// # state.put(MiddlewareData { vec: vec![1] }); /// # chain(state) @@ -63,8 +66,8 @@ use crate::state::{request_id, State}; /// impl Middleware for MiddlewareTwo { /// // Implementation elided. /// // Appends `2` to `MiddlewareData.vec` -/// # fn call(self, mut state: State, chain: Chain) -> Box -/// # where Chain: FnOnce(State) -> Box + Send + 'static +/// # fn call(self, mut state: State, chain: Chain) -> Pin> +/// # where Chain: FnOnce(State) -> Pin> + Send + 'static /// # { /// # state.borrow_mut::().vec.push(2); /// # chain(state) @@ -77,8 +80,8 @@ use crate::state::{request_id, State}; /// impl Middleware for MiddlewareThree { /// // Implementation elided. /// // Appends `3` to `MiddlewareData.vec` -/// # fn call(self, mut state: State, chain: Chain) -> Box -/// # where Chain: FnOnce(State) -> Box + Send + 'static +/// # fn call(self, mut state: State, chain: Chain) -> Pin> +/// # where Chain: FnOnce(State) -> Pin> + Send + 'static /// # { /// # state.borrow_mut::().vec.push(3); /// # chain(state) @@ -152,9 +155,9 @@ where { /// Serves a request using this `PipelineInstance`. Requests that pass through all `Middleware` /// will be served with the `f` function. - fn call(self, state: State, f: F) -> Box + fn call(self, state: State, f: F) -> Pin> where - F: FnOnce(State) -> Box + Send + 'static, + F: FnOnce(State) -> Pin> + Send + 'static, { trace!("[{}] calling middleware", request_id(&state)); self.chain.call(state, f) @@ -188,6 +191,8 @@ where /// # #[macro_use] /// # extern crate gotham_derive; /// # +/// # use std::pin::Pin; +/// # /// # use gotham::state::State; /// # use gotham::handler::HandlerFuture; /// # use gotham::middleware::Middleware; @@ -203,24 +208,24 @@ where /// # struct MiddlewareThree; /// # /// # impl Middleware for MiddlewareOne { -/// # fn call(self, state: State, chain: Chain) -> Box -/// # where Chain: FnOnce(State) -> Box + Send + 'static +/// # fn call(self, state: State, chain: Chain) -> Pin> +/// # where Chain: FnOnce(State) -> Pin> + Send + 'static /// # { /// # chain(state) /// # } /// # } /// # /// # impl Middleware for MiddlewareTwo { -/// # fn call(self, state: State, chain: Chain) -> Box -/// # where Chain: FnOnce(State) -> Box + Send + 'static +/// # fn call(self, state: State, chain: Chain) -> Pin> +/// # where Chain: FnOnce(State) -> Pin> + Send + 'static /// # { /// # chain(state) /// # } /// # } /// # /// # impl Middleware for MiddlewareThree { -/// # fn call(self, state: State, chain: Chain) -> Box -/// # where Chain: FnOnce(State) -> Box + Send + 'static +/// # fn call(self, state: State, chain: Chain) -> Pin> +/// # where Chain: FnOnce(State) -> Pin> + Send + 'static /// # { /// # chain(state) /// # } @@ -287,7 +292,7 @@ where mod tests { use super::*; - use futures::future; + use futures::prelude::*; use hyper::{Body, Response, StatusCode}; use crate::handler::{Handler, IntoHandlerError}; @@ -320,9 +325,9 @@ mod tests { } impl Middleware for Number { - fn call(self, mut state: State, chain: Chain) -> Box + fn call(self, mut state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box + Send + 'static, + Chain: FnOnce(State) -> Pin> + Send + 'static, Self: Sized, { state.put(self.clone()); @@ -345,9 +350,9 @@ mod tests { } impl Middleware for Addition { - fn call(self, mut state: State, chain: Chain) -> Box + fn call(self, mut state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box + Send + 'static, + Chain: FnOnce(State) -> Pin> + Send + 'static, Self: Sized, { state.borrow_mut::().value += self.value; @@ -368,9 +373,9 @@ mod tests { } impl Middleware for Multiplication { - fn call(self, mut state: State, chain: Chain) -> Box + fn call(self, mut state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box + 'static, + Chain: FnOnce(State) -> Pin> + 'static, Self: Sized, { state.borrow_mut::().value *= self.value; @@ -393,7 +398,7 @@ mod tests { Ok(move |state| match pipeline.construct() { Ok(p) => p.call(state, |state| handler.handle(state)), - Err(e) => Box::new(future::err((state, e.into_handler_error()))), + Err(e) => future::err((state, e.into_handler_error())).boxed(), }) }) .unwrap(); diff --git a/gotham/src/plain.rs b/gotham/src/plain.rs index 80acd2e8d..1e54251c1 100644 --- a/gotham/src/plain.rs +++ b/gotham/src/plain.rs @@ -1,4 +1,4 @@ -use futures::{Future, IntoFuture}; +use futures::prelude::*; use log::info; use std::net::ToSocketAddrs; use tokio::runtime::TaskExecutor; @@ -25,7 +25,7 @@ where { let runtime = new_runtime(threads); start_on_executor(addr, new_handler, runtime.executor()); - runtime.shutdown_on_idle().wait().unwrap(); + runtime.shutdown_on_idle(); } /// Starts a Gotham application with a designated backing `TaskExecutor`. @@ -44,19 +44,23 @@ where /// This is used internally, but exposed in case the developer intends on doing any /// manual wiring that isn't supported by the Gotham API. It's unlikely that this will /// be required in most use cases; it's mainly exposed for shutdown handling. -pub fn init_server(addr: A, new_handler: NH) -> impl Future +pub fn init_server(addr: A, new_handler: NH) -> impl Future where NH: NewHandler + 'static, A: ToSocketAddrs + 'static, { - let listener = tcp_listener(addr); - let addr = listener.local_addr().unwrap(); + tcp_listener(addr) + .map_err(|_| ()) + .and_then(|listener| { + let addr = listener.local_addr().unwrap(); - info!( - target: "gotham::start", - " Gotham listening on http://{}", - addr - ); + info!( + target: "gotham::start", + " Gotham listening on http://{}", + addr + ); - bind_server(listener, new_handler, |tcp| Ok(tcp).into_future()) + bind_server(listener, new_handler, future::ok) + }) + .map(|_| ()) // Ignore the result } diff --git a/gotham/src/plain/test.rs b/gotham/src/plain/test.rs index 587df5029..7828d82dd 100644 --- a/gotham/src/plain/test.rs +++ b/gotham/src/plain/test.rs @@ -3,20 +3,23 @@ //! See the `TestServer` type for example usage. use std::net::{self, IpAddr, SocketAddr}; +use std::panic::UnwindSafe; +use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use failure; +use failure::ResultExt; use log::info; -use futures::{Future, IntoFuture}; +use futures::prelude::*; use hyper::client::{ connect::{Connect, Connected, Destination}, Client, }; use tokio::net::TcpListener; use tokio::runtime::Runtime; -use tokio::timer::Delay; +use tokio::timer::{delay, Delay}; use tokio::net::TcpStream; @@ -72,18 +75,29 @@ impl Clone for TestServer { impl test::Server for TestServer { fn request_expiry(&self) -> Delay { - Delay::new(Instant::now() + Duration::from_secs(self.data.timeout)) + delay(Instant::now() + Duration::from_secs(self.data.timeout)) } fn run_future(&self, future: F) -> Result where - F: Send + 'static + Future, + F: Send + 'static + Future>, R: Send + 'static, E: failure::Fail, { - let (tx, rx) = futures::sync::oneshot::channel(); - self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); - rx.wait().unwrap().map_err(Into::into) + let (tx, rx) = futures::channel::oneshot::channel(); + self.spawn( + future + .then(move |r| future::ready(tx.send(r).map_err(|_| unreachable!()))) + .then(|_| future::ready(())), // ignore the result for spawn + ); + + self.data + .runtime + .write() + .expect("unable to acquire write lock") + .block_on(rx) + .unwrap() + .map_err(Into::into) } } @@ -93,7 +107,10 @@ impl TestServer { /// for each connection. /// /// Timeout will be set to 10 seconds. - pub fn new(new_handler: NH) -> Result { + pub fn new(new_handler: NH) -> Result + where + NH::Instance: UnwindSafe, + { TestServer::with_timeout(new_handler, 10) } @@ -101,13 +118,19 @@ impl TestServer { pub fn with_timeout( new_handler: NH, timeout: u64, - ) -> Result { - let mut runtime = Runtime::new()?; - let listener = TcpListener::bind(&"127.0.0.1:0".parse()?)?; + ) -> Result + where + NH::Instance: UnwindSafe, + { + let runtime = Runtime::new()?; + // TODO: Fix this into an async flow + let listener = runtime.block_on(TcpListener::bind( + "127.0.0.1:0".parse::().compat()?, + ))?; let addr = listener.local_addr()?; - let service_stream = super::bind_server(listener, new_handler, |tcp| Ok(tcp).into_future()); - runtime.spawn(service_stream); + let service_stream = super::bind_server(listener, new_handler, future::ok); + runtime.spawn(service_stream.then(|_| future::ready(()))); // Ignore the result let data = TestServerData { addr, @@ -132,7 +155,7 @@ impl TestServer { /// tests. pub fn spawn(&self, fut: F) where - F: Future + Send + 'static, + F: Future + Send + 'static, { self.data .runtime @@ -172,6 +195,7 @@ impl TestServer { /// `TestConnect` represents the connection between a test client and the `TestServer` instance /// that created it. This type should never be used directly. +#[derive(Clone)] pub struct TestConnect { pub(crate) addr: SocketAddr, } @@ -179,16 +203,18 @@ pub struct TestConnect { impl Connect for TestConnect { type Transport = TcpStream; type Error = CompatError; - type Future = - Box + Send + Sync>; - + type Future = Pin< + Box< + dyn Future> + + Send, + >, + >; fn connect(&self, _dst: Destination) -> Self::Future { - Box::new( - TcpStream::connect(&self.addr) - .inspect(|s| info!("Client TcpStream connected: {:?}", s)) - .map(|s| (s, Connected::new())) - .map_err(|e| Error::from(e).compat()), - ) + TcpStream::connect(self.addr) + .inspect(|s| info!("Client TcpStream connected: {:?}", s)) + .map_ok(|s| (s, Connected::new())) + .map_err(|e| Error::from(e).compat()) + .boxed() } } @@ -205,7 +231,6 @@ mod tests { use crate::handler::{Handler, HandlerFuture, IntoHandlerError, NewHandler}; use crate::helpers::http::response::create_response; use crate::state::{client_addr, FromState, State}; - use futures::{future, Stream}; use http::header::CONTENT_TYPE; use log::info; @@ -215,7 +240,7 @@ mod tests { } impl Handler for TestHandler { - fn handle(self, state: State) -> Box { + fn handle(self, state: State) -> Pin> { let path = Uri::borrow_from(&state).path().to_owned(); match path.as_str() { "/" => { @@ -225,11 +250,17 @@ mod tests { .body(self.response.clone().into()) .unwrap(); - Box::new(future::ok((state, response))) + future::ok((state, response)).boxed() } "/timeout" => { + // TODO: What is this supposed to return? It previously returned nothing which isn't a timeout + let response = Response::builder() + .status(StatusCode::REQUEST_TIMEOUT) + .body(Body::default()) + .unwrap(); + info!("TestHandler responding to /timeout"); - Box::new(future::empty()) + future::ok((state, response)).boxed() } "/myaddr" => { info!("TestHandler responding to /myaddr"); @@ -238,7 +269,7 @@ mod tests { .body(format!("{}", client_addr(&state).unwrap()).into()) .unwrap(); - Box::new(future::ok((state, response))) + future::ok((state, response)).boxed() } _ => unreachable!(), } @@ -334,9 +365,9 @@ mod tests { #[test] fn async_echo() { - fn handler(mut state: State) -> Box { - let f = Body::take_from(&mut state) - .concat2() + fn handler(mut state: State) -> Pin> { + Body::take_from(&mut state) + .try_concat() .then(move |full_body| match full_body { Ok(body) => { let resp_data = body.to_vec(); @@ -346,9 +377,8 @@ mod tests { } Err(e) => future::err((state, e.into_handler_error())), - }); - - Box::new(f) + }) + .boxed() } let server = TestServer::new(|| Ok(handler)).unwrap(); diff --git a/gotham/src/router/builder/draw.rs b/gotham/src/router/builder/draw.rs index 712503227..e86745e0d 100644 --- a/gotham/src/router/builder/draw.rs +++ b/gotham/src/router/builder/draw.rs @@ -950,8 +950,9 @@ where #[cfg(test)] mod tests { use std::io; + use std::pin::Pin; - use futures::future; + use futures::prelude::*; use hyper::{Body, Response, StatusCode}; use crate::handler::HandlerFuture; @@ -975,9 +976,9 @@ mod tests { } impl Middleware for QuickExitMiddleware { - fn call(self, state: State, _chain: Chain) -> Box + fn call(self, state: State, _chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box + 'static, + Chain: FnOnce(State) -> Pin> + 'static, { let f = future::ok(( state, @@ -987,7 +988,7 @@ mod tests { .unwrap(), )); - Box::new(f) + f.boxed() } } diff --git a/gotham/src/router/builder/mod.rs b/gotham/src/router/builder/mod.rs index b79815e28..30ca1b69b 100644 --- a/gotham/src/router/builder/mod.rs +++ b/gotham/src/router/builder/mod.rs @@ -316,7 +316,7 @@ where mod tests { use super::*; - use futures::{Future, Stream}; + use futures::prelude::*; use hyper::service::Service; use hyper::{Body, Request, Response, StatusCode}; use serde_derive::Deserialize; @@ -538,7 +538,7 @@ mod tests { let call = move |req| { let mut service = new_service.connect("127.0.0.1:10000".parse().unwrap()); - service.call(req).wait().unwrap() + futures::executor::block_on(service.call(req)).unwrap() }; let response = call(Request::get("/").body(Body::empty()).unwrap()); @@ -549,7 +549,9 @@ mod tests { let response = call(Request::get("/hello/world").body(Body::empty()).unwrap()); assert_eq!(response.status(), StatusCode::OK); - let response_bytes = response.into_body().concat2().wait().unwrap().to_vec(); + let response_bytes = futures::executor::block_on(response.into_body().try_concat()) + .unwrap() + .to_vec(); assert_eq!(&String::from_utf8(response_bytes).unwrap(), "Hello, world!"); let response = call( @@ -558,17 +560,23 @@ mod tests { .unwrap(), ); assert_eq!(response.status(), StatusCode::OK); - let response_bytes = response.into_body().concat2().wait().unwrap().to_vec(); + let response_bytes = futures::executor::block_on(response.into_body().try_concat()) + .unwrap() + .to_vec(); assert_eq!(&String::from_utf8(response_bytes).unwrap(), "Globbed"); let response = call(Request::get("/delegated/b").body(Body::empty()).unwrap()); assert_eq!(response.status(), StatusCode::OK); - let response_bytes = response.into_body().concat2().wait().unwrap().to_vec(); + let response_bytes = futures::executor::block_on(response.into_body().try_concat()) + .unwrap() + .to_vec(); assert_eq!(&String::from_utf8(response_bytes).unwrap(), "Delegated"); let response = call(Request::get("/goodbye/world").body(Body::empty()).unwrap()); assert_eq!(response.status(), StatusCode::OK); - let response_bytes = response.into_body().concat2().wait().unwrap().to_vec(); + let response_bytes = futures::executor::block_on(response.into_body().try_concat()) + .unwrap() + .to_vec(); assert_eq!( &String::from_utf8(response_bytes).unwrap(), "Goodbye, world!" @@ -589,7 +597,9 @@ mod tests { let response = call(Request::get("/add?x=16&y=71").body(Body::empty()).unwrap()); assert_eq!(response.status(), StatusCode::OK); - let response_bytes = response.into_body().concat2().wait().unwrap().to_vec(); + let response_bytes = futures::executor::block_on(response.into_body().try_concat()) + .unwrap() + .to_vec(); assert_eq!(&String::from_utf8(response_bytes).unwrap(), "16 + 71 = 87"); let response = call(Request::post("/resource").body(Body::empty()).unwrap()); @@ -603,7 +613,9 @@ mod tests { let response = call(Request::get("/resource").body(Body::empty()).unwrap()); assert_eq!(response.status(), StatusCode::OK); - let response_bytes = response.into_body().concat2().wait().unwrap().to_vec(); + let response_bytes = futures::executor::block_on(response.into_body().try_concat()) + .unwrap() + .to_vec(); assert_eq!(&response_bytes[..], b"It's a resource."); let response = call( diff --git a/gotham/src/router/builder/single.rs b/gotham/src/router/builder/single.rs index bcdc63281..8f9f3ba03 100644 --- a/gotham/src/router/builder/single.rs +++ b/gotham/src/router/builder/single.rs @@ -116,8 +116,10 @@ pub trait DefineSingleRoute { /// # extern crate hyper; /// # extern crate futures; /// # + /// # use std::pin::Pin; + /// # /// # use hyper::{Body, Response, StatusCode}; - /// # use futures::future; + /// # use futures::prelude::*; /// # use gotham::handler::{Handler, HandlerFuture, NewHandler}; /// # use gotham::state::State; /// # use gotham::router::Router; @@ -140,10 +142,10 @@ pub trait DefineSingleRoute { /// } /// /// impl Handler for MyHandler { - /// fn handle(self, state: State) -> Box { + /// fn handle(self, state: State) -> Pin> { /// // Handler implementation elided. /// # let response = Response::builder().status(StatusCode::ACCEPTED).body(Body::empty()).unwrap(); - /// # Box::new(future::ok((state, response))) + /// # future::ok((state, response)).boxed() /// } /// } /// # diff --git a/gotham/src/router/mod.rs b/gotham/src/router/mod.rs index c79ccb312..ec1120c58 100644 --- a/gotham/src/router/mod.rs +++ b/gotham/src/router/mod.rs @@ -6,9 +6,11 @@ pub mod response; pub mod route; pub mod tree; +use std::pin::Pin; use std::sync::Arc; -use futures::{future, Future}; +use futures::prelude::*; + use hyper::header::ALLOW; use hyper::{Body, Response, StatusCode}; use log::{error, trace}; @@ -67,7 +69,7 @@ impl NewHandler for Router { impl Handler for Router { /// Handles the `Request` by determining the correct `Route` from the internal `Tree`, storing /// any path related variables in `State` and dispatching to the associated `Handler`. - fn handle(self, mut state: State) -> Box { + fn handle(self, mut state: State) -> Pin> { trace!("[{}] starting", request_id(&state)); let future = match state.try_take::() { @@ -99,19 +101,19 @@ impl Handler for Router { ); } } - Box::new(future::ok((state, res))) + future::ok((state, res)).boxed() } } } else { trace!("[{}] did not find routable node", request_id(&state)); let res = create_empty_response(&state, StatusCode::NOT_FOUND); - Box::new(future::ok((state, res))) + future::ok((state, res)).boxed() } } None => { trace!("[{}] invalid request path segments", request_id(&state)); let res = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR); - Box::new(future::ok((state, res))) + future::ok((state, res)).boxed() } }; @@ -142,7 +144,7 @@ impl Router { mut state: State, params: SegmentMapping<'a>, route: &Box + Send + Sync>, - ) -> Box { + ) -> Pin> { match route.extract_request_path(&mut state, params) { Ok(()) => { trace!("[{}] extracted request path", request_id(&state)); @@ -158,7 +160,7 @@ impl Router { let mut res = Response::new(Body::empty()); route.extend_response_on_query_string_error(&mut state, &mut res); - Box::new(future::ok((state, res))) + future::ok((state, res)).boxed() } } } @@ -169,14 +171,14 @@ impl Router { ); let mut res = Response::new(Body::empty()); route.extend_response_on_path_error(&mut state, &mut res); - Box::new(future::ok((state, res))) + future::ok((state, res)).boxed() } } } - fn finalize_response(&self, result: Box) -> Box { + fn finalize_response(&self, result: Pin>) -> Pin> { let response_finalizer = self.data.response_finalizer.clone(); - let f = result + result .or_else(|(state, err)| { trace!( "[{}] converting error into http response \ @@ -190,9 +192,8 @@ impl Router { .and_then(move |(state, res)| { trace!("[{}] handler complete", request_id(&state)); response_finalizer.finalize(state, res) - }); - - Box::new(f) + }) + .boxed() } } @@ -233,7 +234,7 @@ mod tests { state.put(HeaderMap::new()); set_request_id(&mut state); - r.handle(state).wait() + futures::executor::block_on(r.handle(state)) } #[test] @@ -251,7 +252,7 @@ mod tests { state.put(HeaderMap::new()); set_request_id(&mut state); - match router.handle(state).wait() { + match futures::executor::block_on(router.handle(state)) { Ok((_state, res)) => { assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR); } diff --git a/gotham/src/router/response/finalizer.rs b/gotham/src/router/response/finalizer.rs index bfee55a4c..535e9f5f6 100644 --- a/gotham/src/router/response/finalizer.rs +++ b/gotham/src/router/response/finalizer.rs @@ -2,9 +2,10 @@ //! and internal extenders have completed. use std::collections::HashMap; +use std::pin::Pin; use std::sync::Arc; -use futures::future; +use futures::prelude::*; use hyper::{Body, Response, StatusCode}; use log::trace; @@ -63,7 +64,7 @@ impl ResponseFinalizerBuilder { impl ResponseFinalizer { /// Finalize the `Response` if a `ResponseFinalizer` has been supplied for the /// status code assigned to the `Response`. - pub fn finalize(&self, mut state: State, mut res: Response) -> Box { + pub fn finalize(&self, mut state: State, mut res: Response) -> Pin> { match self.data.get(&res.status()) { Some(extender) => { trace!( @@ -82,6 +83,6 @@ impl ResponseFinalizer { } } - Box::new(future::ok((state, res))) + future::ok((state, res)).boxed() } } diff --git a/gotham/src/router/route/dispatch.rs b/gotham/src/router/route/dispatch.rs index 305e61e26..fe213f315 100644 --- a/gotham/src/router/route/dispatch.rs +++ b/gotham/src/router/route/dispatch.rs @@ -1,8 +1,9 @@ //! Defines the route `Dispatcher` and supporting types. -use futures::future; +use futures::prelude::*; use log::trace; use std::panic::RefUnwindSafe; +use std::pin::Pin; use crate::handler::{Handler, HandlerFuture, IntoHandlerError, NewHandler}; use crate::pipeline::chain::PipelineHandleChain; @@ -12,7 +13,7 @@ use crate::state::{request_id, State}; /// Used by `Router` to dispatch requests via pipelines and finally into the configured `Handler`. pub trait Dispatcher: RefUnwindSafe { /// Dispatches a request via pipelines and `Handler` represented by this `Dispatcher`. - fn dispatch(&self, state: State) -> Box; + fn dispatch(&self, state: State) -> Pin>; } /// Default implementation of the `Dispatcher` trait. @@ -57,7 +58,7 @@ where C: PipelineHandleChain

, P: RefUnwindSafe, { - fn dispatch(&self, state: State) -> Box { + fn dispatch(&self, state: State) -> Pin> { match self.new_handler.new_handler() { Ok(h) => { trace!("[{}] cloning handler", request_id(&state)); @@ -66,7 +67,7 @@ where } Err(e) => { trace!("[{}] error cloning handler", request_id(&state)); - Box::new(future::err((state, e.compat().into_handler_error()))) + future::err((state, e.compat().into_handler_error())).boxed() } } } @@ -111,9 +112,9 @@ mod tests { } impl Middleware for Number { - fn call(self, mut state: State, chain: Chain) -> Box + fn call(self, mut state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box + Send + 'static, + Chain: FnOnce(State) -> Pin> + Send + 'static, Self: Sized, { state.put(self.clone()); @@ -136,9 +137,9 @@ mod tests { } impl Middleware for Addition { - fn call(self, mut state: State, chain: Chain) -> Box + fn call(self, mut state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box + Send + 'static, + Chain: FnOnce(State) -> Pin> + Send + 'static, Self: Sized, { state.borrow_mut::().value += self.value; @@ -159,9 +160,9 @@ mod tests { } impl Middleware for Multiplication { - fn call(self, mut state: State, chain: Chain) -> Box + fn call(self, mut state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box + Send + 'static, + Chain: FnOnce(State) -> Pin> + Send + 'static, Self: Sized, { state.borrow_mut::().value *= self.value; diff --git a/gotham/src/router/route/mod.rs b/gotham/src/router/route/mod.rs index 0b2d1844c..ee0bc3e9a 100644 --- a/gotham/src/router/route/mod.rs +++ b/gotham/src/router/route/mod.rs @@ -9,6 +9,7 @@ pub mod matcher; use std::marker::PhantomData; use std::panic::RefUnwindSafe; +use std::pin::Pin; use hyper::{Body, Response, Uri}; use log::debug; @@ -81,7 +82,7 @@ pub trait Route: RefUnwindSafe { /// Dispatches the request to this `Route`, which will execute the pipelines and the handler /// assigned to the `Route. - fn dispatch(&self, state: State) -> Box; + fn dispatch(&self, state: State) -> Pin>; } /// Returned in the `Err` variant from `extract_query_string` or `extract_request_path`, this @@ -165,7 +166,7 @@ where self.delegation } - fn dispatch(&self, state: State) -> Box { + fn dispatch(&self, state: State) -> Pin> { self.dispatcher.dispatch(state) } @@ -220,7 +221,7 @@ where mod tests { use super::*; - use futures::Async; + use futures::prelude::*; use hyper::{HeaderMap, Method, StatusCode, Uri}; use std::str::FromStr; @@ -252,12 +253,10 @@ mod tests { state.put(Method::GET); set_request_id(&mut state); - match route.dispatch(state).poll() { - Ok(Async::Ready((_state, response))) => { - assert_eq!(response.status(), StatusCode::ACCEPTED) - } - Ok(Async::NotReady) => panic!("expected future to be completed already"), - Err((_state, e)) => panic!("error polling future: {}", e), + match route.dispatch(state).now_or_never() { + Some(Ok((_state, response))) => assert_eq!(response.status(), StatusCode::ACCEPTED), + Some(Err((_state, e))) => panic!("error polling future: {}", e), + None => panic!("expected future to be completed already"), } } @@ -286,12 +285,10 @@ mod tests { state.put(RequestPathSegments::new("/")); set_request_id(&mut state); - match route.dispatch(state).poll() { - Ok(Async::Ready((_state, response))) => { - assert_eq!(response.status(), StatusCode::ACCEPTED) - } - Ok(Async::NotReady) => panic!("expected future to be completed already"), - Err((_state, e)) => panic!("error polling future: {}", e), + match route.dispatch(state).now_or_never() { + Some(Ok((_state, response))) => assert_eq!(response.status(), StatusCode::ACCEPTED), + Some(Err((_state, e))) => panic!("error polling future: {}", e), + None => panic!("expected future to be completed already"), } } } diff --git a/gotham/src/service/mod.rs b/gotham/src/service/mod.rs index 654daf721..9d649469b 100644 --- a/gotham/src/service/mod.rs +++ b/gotham/src/service/mod.rs @@ -3,18 +3,21 @@ use std::net::SocketAddr; use std::panic::AssertUnwindSafe; +use std::pin::Pin; use std::sync::Arc; use std::thread; use failure; -use futures::Future; +use futures::prelude::*; +use futures::task::{self, Poll}; use http::request; -use hyper::service::Service; use hyper::{Body, Request, Response}; use log::debug; +use tower_service::Service as TowerService; use crate::handler::NewHandler; + use crate::helpers::http::request::path::RequestPathSegments; use crate::state::client_addr::put_client_addr; use crate::state::{set_request_id, State}; @@ -58,16 +61,22 @@ where client_addr: SocketAddr, } -impl Service for ConnectedGothamService +impl TowerService> for ConnectedGothamService where T: NewHandler, { - type ReqBody = Body; // required by hyper::server::conn::Http::serve_connection() - type ResBody = Body; // has to impl Payload... + type Response = Response; type Error = failure::Compat; // :Into> - type Future = Box, Error = Self::Error> + Send>; + type Future = Pin> + Send>>; + + fn poll_ready( + &mut self, + _cx: &mut task::Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { let mut state = State::new(); put_client_addr(&mut state, self.client_addr); @@ -129,7 +138,7 @@ mod tests { let f = service .connect("127.0.0.1:10000".parse().unwrap()) .call(req); - let response = f.wait().unwrap(); + let response = futures::executor::block_on(f).unwrap(); assert_eq!(response.status(), StatusCode::ACCEPTED); } @@ -147,7 +156,7 @@ mod tests { let f = service .connect("127.0.0.1:10000".parse().unwrap()) .call(req); - let response = f.wait().unwrap(); + let response = futures::executor::block_on(f).unwrap(); assert_eq!(response.status(), StatusCode::ACCEPTED); } } diff --git a/gotham/src/service/trap.rs b/gotham/src/service/trap.rs index 2f3d1b4d2..776950a6f 100644 --- a/gotham/src/service/trap.rs +++ b/gotham/src/service/trap.rs @@ -1,14 +1,14 @@ //! Defines functionality for processing a request and trapping errors and panics in response //! generation. -use std::any::Any; use std::error::Error; -use std::panic::{catch_unwind, AssertUnwindSafe}; -use std::{io, mem}; +use std::panic::catch_unwind; +use std::panic::AssertUnwindSafe; +use std::pin::Pin; use failure; -use futures::future::{self, Future, FutureResult, IntoFuture}; -use futures::Async; +use futures::prelude::*; + use hyper::{Body, Response, StatusCode}; use log::error; @@ -26,42 +26,47 @@ type CompatError = failure::Compat; pub(super) fn call_handler<'a, T>( t: &T, state: AssertUnwindSafe, -) -> Box, Error = CompatError> + Send + 'a> +) -> Pin, CompatError>> + Send + 'a>> where T: NewHandler + 'a, { - let res = catch_unwind(move || { - // Hyper doesn't allow us to present an affine-typed `Handler` interface directly. We have - // to emulate the promise given by hyper's documentation, by creating a `Handler` value and - // immediately consuming it. - t.new_handler() - .into_future() - .map_err(failure::Error::compat) - .and_then(move |handler| { - let AssertUnwindSafe(state) = state; - - handler.handle(state).then(move |result| match result { - Ok((_state, res)) => future::ok(res), - Err((state, err)) => finalize_error_response(state, err), - }) - }) - }); + // Need to consume the NewHandler eagerly (vs lazy) since its borrowed + // The rest of the processing occurs in a future + match catch_unwind(move || t.new_handler()) { + Ok(handler) => { + let res = future::ready(handler) + .map_err(failure::Error::compat) + .and_then(move |handler| { + let AssertUnwindSafe(state) = state; + + handler.handle(state).then(move |result| match result { + Ok((_state, res)) => { + future::ok::<_, CompatError>(res).err_into().left_future() + } + Err((state, err)) => finalize_error_response(state, err) + .err_into() + .right_future(), + }) + }); - if let Ok(f) = res { - return Box::new( - UnwindSafeFuture::new(f) + AssertUnwindSafe(res) .catch_unwind() - .then(finalize_catch_unwind_response), // must be Future - ); + .then(|unwind_result| match unwind_result { + Ok(result) => finalize_catch_unwind_response(result).left_future(), + Err(_) => finalize_panic_response().right_future(), + }) + .left_future() + } + // Pannicked creating the handler from NewHandler + Err(_) => finalize_panic_response().right_future(), } - - Box::new(finalize_panic_response()) + .boxed() } fn finalize_error_response( state: State, err: HandlerError, -) -> FutureResult, CompatError> { +) -> impl Future, CompatError>> { { // HandlerError::source() is far more interesting for logging, but the // API doesn't guarantee its presence (even though it always is). @@ -79,7 +84,7 @@ fn finalize_error_response( future::ok(err.into_response(&state)) } -fn finalize_panic_response() -> FutureResult, CompatError> { +fn finalize_panic_response() -> impl Future, CompatError>> { error!("[PANIC][A panic occurred while invoking the handler]"); future::ok( @@ -91,78 +96,19 @@ fn finalize_panic_response() -> FutureResult, CompatError> { } fn finalize_catch_unwind_response( - result: Result, CompatError>, Box>, -) -> FutureResult, CompatError> { - let response = result - .unwrap_or_else(|_| { - let e = io::Error::new( - io::ErrorKind::Other, - "Attempting to poll the future caused a panic", - ); - - Err(failure::Error::from(e).compat()) - }) - .unwrap_or_else(|_| { - error!("[PANIC][A panic occurred while polling the future]"); - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::default()) - .unwrap() - }); + result: Result, CompatError>, +) -> impl Future, CompatError>> { + let response = result.unwrap_or_else(|_| { + error!("[PANIC][A panic occurred while polling the future]"); + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::default()) + .unwrap() + }); future::ok(response) } -/// Wraps a future to ensure that a panic does not escape and terminate the event loop. -enum UnwindSafeFuture -where - F: Future + Send, -{ - /// The future is available for polling. - Available(AssertUnwindSafe), - - /// The future has been poisoned because a previous call to `poll` caused a panic. - Poisoned, -} - -impl Future for UnwindSafeFuture -where - F: Future + Send, -{ - type Item = F::Item; - type Error = CompatError; - - fn poll(&mut self) -> Result, CompatError> { - // Mark as poisoned in case `f.poll()` panics below. - match mem::replace(self, UnwindSafeFuture::Poisoned) { - UnwindSafeFuture::Available(mut f) => { - let r = f.poll(); - // Replace with the original value again, now that the potential panic has not - // occurred. This allows for a poll to occur next time. - *self = UnwindSafeFuture::Available(f); - r - } - UnwindSafeFuture::Poisoned => { - let e = io::Error::new( - io::ErrorKind::Other, - "Poisoned future due to previous panic", - ); - - Err(failure::Error::from(e).compat()) - } - } - } -} - -impl UnwindSafeFuture -where - F: Future + Send, -{ - fn new(f: F) -> UnwindSafeFuture { - UnwindSafeFuture::Available(AssertUnwindSafe(f)) - } -} - #[cfg(test)] mod tests { use super::*; @@ -171,6 +117,7 @@ mod tests { use hyper::{HeaderMap, Method, StatusCode}; + use crate::error::Result; use crate::handler::{HandlerFuture, IntoHandlerError}; use crate::helpers::http::response::create_empty_response; use crate::state::set_request_id; @@ -190,7 +137,7 @@ mod tests { set_request_id(&mut state); let r = call_handler(&new_handler, AssertUnwindSafe(state)); - let response = r.wait().unwrap(); + let response = futures::executor::block_on(r).unwrap(); assert_eq!(response.status(), StatusCode::ACCEPTED); } @@ -198,16 +145,16 @@ mod tests { fn async_success_repeat_poll() { let new_handler = || { Ok(|state| { - let f = future::lazy(move || { + let f = future::lazy(move |_| { let res = create_empty_response(&state, StatusCode::ACCEPTED); - future::ok((state, res)) + Ok((state, res)) }); - let f = future::lazy(move || f); - let f = future::lazy(move || f); - let f = future::lazy(move || f); + let f = f.map(|v| v); + let f = f.map(|v| v); + let f = f.map(|v| v); - Box::new(f) as Box + f.boxed() }) }; @@ -217,7 +164,7 @@ mod tests { set_request_id(&mut state); let r = call_handler(&new_handler, AssertUnwindSafe(state)); - let response = r.wait().unwrap(); + let response = futures::executor::block_on(r).unwrap(); assert_eq!(response.status(), StatusCode::ACCEPTED); } @@ -225,10 +172,7 @@ mod tests { fn error() { let new_handler = || { Ok(|state| { - Box::new(future::err(( - state, - io::Error::last_os_error().into_handler_error(), - ))) as Box + future::err((state, io::Error::last_os_error().into_handler_error())).boxed() }) }; @@ -238,7 +182,7 @@ mod tests { set_request_id(&mut state); let r = call_handler(&new_handler, AssertUnwindSafe(state)); - let response = r.wait().unwrap(); + let response = futures::executor::block_on(r).unwrap(); assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); } @@ -246,7 +190,7 @@ mod tests { fn panic() { let new_handler = || { Ok(|_| { - let val: Option> = None; + let val: Option>> = None; val.expect("test panic") }) }; @@ -257,18 +201,13 @@ mod tests { set_request_id(&mut state); let r = call_handler(&new_handler, AssertUnwindSafe(state)); - let response = r.wait().unwrap(); + let response = futures::executor::block_on(r).unwrap(); assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); } #[test] fn async_panic() { - let new_handler = || { - Ok(|_| { - let val: Option> = None; - Box::new(future::lazy(move || val.expect("test panic"))) as Box - }) - }; + let new_handler = || Ok(|_| future::lazy(move |_| panic!("test panic")).boxed()); let mut state = State::new(); state.put(HeaderMap::new()); @@ -276,7 +215,7 @@ mod tests { set_request_id(&mut state); let r = call_handler(&new_handler, AssertUnwindSafe(state)); - let response = r.wait().unwrap(); + let response = futures::executor::block_on(r).unwrap(); assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); } @@ -284,12 +223,13 @@ mod tests { fn async_panic_repeat_poll() { let new_handler = || { Ok(|_| { - let val: Option> = None; - let f = future::lazy(move || val.expect("test panic")); - let f = future::lazy(move || f); - let f = future::lazy(move || f); - let f = future::lazy(move || f); - Box::new(f) as Box + let f = future::lazy(move |_| panic!("test panic")); + + let f = f.map(|v| v); + let f = f.map(|v| v); + let f = f.map(|v| v); + + f.boxed() }) }; @@ -299,7 +239,35 @@ mod tests { set_request_id(&mut state); let r = call_handler(&new_handler, AssertUnwindSafe(state)); - let response = r.wait().unwrap(); + let response = futures::executor::block_on(r).unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); + } + + #[test] + fn new_handler_panic() { + struct PanicNewHandler; + impl NewHandler for PanicNewHandler { + type Instance = Self; + + fn new_handler(&self) -> Result { + panic!("Pannicked creating a new handler"); + } + } + + impl Handler for PanicNewHandler { + fn handle(self, _state: State) -> Pin> { + unreachable!(); + } + } + + let mut state = State::new(); + state.put(HeaderMap::new()); + state.put(Method::GET); + set_request_id(&mut state); + + let new_handler = PanicNewHandler {}; + let r = call_handler(&new_handler, AssertUnwindSafe(state)); + let response = futures::executor::block_on(r).unwrap(); assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); } } diff --git a/gotham/src/test.rs b/gotham/src/test.rs index 1ac873bbd..6389709fc 100644 --- a/gotham/src/test.rs +++ b/gotham/src/test.rs @@ -5,10 +5,12 @@ use std::fmt; use std::ops::{Deref, DerefMut}; use failure::format_err; +use failure::ResultExt; -use futures::{future, Future, Stream}; +use futures::prelude::*; use http::HttpTryFrom; -use hyper::client::{connect::Connect, Client}; +use hyper::client::connect::Connect; +use hyper::client::Client; use hyper::header::CONTENT_TYPE; use hyper::{Body, Method, Response, Uri}; use log::warn; @@ -31,7 +33,7 @@ pub trait Server: Clone { /// Runs a Future until it resolves. fn run_future(&self, future: F) -> Result where - F: Send + 'static + Future, + F: Send + 'static + Future>, R: Send + 'static, E: failure::Fail; @@ -42,30 +44,28 @@ pub trait Server: Clone { /// /// If the future came from a different instance of `Server`, the event loop will run until /// the timeout is triggered. - fn run_request(&self, f: F) -> Result + fn run_request(&self, f: F) -> Result where - F: Future + Send + 'static, + F: TryFuture + Unpin + Send + 'static, F::Error: failure::Fail + Sized, - F::Item: Send, + F::Ok: Send, { - let might_expire = self.run_future(f.select2(self.request_expiry()).map_err(|either| { - let e: failure::Error = match either { - future::Either::A((req_err, _)) => { - warn!("run_request request error: {:?}", req_err); - req_err.into() - } - future::Either::B((times_up, _)) => { - warn!("run_request timed out"); - times_up.into() - } - }; - e.compat() - }))?; - - match might_expire { - future::Either::A((item, _)) => Ok(item), - future::Either::B(_) => Err(failure::err_msg("timed out")), - } + self.run_future( + // Race the timeout against the request future + future::try_select(f, self.request_expiry().then(future::ok::<_, F::Error>)) + // Map an error in either (though it can only occur in the request future) + .map_err(|either| either.factor_first().0.into()) + // Finally, map the Ok(Either) (left = request, right = timeout) to Ok/Err + .and_then(|might_expire| { + future::ready(match might_expire { + future::Either::Left((item, _)) => Ok(item), + future::Either::Right(_) => Err(failure::err_msg("timed out")), + }) + }) + .into_future() + // Finally, make the fail error compatible + .map(|result| result.compat()), + ) } } @@ -73,8 +73,8 @@ impl BodyReader for T { fn read_body(&mut self, response: Response) -> Result> { let f = response .into_body() - .concat2() - .map(|chunk| chunk.into_iter().collect()); + .try_concat() + .map_ok(|chunk| chunk.into_iter().collect()); self.run_future(f) } } diff --git a/gotham/src/test/request.rs b/gotham/src/test/request.rs index 5d0ef8e2f..f84a38411 100644 --- a/gotham/src/test/request.rs +++ b/gotham/src/test/request.rs @@ -2,12 +2,12 @@ use std::ops::Deref; use std::ops::DerefMut; use http::HttpTryFrom; +use hyper::client::connect::Connect; use hyper::header::{HeaderValue, IntoHeaderName}; use hyper::{Body, Method, Request, Uri}; use super::Server; use super::{TestClient, TestResponse}; -use hyper::client::connect::Connect; use crate::error::*; diff --git a/gotham/src/tls.rs b/gotham/src/tls.rs index 84a00c42a..5738d755c 100644 --- a/gotham/src/tls.rs +++ b/gotham/src/tls.rs @@ -1,10 +1,10 @@ -use futures::Future; +use futures::prelude::*; +use futures_rustls::{rustls, TlsAcceptor}; use log::{error, info}; use std::net::ToSocketAddrs; use std::sync::Arc; use tokio::net::TcpListener; use tokio::runtime::TaskExecutor; -use tokio_rustls::{rustls, TlsAcceptor}; use super::{bind_server, new_runtime, tcp_listener}; @@ -33,7 +33,7 @@ pub fn start_with_num_threads( { let runtime = new_runtime(threads); start_on_executor(addr, new_handler, tls_config, runtime.executor()); - runtime.shutdown_on_idle().wait().unwrap(); + runtime.shutdown_on_idle(); } /// Starts a Gotham application with a designated backing `TaskExecutor`. @@ -60,36 +60,41 @@ pub fn init_server( addr: A, new_handler: NH, tls_config: rustls::ServerConfig, -) -> impl Future +) -> impl Future where NH: NewHandler + 'static, A: ToSocketAddrs + 'static, { - let listener = tcp_listener(addr); - let addr = listener.local_addr().unwrap(); + tcp_listener(addr) + .map_err(|_| ()) + .and_then(|listener| { + let addr = listener.local_addr().unwrap(); - info!( - target: "gotham::start", - " Gotham listening on http://{}", - addr - ); + info!( + target: "gotham::start", + " Gotham listening on http://{}", + addr + ); - bind_server_rustls(listener, new_handler, tls_config) + bind_server_rustls(listener, new_handler, tls_config).map_err(|_| ()) + }) + .then(|_| future::ready(())) // Ignore the result } fn bind_server_rustls( listener: TcpListener, new_handler: NH, tls_config: rustls::ServerConfig, -) -> impl Future +) -> impl Future> where NH: NewHandler + 'static, { let tls = TlsAcceptor::from(Arc::new(tls_config)); bind_server(listener, new_handler, move |socket| { - tls.accept(socket).map_err(|e| { - error!(target: "gotham::tls", "TLS handshake error: {:?}", e); - () - }) + tls.accept(futures_tokio_compat::Compat::new(socket)) + .map_ok(futures_tokio_compat::Compat::new) + .map_err(|e| { + error!(target: "gotham::tls", "TLS handshake error: {:?}", e); + }) }) } diff --git a/gotham/src/tls/test.rs b/gotham/src/tls/test.rs index 5e4a6a595..376eabd1d 100644 --- a/gotham/src/tls/test.rs +++ b/gotham/src/tls/test.rs @@ -4,31 +4,33 @@ use std::io::BufReader; use std::net::{self, IpAddr, SocketAddr}; +use std::panic::UnwindSafe; +use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use failure; +use failure::Fail; use log::info; -use futures::Future; +use futures::prelude::*; use hyper::client::{ connect::{Connect, Connected, Destination}, Client, }; use tokio::net::TcpListener; use tokio::runtime::Runtime; -use tokio::timer::Delay; +use tokio::timer::{delay, Delay}; use tokio::net::TcpStream; -use tokio_rustls::{ +use futures_rustls::{ rustls::{ self, internal::pemfile::{certs, pkcs8_private_keys}, - ClientSession, NoClientAuth, + NoClientAuth, }, webpki::DNSNameRef, - TlsConnector, TlsStream, + TlsConnector, }; use crate::handler::NewHandler; @@ -83,18 +85,29 @@ impl Clone for TestServer { impl test::Server for TestServer { fn request_expiry(&self) -> Delay { - Delay::new(Instant::now() + Duration::from_secs(self.data.timeout)) + delay(Instant::now() + Duration::from_secs(self.data.timeout)) } fn run_future(&self, future: F) -> Result where - F: Send + 'static + Future, + F: Send + 'static + Future>, R: Send + 'static, E: failure::Fail, { - let (tx, rx) = futures::sync::oneshot::channel(); - self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); - rx.wait().unwrap().map_err(Into::into) + let (tx, rx) = futures::channel::oneshot::channel(); + self.spawn( + future + .then(move |r| future::ready(tx.send(r).map_err(|_| unreachable!()))) + .then(|_| future::ready(())), // ignore the result for spawn + ); + + self.data + .runtime + .write() + .expect("unable to acquire write lock") + .block_on(rx) + .unwrap() + .map_err(Into::into) } } @@ -104,7 +117,10 @@ impl TestServer { /// for each connection. /// /// Timeout will be set to 10 seconds. - pub fn new(new_handler: NH) -> Result { + pub fn new(new_handler: NH) -> Result + where + NH::Instance: UnwindSafe, + { TestServer::with_timeout(new_handler, 10) } @@ -112,9 +128,17 @@ impl TestServer { pub fn with_timeout( new_handler: NH, timeout: u64, - ) -> Result { - let mut runtime = Runtime::new()?; - let listener = TcpListener::bind(&"127.0.0.1:0".parse()?)?; + ) -> Result + where + NH::Instance: UnwindSafe, + { + let runtime = Runtime::new()?; + // TODO: Fix this into an async flow + let listener = runtime.block_on(TcpListener::bind( + "127.0.0.1:0" + .parse::() + .map_err(|e| e.compat())?, + ))?; let addr = listener.local_addr()?; let mut cfg = rustls::ServerConfig::new(NoClientAuth::new()); @@ -125,7 +149,7 @@ impl TestServer { cfg.set_single_cert(certs, keys.remove(0))?; let service_stream = super::bind_server_rustls(listener, new_handler, cfg); - runtime.spawn(service_stream); + runtime.spawn(service_stream.then(|_| future::ready(()))); // Ignore the result let data = TestServerData { addr, @@ -150,7 +174,7 @@ impl TestServer { /// tests. pub fn spawn(&self, fut: F) where - F: Future + Send + 'static, + F: Future + Send + 'static, { self.data .runtime @@ -194,32 +218,39 @@ impl TestServer { /// `TestConnect` represents the connection between a test client and the `TestServer` instance /// that created it. This type should never be used directly. +#[derive(Clone)] pub struct TestConnect { pub(crate) addr: SocketAddr, config: Arc, } impl Connect for TestConnect { - type Transport = TlsStream; + type Transport = futures_tokio_compat::Compat< + futures_rustls::client::TlsStream>, + >; type Error = CompatError; - type Future = - Box + Send + Sync>; - + type Future = Pin< + Box< + dyn Future> + + Send, + >, + >; fn connect(&self, dst: Destination) -> Self::Future { let tls = TlsConnector::from(self.config.clone()); - Box::new( - TcpStream::connect(&self.addr) - .and_then(move |stream| { - let domain = DNSNameRef::try_from_ascii_str(dst.host()).unwrap(); - tls.connect(domain, stream) - }) - .inspect(|s| info!("Client TcpStream connected: {:?}", s)) - .map(|s| (s, Connected::new())) - .map_err(|e| { - info!("TLS TestClient error: {:?}", e); - Error::from(e).compat() - }), - ) + + TcpStream::connect(self.addr) + .and_then(move |stream: TcpStream| { + let domain = DNSNameRef::try_from_ascii_str(dst.host()).unwrap(); + tls.connect(domain, futures_tokio_compat::Compat::new(stream)) + .map_ok(futures_tokio_compat::Compat::new) + }) + .inspect(|s| info!("Client TcpStream connected: {:?}", s)) + .map_ok(|s| (s, Connected::new())) + .map_err(|e| { + info!("TLS TestClient error: {:?}", e); + Error::from(e).compat() + }) + .boxed() } } @@ -236,7 +267,6 @@ mod tests { use crate::handler::{Handler, HandlerFuture, IntoHandlerError, NewHandler}; use crate::helpers::http::response::create_response; use crate::state::{client_addr, FromState, State}; - use futures::{future, Stream}; use http::header::CONTENT_TYPE; use log::info; @@ -246,7 +276,7 @@ mod tests { } impl Handler for TestHandler { - fn handle(self, state: State) -> Box { + fn handle(self, state: State) -> Pin> { let path = Uri::borrow_from(&state).path().to_owned(); match path.as_str() { "/" => { @@ -256,12 +286,19 @@ mod tests { .body(self.response.clone().into()) .unwrap(); - Box::new(future::ok((state, response))) + future::ok((state, response)).boxed() } "/timeout" => { + // TODO: What is this supposed to return? It previously returned nothing which isn't a timeout + let response = Response::builder() + .status(StatusCode::REQUEST_TIMEOUT) + .body(Body::default()) + .unwrap(); + info!("TestHandler responding to /timeout"); - Box::new(future::empty()) + future::ok((state, response)).boxed() } + "/myaddr" => { info!("TestHandler responding to /myaddr"); let response = Response::builder() @@ -269,7 +306,7 @@ mod tests { .body(format!("{}", client_addr(&state).unwrap()).into()) .unwrap(); - Box::new(future::ok((state, response))) + future::ok((state, response)).boxed() } _ => unreachable!(), } @@ -366,21 +403,26 @@ mod tests { #[test] fn async_echo() { - fn handler(mut state: State) -> Box { - let f = Body::take_from(&mut state) - .concat2() - .then(move |full_body| match full_body { - Ok(body) => { - let resp_data = body.to_vec(); - let res = - create_response(&state, StatusCode::OK, mime::TEXT_PLAIN, resp_data); - future::ok((state, res)) - } - - Err(e) => future::err((state, e.into_handler_error())), - }); - - Box::new(f) + fn handler(mut state: State) -> Pin> { + let f = + Body::take_from(&mut state) + .try_concat() + .then(move |full_body| match full_body { + Ok(body) => { + let resp_data = body.to_vec(); + let res = create_response( + &state, + StatusCode::OK, + mime::TEXT_PLAIN, + resp_data, + ); + future::ok((state, res)) + } + + Err(e) => future::err((state, e.into_handler_error())), + }); + + f.boxed() } let server = TestServer::new(|| Ok(handler)).unwrap(); diff --git a/middleware/diesel/Cargo.toml b/middleware/diesel/Cargo.toml index 668d950c4..ff5f3b76a 100644 --- a/middleware/diesel/Cargo.toml +++ b/middleware/diesel/Cargo.toml @@ -12,7 +12,7 @@ categories = ["web-programming::http-server"] keywords = ["http", "async", "web", "gotham", "diesel"] [dependencies] -futures = "0.1" +futures-preview = { version = "0.3.0-alpha.19", features = ["io-compat"] } gotham = "0.5.0-dev" gotham_derive = "0.5.0-dev" diesel = { version = "1.3", features = ["r2d2"] } @@ -23,5 +23,5 @@ log = "0.4" [dev-dependencies] diesel = { version = "1", features = ["sqlite"] } mime = "0.3" -hyper = "0.12" -tokio = "0.1" +hyper = { version = "0.13.0-alpha.4", features = ["unstable-stream"] } +tokio = "=0.2.0-alpha.6" diff --git a/middleware/jwt/Cargo.toml b/middleware/jwt/Cargo.toml index c49457eae..d801cd7ad 100644 --- a/middleware/jwt/Cargo.toml +++ b/middleware/jwt/Cargo.toml @@ -15,11 +15,11 @@ license = "MIT/Apache-2.0" edition = "2018" [dependencies] -futures = "0.1" +futures-preview = { version = "0.3.0-alpha.19", features = ["io-compat"] } gotham = "0.5.0-dev" gotham_derive = "0.5.0-dev" serde = "1.0" serde_derive = "1.0" -hyper = "0.12" +hyper = { version = "0.13.0-alpha.4", features = ["unstable-stream"] } jsonwebtoken = "6.0" log = "0.4" diff --git a/middleware/template/Cargo.toml b/middleware/template/Cargo.toml index fd5982a79..46dd9dbf4 100644 --- a/middleware/template/Cargo.toml +++ b/middleware/template/Cargo.toml @@ -10,7 +10,7 @@ publish = false [dependencies] log = "0.4" -futures = "0.1" +futures-preview = { version = "0.3.0-alpha.19", features = ["io-compat"] } # Middlewares should reference the semantic versions of Gotham that are they compatible with # and not use relative references such as shown here. diff --git a/middleware/template/src/lib.rs b/middleware/template/src/lib.rs index 0ab0cf70e..ff1c76645 100644 --- a/middleware/template/src/lib.rs +++ b/middleware/template/src/lib.rs @@ -16,8 +16,9 @@ extern crate log; //extern crate gotham_derive; use std::io; +use std::pin::Pin; -use futures::{future, Future}; +use futures::prelude::*; use gotham::handler::HandlerFuture; use gotham::middleware::{Middleware, NewMiddleware}; @@ -45,9 +46,9 @@ impl NewMiddleware for MyMiddleware { } impl Middleware for MyMiddleware { - fn call(self, state: State, chain: Chain) -> Box + fn call(self, state: State, chain: Chain) -> Pin> where - Chain: FnOnce(State) -> Box, + Chain: FnOnce(State) -> Pin>, { debug!("[{}] pre chain", request_id(&state)); // Do things prior to passing the request on to other middleware and the eventual Handler @@ -55,16 +56,17 @@ impl Middleware for MyMiddleware { // For example store something in State // state.put(MyData { my_value: "abcdefg".to_owned() }); - let f = chain(state).and_then(move |(state, response)| { - { - debug!("[{}] post chain", request_id(&state)); - // Do things once a response has come back - // .. - // For example get our data back from State - // let data = state.borrow::().unwrap(); - } - future::ok((state, response)) - }); - Box::new(f) + chain(state) + .and_then(move |(state, response)| { + { + debug!("[{}] post chain", request_id(&state)); + // Do things once a response has come back + // .. + // For example get our data back from State + // let data = state.borrow::().unwrap(); + } + future::ok((state, response)) + }) + .boxed() } }