Skip to content

Commit

Permalink
generic decode fn
Browse files Browse the repository at this point in the history
  • Loading branch information
alce committed Dec 24, 2019
1 parent 1d6aac8 commit b5c473d
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 40 deletions.
2 changes: 1 addition & 1 deletion tonic/benches/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Decoder for MockDecoder {
type Item = Bytes;
type Error = Status;

fn decode(&mut self, buf: &mut dyn Buf) -> Result<Option<Self::Item>, Self::Error> {
fn decode<B: Buf>(&mut self, buf: &mut B) -> Result<Option<Self::Item>, Self::Error> {
let mut out = vec![0; self.message_size];
buf.copy_to_slice(&mut out);
Ok(Some(Bytes::from(out)))
Expand Down
4 changes: 2 additions & 2 deletions tonic/src/client/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<T> Grpc<T> {
request: Request<M1>,
path: PathAndQuery,
codec: C,
) -> Result<Response<Streaming<M2>>, Status>
) -> Result<Response<Streaming<M2, C::Decoder>>, Status>
where
T: GrpcService<BoxBody>,
T::ResponseBody: Body + HttpBody + Send + 'static,
Expand All @@ -124,7 +124,7 @@ impl<T> Grpc<T> {
request: Request<S>,
path: PathAndQuery,
mut codec: C,
) -> Result<Response<Streaming<M2>>, Status>
) -> Result<Response<Streaming<M2, C::Decoder>>, Status>
where
T: GrpcService<BoxBody>,
T::ResponseBody: Body + HttpBody + Send + 'static,
Expand Down
57 changes: 36 additions & 21 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@ const BUFFER_SIZE: usize = 8 * 1024;
///
/// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface
/// to fetch the message stream and trailing metadata
pub struct Streaming<T> {
decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + Sync + 'static>,
pub struct Streaming<T, D>
where
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
decoder: D,
body: BoxBody,
state: State,
direction: Direction,
buf: BytesMut,
trailers: Option<MetadataMap>,
}

impl<T> Unpin for Streaming<T> {}
impl<T, D> Unpin for Streaming<T, D> where
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static
{
}

#[derive(Debug)]
enum State {
Expand All @@ -42,43 +48,42 @@ enum Direction {
EmptyResponse,
}

impl<T> Streaming<T> {
pub(crate) fn new_response<B, D>(decoder: D, body: B, status_code: StatusCode) -> Self
impl<T, D> Streaming<T, D>
where
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
pub(crate) fn new_response<B>(decoder: D, body: B, status_code: StatusCode) -> Self
where
B: Body + Send + Sync + 'static,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self::new(decoder, body, Direction::Response(status_code))
}

pub(crate) fn new_empty<B, D>(decoder: D, body: B) -> Self
pub(crate) fn new_empty<B>(decoder: D, body: B) -> Self
where
B: Body + Send + Sync + 'static,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self::new(decoder, body, Direction::EmptyResponse)
}

#[doc(hidden)]
pub fn new_request<B, D>(decoder: D, body: B) -> Self
pub fn new_request<B>(decoder: D, body: B) -> Self
where
B: Body + Send + Sync + 'static,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self::new(decoder, body, Direction::Request)
}

fn new<B, D>(decoder: D, body: B, direction: Direction) -> Self
fn new<B>(decoder: D, body: B, direction: Direction) -> Self
where
B: Body + Send + Sync + 'static,
B::Error: Into<crate::Error>,
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
Self {
decoder: Box::new(decoder),
decoder,
body: BoxBody::map_from(body),
state: State::ReadHeader,
direction,
Expand All @@ -88,13 +93,17 @@ impl<T> Streaming<T> {
}
}

impl<T> Streaming<T> {
impl<T, D> Streaming<T, D>
where
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
/// Fetch the next message from this stream.
/// ```rust
/// # use tonic::{Streaming, Status};
/// # use tonic::{Streaming, Status, codec::Decoder};
/// # use std::fmt::Debug;
/// # async fn next_message_ex<T>(mut request: Streaming<T>) -> Result<(), Status>
/// # where T: Debug
/// # async fn next_message_ex<T, D>(mut request: Streaming<T, D>) -> Result<(), Status>
/// # where T: Debug,
/// # D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
/// # {
/// if let Some(next_message) = request.message().await? {
/// println!("{:?}", next_message);
Expand Down Expand Up @@ -204,7 +213,10 @@ impl<T> Streaming<T> {
}
}

impl<T> Stream for Streaming<T> {
impl<T, D> Stream for Streaming<T, D>
where
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
type Item = Result<T, Status>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down Expand Up @@ -277,11 +289,14 @@ impl<T> Stream for Streaming<T> {
}
}

impl<T> fmt::Debug for Streaming<T> {
impl<T, D> fmt::Debug for Streaming<T, D>
where
D: Decoder<Item = T, Error = Status> + Send + Sync + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Streaming").finish()
}
}

#[cfg(test)]
static_assertions::assert_impl_all!(Streaming<()>: Send, Sync);
//#[cfg(test)]
//static_assertions::assert_impl_all!(Streaming<(), ()>: Send, Sync);
2 changes: 1 addition & 1 deletion tonic/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub trait Decoder {
type Error: From<io::Error>;

/// Attempts to decode a frame from the provided buffer of bytes.
fn decode(&mut self, src: &mut dyn Buf) -> Result<Option<Self::Item>, Self::Error>;
fn decode<B: Buf>(&mut self, src: &mut B) -> Result<Option<Self::Item>, Self::Error>;
}

