Skip to content

Commit

Permalink
feat(lib): update Tokio, bytes, http, h2, and http-body
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 4, 2019
1 parent 131962c commit cb3f39c
Show file tree
Hide file tree
Showing 51 changed files with 991 additions and 1,311 deletions.
39 changes: 16 additions & 23 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,37 @@ include = [
]

[dependencies]
bytes = "0.4.6"
futures-core = "0.3.1"
futures-channel = "0.3.1"
futures-util = "0.3.1"
http = "0.1.15"
http-body = "=0.2.0-alpha.3"
bytes = "0.5"
futures-core = { version = "0.3", default-features = false }
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.2"
httparse = "1.0"
h2 = "=0.2.0-alpha.3"
iovec = "0.1"
h2 = "0.2"
itoa = "0.4.1"
log = "0.4"
pin-project = "0.4"
time = "0.1"
tower-service = "=0.3.0-alpha.2"
tokio-executor = "=0.2.0-alpha.6"
tokio-io = "=0.2.0-alpha.6"
tokio-sync = "=0.2.0-alpha.6"
tokio = { version = "0.2", features = ["sync"] }
want = "0.3"

# Optional

net2 = { version = "0.2.32", optional = true }
tokio = { version = "=0.2.0-alpha.6", optional = true, default-features = false, features = ["rt-full"] }
tokio-net = { version = "=0.2.0-alpha.6", optional = true, features = ["tcp"] }
tokio-timer = { version = "=0.3.0-alpha.6", optional = true }


[dev-dependencies]
futures-util-a19 = { version = "=0.3.0-alpha.19", package = "futures-util-preview" }
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
matches = "0.1"
num_cpus = "1.0"
pretty_env_logger = "0.3"
spmc = "0.3"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
tokio = "=0.2.0-alpha.6" # using #[tokio::test] attributes
tokio-fs = "=0.2.0-alpha.6"
tokio-test = "=0.2.0-alpha.6"
tokio = { version = "0.2.2", features = ["fs", "macros", "rt-util", "sync", "time", "test-util"] }
tokio-test = "0.2"
url = "1.0"

[features]
Expand All @@ -68,13 +60,13 @@ default = [
]
runtime = [
"tcp",
"tokio",
"tokio/time",
]
tcp = [
"net2",
"tokio-executor/blocking",
"tokio-net",
"tokio-timer",
"tokio/blocking",
"tokio/tcp",
"tokio/time",
]

# unstable features
Expand Down Expand Up @@ -206,3 +198,4 @@ required-features = ["runtime", "unstable-stream"]
name = "server"
path = "tests/server.rs"
required-features = ["runtime"]

10 changes: 7 additions & 3 deletions benches/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@

extern crate test;

use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::runtime::current_thread::Runtime;
use hyper::client::connect::{Destination, HttpConnector};
use hyper::service::Service;
use http::Uri;

#[bench]
fn http_connector(b: &mut test::Bencher) {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();
let mut listener = rt.block_on(TcpListener::bind("127.0.0.1:0")).expect("bind");
let mut rt = tokio::runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.expect("rt build");
let mut listener = rt.block_on(TcpListener::bind(&SocketAddr::from(([127, 0, 0, 1], 0)))).expect("bind");
let addr = listener.local_addr().expect("local_addr");
let uri: Uri = format!("http://{}/", addr).parse().expect("uri parse");
let dst = Destination::try_from_uri(uri).expect("destination");
Expand Down
37 changes: 21 additions & 16 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ extern crate test;
use std::net::SocketAddr;

use futures_util::future::join_all;
use tokio::runtime::current_thread::Runtime;

use hyper::{Body, Method, Request, Response, Server};
use hyper::{body::HttpBody as _, Body, Method, Request, Response, Server};
use hyper::client::HttpConnector;

// HTTP1
Expand Down Expand Up @@ -264,8 +263,12 @@ impl Opts {
fn bench(self, b: &mut test::Bencher) {
let _ = pretty_env_logger::try_init();
// Create a runtime of current thread.
let mut rt = Runtime::new().unwrap();
let exec = rt.handle();
let mut rt = tokio::runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.expect("rt build");
let exec = rt.handle().clone();

let req_len = self.request_body.map(|b| b.len()).unwrap_or(0) as u64;
let req_len = if self.request_chunks > 0 {
Expand Down Expand Up @@ -297,7 +300,7 @@ impl Opts {
for _ in 0..chunk_cnt {
tx.send_data(chunk.into()).await.expect("send_data");
}
}).expect("body tx spawn");
});
body
} else {
self
Expand Down Expand Up @@ -340,22 +343,24 @@ impl Opts {
}
}

