Skip to content

Commit

Permalink
feat(body): remove "full" constructors from Body (#2958)
Browse files Browse the repository at this point in the history
The constructors of `hyper::Body` from a full bytes are removed. Along
with `Body::empty()`.

BREAKING CHANGE: Use the types from `http-body-util`.

Co-authored-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
seanmonstar and Xuanwo authored Aug 24, 2022
1 parent 952756b commit 9e8fc8f
Show file tree
Hide file tree
Showing 34 changed files with 236 additions and 463 deletions.
5 changes: 0 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,6 @@ name = "state"
path = "examples/state.rs"
required-features = ["full"]

[[example]]
name = "tower_server"
path = "examples/tower_server.rs"
required-features = ["full"]

[[example]]
name = "upgrades"
path = "examples/upgrades.rs"
Expand Down
9 changes: 7 additions & 2 deletions benches/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@

extern crate test;

use std::convert::Infallible;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream};
use std::sync::mpsc;
use std::time::Duration;

use bytes::Bytes;
use http_body_util::Full;
use tokio::net::TcpListener;
use tokio::sync::oneshot;

use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Response};
use hyper::Response;

const PIPELINED_REQUESTS: usize = 16;

Expand Down Expand Up @@ -43,7 +46,9 @@ fn hello_world_16(b: &mut test::Bencher) {
.serve_connection(
stream,
service_fn(|_| async {
Ok::<_, hyper::Error>(Response::new(Body::from("Hello, World!")))
Ok::<_, Infallible>(Response::new(Full::new(Bytes::from(
"Hello, World!",
))))
}),
)
.await
Expand Down
7 changes: 4 additions & 3 deletions benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc;
use std::time::Duration;

use bytes::Bytes;
use futures_util::{stream, StreamExt};
use http_body_util::{BodyExt, StreamBody};
use http_body_util::{BodyExt, Full, StreamBody};
use tokio::sync::oneshot;

use hyper::server::conn::Http;
Expand Down Expand Up @@ -87,8 +88,8 @@ macro_rules! bench_server {
}};
}

fn body(b: &'static [u8]) -> hyper::Body {
b.into()
fn body(b: &'static [u8]) -> Full<Bytes> {
Full::new(b.into())
}

#[bench]
Expand Down
6 changes: 4 additions & 2 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
#![warn(rust_2018_idioms)]
use std::env;

use hyper::{body::HttpBody as _, Body, Request};
use bytes::Bytes;
use http_body_util::Empty;
use hyper::{body::HttpBody as _, Request};
use tokio::io::{self, AsyncWriteExt as _};
use tokio::net::TcpStream;

Expand Down Expand Up @@ -51,7 +53,7 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {
let req = Request::builder()
.uri(url)
.header(hyper::header::HOST, authority.as_str())
.body(Body::empty())?;
.body(Empty::<Bytes>::new())?;

let mut res = sender.send_request(req).await?;

Expand Down
5 changes: 3 additions & 2 deletions examples/client_json.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#![deny(warnings)]
#![warn(rust_2018_idioms)]

use hyper::Body;
use bytes::Bytes;
use http_body_util::Empty;
use hyper::{body::Buf, Request};
use serde::Deserialize;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -42,7 +43,7 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
let req = Request::builder()
.uri(url)
.header(hyper::header::HOST, authority.as_str())
.body(Body::empty())?;
.body(Empty::<Bytes>::new())?;

let res = sender.send_request(req).await?;

Expand Down
26 changes: 20 additions & 6 deletions examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use std::net::SocketAddr;

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::body::HttpBody as _;
use hyper::server::conn::Http;
use hyper::service::service_fn;
Expand All @@ -10,15 +12,15 @@ use tokio::net::TcpListener;

/// This is our service handler. It receives a Request, routes on its
/// path, and returns a Future of a Response.
async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn echo(req: Request<Body>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match (req.method(), req.uri().path()) {
// Serve some instructions at /
(&Method::GET, "/") => Ok(Response::new(Body::from(
(&Method::GET, "/") => Ok(Response::new(full(
"Try POSTing data to /echo such as: `curl localhost:3000/echo -XPOST -d 'hello world'`",
))),

// Simply echo the body back to the client.
(&Method::POST, "/echo") => Ok(Response::new(req.into_body())),
(&Method::POST, "/echo") => Ok(Response::new(req.into_body().boxed())),

// TODO: Fix this, broken in PR #2896
// Convert to uppercase before sending back to client using a stream.
Expand All @@ -43,26 +45,38 @@ async fn echo(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
// 64kbs of data.
let max = req.body().size_hint().upper().unwrap_or(u64::MAX);
if max > 1024 * 64 {
let mut resp = Response::new(Body::from("Body too big"));
let mut resp = Response::new(full("Body too big"));
*resp.status_mut() = hyper::StatusCode::PAYLOAD_TOO_LARGE;
return Ok(resp);
}

let whole_body = hyper::body::to_bytes(req.into_body()).await?;

let reversed_body = whole_body.iter().rev().cloned().collect::<Vec<u8>>();
Ok(Response::new(Body::from(reversed_body)))
Ok(Response::new(full(reversed_body)))
}

// Return the 404 Not Found for other routes.
_ => {
let mut not_found = Response::default();
let mut not_found = Response::new(empty());
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
}
}

fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}

fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
Expand Down
6 changes: 4 additions & 2 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
use std::convert::Infallible;
use std::net::SocketAddr;

use bytes::Bytes;
use http_body_util::Full;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
use tokio::net::TcpListener;

async fn hello(_: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from("Hello World!")))
async fn hello(_: Request<Body>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello World!"))))
}

#[tokio::main]
Expand Down
23 changes: 19 additions & 4 deletions examples/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use std::net::SocketAddr;

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::client::conn::Builder;
use hyper::server::conn::Http;
use hyper::service::service_fn;
Expand Down Expand Up @@ -41,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

async fn proxy(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn proxy(req: Request<Body>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
println!("req: {:?}", req);

if Method::CONNECT == req.method() {
Expand Down Expand Up @@ -70,10 +72,10 @@ async fn proxy(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
}
});

Ok(Response::new(Body::empty()))
Ok(Response::new(empty()))
} else {
eprintln!("CONNECT host is not socket addr: {:?}", req.uri());
let mut resp = Response::new(Body::from("CONNECT must be to a socket address"));
let mut resp = Response::new(full("CONNECT must be to a socket address"));
*resp.status_mut() = http::StatusCode::BAD_REQUEST;

Ok(resp)
Expand All @@ -96,14 +98,27 @@ async fn proxy(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
}
});

sender.send_request(req).await
let resp = sender.send_request(req).await?;
Ok(resp.map(|b| b.boxed()))
}
}

fn host_addr(uri: &http::Uri) -> Option<String> {
uri.authority().and_then(|auth| Some(auth.to_string()))
}

fn empty() -> BoxBody<Bytes, hyper::Error> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}

fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
.map_err(|never| match never {})
.boxed()
}

// Create a TCP connection to host:port, build a tunnel between the connection and
// the upgraded connection
async fn tunnel(mut upgraded: Upgraded, addr: String) -> std::io::Result<()> {
Expand Down
10 changes: 6 additions & 4 deletions examples/multi_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

use std::net::SocketAddr;

use bytes::Bytes;
use futures_util::future::join;
use http_body_util::Full;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Request, Response};
Expand All @@ -12,12 +14,12 @@ use tokio::net::TcpListener;
static INDEX1: &[u8] = b"The 1st service!";
static INDEX2: &[u8] = b"The 2nd service!";

async fn index1(_: Request<Body>) -> Result<Response<Body>, hyper::Error> {
Ok(Response::new(Body::from(INDEX1)))
async fn index1(_: Request<Body>) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::new(Full::new(Bytes::from(INDEX1))))
}

async fn index2(_: Request<Body>) -> Result<Response<Body>, hyper::Error> {
Ok(Response::new(Body::from(INDEX2)))
async fn index2(_: Request<Body>) -> Result<Response<Full<Bytes>>, hyper::Error> {
Ok(Response::new(Full::new(Bytes::from(INDEX2))))
}

#[tokio::main]
Expand Down
33 changes: 23 additions & 10 deletions examples/params.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
// #![deny(warnings)] // FIXME: https://github.com/rust-lang/rust/issues/62411
#![warn(rust_2018_idioms)]

use bytes::Bytes;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::server::conn::Http;
use hyper::service::service_fn;
use hyper::{Body, Method, Request, Response, StatusCode};
use tokio::net::TcpListener;

use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use url::form_urlencoded;

Expand All @@ -15,9 +18,11 @@ static MISSING: &[u8] = b"Missing field";
static NOTNUMERIC: &[u8] = b"Number field is not numeric";

// Using service_fn, we can turn this function into a `Service`.
async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn param_example(
req: Request<Body>,
) -> Result<Response<BoxBody<Bytes, Infallible>>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(INDEX.into())),
(&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(full(INDEX))),
(&Method::POST, "/post") => {
// Concatenate the body...
let b = hyper::body::to_bytes(req).await?;
Expand All @@ -43,7 +48,7 @@ async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Erro
} else {
return Ok(Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.body(MISSING.into())
.body(full(MISSING))
.unwrap());
};
let number = if let Some(n) = params.get("number") {
Expand All @@ -52,13 +57,13 @@ async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Erro
} else {
return Ok(Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.body(NOTNUMERIC.into())
.body(full(NOTNUMERIC))
.unwrap());
}
} else {
return Ok(Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.body(MISSING.into())
.body(full(MISSING))
.unwrap());
};

Expand All @@ -69,15 +74,15 @@ async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Erro
// responses such as InternalServiceError may be
// needed here, too.
let body = format!("Hello {}, your number is {}", name, number);
Ok(Response::new(body.into()))
Ok(Response::new(full(body)))
}
(&Method::GET, "/get") => {
let query = if let Some(q) = req.uri().query() {
q
} else {
return Ok(Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.body(MISSING.into())
.body(full(MISSING))
.unwrap());
};
let params = form_urlencoded::parse(query.as_bytes())
Expand All @@ -88,19 +93,27 @@ async fn param_example(req: Request<Body>) -> Result<Response<Body>, hyper::Erro
} else {
return Ok(Response::builder()
.status(StatusCode::UNPROCESSABLE_ENTITY)
.body(MISSING.into())
.body(full(MISSING))
.unwrap());
};
let body = format!("You requested {}", page);
Ok(Response::new(body.into()))
Ok(Response::new(full(body)))
}
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.body(empty())
.unwrap()),
}
}

fn empty() -> BoxBody<Bytes, Infallible> {
Empty::<Bytes>::new().boxed()
}

fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, Infallible> {
Full::new(chunk.into()).boxed()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
pretty_env_logger::init();
Expand Down
Loading

0 comments on commit 9e8fc8f

Please sign in to comment.