Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(codec): Properly decode partial DATA frames #83

Merged
merged 28 commits into from
Oct 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c8a4c7c
Add blobservice example for smoke testing
Oct 15, 2019
00c415e
Remove publicity from example
Oct 21, 2019
bb8acae
Make empty struct more concise
Oct 21, 2019
88abfdf
Expose HEADER_SIZE and BUFFER_SIZE as crate public to avoid duping co…
Oct 21, 2019
893f990
Make sure our encoded buffer has the correct lower bound in size
Oct 21, 2019
fec5d09
Use crate-public constants in encode.rs
Oct 21, 2019
ce2ae98
Use crate-public constants in decode.rs
Oct 21, 2019
a60b3ed
Rename fields for clarity in later usage
Oct 21, 2019
c92bd82
Ensure that if we do not have enough bytes for header + message that …
Oct 21, 2019
0733aec
Add comments describing the flow of data
Oct 21, 2019
13277a1
No need for match
Oct 21, 2019
a491622
Rename variables for clarity
Oct 21, 2019
36164a5
Relax the assertion in encoding, since it is not actually required
Oct 21, 2019
2ddd3af
Move blobservice back to multiple threads
Oct 21, 2019
306bae4
Remove logging dependencies
Oct 21, 2019
fc8500d
Give an example of how the bug can happen in a comment
Oct 21, 2019
6aecefd
Use message_len for maximum rustiness
Oct 21, 2019
7fd4d7d
Use message instead of try_next
Oct 21, 2019
0090592
Remove license
Oct 21, 2019
0b46a6f
Remove blobservice
Oct 21, 2019
e79db30
Revert changes to decode.rs
Oct 21, 2019
f0fb03e
Simplify the check
Oct 21, 2019
36c6871
Revert body change
Oct 21, 2019
3abd90b
Revert refactor
Oct 21, 2019
c7381a3
Fix decode test to actually run
Oct 21, 2019
2d9c5b8
Checkpoint
Oct 21, 2019
bdcac37
Modify existing test to send partial bodies
Oct 21, 2019
b5e5ea7
Shorten syntax
Oct 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ impl<T> Streaming<T> {
}

if let State::ReadBody { len, .. } = &self.state {
if buf.remaining() < *len {
// if we haven't read enough of the message then return and keep
// reading
if buf.remaining() < *len || self.buf.len() < *len + 5 {
return Ok(None);
}

Expand Down
51 changes: 42 additions & 9 deletions tonic/src/codec/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ struct Msg {
async fn decode() {
let decoder = ProstDecoder::<Msg>::default();

let data = Vec::from(&[0u8; 1024][..]);
let data = vec![0u8; 10000];
let data_len = data.len();
let msg = Msg { data };

let mut buf = BytesMut::new();
Expand All @@ -34,11 +35,20 @@ async fn decode() {
buf.put_u32_be(len as u32);
msg.encode(&mut buf).unwrap();

let body = MockBody(buf.freeze(), 0, 100);
let body = MockBody {
data: buf.freeze(),
partial_len: 10005,
count: 0,
};

let mut stream = Streaming::new_request(decoder, body);

while let Some(_) = stream.message().await.unwrap() {}
let mut i = 0usize;
while let Some(msg) = stream.message().await.unwrap() {
assert_eq!(msg.data.len(), data_len);
i += 1;
}
assert_eq!(i, 1);
}

#[tokio::test]
Expand All @@ -61,20 +71,43 @@ async fn encode() {
}

#[derive(Debug)]
struct MockBody(Bytes, usize, usize);
struct MockBody {
data: Bytes,

// the size of the partial message to send
partial_len: usize,

// the number of times we've sent
count: usize,
}

impl Body for MockBody {
type Data = Data;
type Error = Status;

fn poll_data(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
if self.1 > self.2 {
self.1 += 1;
let data = Data(self.0.clone().into_buf());
Poll::Ready(Some(Ok(data)))
// every other call to poll_data returns data
let should_send = self.count % 2 == 0;
let data_len = self.data.len();
let partial_len = self.partial_len;
let count = self.count;
if data_len > 0 {
let result = if should_send {
let response = self
.data
.split_to(if count == 0 { partial_len } else { data_len })
.into_buf();
Poll::Ready(Some(Ok(Data(response))))
} else {
cx.waker().wake_by_ref();
Poll::Pending
};
// make some fake progress
self.count += 1;
result
} else {
Poll::Ready(None)
}
Expand Down