Skip to content

Commit

Permalink
tonic: Use BoxBody from http-body crate
Browse files Browse the repository at this point in the history
As of `http-body` 0.4.1 its has had a `BoxBody` type similar to
`tonic::body::BoxBody`. It also has `Empty` and `Body::map_{data,err}`.

That means all the custom body things we had in tonic can basically be
replaced with that.

Note that this is a breaking change so we should merge this next time we
decide to ship a breaking release.

The breaking changes are:

- `tonic::body::Body` has been removed. I think its fine for users to
depend directly on `http-body` if they need this trait.
- `tonic::body::BoxBody` is now just a type alias for
`http_body::combinators::BoxBody<Bytes, Status>`. So the methods it
previously had are gone. The replacements are
  - `tonic::body::Body::new` -> `http_body::Body::boxed`
  - `tonic::body::Body::map_from` -> `http_body::Body::map_data` and
  `http_body::Body::map_err` depending on which part you want to map.
  - `tonic::body::Body::empty` -> `http_body::Empty`

Additionally a `Sync` bound has been added to a few methods. I actually
don't think this is a breaking change because the old
`tonic::body::Body` trait had `Sync` as a supertrait meaning the `Sync`
requirement was already there.

Fixes #557
  • Loading branch information
davidpdrsn committed May 8, 2021
1 parent 4a917a3 commit 1f14b9e
Show file tree
Hide file tree
Showing 13 changed files with 45 additions and 252 deletions.
4 changes: 2 additions & 2 deletions tonic-build/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ pub fn generate<T: Service>(

impl<T> #service_ident<T>
where T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::ResponseBody: Body + HttpBody + Send + 'static,
T::ResponseBody: Body + Send + Sync + 'static,
T::Error: Into<StdError>,
<T::ResponseBody as HttpBody>::Error: Into<StdError> + Send, {
<T::ResponseBody as Body>::Error: Into<StdError> + Send, {
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Self { inner }
Expand Down
4 changes: 2 additions & 2 deletions tonic-build/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub fn generate<T: Service>(
impl<T, B> Service<http::Request<B>> for #server_service<T>
where
T: #server_trait,
B: HttpBody + Send + Sync + 'static,
B: Body + Send + Sync + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
Expand All @@ -91,7 +91,7 @@ pub fn generate<T: Service>(
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(tonic::body::BoxBody::empty())
.body(empty_body())
.unwrap())
}),
}
Expand Down
8 changes: 4 additions & 4 deletions tonic-reflection/tests/proto/grpc.reflection.v1alpha.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ pub mod server_reflection_client {
impl<T> ServerReflectionClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::ResponseBody: Body + HttpBody + Send + 'static,
T::ResponseBody: Body + Send + 'static,
T::Error: Into<StdError>,
<T::ResponseBody as HttpBody>::Error: Into<StdError> + Send,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Expand Down Expand Up @@ -254,7 +254,7 @@ pub mod server_reflection_server {
impl<T, B> Service<http::Request<B>> for ServerReflectionServer<T>
where
T: ServerReflection,
B: HttpBody + Send + Sync + 'static,
B: Body + Send + Sync + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
Expand Down Expand Up @@ -309,7 +309,7 @@ pub mod server_reflection_server {
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(tonic::body::BoxBody::empty())
.body(empty_body())
.unwrap())
}),
}
Expand Down
8 changes: 4 additions & 4 deletions tonic-reflection/tests/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ pub mod server_reflection_client {
impl<T> ServerReflectionClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::ResponseBody: Body + HttpBody + Send + 'static,
T::ResponseBody: Body + Send + 'static,
T::Error: Into<StdError>,
<T::ResponseBody as HttpBody>::Error: Into<StdError> + Send,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Expand Down Expand Up @@ -254,7 +254,7 @@ pub mod server_reflection_server {
impl<T, B> Service<http::Request<B>> for ServerReflectionServer<T>
where
T: ServerReflection,
B: HttpBody + Send + Sync + 'static,
B: Body + Send + Sync + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
Expand Down Expand Up @@ -309,7 +309,7 @@ pub mod server_reflection_server {
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(tonic::body::BoxBody::empty())
.body(empty_body())
.unwrap())
}),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ pub mod client {
impl<T> GreeterClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::ResponseBody: Body + HttpBody + Send + 'static,
T::ResponseBody: Body + Send + 'static,
T::Error: Into<StdError>,
<T::ResponseBody as HttpBody>::Error: Into<StdError> + Send,
<T::ResponseBody as HttpBody>::Data: Into<bytes::Bytes> + Send,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
<T::ResponseBody as Body>::Data: Into<bytes::Bytes> + Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Expand Down Expand Up @@ -158,7 +158,7 @@ pub mod server {
Ok(http::Response::builder()
.status(200)
.header("grpc-status", "12")
.body(tonic::body::BoxBody::empty())
.body(empty_body())
.unwrap())
}),
}
Expand Down
218 changes: 3 additions & 215 deletions tonic/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,218 +1,6 @@
//! HTTP specific body utilities.
//!
//! This module contains traits and helper types to work with http bodies. Most
//! of the types in this module are based around [`http_body::Body`].

use crate::{Error, Status};
use bytes::{Buf, Bytes};
use http_body::Body as HttpBody;
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
};
pub(crate) use crate::codegen::empty_body;

/// A trait alias for [`http_body::Body`].
pub trait Body: sealed::Sealed + Send + Sync {
/// The body data type.
type Data: Buf;
/// The errors produced from the body.
type Error: Into<Error>;

/// Check if the stream is over or not.
///
/// Reference [`http_body::Body::is_end_stream`].
fn is_end_stream(&self) -> bool;

/// Poll for more data from the body.
///
/// Reference [`http_body::Body::poll_data`].
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>>;

/// Poll for the trailing headers.
///
/// Reference [`http_body::Body::poll_trailers`].
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>>;
}