fn spawn_server(rt: &mut Runtime, opts: &Opts) -> SocketAddr {
fn spawn_server(rt: &mut tokio::runtime::Runtime, opts: &Opts) -> SocketAddr {
use hyper::service::{make_service_fn, service_fn};
let addr = "127.0.0.1:0".parse().unwrap();

let body = opts.response_body;
let srv = Server::bind(&addr)
.http2_only(opts.http2)
.http2_initial_stream_window_size(opts.http2_stream_window)
.http2_initial_connection_window_size(opts.http2_conn_window)
.serve(make_service_fn( move |_| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let mut req_body = req.into_body();
while let Some(_chunk) = req_body.next().await {}
Ok::<_, hyper::Error>(Response::new(Body::from(body)))
let srv = rt.block_on(async move {
Server::bind(&addr)
.http2_only(opts.http2)
.http2_initial_stream_window_size(opts.http2_stream_window)
.http2_initial_connection_window_size(opts.http2_conn_window)
.serve(make_service_fn( move |_| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let mut req_body = req.into_body();
while let Some(_chunk) = req_body.next().await {}
Ok::<_, hyper::Error>(Response::new(Body::from(body)))
}))
}))
}));
});
let addr = srv.local_addr();
rt.spawn(async {
if let Err(err) = srv.await {
Expand Down
19 changes: 12 additions & 7 deletions benches/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::net::{TcpStream};
use std::sync::mpsc;
use std::time::Duration;

use tokio::runtime::current_thread;
use tokio::sync::oneshot;

use hyper::{Body, Response, Server};
Expand All @@ -31,9 +30,17 @@ fn hello_world(b: &mut test::Bencher) {
Ok::<_, hyper::Error>(Response::new(Body::from("Hello, World!")))
}))
});
let srv = Server::bind(&addr)
.http1_pipeline_flush(true)
.serve(make_svc);

let mut rt = tokio::runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.expect("rt build");
let srv = rt.block_on(async move {
Server::bind(&addr)
.http1_pipeline_flush(true)
.serve(make_svc)
});

addr_tx.send(srv.local_addr()).unwrap();

Expand All @@ -42,13 +49,11 @@ fn hello_world(b: &mut test::Bencher) {
until_rx.await.ok();
});

let mut rt = current_thread::Runtime::new().unwrap();
rt.spawn(async {
rt.block_on(async {
if let Err(e) = graceful.await {
panic!("server error: {}", e);
}
});
rt.run().unwrap();
});

addr_rx.recv().unwrap()
Expand Down
18 changes: 12 additions & 6 deletions benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::sync::mpsc;
use std::time::Duration;

use futures_util::{stream, StreamExt};
use tokio::runtime::current_thread;
use tokio::sync::oneshot;

use hyper::{Body, Response, Server};
Expand All @@ -33,22 +32,29 @@ macro_rules! bench_server {
)
}))
});
let srv = Server::bind(&addr)
.serve(make_svc);

let mut rt = tokio::runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.expect("rt build");

let srv = rt.block_on(async move {
Server::bind(&addr)
.serve(make_svc)
});

addr_tx.send(srv.local_addr()).unwrap();

let graceful = srv
.with_graceful_shutdown(async {
until_rx.await.ok();
});
let mut rt = current_thread::Runtime::new().unwrap();
rt.spawn(async {
rt.block_on(async move {
if let Err(e) = graceful.await {
panic!("server error: {}", e);
}
});
rt.run().unwrap();
});

