Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(codec): Properly decode partial DATA frames #83

Merged
merged 28 commits into from
Oct 22, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c8a4c7c
Add blobservice example for smoke testing
Oct 15, 2019
00c415e
Remove publicity from example
Oct 21, 2019
bb8acae
Make empty struct more concise
Oct 21, 2019
88abfdf
Expose HEADER_SIZE and BUFFER_SIZE as crate public to avoid duping co…
Oct 21, 2019
893f990
Make sure our encoded buffer has the correct lower bound in size
Oct 21, 2019
fec5d09
Use crate-public constants in encode.rs
Oct 21, 2019
ce2ae98
Use crate-public constants in decode.rs
Oct 21, 2019
a60b3ed
Rename fields for clarity in later usage
Oct 21, 2019
c92bd82
Ensure that if we do not have enough bytes for header + message that …
Oct 21, 2019
0733aec
Add comments describing the flow of data
Oct 21, 2019
13277a1
No need for match
Oct 21, 2019
a491622
Rename variables for clarity
Oct 21, 2019
36164a5
Relax the assertion in encoding, since it is not actually required
Oct 21, 2019
2ddd3af
Move blobservice back to multiple threads
Oct 21, 2019
306bae4
Remove logging dependencies
Oct 21, 2019
fc8500d
Give an example of how the bug can happen in a comment
Oct 21, 2019
6aecefd
Use message_len for maximum rustiness
Oct 21, 2019
7fd4d7d
Use message instead of try_next
Oct 21, 2019
0090592
Remove license
Oct 21, 2019
0b46a6f
Remove blobservice
Oct 21, 2019
e79db30
Revert changes to decode.rs
Oct 21, 2019
f0fb03e
Simplify the check
Oct 21, 2019
36c6871
Revert body change
Oct 21, 2019
3abd90b
Revert refactor
Oct 21, 2019
c7381a3
Fix decode test to actually run
Oct 21, 2019
2d9c5b8
Checkpoint
Oct 21, 2019
bdcac37
Modify existing test to send partial bodies
Oct 21, 2019
b5e5ea7
Shorten syntax
Oct 21, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions tonic-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ path = "src/tls/client.rs"
name = "tls-server"
path = "src/tls/server.rs"

[[bin]]
name = "blob-client"
path = "src/blobservice/client.rs"

[[bin]]
name = "blob-server"
path = "src/blobservice/server.rs"