impl<T> Body for T
where
T: HttpBody + Send + Sync + 'static,
T::Error: Into<Error>,
{
type Data = T::Data;
type Error = T::Error;

fn is_end_stream(&self) -> bool {
HttpBody::is_end_stream(self)
}

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
HttpBody::poll_data(self, cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
HttpBody::poll_trailers(self, cx)
}
}

impl<T> sealed::Sealed for T
where
T: HttpBody,
T::Error: Into<Error>,
{
}

mod sealed {
pub trait Sealed {}
}

/// A type erased http body.
pub struct BoxBody {
inner: Pin<Box<dyn Body<Data = Bytes, Error = Status> + Send + Sync + 'static>>,
}

struct MapBody<B>(B);

impl BoxBody {
/// Create a new `BoxBody` mapping item and error to the default types.
pub fn new<B>(inner: B) -> Self
where
B: Body<Data = Bytes, Error = Status> + Send + Sync + 'static,
{
BoxBody {
inner: Box::pin(inner),
}
}

/// Create a new `BoxBody` mapping item and error to the default types.
pub fn map_from<B>(inner: B) -> Self
where
B: Body + Send + Sync + 'static,
B::Error: Into<crate::Error>,
{
BoxBody {
inner: Box::pin(MapBody(inner)),
}
}

/// Create a new `BoxBody` that is empty.
pub fn empty() -> Self {
BoxBody {
inner: Box::pin(EmptyBody::default()),
}
}
}

impl HttpBody for BoxBody {
type Data = Bytes;
type Error = Status;

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Body::poll_data(self.inner.as_mut(), cx)
}

fn poll_trailers(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Body::poll_trailers(self.inner.as_mut(), cx)
}
}

impl<B> HttpBody for MapBody<B>
where
B: Body,
B::Error: Into<crate::Error>,
{
type Data = Bytes;
type Error = Status;

fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let v = unsafe {
let me = self.get_unchecked_mut();
Pin::new_unchecked(&mut me.0).poll_data(cx)
};
match futures_util::ready!(v) {
Some(Ok(mut i)) => Poll::Ready(Some(Ok(i.copy_to_bytes(i.remaining())))),
Some(Err(e)) => {
let err = Status::map_error(e.into());
Poll::Ready(Some(Err(err)))
}
None => Poll::Ready(None),
}
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let v = unsafe {
let me = self.get_unchecked_mut();
Pin::new_unchecked(&mut me.0).poll_trailers(cx)
};

let v = futures_util::ready!(v).map_err(|e| Status::from_error(&*e.into()));
Poll::Ready(v)
}
}

impl fmt::Debug for BoxBody {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BoxBody").finish()
}
}

#[derive(Debug, Default)]
struct EmptyBody {
_p: (),
}

impl HttpBody for EmptyBody {
type Data = Bytes;
type Error = Status;

fn is_end_stream(&self) -> bool {
true
}

fn poll_data(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Poll::Ready(None)
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
/// A type erased HTTP body used for tonic services.
pub type BoxBody = http_body::combinators::BoxBody<bytes::Bytes, crate::Status>;
20 changes: 10 additions & 10 deletions tonic/src/client/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
body::{Body, BoxBody},
body::BoxBody,
client::GrpcService,
codec::{encode_client, Codec, Streaming},
interceptor::Interceptor,
Expand All @@ -11,7 +11,7 @@ use http::{
header::{HeaderValue, CONTENT_TYPE, TE},
uri::{Parts, PathAndQuery, Uri},
};
use http_body::Body as HttpBody;
use http_body::Body;
use std::fmt;

/// A gRPC client dispatcher.
Expand Down Expand Up @@ -71,8 +71,8 @@ impl<T> Grpc<T> {
) -> Result<Response<M2>, Status>
where
T: GrpcService<BoxBody>,
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
T::ResponseBody: Body + Send + Sync + 'static,
<T::ResponseBody as Body>::Error: Into<crate::Error>,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
Expand All @@ -90,8 +90,8 @@ impl<T> Grpc<T> {
) -> Result<Response<M2>, Status>
where
T: GrpcService<BoxBody>,
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
T::ResponseBody: Body + Send + Sync + 'static,
<T::ResponseBody as Body>::Error: Into<crate::Error>,
S: Stream<Item = M1> + Send + Sync + 'static,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + Sync + 'static,
Expand Down Expand Up @@ -126,8 +126,8 @@ impl<T> Grpc<T> {
) -> Result<Response<Streaming<M2>>, Status>
where
T: GrpcService<BoxBody>,
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
T::ResponseBody: Body + Send + Sync + 'static,
<T::ResponseBody as Body>::Error: Into<crate::Error>,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + Sync + 'static,
M2: Send + Sync + 'static,
Expand All @@ -145,8 +145,8 @@ impl<T> Grpc<T> {
) -> Result<Response<Streaming<M2>>, Status>
where
T: GrpcService<BoxBody>,
T::ResponseBody: Body + HttpBody + Send + 'static,
<T::ResponseBody as HttpBody>::Error: Into<crate::Error>,
T::ResponseBody: Body + Send + Sync + 'static,
<T::ResponseBody as Body>::Error: Into<crate::Error>,
S: Stream<Item = M1> + Send + Sync + 'static,
C: Codec<Encode = M1, Decode = M2>,
M1: Send + Sync + 'static,
Expand Down
Loading

0 comments on commit 1f14b9e

Please sign in to comment.