addr_rx.recv().unwrap()
Expand Down
3 changes: 2 additions & 1 deletion examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::env;
use std::io::{self, Write};

use hyper::Client;
use futures_util::StreamExt;

// A simple type alias so as to DRY.
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand All @@ -24,7 +25,7 @@ async fn main() -> Result<()> {
// HTTPS requires picking a TLS implementation, so give a better
// warning if the user tries to request an 'https' URL.
let url = url.parse::<hyper::Uri>().unwrap();
if url.scheme_part().map(|s| s.as_ref()) != Some("http") {
if url.scheme_str() != Some("http") {
println!("This example only works with 'http' URLs.");
return Ok(());
}
Expand Down
9 changes: 6 additions & 3 deletions examples/client_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
extern crate serde_derive;

use hyper::Client;
use futures_util::TryStreamExt;
use futures_util::StreamExt;

// A simple type alias so as to DRY.
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
Expand All @@ -27,9 +27,12 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
let client = Client::new();

// Fetch the url...
let res = client.get(url).await?;
let mut res = client.get(url).await?;
// asynchronously concatenate chunks of the body
let body = res.into_body().try_concat().await?;
let mut body = Vec::new();
while let Some(chunk) = res.body_mut().next().await {
body.extend_from_slice(&chunk?);
}
// try to parse as json with serde_json
let users = serde_json::from_slice(&body)?;

Expand Down
23 changes: 13 additions & 10 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

use hyper::{Body, Method, Request, Response, Server, StatusCode};
use hyper::service::{make_service_fn, service_fn};
use futures_util::TryStreamExt;
use futures_util::{StreamExt, TryStreamExt};

/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {

async fn echo(mut req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
// Serve some instructions at /
(&Method::GET, "/") => {
Expand Down Expand Up @@ -37,13 +36,17 @@ async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
// So here we do `.await` on the future, waiting on concatenating the full body,
// then afterwards the content can be reversed. Only then can we return a `Response`.
(&Method::POST, "/echo/reversed") => {
let whole_chunk = req.into_body().try_concat().await;

let reversed_chunk = whole_chunk.map(move |chunk| {
chunk.iter().rev().cloned().collect::<Vec<u8>>()

})?;
Ok(Response::new(Body::from(reversed_chunk)))
let mut whole_body = Vec::new();
while let Some(chunk) = req.body_mut().next().await {
whole_body.extend_from_slice(&chunk?);
}

let reversed_body = whole_body
.iter()
.rev()
.cloned()
.collect::<Vec<u8>>();
Ok(Response::new(Body::from(reversed_body)))
}

// Return the 404 Not Found for other routes.
Expand Down
10 changes: 7 additions & 3 deletions examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,24 @@ use hyper::service::{service_fn, make_service_fn};

use std::collections::HashMap;
use url::form_urlencoded;
use futures_util::TryStreamExt;
use futures_util::StreamExt;

static INDEX: &[u8] = b"<html><body><form action=\"post\" method=\"post\">Name: <input type=\"text\" name=\"name\"><br>Number: <input type=\"text\" name=\"number\"><br><input type=\"submit\"></body></html>";
static MISSING: &[u8] = b"Missing field";
static NOTNUMERIC: &[u8] = b"Number field is not numeric";

// Using service_fn, we can turn this function into a `Service`.
async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn param_example(mut req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/post") => {
Ok(Response::new(INDEX.into()))
},
(&Method::POST, "/post") => {
let b = req.into_body().try_concat().await?;
// Concatenate the body...
let mut b = Vec::new();
while let Some(chunk) = req.body_mut().next().await {
b.extend_from_slice(&chunk?);
}
// Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so
Expand Down
2 changes: 1 addition & 1 deletion examples/send_file.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![deny(warnings)]

use tokio::io::AsyncReadExt;
use tokio_fs::File;
use tokio::fs::File;

use hyper::{Body, Method, Result, Request, Response, Server, StatusCode};
use hyper::service::{make_service_fn, service_fn};
Expand Down
Loading

0 comments on commit cb3f39c

Please sign in to comment.