/// Trait of helper objects to write out messages as bytes.
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/codec/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<U: Message + Default> Decoder for ProstDecoder<U> {
type Item = U;
type Error = Status;

fn decode(&mut self, buf: &mut dyn Buf) -> Result<Option<Self::Item>, Self::Error> {
fn decode<B: Buf>(&mut self, buf: &mut B) -> Result<Option<Self::Item>, Self::Error> {
let mut cursor = std::io::Cursor::new(buf.bytes());

let item = Message::decode(&mut cursor)
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/codec/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl Decoder for MockDecoder {
type Item = Vec<u8>;
type Error = Status;

fn decode(&mut self, buf: &mut dyn Buf) -> Result<Option<Self::Item>, Self::Error> {
fn decode<B: Buf>(&mut self, buf: &mut B) -> Result<Option<Self::Item>, Self::Error> {
let mut out = vec![0; buf.remaining()];
buf.copy_to_slice(&mut out);
Ok(Some(out))
Expand Down
6 changes: 3 additions & 3 deletions tonic/src/server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ where
req: http::Request<B>,
) -> http::Response<BoxBody>
where
S: ClientStreamingService<T::Decode, Response = T::Encode>,
S: ClientStreamingService<T::Decode, T::Decoder, Response = T::Encode>,
B: Body + Send + Sync + 'static,
B::Error: Into<crate::Error> + Send + 'static,
{
Expand All @@ -111,7 +111,7 @@ where
req: http::Request<B>,
) -> http::Response<BoxBody>
where
S: StreamingService<T::Decode, Response = T::Encode> + Send,
S: StreamingService<T::Decode, T::Decoder, Response = T::Encode> + Send,
S::ResponseStream: Send + Sync + 'static,
B: Body + Send + Sync + 'static,
B::Error: Into<crate::Error> + Send,
Expand Down Expand Up @@ -151,7 +151,7 @@ where
fn map_request_streaming<B>(
&mut self,
request: http::Request<B>,
) -> Request<Streaming<T::Decode>>
) -> Request<Streaming<T::Decode, T::Decoder>>
where
B: Body + Send + Sync + 'static,
B::Error: Into<crate::Error> + Send,
Expand Down
29 changes: 19 additions & 10 deletions tonic/src/server/service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::codec::Decoder;
use crate::{Request, Response, Status, Streaming};
use futures_core::Stream;
use std::future::Future;
Expand Down Expand Up @@ -66,25 +67,29 @@ where
///
/// Existing tower_service::Service implementations with the correct form will
/// automatically implement `ClientStreamingService`.
pub trait ClientStreamingService<R> {
pub trait ClientStreamingService<R, D>
where
D: Decoder<Item = R, Error = Status> + Send + Sync + 'static,
{
/// Protobuf response message type
type Response;

/// Response future
type Future: Future<Output = Result<Response<Self::Response>, Status>>;

/// Call the service
fn call(&mut self, request: Request<Streaming<R>>) -> Self::Future;
fn call(&mut self, request: Request<Streaming<R, D>>) -> Self::Future;
}

impl<T, M1, M2> ClientStreamingService<M1> for T
impl<T, D, M1, M2> ClientStreamingService<M1, D> for T
where
T: Service<Request<Streaming<M1>>, Response = Response<M2>, Error = crate::Status>,
D: Decoder<Item = M1, Error = Status> + Send + Sync + 'static,
T: Service<Request<Streaming<M1, D>>, Response = Response<M2>, Error = crate::Status>,
{
type Response = M2;
type Future = T::Future;

fn call(&mut self, request: Request<Streaming<M1>>) -> Self::Future {
fn call(&mut self, request: Request<Streaming<M1, D>>) -> Self::Future {
Service::call(self, request)
}
}
Expand All @@ -93,7 +98,10 @@ where
///
/// Existing tower_service::Service implementations with the correct form will
/// automatically implement `StreamingService`.
pub trait StreamingService<R> {
pub trait StreamingService<R, D>
where
D: Decoder<Item = R, Error = Status> + Send + Sync + 'static,
{
/// Protobuf response message type
type Response;

Expand All @@ -104,19 +112,20 @@ pub trait StreamingService<R> {
type Future: Future<Output = Result<Response<Self::ResponseStream>, Status>>;

/// Call the service
fn call(&mut self, request: Request<Streaming<R>>) -> Self::Future;
fn call(&mut self, request: Request<Streaming<R, D>>) -> Self::Future;
}

impl<T, S, M1, M2> StreamingService<M1> for T
impl<T, S, D, M1, M2> StreamingService<M1, D> for T
where
T: Service<Request<Streaming<M1>>, Response = Response<S>, Error = crate::Status>,
T: Service<Request<Streaming<M1, D>>, Response = Response<S>, Error = crate::Status>,
S: Stream<Item = Result<M2, crate::Status>>,
D: Decoder<Item = M1, Error = Status> + Send + Sync + 'static,
{
type Response = M2;
type ResponseStream = S;
type Future = T::Future;

fn call(&mut self, request: Request<Streaming<M1>>) -> Self::Future {
fn call(&mut self, request: Request<Streaming<M1, D>>) -> Self::Future {
Service::call(self, request)
}
}

0 comments on commit b5c473d

Please sign in to comment.