[dependencies]
tonic = { path = "../tonic", features = ["rustls"] }
bytes = "0.4"
Expand All @@ -54,6 +62,8 @@ futures-preview = { version = "=0.3.0-alpha.19", default-features = false, featu
async-stream = "0.1.2"
http = "0.1"
tower = "=0.3.0-alpha.2"
env_logger = "*"
log = "*"
cpcloud marked this conversation as resolved.
Show resolved Hide resolved

# Required for routeguide
serde = { version = "1.0", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions tonic-examples/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ fn main() {
tonic_build::compile_protos("proto/helloworld/helloworld.proto").unwrap();
tonic_build::compile_protos("proto/routeguide/route_guide.proto").unwrap();
tonic_build::compile_protos("proto/echo/echo.proto").unwrap();
tonic_build::compile_protos("proto/blobservice/blobservice.proto").unwrap();
}
29 changes: 29 additions & 0 deletions tonic-examples/proto/blobservice/blobservice.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2015 gRPC authors.
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also not accurate, I think. It's Apache OR MIT dual-licensed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an addendum, the right thing to do if license headers are desired is to use SPDX format ones, otherwise the LICENSE in the repo works just fine.

// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package blobservice;

service Blobber {
rpc GetBytes (BlobRequest) returns (stream BlobResponse) {}
}

message BlobRequest {
uint64 nbytes = 1;
}

message BlobResponse {
bytes bytes = 1;
}
32 changes: 32 additions & 0 deletions tonic-examples/src/blobservice/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use env_logger;
use futures::TryStreamExt;
use log::{debug, error};

pub mod blobservice {
tonic::include_proto!("blobservice");
}

use blobservice::{client::BlobberClient, BlobRequest};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let mut client = BlobberClient::connect("http://[::1]:50051")?;
let nbytes = std::env::args().nth(1).unwrap().parse::<u64>().unwrap();
let request = tonic::Request::new(BlobRequest { nbytes });

let response = client.get_bytes(request).await?;
let mut inner = response.into_inner();
let mut i = 0_u64;
while let Some(_) = inner.try_next().await.map_err(|e| {
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
error!("i={}", i);
error!("message={}", e.message());
e
})? {
if i % 1000 == 0 {
debug!("request # {}", i);
}
i += 1;
}
Ok(())
}
49 changes: 49 additions & 0 deletions tonic-examples/src/blobservice/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use futures::Stream;
use std::convert::TryInto;
use std::iter::FromIterator;
use std::pin::Pin;
use tonic::{transport::Server, Request, Response, Status};

mod blobservice {
tonic::include_proto!("blobservice");
}

use blobservice::{
server::{Blobber, BlobberServer},
BlobRequest, BlobResponse,
};

#[derive(Default)]
struct SimpleBlobber;

#[tonic::async_trait]
impl Blobber for SimpleBlobber {
type GetBytesStream =
Pin<Box<dyn Stream<Item = Result<BlobResponse, Status>> + Send + 'static>>;

async fn get_bytes(
&self,
request: Request<BlobRequest>,
) -> Result<Response<Self::GetBytesStream>, Status> {
let message = request.into_inner();
let bytes =
Vec::from_iter(std::iter::repeat(254u8).take(message.nbytes.try_into().unwrap()));
let response = futures::stream::iter((0..).map(move |_| {
Ok(blobservice::BlobResponse {
bytes: bytes.clone(),
})
}));

Ok(Response::new(Box::pin(response) as Self::GetBytesStream))
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let address = "[::1]:50051".parse().unwrap();
Server::builder()
.serve(address, BlobberServer::new(SimpleBlobber::default()))
.await?;

Ok(())
}
4 changes: 1 addition & 3 deletions tonic/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ impl fmt::Debug for BoxBody {
}

#[derive(Debug, Default)]
struct EmptyBody {
_p: (),
}
struct EmptyBody;

impl HttpBody for EmptyBody {
type Data = BytesBuf;
Expand Down
82 changes: 57 additions & 25 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use super::Decoder;
use crate::{body::BoxBody, metadata::MetadataMap, Code, Status};
use crate::{
body::BoxBody,
codec::{BUFFER_SIZE, HEADER_SIZE},
metadata::MetadataMap,
Code, Status,
};
use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf};
use futures_core::Stream;
use futures_util::{future, ready};
Expand All @@ -12,8 +17,6 @@ use std::{
};
use tracing::{debug, trace};

const BUFFER_SIZE: usize = 8 * 1024;

/// Streaming requests and responses.
///
/// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface
Expand All @@ -32,7 +35,10 @@ impl<T> Unpin for Streaming<T> {}
#[derive(Debug)]
enum State {
ReadHeader,
ReadBody { compression: bool, len: usize },
ReadBody {
is_compressed: bool,
message_length: usize,
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
},
}

#[derive(Debug)]
Expand Down Expand Up @@ -154,13 +160,18 @@ impl<T> Streaming<T> {
}

fn decode_chunk(&mut self) -> Result<Option<T>, Status> {
// pull out the bytes containing the message
let mut buf = (&self.buf[..]).into_buf();

// read the tonic header
if let State::ReadHeader = self.state {
if buf.remaining() < 5 {
// we have less than HEADER_SIZE bytes to read so return early (triggering further
// reading from the body)
if buf.remaining() < HEADER_SIZE {
return Ok(None);
}

// FIXME: compression isn't supported yet, so just consume the first byte
let is_compressed = match buf.get_u8() {
0 => false,
1 => {
Expand All @@ -178,21 +189,39 @@ impl<T> Streaming<T> {
));
}
};
let len = buf.get_u32_be() as usize;

// consume the length of the message
let message_length = buf.get_u32_be() as usize;

// now it's time to read the body
self.state = State::ReadBody {
compression: is_compressed,
len,
is_compressed,
message_length,
}
}

if let State::ReadBody { len, .. } = &self.state {
if buf.remaining() < *len {
// read the message
if let State::ReadBody { message_length, .. } = self.state {
// if we haven't read the entire message in then we need to wait for more data
let bytes_left_to_decode = buf.remaining();
if bytes_left_to_decode < message_length {
return Ok(None);
}

let bytes_allocd_for_decoding = self.buf.len();
// It's possible to read enough data to decode the body, but not enough to decode the
// body _and_ the tonic header. If that's the case, then read more data.
let min_bytes_needed_to_decode = message_length + HEADER_SIZE;
if bytes_allocd_for_decoding < min_bytes_needed_to_decode {
return Ok(None);
}

// self.buf must always contain at least the length of the message number of bytes +
// the number of bytes in the tonic header
assert!(bytes_allocd_for_decoding >= min_bytes_needed_to_decode);

// advance past the header
self.buf.advance(5);
self.buf.advance(HEADER_SIZE);

match self.decoder.decode(&mut self.buf) {
Ok(Some(msg)) => {
Expand All @@ -218,13 +247,15 @@ impl<T> Stream for Streaming<T> {
// FIXME: implement the ability to poll trailers when we _know_ that
// the consumer of this stream will only poll for the first message.
// This means we skip the poll_trailers step.
match self.decode_chunk()? {
Some(item) => return Poll::Ready(Some(Ok(item))),
None => (),

// if we're able to decode a chunk then the future is complete
if let Some(item) = self.decode_chunk()? {
return Poll::Ready(Some(Ok(item)));
}

let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
Some(Ok(d)) => Some(d),
// otherwise wait for more data from the request body
let body_chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
Some(Ok(data)) => Some(data),
Some(Err(e)) => {
let err: crate::Error = e.into();
debug!("decoder inner stream error: {:?}", err);
Expand All @@ -235,19 +266,20 @@ impl<T> Stream for Streaming<T> {
None => None,
};

if let Some(data) = chunk {
if data.remaining() > self.buf.remaining_mut() {
let amt = if data.remaining() > BUFFER_SIZE {
data.remaining()
} else {
BUFFER_SIZE
};

// if we received some data from the body, ensure that self.buf has room and put data
// into it
if let Some(body_data) = body_chunk {
let bytes_left_to_decode = body_data.remaining();
let bytes_left_for_decoding = self.buf.remaining_mut();
if bytes_left_to_decode > bytes_left_for_decoding {
let amt = bytes_left_to_decode.max(BUFFER_SIZE);
self.buf.reserve(amt);
}

self.buf.put(data);
self.buf.put(body_data);
} else {
// otherwise, ensure that there are no remaining bytes in self.buf
//
// FIXME: improve buf usage.
let buf1 = (&self.buf[..]).into_buf();
if buf1.has_remaining() {
Expand Down
18 changes: 10 additions & 8 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::{body::BytesBuf, Code, Status};
use crate::{
body::BytesBuf,
codec::{BUFFER_SIZE, HEADER_SIZE},
Code, Status,
};
use bytes::{BufMut, BytesMut, IntoBuf};
use futures_core::{Stream, TryStream};
use futures_util::{ready, StreamExt, TryStreamExt};
Expand All @@ -9,8 +13,6 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_codec::Encoder;

const BUFFER_SIZE: usize = 8 * 1024;

pub(crate) fn encode_server<T, U>(
encoder: T,
source: U,
Expand Down Expand Up @@ -47,22 +49,22 @@ where
loop {
match source.next().await {
Some(Ok(item)) => {
buf.reserve(5);
buf.reserve(HEADER_SIZE);
unsafe {
buf.advance_mut(5);
buf.advance_mut(HEADER_SIZE);
}
encoder.encode(item, &mut buf).map_err(drop).unwrap();

// now that we know length, we can write the header
let len = buf.len() - 5;
let len = buf.len() - HEADER_SIZE;
assert!(len <= std::u32::MAX as usize);
{
let mut cursor = std::io::Cursor::new(&mut buf[..5]);
let mut cursor = std::io::Cursor::new(&mut buf[..HEADER_SIZE]);
cursor.put_u8(0); // byte must be 0, reserve doesn't auto-zero
cursor.put_u32_be(len as u32);
}

yield Ok(buf.split_to(len + 5).freeze().into_buf());
yield Ok(buf.split_to(len + HEADER_SIZE).freeze().into_buf());
},
Some(Err(status)) => yield Err(status),
None => break,
Expand Down
9 changes: 9 additions & 0 deletions tonic/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,17 @@
//! This module contains the generic `Codec` trait and a protobuf codec
//! based on prost.

// The mininum buffer size for codec data
pub(crate) const BUFFER_SIZE: usize = 8 * 1024;

// The number of bytes in the tonic header:
// 1 * u8 for compression +
// 1 * u32 for body length
pub(crate) const HEADER_SIZE: usize = 5;
cpcloud marked this conversation as resolved.
Show resolved Hide resolved

mod decode;
mod encode;

#[cfg(feature = "prost")]
mod prost;

Expand Down