From c8a4c7c91deb3f854250cc9535e69e3f2a8395e3 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Tue, 15 Oct 2019 17:31:32 -0400 Subject: [PATCH 01/28] Add blobservice example for smoke testing --- tonic-examples/Cargo.toml | 10 ++++ tonic-examples/build.rs | 1 + .../proto/blobservice/blobservice.proto | 29 +++++++++++ tonic-examples/src/blobservice/client.rs | 32 ++++++++++++ tonic-examples/src/blobservice/server.rs | 49 +++++++++++++++++++ 5 files changed, 121 insertions(+) create mode 100644 tonic-examples/proto/blobservice/blobservice.proto create mode 100644 tonic-examples/src/blobservice/client.rs create mode 100644 tonic-examples/src/blobservice/server.rs diff --git a/tonic-examples/Cargo.toml b/tonic-examples/Cargo.toml index d79c681be..53fc9fbe7 100644 --- a/tonic-examples/Cargo.toml +++ b/tonic-examples/Cargo.toml @@ -44,6 +44,14 @@ path = "src/tls/client.rs" name = "tls-server" path = "src/tls/server.rs" +[[bin]] +name = "blob-client" +path = "src/blobservice/client.rs" + +[[bin]] +name = "blob-server" +path = "src/blobservice/server.rs" + [dependencies] tonic = { path = "../tonic", features = ["rustls"] } bytes = "0.4" @@ -54,6 +62,8 @@ futures-preview = { version = "=0.3.0-alpha.19", default-features = false, featu async-stream = "0.1.2" http = "0.1" tower = "=0.3.0-alpha.2" +env_logger = "*" +log = "*" # Required for routeguide serde = { version = "1.0", features = ["derive"] } diff --git a/tonic-examples/build.rs b/tonic-examples/build.rs index 4649bee22..d592f7227 100644 --- a/tonic-examples/build.rs +++ b/tonic-examples/build.rs @@ -2,4 +2,5 @@ fn main() { tonic_build::compile_protos("proto/helloworld/helloworld.proto").unwrap(); tonic_build::compile_protos("proto/routeguide/route_guide.proto").unwrap(); tonic_build::compile_protos("proto/echo/echo.proto").unwrap(); + tonic_build::compile_protos("proto/blobservice/blobservice.proto").unwrap(); } diff --git a/tonic-examples/proto/blobservice/blobservice.proto b/tonic-examples/proto/blobservice/blobservice.proto new file mode 100644 index 000000000..469a28fe6 --- /dev/null +++ b/tonic-examples/proto/blobservice/blobservice.proto @@ -0,0 +1,29 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package blobservice; + +service Blobber { + rpc GetBytes (BlobRequest) returns (stream BlobResponse) {} +} + +message BlobRequest { + uint64 nbytes = 1; +} + +message BlobResponse { + bytes bytes = 1; +} diff --git a/tonic-examples/src/blobservice/client.rs b/tonic-examples/src/blobservice/client.rs new file mode 100644 index 000000000..bb15e61c3 --- /dev/null +++ b/tonic-examples/src/blobservice/client.rs @@ -0,0 +1,32 @@ +use env_logger; +use futures::TryStreamExt; +use log::{debug, error}; + +pub mod blobservice { + tonic::include_proto!("blobservice"); +} + +use blobservice::{client::BlobberClient, BlobRequest}; + +#[tokio::main(single_thread)] +async fn main() -> Result<(), Box> { + env_logger::init(); + let mut client = BlobberClient::connect("http://[::1]:50051")?; + let nbytes = std::env::args().nth(1).unwrap().parse::().unwrap(); + let request = tonic::Request::new(BlobRequest { nbytes }); + + let response = client.get_bytes(request).await?; + let mut inner = response.into_inner(); + let mut i = 0_u64; + while let Some(_) = inner.try_next().await.map_err(|e| { + error!("i={}", i); + error!("message={}", e.message()); + e + })? { + if i % 1000 == 0 { + debug!("request # {}", i); + } + i += 1; + } + Ok(()) +} diff --git a/tonic-examples/src/blobservice/server.rs b/tonic-examples/src/blobservice/server.rs new file mode 100644 index 000000000..79716d789 --- /dev/null +++ b/tonic-examples/src/blobservice/server.rs @@ -0,0 +1,49 @@ +use futures::Stream; +use std::convert::TryInto; +use std::iter::FromIterator; +use std::pin::Pin; +use tonic::{transport::Server, Request, Response, Status}; + +pub mod blobservice { + tonic::include_proto!("blobservice"); +} + +use blobservice::{ + server::{Blobber, BlobberServer}, + BlobRequest, BlobResponse, +}; + +#[derive(Default)] +pub struct SimpleBlobber; + +#[tonic::async_trait] +impl Blobber for SimpleBlobber { + type GetBytesStream = + Pin> + Send + 'static>>; + + async fn get_bytes( + &self, + request: Request, + ) -> Result, Status> { + let message = request.into_inner(); + let bytes = + Vec::from_iter(std::iter::repeat(254u8).take(message.nbytes.try_into().unwrap())); + let response = futures::stream::iter((0..).map(move |_| { + Ok(blobservice::BlobResponse { + bytes: bytes.clone(), + }) + })); + + Ok(Response::new(Box::pin(response) as Self::GetBytesStream)) + } +} + +#[tokio::main(single_thread)] +async fn main() -> Result<(), Box> { + let address = "[::1]:50051".parse().unwrap(); + Server::builder() + .serve(address, BlobberServer::new(SimpleBlobber::default())) + .await?; + + Ok(()) +} From 00c415eebb6046245fb621b0fdcb6b59917243ea Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:06:03 -0400 Subject: [PATCH 02/28] Remove publicity from example --- tonic-examples/src/blobservice/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tonic-examples/src/blobservice/server.rs b/tonic-examples/src/blobservice/server.rs index 79716d789..16a26a8e0 100644 --- a/tonic-examples/src/blobservice/server.rs +++ b/tonic-examples/src/blobservice/server.rs @@ -4,7 +4,7 @@ use std::iter::FromIterator; use std::pin::Pin; use tonic::{transport::Server, Request, Response, Status}; -pub mod blobservice { +mod blobservice { tonic::include_proto!("blobservice"); } @@ -14,7 +14,7 @@ use blobservice::{ }; #[derive(Default)] -pub struct SimpleBlobber; +struct SimpleBlobber; #[tonic::async_trait] impl Blobber for SimpleBlobber { From bb8acae5093502a3a372bdf08a96421b1a1e0e12 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:10:41 -0400 Subject: [PATCH 03/28] Make empty struct more concise --- tonic/src/body.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tonic/src/body.rs b/tonic/src/body.rs index 41930edab..8d62f6bb3 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -194,9 +194,7 @@ impl fmt::Debug for BoxBody { } #[derive(Debug, Default)] -struct EmptyBody { - _p: (), -} +struct EmptyBody; impl HttpBody for EmptyBody { type Data = BytesBuf; From 88abfdf4d250f7e38c956db6c11c1336746c459d Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:11:12 -0400 Subject: [PATCH 04/28] Expose HEADER_SIZE and BUFFER_SIZE as crate public to avoid duping constants --- tonic/src/codec/mod.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tonic/src/codec/mod.rs b/tonic/src/codec/mod.rs index c32c729af..81437197d 100644 --- a/tonic/src/codec/mod.rs +++ b/tonic/src/codec/mod.rs @@ -3,8 +3,17 @@ //! This module contains the generic `Codec` trait and a protobuf codec //! based on prost. +// The mininum buffer size for codec data +pub(crate) const BUFFER_SIZE: usize = 8 * 1024; + +// The number of bytes in the tonic header: +// 1 * u8 for compression + +// 1 * u32 for body length +pub(crate) const HEADER_SIZE: usize = 5; + mod decode; mod encode; + #[cfg(feature = "prost")] mod prost; From 893f99061da7958df32ca6e149200b66e74900f9 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:11:34 -0400 Subject: [PATCH 05/28] Make sure our encoded buffer has the correct lower bound in size --- tonic/src/codec/prost.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tonic/src/codec/prost.rs b/tonic/src/codec/prost.rs index 495dc11f5..e726e3efe 100644 --- a/tonic/src/codec/prost.rs +++ b/tonic/src/codec/prost.rs @@ -1,4 +1,5 @@ use super::{Codec, Decoder, Encoder}; +use crate::codec::HEADER_SIZE; use crate::{Code, Status}; use bytes::{BufMut, BytesMut}; use prost::Message; @@ -54,6 +55,8 @@ impl Encoder for ProstEncoder { buf.reserve(len); } + assert!(buf.len() >= len + HEADER_SIZE); + item.encode(buf) .map_err(|_| unreachable!("Message only errors if not enough space")) } From fec5d0912f760c7d7ca94a362e9f3b2730ccd916 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:11:59 -0400 Subject: [PATCH 06/28] Use crate-public constants in encode.rs --- tonic/src/codec/encode.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 7d91b4b94..76a12e8bd 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -1,4 +1,8 @@ -use crate::{body::BytesBuf, Code, Status}; +use crate::{ + body::BytesBuf, + codec::{BUFFER_SIZE, HEADER_SIZE}, + Code, Status, +}; use bytes::{BufMut, BytesMut, IntoBuf}; use futures_core::{Stream, TryStream}; use futures_util::{ready, StreamExt, TryStreamExt}; @@ -9,8 +13,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio_codec::Encoder; -const BUFFER_SIZE: usize = 8 * 1024; - pub(crate) fn encode_server( encoder: T, source: U, @@ -47,22 +49,22 @@ where loop { match source.next().await { Some(Ok(item)) => { - buf.reserve(5); + buf.reserve(HEADER_SIZE); unsafe { - buf.advance_mut(5); + buf.advance_mut(HEADER_SIZE); } encoder.encode(item, &mut buf).map_err(drop).unwrap(); // now that we know length, we can write the header - let len = buf.len() - 5; + let len = buf.len() - HEADER_SIZE; assert!(len <= std::u32::MAX as usize); { - let mut cursor = std::io::Cursor::new(&mut buf[..5]); + let mut cursor = std::io::Cursor::new(&mut buf[..HEADER_SIZE]); cursor.put_u8(0); // byte must be 0, reserve doesn't auto-zero cursor.put_u32_be(len as u32); } - yield Ok(buf.split_to(len + 5).freeze().into_buf()); + yield Ok(buf.split_to(len + HEADER_SIZE).freeze().into_buf()); }, Some(Err(status)) => yield Err(status), None => break, From ce2ae98ba3438c91d31ba1ccfc48e95dcf9ef091 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:13:21 -0400 Subject: [PATCH 07/28] Use crate-public constants in decode.rs --- tonic/src/codec/decode.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 413afdb45..4186f3483 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -1,5 +1,10 @@ use super::Decoder; -use crate::{body::BoxBody, metadata::MetadataMap, Code, Status}; +use crate::{ + body::BoxBody, + codec::{BUFFER_SIZE, HEADER_SIZE}, + metadata::MetadataMap, + Code, Status, +}; use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; use futures_core::Stream; use futures_util::{future, ready}; @@ -12,8 +17,6 @@ use std::{ }; use tracing::{debug, trace}; -const BUFFER_SIZE: usize = 8 * 1024; - /// Streaming requests and responses. /// /// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface @@ -157,7 +160,9 @@ impl Streaming { let mut buf = (&self.buf[..]).into_buf(); if let State::ReadHeader = self.state { - if buf.remaining() < 5 { + // we have less than HEADER_SIZE bytes to read so return early (triggering further + // reading from the body) + if buf.remaining() < HEADER_SIZE { return Ok(None); } @@ -192,7 +197,7 @@ impl Streaming { } // advance past the header - self.buf.advance(5); + self.buf.advance(HEADER_SIZE); match self.decoder.decode(&mut self.buf) { Ok(Some(msg)) => { From a60b3ede876120240dc2382ecc5eeaa57764ec22 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:14:15 -0400 Subject: [PATCH 08/28] Rename fields for clarity in later usage --- tonic/src/codec/decode.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 4186f3483..deae84c1e 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -35,7 +35,10 @@ impl Unpin for Streaming {} #[derive(Debug)] enum State { ReadHeader, - ReadBody { compression: bool, len: usize }, + ReadBody { + is_compressed: bool, + message_length: usize, + }, } #[derive(Debug)] @@ -183,16 +186,24 @@ impl Streaming { )); } }; - let len = buf.get_u32_be() as usize; + // consume the length of the message + let message_length = buf.get_u32_be() as usize; + + // now it's time to read the body self.state = State::ReadBody { - compression: is_compressed, - len, + is_compressed, + message_length, } } - if let State::ReadBody { len, .. } = &self.state { - if buf.remaining() < *len { + /////////////////////////// + // read the message body // + /////////////////////////// + if let State::ReadBody { message_length, .. } = self.state { + // if we haven't read the entire message in then we need to wait for more data + let bytes_left_to_decode = buf.remaining(); + if bytes_left_to_decode < message_length { return Ok(None); } From c92bd82003dd70d1badeb8444a67f3072297c9b7 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:15:01 -0400 Subject: [PATCH 09/28] Ensure that if we do not have enough bytes for header + message that we continue reading --- tonic/src/codec/decode.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index deae84c1e..a1e56e9db 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -207,6 +207,18 @@ impl Streaming { return Ok(None); } + let bytes_allocd_for_decoding = self.buf.len(); + // It's possible to read enough data to decode the body, but not enough to decode the + // body _and_ the tonic header. If that's the case, then read more data. + let min_bytes_needed_to_decode = message_length + HEADER_SIZE; + if bytes_allocd_for_decoding < min_bytes_needed_to_decode { + return Ok(None); + } + + // self.buf must always contain at least the length of the message number of bytes + + // the number of bytes in the tonic header + assert!(bytes_allocd_for_decoding >= min_bytes_needed_to_decode); + // advance past the header self.buf.advance(HEADER_SIZE); From 0733aecfce299e795dd7457257647218a3319e76 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:18:10 -0400 Subject: [PATCH 10/28] Add comments describing the flow of data --- tonic/src/codec/decode.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index a1e56e9db..ad4affa57 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -160,8 +160,10 @@ impl Streaming { } fn decode_chunk(&mut self) -> Result, Status> { + // pull out the bytes containing the message let mut buf = (&self.buf[..]).into_buf(); + // read the tonic header if let State::ReadHeader = self.state { // we have less than HEADER_SIZE bytes to read so return early (triggering further // reading from the body) @@ -169,6 +171,7 @@ impl Streaming { return Ok(None); } + // FIXME: compression isn't supported yet, so just consume the first byte let is_compressed = match buf.get_u8() { 0 => false, 1 => { @@ -197,9 +200,7 @@ impl Streaming { } } - /////////////////////////// - // read the message body // - /////////////////////////// + // read the message if let State::ReadBody { message_length, .. } = self.state { // if we haven't read the entire message in then we need to wait for more data let bytes_left_to_decode = buf.remaining(); @@ -276,6 +277,8 @@ impl Stream for Streaming { self.buf.put(data); } else { + // otherwise, ensure that there are no remaining bytes in self.buf + // // FIXME: improve buf usage. let buf1 = (&self.buf[..]).into_buf(); if buf1.has_remaining() { From 13277a1abb6815b9e2296314de82732edad809f6 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:18:29 -0400 Subject: [PATCH 11/28] No need for match --- tonic/src/codec/decode.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index ad4affa57..91925ba09 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -247,9 +247,10 @@ impl Stream for Streaming { // FIXME: implement the ability to poll trailers when we _know_ that // the consumer of this stream will only poll for the first message. // This means we skip the poll_trailers step. - match self.decode_chunk()? { - Some(item) => return Poll::Ready(Some(Ok(item))), - None => (), + + // if we're able to decode a chunk then the future is complete + if let Some(item) = self.decode_chunk()? { + return Poll::Ready(Some(Ok(item))); } let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) { From a491622235f1cfdef185f9349f29c0e597530c37 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:18:47 -0400 Subject: [PATCH 12/28] Rename variables for clarity --- tonic/src/codec/decode.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 91925ba09..031236c43 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -253,8 +253,9 @@ impl Stream for Streaming { return Poll::Ready(Some(Ok(item))); } - let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) { - Some(Ok(d)) => Some(d), + // otherwise wait for more data from the request body + let body_chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) { + Some(Ok(data)) => Some(data), Some(Err(e)) => { let err: crate::Error = e.into(); debug!("decoder inner stream error: {:?}", err); @@ -265,18 +266,17 @@ impl Stream for Streaming { None => None, }; - if let Some(data) = chunk { - if data.remaining() > self.buf.remaining_mut() { - let amt = if data.remaining() > BUFFER_SIZE { - data.remaining() - } else { - BUFFER_SIZE - }; - + // if we received some data from the body, ensure that self.buf has room and put data + // into it + if let Some(body_data) = body_chunk { + let bytes_left_to_decode = body_data.remaining(); + let bytes_left_for_decoding = self.buf.remaining_mut(); + if bytes_left_to_decode > bytes_left_for_decoding { + let amt = bytes_left_to_decode.max(BUFFER_SIZE); self.buf.reserve(amt); } - self.buf.put(data); + self.buf.put(body_data); } else { // otherwise, ensure that there are no remaining bytes in self.buf // From 36164a5f382370846c004996f45fc1713f229fe2 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:20:55 -0400 Subject: [PATCH 13/28] Relax the assertion in encoding, since it is not actually required --- tonic/src/codec/prost.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/tonic/src/codec/prost.rs b/tonic/src/codec/prost.rs index e726e3efe..495dc11f5 100644 --- a/tonic/src/codec/prost.rs +++ b/tonic/src/codec/prost.rs @@ -1,5 +1,4 @@ use super::{Codec, Decoder, Encoder}; -use crate::codec::HEADER_SIZE; use crate::{Code, Status}; use bytes::{BufMut, BytesMut}; use prost::Message; @@ -55,8 +54,6 @@ impl Encoder for ProstEncoder { buf.reserve(len); } - assert!(buf.len() >= len + HEADER_SIZE); - item.encode(buf) .map_err(|_| unreachable!("Message only errors if not enough space")) } From 2ddd3af52ea2af187812c5ef0d4fb65a1ac7191b Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:21:05 -0400 Subject: [PATCH 14/28] Move blobservice back to multiple threads --- tonic-examples/src/blobservice/client.rs | 2 +- tonic-examples/src/blobservice/server.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tonic-examples/src/blobservice/client.rs b/tonic-examples/src/blobservice/client.rs index bb15e61c3..8aaa70faf 100644 --- a/tonic-examples/src/blobservice/client.rs +++ b/tonic-examples/src/blobservice/client.rs @@ -8,7 +8,7 @@ pub mod blobservice { use blobservice::{client::BlobberClient, BlobRequest}; -#[tokio::main(single_thread)] +#[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); let mut client = BlobberClient::connect("http://[::1]:50051")?; diff --git a/tonic-examples/src/blobservice/server.rs b/tonic-examples/src/blobservice/server.rs index 16a26a8e0..9b28332ec 100644 --- a/tonic-examples/src/blobservice/server.rs +++ b/tonic-examples/src/blobservice/server.rs @@ -38,7 +38,7 @@ impl Blobber for SimpleBlobber { } } -#[tokio::main(single_thread)] +#[tokio::main] async fn main() -> Result<(), Box> { let address = "[::1]:50051".parse().unwrap(); Server::builder() From 306bae4596583a9dce90aebdec336aedf9c0f7b6 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 10:49:00 -0400 Subject: [PATCH 15/28] Remove logging dependencies --- tonic-examples/Cargo.toml | 2 -- tonic-examples/src/blobservice/client.rs | 10 ++-------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/tonic-examples/Cargo.toml b/tonic-examples/Cargo.toml index 53fc9fbe7..1ed07a5e4 100644 --- a/tonic-examples/Cargo.toml +++ b/tonic-examples/Cargo.toml @@ -62,8 +62,6 @@ futures-preview = { version = "=0.3.0-alpha.19", default-features = false, featu async-stream = "0.1.2" http = "0.1" tower = "=0.3.0-alpha.2" -env_logger = "*" -log = "*" # Required for routeguide serde = { version = "1.0", features = ["derive"] } diff --git a/tonic-examples/src/blobservice/client.rs b/tonic-examples/src/blobservice/client.rs index 8aaa70faf..c17e3913f 100644 --- a/tonic-examples/src/blobservice/client.rs +++ b/tonic-examples/src/blobservice/client.rs @@ -1,6 +1,4 @@ -use env_logger; use futures::TryStreamExt; -use log::{debug, error}; pub mod blobservice { tonic::include_proto!("blobservice"); @@ -10,7 +8,6 @@ use blobservice::{client::BlobberClient, BlobRequest}; #[tokio::main] async fn main() -> Result<(), Box> { - env_logger::init(); let mut client = BlobberClient::connect("http://[::1]:50051")?; let nbytes = std::env::args().nth(1).unwrap().parse::().unwrap(); let request = tonic::Request::new(BlobRequest { nbytes }); @@ -19,13 +16,10 @@ async fn main() -> Result<(), Box> { let mut inner = response.into_inner(); let mut i = 0_u64; while let Some(_) = inner.try_next().await.map_err(|e| { - error!("i={}", i); - error!("message={}", e.message()); + println!("i={}", i); + println!("message={}", e.message()); e })? { - if i % 1000 == 0 { - debug!("request # {}", i); - } i += 1; } Ok(()) From fc8500d927d8cb2174b7d6b525cdfff147ad70fa Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 11:05:15 -0400 Subject: [PATCH 16/28] Give an example of how the bug can happen in a comment --- tonic/src/codec/decode.rs | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 031236c43..eb41b936e 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -165,8 +165,8 @@ impl Streaming { // read the tonic header if let State::ReadHeader = self.state { - // we have less than HEADER_SIZE bytes to read so return early (triggering further - // reading from the body) + // if we don't have enough data from the body to decode the tonic header then read more + // data if buf.remaining() < HEADER_SIZE { return Ok(None); } @@ -190,17 +190,17 @@ impl Streaming { } }; - // consume the length of the message + // consume the length of the message from the tonic header let message_length = buf.get_u32_be() as usize; - // now it's time to read the body + // time to read the message body self.state = State::ReadBody { is_compressed, message_length, } } - // read the message + // read the message body if let State::ReadBody { message_length, .. } = self.state { // if we haven't read the entire message in then we need to wait for more data let bytes_left_to_decode = buf.remaining(); @@ -208,9 +208,33 @@ impl Streaming { return Ok(None); } + // It's possible to read enough data to appear that we could decode the body, when in + // fact the tonic + a _partial_ message body has been decoded. + // + // Example: + // + // Msg { + // bytes: Vec + // } + // + // # Encode: + // 1. Encode 10000 bytes from an instance of `Msg` + // 2. Total bytes needed for decoding a message is: + // 5 bytes for tonic's header + // + 10000 data bytes + // + 2 bytes to encode the number 10000 + // + 1 tag byte + // ------------------------------------ + // 10008 + // + // # Decode: + // 1. Partial read of 10005 bytes from HTTP2 stream + // 2. We've read enough bytes for the message, but we don't have the entire message + // because the first 5 bytes are tonic's header + // + // + // If that's the case we need to wait for more data. let bytes_allocd_for_decoding = self.buf.len(); - // It's possible to read enough data to decode the body, but not enough to decode the - // body _and_ the tonic header. If that's the case, then read more data. let min_bytes_needed_to_decode = message_length + HEADER_SIZE; if bytes_allocd_for_decoding < min_bytes_needed_to_decode { return Ok(None); From 6aecefde303f96c27648efea85fb2c7b9497c600 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 13:25:12 -0400 Subject: [PATCH 17/28] Use message_len for maximum rustiness --- tonic/src/codec/decode.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index eb41b936e..6a60ab2ca 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -37,7 +37,7 @@ enum State { ReadHeader, ReadBody { is_compressed: bool, - message_length: usize, + message_len: usize, }, } @@ -191,20 +191,20 @@ impl Streaming { }; // consume the length of the message from the tonic header - let message_length = buf.get_u32_be() as usize; + let message_len = buf.get_u32_be() as usize; // time to read the message body self.state = State::ReadBody { is_compressed, - message_length, + message_len, } } // read the message body - if let State::ReadBody { message_length, .. } = self.state { + if let State::ReadBody { message_len, .. } = self.state { // if we haven't read the entire message in then we need to wait for more data let bytes_left_to_decode = buf.remaining(); - if bytes_left_to_decode < message_length { + if bytes_left_to_decode < message_len { return Ok(None); } @@ -235,7 +235,7 @@ impl Streaming { // // If that's the case we need to wait for more data. let bytes_allocd_for_decoding = self.buf.len(); - let min_bytes_needed_to_decode = message_length + HEADER_SIZE; + let min_bytes_needed_to_decode = message_len + HEADER_SIZE; if bytes_allocd_for_decoding < min_bytes_needed_to_decode { return Ok(None); } From 7fd4d7da17e18595692ddcb041fee0e8dda0a13b Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 13:27:00 -0400 Subject: [PATCH 18/28] Use message instead of try_next --- tonic-examples/src/blobservice/client.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tonic-examples/src/blobservice/client.rs b/tonic-examples/src/blobservice/client.rs index c17e3913f..e95bb1a14 100644 --- a/tonic-examples/src/blobservice/client.rs +++ b/tonic-examples/src/blobservice/client.rs @@ -1,5 +1,3 @@ -use futures::TryStreamExt; - pub mod blobservice { tonic::include_proto!("blobservice"); } @@ -15,7 +13,7 @@ async fn main() -> Result<(), Box> { let response = client.get_bytes(request).await?; let mut inner = response.into_inner(); let mut i = 0_u64; - while let Some(_) = inner.try_next().await.map_err(|e| { + while let Some(_) = inner.message().await.map_err(|e| { println!("i={}", i); println!("message={}", e.message()); e From 0090592acab949b5d304c8cb5eb1bb9a71dc88ef Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 14:27:47 -0400 Subject: [PATCH 19/28] Remove license --- tonic-examples/proto/blobservice/blobservice.proto | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tonic-examples/proto/blobservice/blobservice.proto b/tonic-examples/proto/blobservice/blobservice.proto index 469a28fe6..4c350e66e 100644 --- a/tonic-examples/proto/blobservice/blobservice.proto +++ b/tonic-examples/proto/blobservice/blobservice.proto @@ -1,17 +1,3 @@ -// Copyright 2015 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - syntax = "proto3"; package blobservice; From 0b46a6fc9843d256ea91f47726777d38ced49004 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 15:13:33 -0400 Subject: [PATCH 20/28] Remove blobservice --- tonic-examples/Cargo.toml | 8 --- tonic-examples/build.rs | 1 - .../proto/blobservice/blobservice.proto | 15 ------ tonic-examples/src/blobservice/client.rs | 24 --------- tonic-examples/src/blobservice/server.rs | 49 ------------------- 5 files changed, 97 deletions(-) delete mode 100644 tonic-examples/proto/blobservice/blobservice.proto delete mode 100644 tonic-examples/src/blobservice/client.rs delete mode 100644 tonic-examples/src/blobservice/server.rs diff --git a/tonic-examples/Cargo.toml b/tonic-examples/Cargo.toml index 1ed07a5e4..d79c681be 100644 --- a/tonic-examples/Cargo.toml +++ b/tonic-examples/Cargo.toml @@ -44,14 +44,6 @@ path = "src/tls/client.rs" name = "tls-server" path = "src/tls/server.rs" -[[bin]] -name = "blob-client" -path = "src/blobservice/client.rs" - -[[bin]] -name = "blob-server" -path = "src/blobservice/server.rs" - [dependencies] tonic = { path = "../tonic", features = ["rustls"] } bytes = "0.4" diff --git a/tonic-examples/build.rs b/tonic-examples/build.rs index d592f7227..4649bee22 100644 --- a/tonic-examples/build.rs +++ b/tonic-examples/build.rs @@ -2,5 +2,4 @@ fn main() { tonic_build::compile_protos("proto/helloworld/helloworld.proto").unwrap(); tonic_build::compile_protos("proto/routeguide/route_guide.proto").unwrap(); tonic_build::compile_protos("proto/echo/echo.proto").unwrap(); - tonic_build::compile_protos("proto/blobservice/blobservice.proto").unwrap(); } diff --git a/tonic-examples/proto/blobservice/blobservice.proto b/tonic-examples/proto/blobservice/blobservice.proto deleted file mode 100644 index 4c350e66e..000000000 --- a/tonic-examples/proto/blobservice/blobservice.proto +++ /dev/null @@ -1,15 +0,0 @@ -syntax = "proto3"; - -package blobservice; - -service Blobber { - rpc GetBytes (BlobRequest) returns (stream BlobResponse) {} -} - -message BlobRequest { - uint64 nbytes = 1; -} - -message BlobResponse { - bytes bytes = 1; -} diff --git a/tonic-examples/src/blobservice/client.rs b/tonic-examples/src/blobservice/client.rs deleted file mode 100644 index e95bb1a14..000000000 --- a/tonic-examples/src/blobservice/client.rs +++ /dev/null @@ -1,24 +0,0 @@ -pub mod blobservice { - tonic::include_proto!("blobservice"); -} - -use blobservice::{client::BlobberClient, BlobRequest}; - -#[tokio::main] -async fn main() -> Result<(), Box> { - let mut client = BlobberClient::connect("http://[::1]:50051")?; - let nbytes = std::env::args().nth(1).unwrap().parse::().unwrap(); - let request = tonic::Request::new(BlobRequest { nbytes }); - - let response = client.get_bytes(request).await?; - let mut inner = response.into_inner(); - let mut i = 0_u64; - while let Some(_) = inner.message().await.map_err(|e| { - println!("i={}", i); - println!("message={}", e.message()); - e - })? { - i += 1; - } - Ok(()) -} diff --git a/tonic-examples/src/blobservice/server.rs b/tonic-examples/src/blobservice/server.rs deleted file mode 100644 index 9b28332ec..000000000 --- a/tonic-examples/src/blobservice/server.rs +++ /dev/null @@ -1,49 +0,0 @@ -use futures::Stream; -use std::convert::TryInto; -use std::iter::FromIterator; -use std::pin::Pin; -use tonic::{transport::Server, Request, Response, Status}; - -mod blobservice { - tonic::include_proto!("blobservice"); -} - -use blobservice::{ - server::{Blobber, BlobberServer}, - BlobRequest, BlobResponse, -}; - -#[derive(Default)] -struct SimpleBlobber; - -#[tonic::async_trait] -impl Blobber for SimpleBlobber { - type GetBytesStream = - Pin> + Send + 'static>>; - - async fn get_bytes( - &self, - request: Request, - ) -> Result, Status> { - let message = request.into_inner(); - let bytes = - Vec::from_iter(std::iter::repeat(254u8).take(message.nbytes.try_into().unwrap())); - let response = futures::stream::iter((0..).map(move |_| { - Ok(blobservice::BlobResponse { - bytes: bytes.clone(), - }) - })); - - Ok(Response::new(Box::pin(response) as Self::GetBytesStream)) - } -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - let address = "[::1]:50051".parse().unwrap(); - Server::builder() - .serve(address, BlobberServer::new(SimpleBlobber::default())) - .await?; - - Ok(()) -} From e79db3020244850b91b32f083632a9f8170734e2 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 15:15:34 -0400 Subject: [PATCH 21/28] Revert changes to decode.rs --- tonic/src/codec/decode.rs | 106 +++++++++----------------------------- 1 file changed, 25 insertions(+), 81 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 6a60ab2ca..413afdb45 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -1,10 +1,5 @@ use super::Decoder; -use crate::{ - body::BoxBody, - codec::{BUFFER_SIZE, HEADER_SIZE}, - metadata::MetadataMap, - Code, Status, -}; +use crate::{body::BoxBody, metadata::MetadataMap, Code, Status}; use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; use futures_core::Stream; use futures_util::{future, ready}; @@ -17,6 +12,8 @@ use std::{ }; use tracing::{debug, trace}; +const BUFFER_SIZE: usize = 8 * 1024; + /// Streaming requests and responses. /// /// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface @@ -35,10 +32,7 @@ impl Unpin for Streaming {} #[derive(Debug)] enum State { ReadHeader, - ReadBody { - is_compressed: bool, - message_len: usize, - }, + ReadBody { compression: bool, len: usize }, } #[derive(Debug)] @@ -160,18 +154,13 @@ impl Streaming { } fn decode_chunk(&mut self) -> Result, Status> { - // pull out the bytes containing the message let mut buf = (&self.buf[..]).into_buf(); - // read the tonic header if let State::ReadHeader = self.state { - // if we don't have enough data from the body to decode the tonic header then read more - // data - if buf.remaining() < HEADER_SIZE { + if buf.remaining() < 5 { return Ok(None); } - // FIXME: compression isn't supported yet, so just consume the first byte let is_compressed = match buf.get_u8() { 0 => false, 1 => { @@ -189,63 +178,21 @@ impl Streaming { )); } }; + let len = buf.get_u32_be() as usize; - // consume the length of the message from the tonic header - let message_len = buf.get_u32_be() as usize; - - // time to read the message body self.state = State::ReadBody { - is_compressed, - message_len, + compression: is_compressed, + len, } } - // read the message body - if let State::ReadBody { message_len, .. } = self.state { - // if we haven't read the entire message in then we need to wait for more data - let bytes_left_to_decode = buf.remaining(); - if bytes_left_to_decode < message_len { - return Ok(None); - } - - // It's possible to read enough data to appear that we could decode the body, when in - // fact the tonic + a _partial_ message body has been decoded. - // - // Example: - // - // Msg { - // bytes: Vec - // } - // - // # Encode: - // 1. Encode 10000 bytes from an instance of `Msg` - // 2. Total bytes needed for decoding a message is: - // 5 bytes for tonic's header - // + 10000 data bytes - // + 2 bytes to encode the number 10000 - // + 1 tag byte - // ------------------------------------ - // 10008 - // - // # Decode: - // 1. Partial read of 10005 bytes from HTTP2 stream - // 2. We've read enough bytes for the message, but we don't have the entire message - // because the first 5 bytes are tonic's header - // - // - // If that's the case we need to wait for more data. - let bytes_allocd_for_decoding = self.buf.len(); - let min_bytes_needed_to_decode = message_len + HEADER_SIZE; - if bytes_allocd_for_decoding < min_bytes_needed_to_decode { + if let State::ReadBody { len, .. } = &self.state { + if buf.remaining() < *len { return Ok(None); } - // self.buf must always contain at least the length of the message number of bytes + - // the number of bytes in the tonic header - assert!(bytes_allocd_for_decoding >= min_bytes_needed_to_decode); - // advance past the header - self.buf.advance(HEADER_SIZE); + self.buf.advance(5); match self.decoder.decode(&mut self.buf) { Ok(Some(msg)) => { @@ -271,15 +218,13 @@ impl Stream for Streaming { // FIXME: implement the ability to poll trailers when we _know_ that // the consumer of this stream will only poll for the first message. // This means we skip the poll_trailers step. - - // if we're able to decode a chunk then the future is complete - if let Some(item) = self.decode_chunk()? { - return Poll::Ready(Some(Ok(item))); + match self.decode_chunk()? { + Some(item) => return Poll::Ready(Some(Ok(item))), + None => (), } - // otherwise wait for more data from the request body - let body_chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) { - Some(Ok(data)) => Some(data), + let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) { + Some(Ok(d)) => Some(d), Some(Err(e)) => { let err: crate::Error = e.into(); debug!("decoder inner stream error: {:?}", err); @@ -290,20 +235,19 @@ impl Stream for Streaming { None => None, }; - // if we received some data from the body, ensure that self.buf has room and put data - // into it - if let Some(body_data) = body_chunk { - let bytes_left_to_decode = body_data.remaining(); - let bytes_left_for_decoding = self.buf.remaining_mut(); - if bytes_left_to_decode > bytes_left_for_decoding { - let amt = bytes_left_to_decode.max(BUFFER_SIZE); + if let Some(data) = chunk { + if data.remaining() > self.buf.remaining_mut() { + let amt = if data.remaining() > BUFFER_SIZE { + data.remaining() + } else { + BUFFER_SIZE + }; + self.buf.reserve(amt); } - self.buf.put(body_data); + self.buf.put(data); } else { - // otherwise, ensure that there are no remaining bytes in self.buf - // // FIXME: improve buf usage. let buf1 = (&self.buf[..]).into_buf(); if buf1.has_remaining() { From f0fb03eca1fe105978bb0fb9095b180cd7391b34 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 15:20:19 -0400 Subject: [PATCH 22/28] Simplify the check --- tonic/src/codec/decode.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 413afdb45..153a9b6da 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -187,7 +187,9 @@ impl Streaming { } if let State::ReadBody { len, .. } = &self.state { - if buf.remaining() < *len { + // if we haven't read enough of the message then return and keep + // reading + if buf.remaining() < *len || self.buf.len() < *len + 5 { return Ok(None); } From 36c68718622c78f6d4ab815e657cb4cf967f8e88 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 15:22:52 -0400 Subject: [PATCH 23/28] Revert body change --- tonic/src/body.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tonic/src/body.rs b/tonic/src/body.rs index 8d62f6bb3..41930edab 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -194,7 +194,9 @@ impl fmt::Debug for BoxBody { } #[derive(Debug, Default)] -struct EmptyBody; +struct EmptyBody { + _p: (), +} impl HttpBody for EmptyBody { type Data = BytesBuf; From 3abd90bab0128572c498748b097a827c5b884c21 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 15:23:54 -0400 Subject: [PATCH 24/28] Revert refactor --- tonic/src/codec/encode.rs | 18 ++++++++---------- tonic/src/codec/mod.rs | 9 --------- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 76a12e8bd..7d91b4b94 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -1,8 +1,4 @@ -use crate::{ - body::BytesBuf, - codec::{BUFFER_SIZE, HEADER_SIZE}, - Code, Status, -}; +use crate::{body::BytesBuf, Code, Status}; use bytes::{BufMut, BytesMut, IntoBuf}; use futures_core::{Stream, TryStream}; use futures_util::{ready, StreamExt, TryStreamExt}; @@ -13,6 +9,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio_codec::Encoder; +const BUFFER_SIZE: usize = 8 * 1024; + pub(crate) fn encode_server( encoder: T, source: U, @@ -49,22 +47,22 @@ where loop { match source.next().await { Some(Ok(item)) => { - buf.reserve(HEADER_SIZE); + buf.reserve(5); unsafe { - buf.advance_mut(HEADER_SIZE); + buf.advance_mut(5); } encoder.encode(item, &mut buf).map_err(drop).unwrap(); // now that we know length, we can write the header - let len = buf.len() - HEADER_SIZE; + let len = buf.len() - 5; assert!(len <= std::u32::MAX as usize); { - let mut cursor = std::io::Cursor::new(&mut buf[..HEADER_SIZE]); + let mut cursor = std::io::Cursor::new(&mut buf[..5]); cursor.put_u8(0); // byte must be 0, reserve doesn't auto-zero cursor.put_u32_be(len as u32); } - yield Ok(buf.split_to(len + HEADER_SIZE).freeze().into_buf()); + yield Ok(buf.split_to(len + 5).freeze().into_buf()); }, Some(Err(status)) => yield Err(status), None => break, diff --git a/tonic/src/codec/mod.rs b/tonic/src/codec/mod.rs index 81437197d..c32c729af 100644 --- a/tonic/src/codec/mod.rs +++ b/tonic/src/codec/mod.rs @@ -3,17 +3,8 @@ //! This module contains the generic `Codec` trait and a protobuf codec //! based on prost. -// The mininum buffer size for codec data -pub(crate) const BUFFER_SIZE: usize = 8 * 1024; - -// The number of bytes in the tonic header: -// 1 * u8 for compression + -// 1 * u32 for body length -pub(crate) const HEADER_SIZE: usize = 5; - mod decode; mod encode; - #[cfg(feature = "prost")] mod prost; From c7381a3ad2fd3d0e3e405c091f120ec00cd12571 Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 15:59:25 -0400 Subject: [PATCH 25/28] Fix decode test to actually run --- tonic/src/codec/tests.rs | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/tonic/src/codec/tests.rs b/tonic/src/codec/tests.rs index a61d063ab..524db41d7 100644 --- a/tonic/src/codec/tests.rs +++ b/tonic/src/codec/tests.rs @@ -23,7 +23,8 @@ struct Msg { async fn decode() { let decoder = ProstDecoder::::default(); - let data = Vec::from(&[0u8; 1024][..]); + let data = Vec::from(&[0u8; 10000][..]); + let data_len = data.len(); let msg = Msg { data }; let mut buf = BytesMut::new(); @@ -34,11 +35,21 @@ async fn decode() { buf.put_u32_be(len as u32); msg.encode(&mut buf).unwrap(); - let body = MockBody(buf.freeze(), 0, 100); + let upper = 100; + let body = MockBody { + data: buf.freeze(), + lower: 0, + upper: upper, + }; let mut stream = Streaming::new_request(decoder, body); - while let Some(_) = stream.message().await.unwrap() {} + let mut i = 0usize; + while let Some(msg) = stream.message().await.unwrap() { + assert_eq!(msg.data.len(), data_len); + i += 1; + } + assert_eq!(i, upper); } #[tokio::test] @@ -61,7 +72,11 @@ async fn encode() { } #[derive(Debug)] -struct MockBody(Bytes, usize, usize); +struct MockBody { + data: Bytes, + lower: usize, + upper: usize, +} impl Body for MockBody { type Data = Data; @@ -71,9 +86,9 @@ impl Body for MockBody { mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { - if self.1 > self.2 { - self.1 += 1; - let data = Data(self.0.clone().into_buf()); + if self.upper > self.lower { + self.lower += 1; + let data = Data(self.data.clone().into_buf()); Poll::Ready(Some(Ok(data))) } else { Poll::Ready(None) From 2d9c5b8b13be17ae289ff170c5fd231defbf2e8e Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 18:38:18 -0400 Subject: [PATCH 26/28] Checkpoint --- tonic/src/codec/decode.rs | 13 ++++++++++++- tonic/src/codec/tests.rs | 34 ++++++++++++++++++++-------------- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 153a9b6da..9b2f7708a 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -189,12 +189,20 @@ impl Streaming { if let State::ReadBody { len, .. } = &self.state { // if we haven't read enough of the message then return and keep // reading - if buf.remaining() < *len || self.buf.len() < *len + 5 { + println!( + "buf.remaining() == {}, *len == {}, self.buf.len() == {}", + buf.remaining(), + *len, + self.buf.len() + ); + if buf.remaining() < *len { + // || self.buf.len() < *len + 5 { return Ok(None); } // advance past the header self.buf.advance(5); + println!("self.buf.len() == {}", self.buf.len()); match self.decoder.decode(&mut self.buf) { Ok(Some(msg)) => { @@ -220,6 +228,8 @@ impl Stream for Streaming { // FIXME: implement the ability to poll trailers when we _know_ that // the consumer of this stream will only poll for the first message. // This means we skip the poll_trailers step. + println!("decoding ..."); + println!("self.buf.len() == {}", self.buf.len()); match self.decode_chunk()? { Some(item) => return Poll::Ready(Some(Ok(item))), None => (), @@ -245,6 +255,7 @@ impl Stream for Streaming { BUFFER_SIZE }; + println!("amt == {}", amt); self.buf.reserve(amt); } diff --git a/tonic/src/codec/tests.rs b/tonic/src/codec/tests.rs index 524db41d7..3cd18c29d 100644 --- a/tonic/src/codec/tests.rs +++ b/tonic/src/codec/tests.rs @@ -30,26 +30,30 @@ async fn decode() { let mut buf = BytesMut::new(); let len = msg.encoded_len(); - buf.reserve(len + 5); - buf.put_u8(0); - buf.put_u32_be(len as u32); - msg.encode(&mut buf).unwrap(); + // encode a few messages + let nmessages = 3; + buf.reserve(nmessages * (len + 5)); + + for _ in 0..nmessages { + buf.put_u8(0); + buf.put_u32_be(len as u32); + msg.encode(&mut buf).unwrap(); + } - let upper = 100; let body = MockBody { data: buf.freeze(), - lower: 0, - upper: upper, + partial_len: 10010, }; let mut stream = Streaming::new_request(decoder, body); let mut i = 0usize; + println!(""); while let Some(msg) = stream.message().await.unwrap() { assert_eq!(msg.data.len(), data_len); i += 1; } - assert_eq!(i, upper); + assert_eq!(i, nmessages); } #[tokio::test] @@ -74,8 +78,7 @@ async fn encode() { #[derive(Debug)] struct MockBody { data: Bytes, - lower: usize, - upper: usize, + partial_len: usize, } impl Body for MockBody { @@ -86,10 +89,13 @@ impl Body for MockBody { mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { - if self.upper > self.lower { - self.lower += 1; - let data = Data(self.data.clone().into_buf()); - Poll::Ready(Some(Ok(data))) + let partial_len = self.partial_len; + let data_len = self.data.len(); + let bytes_to_read = partial_len.min(data_len); + println!(""); + if bytes_to_read > 0 { + let response = self.data.split_to(bytes_to_read).into_buf(); + Poll::Ready(Some(Ok(Data(response)))) } else { Poll::Ready(None) } From bdcac3716aabc5d7398f93b50e96fc502028e99f Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 19:38:48 -0400 Subject: [PATCH 27/28] Modify existing test to send partial bodies --- tonic/src/codec/decode.rs | 13 +--------- tonic/src/codec/tests.rs | 50 ++++++++++++++++++++++++--------------- 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 9b2f7708a..153a9b6da 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -189,20 +189,12 @@ impl Streaming { if let State::ReadBody { len, .. } = &self.state { // if we haven't read enough of the message then return and keep // reading - println!( - "buf.remaining() == {}, *len == {}, self.buf.len() == {}", - buf.remaining(), - *len, - self.buf.len() - ); - if buf.remaining() < *len { - // || self.buf.len() < *len + 5 { + if buf.remaining() < *len || self.buf.len() < *len + 5 { return Ok(None); } // advance past the header self.buf.advance(5); - println!("self.buf.len() == {}", self.buf.len()); match self.decoder.decode(&mut self.buf) { Ok(Some(msg)) => { @@ -228,8 +220,6 @@ impl Stream for Streaming { // FIXME: implement the ability to poll trailers when we _know_ that // the consumer of this stream will only poll for the first message. // This means we skip the poll_trailers step. - println!("decoding ..."); - println!("self.buf.len() == {}", self.buf.len()); match self.decode_chunk()? { Some(item) => return Poll::Ready(Some(Ok(item))), None => (), @@ -255,7 +245,6 @@ impl Stream for Streaming { BUFFER_SIZE }; - println!("amt == {}", amt); self.buf.reserve(amt); } diff --git a/tonic/src/codec/tests.rs b/tonic/src/codec/tests.rs index 3cd18c29d..b6e55a589 100644 --- a/tonic/src/codec/tests.rs +++ b/tonic/src/codec/tests.rs @@ -30,30 +30,25 @@ async fn decode() { let mut buf = BytesMut::new(); let len = msg.encoded_len(); - // encode a few messages - let nmessages = 3; - buf.reserve(nmessages * (len + 5)); - - for _ in 0..nmessages { - buf.put_u8(0); - buf.put_u32_be(len as u32); - msg.encode(&mut buf).unwrap(); - } + buf.reserve(len + 5); + buf.put_u8(0); + buf.put_u32_be(len as u32); + msg.encode(&mut buf).unwrap(); let body = MockBody { data: buf.freeze(), - partial_len: 10010, + partial_len: 10005, + count: 0, }; let mut stream = Streaming::new_request(decoder, body); let mut i = 0usize; - println!(""); while let Some(msg) = stream.message().await.unwrap() { assert_eq!(msg.data.len(), data_len); i += 1; } - assert_eq!(i, nmessages); + assert_eq!(i, 1); } #[tokio::test] @@ -78,7 +73,12 @@ async fn encode() { #[derive(Debug)] struct MockBody { data: Bytes, + + // the size of the partial message to send partial_len: usize, + + // the number of times we've sent + count: usize, } impl Body for MockBody { @@ -87,15 +87,27 @@ impl Body for MockBody { fn poll_data( mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, + cx: &mut Context<'_>, ) -> Poll>> { - let partial_len = self.partial_len; + // every other call to poll_data returns data + let should_send = self.count % 2 == 0; let data_len = self.data.len(); - let bytes_to_read = partial_len.min(data_len); - println!(""); - if bytes_to_read > 0 { - let response = self.data.split_to(bytes_to_read).into_buf(); - Poll::Ready(Some(Ok(Data(response)))) + let partial_len = self.partial_len; + let count = self.count; + if data_len > 0 { + let result = if should_send { + let response = self + .data + .split_to(if count == 0 { partial_len } else { data_len }) + .into_buf(); + Poll::Ready(Some(Ok(Data(response)))) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + }; + // make some fake progress + self.count += 1; + result } else { Poll::Ready(None) } From b5e5ea79c1a657f01af422203c015f84701a352e Mon Sep 17 00:00:00 2001 From: Phillip Cloud Date: Mon, 21 Oct 2019 19:39:56 -0400 Subject: [PATCH 28/28] Shorten syntax --- tonic/src/codec/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tonic/src/codec/tests.rs b/tonic/src/codec/tests.rs index b6e55a589..ac2ca8c2e 100644 --- a/tonic/src/codec/tests.rs +++ b/tonic/src/codec/tests.rs @@ -23,7 +23,7 @@ struct Msg { async fn decode() { let decoder = ProstDecoder::::default(); - let data = Vec::from(&[0u8; 10000][..]); + let data = vec![0u8; 10000]; let data_len = data.len(); let msg = Msg { data };