diff --git a/tonic-build/src/client.rs b/tonic-build/src/client.rs index 5d1bd6717..e65248908 100644 --- a/tonic-build/src/client.rs +++ b/tonic-build/src/client.rs @@ -37,9 +37,9 @@ pub fn generate( impl #service_ident where T: tonic::client::GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, + T::ResponseBody: Body + Send + Sync + 'static, T::Error: Into, - ::Error: Into + Send, { + ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); Self { inner } diff --git a/tonic-build/src/server.rs b/tonic-build/src/server.rs index 8cb4866cc..170c79995 100644 --- a/tonic-build/src/server.rs +++ b/tonic-build/src/server.rs @@ -69,7 +69,7 @@ pub fn generate( impl Service> for #server_service where T: #server_trait, - B: HttpBody + Send + Sync + 'static, + B: Body + Send + Sync + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; @@ -91,7 +91,7 @@ pub fn generate( .status(200) .header("grpc-status", "12") .header("content-type", "application/grpc") - .body(tonic::body::BoxBody::empty()) + .body(empty_body()) .unwrap()) }), } diff --git a/tonic-reflection/tests/proto/grpc.reflection.v1alpha.rs b/tonic-reflection/tests/proto/grpc.reflection.v1alpha.rs index bdd4cba96..eeb8f2574 100644 --- a/tonic-reflection/tests/proto/grpc.reflection.v1alpha.rs +++ b/tonic-reflection/tests/proto/grpc.reflection.v1alpha.rs @@ -166,9 +166,9 @@ pub mod server_reflection_client { impl ServerReflectionClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, + T::ResponseBody: Body + Send + 'static, T::Error: Into, - ::Error: Into + Send, + ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); @@ -254,7 +254,7 @@ pub mod server_reflection_server { impl Service> for ServerReflectionServer where T: ServerReflection, - B: HttpBody + Send + Sync + 'static, + B: Body + Send + Sync + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; @@ -309,7 +309,7 @@ pub mod server_reflection_server { .status(200) .header("grpc-status", "12") .header("content-type", "application/grpc") - .body(tonic::body::BoxBody::empty()) + .body(empty_body()) .unwrap()) }), } diff --git a/tonic-reflection/tests/proto/mod.rs b/tonic-reflection/tests/proto/mod.rs index bdd4cba96..eeb8f2574 100644 --- a/tonic-reflection/tests/proto/mod.rs +++ b/tonic-reflection/tests/proto/mod.rs @@ -166,9 +166,9 @@ pub mod server_reflection_client { impl ServerReflectionClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, + T::ResponseBody: Body + Send + 'static, T::Error: Into, - ::Error: Into + Send, + ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); @@ -254,7 +254,7 @@ pub mod server_reflection_server { impl Service> for ServerReflectionServer where T: ServerReflection, - B: HttpBody + Send + Sync + 'static, + B: Body + Send + Sync + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; @@ -309,7 +309,7 @@ pub mod server_reflection_server { .status(200) .header("grpc-status", "12") .header("content-type", "application/grpc") - .body(tonic::body::BoxBody::empty()) + .body(empty_body()) .unwrap()) }), } diff --git a/tonic/benches-disabled/benchmarks/compiled_protos/helloworld.rs b/tonic/benches-disabled/benchmarks/compiled_protos/helloworld.rs index 79afc2b6f..dcfce39d5 100755 --- a/tonic/benches-disabled/benchmarks/compiled_protos/helloworld.rs +++ b/tonic/benches-disabled/benchmarks/compiled_protos/helloworld.rs @@ -32,10 +32,10 @@ pub mod client { impl GreeterClient where T: tonic::client::GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, + T::ResponseBody: Body + Send + 'static, T::Error: Into, - ::Error: Into + Send, - ::Data: Into + Send, + ::Error: Into + Send, + ::Data: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); @@ -158,7 +158,7 @@ pub mod server { Ok(http::Response::builder() .status(200) .header("grpc-status", "12") - .body(tonic::body::BoxBody::empty()) + .body(empty_body()) .unwrap()) }), } diff --git a/tonic/src/body.rs b/tonic/src/body.rs index 7753086ac..0941762ac 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -1,218 +1,6 @@ //! HTTP specific body utilities. -//! -//! This module contains traits and helper types to work with http bodies. Most -//! of the types in this module are based around [`http_body::Body`]. -use crate::{Error, Status}; -use bytes::{Buf, Bytes}; -use http_body::Body as HttpBody; -use std::{ - fmt, - pin::Pin, - task::{Context, Poll}, -}; +pub(crate) use crate::codegen::empty_body; -/// A trait alias for [`http_body::Body`]. -pub trait Body: sealed::Sealed + Send + Sync { - /// The body data type. - type Data: Buf; - /// The errors produced from the body. - type Error: Into; - - /// Check if the stream is over or not. - /// - /// Reference [`http_body::Body::is_end_stream`]. - fn is_end_stream(&self) -> bool; - - /// Poll for more data from the body. - /// - /// Reference [`http_body::Body::poll_data`]. - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>>; - - /// Poll for the trailing headers. - /// - /// Reference [`http_body::Body::poll_trailers`]. - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>; -} - -impl Body for T -where - T: HttpBody + Send + Sync + 'static, - T::Error: Into, -{ - type Data = T::Data; - type Error = T::Error; - - fn is_end_stream(&self) -> bool { - HttpBody::is_end_stream(self) - } - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - HttpBody::poll_data(self, cx) - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - HttpBody::poll_trailers(self, cx) - } -} - -impl sealed::Sealed for T -where - T: HttpBody, - T::Error: Into, -{ -} - -mod sealed { - pub trait Sealed {} -} - -/// A type erased http body. -pub struct BoxBody { - inner: Pin + Send + Sync + 'static>>, -} - -struct MapBody(B); - -impl BoxBody { - /// Create a new `BoxBody` mapping item and error to the default types. - pub fn new(inner: B) -> Self - where - B: Body + Send + Sync + 'static, - { - BoxBody { - inner: Box::pin(inner), - } - } - - /// Create a new `BoxBody` mapping item and error to the default types. - pub fn map_from(inner: B) -> Self - where - B: Body + Send + Sync + 'static, - B::Error: Into, - { - BoxBody { - inner: Box::pin(MapBody(inner)), - } - } - - /// Create a new `BoxBody` that is empty. - pub fn empty() -> Self { - BoxBody { - inner: Box::pin(EmptyBody::default()), - } - } -} - -impl HttpBody for BoxBody { - type Data = Bytes; - type Error = Status; - - fn is_end_stream(&self) -> bool { - self.inner.is_end_stream() - } - - fn poll_data( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - Body::poll_data(self.inner.as_mut(), cx) - } - - fn poll_trailers( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Body::poll_trailers(self.inner.as_mut(), cx) - } -} - -impl HttpBody for MapBody -where - B: Body, - B::Error: Into, -{ - type Data = Bytes; - type Error = Status; - - fn is_end_stream(&self) -> bool { - self.0.is_end_stream() - } - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let v = unsafe { - let me = self.get_unchecked_mut(); - Pin::new_unchecked(&mut me.0).poll_data(cx) - }; - match futures_util::ready!(v) { - Some(Ok(mut i)) => Poll::Ready(Some(Ok(i.copy_to_bytes(i.remaining())))), - Some(Err(e)) => { - let err = Status::map_error(e.into()); - Poll::Ready(Some(Err(err))) - } - None => Poll::Ready(None), - } - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let v = unsafe { - let me = self.get_unchecked_mut(); - Pin::new_unchecked(&mut me.0).poll_trailers(cx) - }; - - let v = futures_util::ready!(v).map_err(|e| Status::from_error(&*e.into())); - Poll::Ready(v) - } -} - -impl fmt::Debug for BoxBody { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BoxBody").finish() - } -} - -#[derive(Debug, Default)] -struct EmptyBody { - _p: (), -} - -impl HttpBody for EmptyBody { - type Data = Bytes; - type Error = Status; - - fn is_end_stream(&self) -> bool { - true - } - - fn poll_data( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll>> { - Poll::Ready(None) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - Poll::Ready(Ok(None)) - } -} +/// A type erased HTTP body used for tonic services. +pub type BoxBody = http_body::combinators::BoxBody; diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index 2e3c2de85..76143c5b2 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -1,5 +1,5 @@ use crate::{ - body::{Body, BoxBody}, + body::BoxBody, client::GrpcService, codec::{encode_client, Codec, Streaming}, interceptor::Interceptor, @@ -11,7 +11,7 @@ use http::{ header::{HeaderValue, CONTENT_TYPE, TE}, uri::{Parts, PathAndQuery, Uri}, }; -use http_body::Body as HttpBody; +use http_body::Body; use std::fmt; /// A gRPC client dispatcher. @@ -71,8 +71,8 @@ impl Grpc { ) -> Result, Status> where T: GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, - ::Error: Into, + T::ResponseBody: Body + Send + Sync + 'static, + ::Error: Into, C: Codec, M1: Send + Sync + 'static, M2: Send + Sync + 'static, @@ -90,8 +90,8 @@ impl Grpc { ) -> Result, Status> where T: GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, - ::Error: Into, + T::ResponseBody: Body + Send + Sync + 'static, + ::Error: Into, S: Stream + Send + Sync + 'static, C: Codec, M1: Send + Sync + 'static, @@ -126,8 +126,8 @@ impl Grpc { ) -> Result>, Status> where T: GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, - ::Error: Into, + T::ResponseBody: Body + Send + Sync + 'static, + ::Error: Into, C: Codec, M1: Send + Sync + 'static, M2: Send + Sync + 'static, @@ -145,8 +145,8 @@ impl Grpc { ) -> Result>, Status> where T: GrpcService, - T::ResponseBody: Body + HttpBody + Send + 'static, - ::Error: Into, + T::ResponseBody: Body + Send + Sync + 'static, + ::Error: Into, S: Stream + Send + Sync + 'static, C: Codec, M1: Send + Sync + 'static, diff --git a/tonic/src/client/service.rs b/tonic/src/client/service.rs index c7511ed4a..8f300c366 100644 --- a/tonic/src/client/service.rs +++ b/tonic/src/client/service.rs @@ -1,5 +1,4 @@ -use crate::body::Body; -use http_body::Body as HttpBody; +use http_body::Body; use std::future::Future; use std::task::{Context, Poll}; use tower_service::Service; @@ -13,7 +12,7 @@ use tower_service::Service; /// [`tower_service`]: https://docs.rs/tower-service pub trait GrpcService { /// Responses body given by the service. - type ResponseBody: Body + HttpBody; + type ResponseBody: Body; /// Errors produced by the service. type Error: Into; /// The future response value. @@ -34,8 +33,8 @@ impl GrpcService for T where T: Service, Response = http::Response>, T::Error: Into, - ResBody: Body + HttpBody, - ::Error: Into, + ResBody: Body, + ::Error: Into, { type ResponseBody = ResBody; type Error = T::Error; diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index dbebfe21e..73f3427af 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -79,7 +79,10 @@ impl Streaming { { Self { decoder: Box::new(decoder), - body: BoxBody::map_from(body), + body: body + .map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) + .map_err(|err| Status::map_error(err.into())) + .boxed(), state: State::ReadHeader, direction, buf: BytesMut::with_capacity(BUFFER_SIZE), diff --git a/tonic/src/codegen.rs b/tonic/src/codegen.rs index cb146929c..5cb4221b4 100644 --- a/tonic/src/codegen.rs +++ b/tonic/src/codegen.rs @@ -4,14 +4,13 @@ pub use async_trait::async_trait; pub use futures_core; pub use futures_util::future::{ok, poll_fn, Ready}; -pub use http_body::Body as HttpBody; pub use std::future::Future; pub use std::pin::Pin; pub use std::sync::Arc; pub use std::task::{Context, Poll}; pub use tower_service::Service; pub type StdError = Box; -pub use crate::body::Body; +pub use http_body::Body; pub type BoxFuture = self::Pin> + Send + 'static>>; pub type BoxStream = @@ -31,3 +30,7 @@ impl std::fmt::Display for Never { } impl std::error::Error for Never {} + +pub fn empty_body() -> crate::body::BoxBody { + http_body::Empty::new().map_err(|err| match err {}).boxed() +} diff --git a/tonic/src/status.rs b/tonic/src/status.rs index 071e1f9cd..5e58fe9de 100644 --- a/tonic/src/status.rs +++ b/tonic/src/status.rs @@ -527,7 +527,7 @@ impl Status { self.add_header(&mut parts.headers).unwrap(); - http::Response::from_parts(parts, BoxBody::empty()) + http::Response::from_parts(parts, crate::body::empty_body()) } } diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index c2ad4d5ef..e46b5266b 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -691,7 +691,7 @@ impl Service> for Unimplemented { .status(200) .header("grpc-status", "12") .header("content-type", "application/grpc") - .body(BoxBody::empty()) + .body(crate::body::empty_body()) .unwrap(), ) } diff --git a/tonic/src/transport/server/recover_error.rs b/tonic/src/transport/server/recover_error.rs index 9b4ff2e67..2004560c2 100644 --- a/tonic/src/transport/server/recover_error.rs +++ b/tonic/src/transport/server/recover_error.rs @@ -63,7 +63,7 @@ where Ok(res) => Poll::Ready(Ok(res)), Err(err) => { if let Some(status) = Status::try_from_error(&*err) { - let mut res = Response::new(BoxBody::empty()); + let mut res = Response::new(crate::body::empty_body()); status.add_header(res.headers_mut()).unwrap(); Poll::Ready(Ok(res)) } else {