-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds a non-blocking raw binary reader #394
Conversation
The existing `RawBinaryReader` wraps an `io::BufRead` implementation and pulls additional bytes from it as needed. This allows it to read large files in a streaming fashion; however, it also means that the file must be complete. If the data source is still growing (for example: a file that is being appended to), the streaming reader will treat the end of the stream as an unexpected EOF and error out. Likewise, in async contexts where bytes can arrive in separate events, trying to read chunk of Ion bytes can fail if the slice doesn't contain a complete Ion stream. This PR adds two new types: a `BinaryBuffer` and a `RawBinaryBufferReader`. The `BinaryBuffer` is a stack-allocated wrapper around an implementation of `AsRef<u8>`. It provides methods to read binary Ion encoding primitives like `VarUInt`, `VarInt`, and `NOP` from the data source. It is very cheap to create and does not modify the data source. The `RawBinaryBufferReader` provides a full `Reader` interface over a `BinaryBuffer`. If it encounters an unexpected EOF while reading, it returns an `IonError::Incomplete`. Unlike the original `RawBinaryReader`, however, it is guaranteed to still be in a good state afterwards. If more data becomes available later on, it can be appended to the buffer and the `RawBinaryBufferReader` can try parsing it again. Finally, a new configuration for the `ion-tests` integration suite has been added that exercises the `Reader` interface in the `RawBinaryBufferReader`, demonstrating that it is spec-compliant. This PR duplicates a fair amount of functionality that is available elsewhere in the crate. Follow-on PRs will consolidate these implementations; it is expected that the blocking raw binary reader will be re-implemented as a wrapper around the non-blocking reader, making calls to `read` whenever an `Incomplete` is encountered.
Codecov Report
@@ Coverage Diff @@
## main #394 +/- ##
==========================================
- Coverage 88.76% 88.44% -0.33%
==========================================
Files 80 82 +2
Lines 12622 14027 +1405
==========================================
+ Hits 11204 12406 +1202
- Misses 1418 1621 +203
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ PR tour
@@ -53,6 +53,7 @@ rstest = "0.9" | |||
walkdir = "2.3" | |||
test-generator = "0.3" | |||
pretty-hex = "0.2" | |||
memmap = "0.7.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ This has been added as a dev dependency so it can be used in examples/read_all_values.rs
.
use ion_rs::{IonDataSource, IonType, RawBinaryReader}; | ||
use ion_rs::RawReader; | ||
use ion_rs::{IonType, RawBinaryReader}; | ||
use memmap::MmapOptions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ I updated this example program to be able to use either a blocking or non-blocking raw reader depending on the command line arguments passed. I used it as a quick-and-dirty benchmark during development. In its current state, the non-blocking reader is a modest improvement over the blocking one, taking about 5% less time to complete on a ~1.3GB file.
use IonType::*; | ||
use RawStreamItem::{Nothing, Null as NullValue, Value, VersionMarker}; | ||
let mut count: usize = 0; | ||
loop { | ||
match cursor.next()? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name was an artifact of when the RawBinaryReader
used to be called a BinaryCursor
.
@@ -30,6 +30,14 @@ pub struct DecodedInt { | |||
} | |||
|
|||
impl DecodedInt { | |||
pub(crate) fn new(value: Integer, is_negative: bool, size_in_bytes: usize) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ The encoding primitive constructors (VarUInt::new
, DecodedInt::new
, etc) are now pub (crate)
to allow them to be constructed by the BinaryBuffer
during parsing.
@@ -5,7 +5,7 @@ const MAX_NIBBLE_VALUE: u8 = 15; | |||
const NIBBLE_SIZE_IN_BITS: u8 = 4; | |||
|
|||
/// Given a byte, will return a tuple containing the values of its left and right nibbles. | |||
pub(crate) fn nibbles_from_byte(byte: u8) -> (u8, u8) { | |||
pub(crate) const fn nibbles_from_byte(byte: u8) -> (u8, u8) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ This function was made const
so it could be called in compile-time computations. In particular, this enabled a pre-computed table of type descriptors to be constructed at compile time instead of during each reader's initialization; I'll call that out when it appears further down in the diff.
fn big_integer_from_big_uint(value: BigUint) -> Integer { | ||
Integer::BigInt(BigInt::from(value)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ These helper functions are a (very) cold path for reading integers. I've placed them in an inline(never)
to minimize their footprint.
@@ -21,17 +20,6 @@ impl<R: RawReader> Iterator for NativeElementIterator<R> { | |||
} | |||
} | |||
|
|||
impl ElementReader for NativeElementReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ This impl
block was moved further down in the same file.
} | ||
} | ||
|
||
pub struct NonBlockingNativeElementReader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ This type is used by our ion-tests
integration to demonstrate spec compliance. It is not especially useful in other contexts.
@@ -763,3 +762,144 @@ mod native_element_tests { | |||
non_equivs(NativeElementApi, file_name) | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ I added an ion-tests
configuration for the non-blocking reader.
/// When the wrapped type is a `Vec<u8>`, data can be appended to the buffer between read | ||
/// operations. | ||
#[derive(Debug, PartialEq)] | ||
pub(crate) struct BinaryBuffer<A: AsRef<[u8]>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ This type allows you to read Ion encoding primitives from a &[u8]
, Vec<u8>
, bytes::Buf
, or any other container of bytes.
There are many code blocks that are basically copy/pasted from the blocking implementation (for example: reading VarUInt
, VarInt
, etc). I intend to do a lot of consolidation in a follow-on PR.
I initially tried to use some existing byte buffer implementations from the Rust ecosystem for this, but needed some particular functionality:
- I needed the buffer to track how many bytes it had consumed over its lifespan.
- I wanted control over allocations.
- I needed some types of the buffer to offer methods for adding data but not others.
It was simpler to just write my own.
/// Type, offset, and length information about the serialized value over which the | ||
/// NonBlockingRawBinaryReader is currently positioned. | ||
#[derive(Clone, Copy, Debug, PartialEq)] | ||
struct EncodedValue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ Each time the binary reader advances, it will populate a new EncodedValue
that tracks the stream offsets of the serialized value's components.
// The `Skipping(n)` state means that we ran out of data on a previous call to `next()`. | ||
// The only way to fix it is to add more input bytes to the buffer using the `append_bytes` | ||
// or `read_from` methods. Adding input bytes to the buffer is only supported | ||
// when the generic type `A` is `Vec<u8>`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ There's a tradeoff lurking here. Moving to the Skipping(n)
state means that we leave the reader in no man's land; no operation will succeed until more data has been added.
We could instead have the reader simply return IonError::Incomplete
without changing its state. However:
- This would mean that the only time the reader could advance is when the entire current value is in the buffer. Given that some real-world use cases work with multi-GB values, this feels like an onerous requirement. Having a
Skipping(n)
state allows the reader to incrementally skip over very large values. - If a call to
next()
(which in turn callsadvance_to_next_item()
) fails asIncomplete
, I don't think users would expect to be able to perform more read operations on the value they were on before callingnext()
.
If a use case for the alternative behavior surfaces down the road, I could see adding a config option for this (disable_incremental_advancement
?).
Err(incomplete_data_error_raw(label, offset)) | ||
} | ||
|
||
pub fn incomplete_data_error_raw(label: &'static str, offset: usize) -> IonError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not #[inline(never)]
this raw error function as well similar to other raw error functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question! The other error methods each allocate a String
via format!
, which causes their assembly to include code for allocation, formatting, and deallocation. incomplete_data_error_raw
holds a &'static str
, so it doesn't need to allocate; its assembly is pretty small for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's an assembly dump for decoding_error_raw
(which is behind inline(never)
):
In contrast, here's the assembly for incomplete_data_error_raw
, which usually gets inlined:
I'm using cargo asm
to generate these.
} | ||
|
||
if !terminated { | ||
// panic!("incomplete (unterm) varint @ pos {}: {:x?}", self.total_consumed(), self.bytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug comment?
// panic!("incomplete (unterm) varint @ pos {}: {:x?}", self.total_consumed(), self.bytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good eye, removed.
self.consume(encoded_size_in_bytes); | ||
Ok(VarInt::new( | ||
magnitude * sign, | ||
!is_positive, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Everywhere we use !is_positive
with VarInt::new
, why not just use is_negative
in declaration above and use it as is. Only reason I suggest this is because I had to go back and look at the definition of VarInt::new
to make sure the above statement is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These comments describe changes I introduced in the latest commit. If you hadn't looked at earlier commits, you can probably ignore them and just read the latest version.
let next_byte = self.bytes()[0]; | ||
let next_byte = self.data.as_ref()[self.start]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ This change sidesteps the process of building a subslice via bytes()
. It's a small optimization, but peeking at the next type descriptor might be the single hottest path in the codebase.
self.field_id_length as usize | ||
+ self.annotations_header_length as usize | ||
+ self.header_length() | ||
+ self.value_length() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This calculation happens when stepping into a container (to know where the end of the container will be), skipping a value (to know where the end of the value will be), and getting the slice of bytes that represent the body of the value. Caching it in the EncodedValue
proved to be worth the space tradeoff.
} else { | ||
self.buffer.consume(bytes_available); | ||
self.state = Skipping(bytes_to_skip - bytes_available); | ||
incomplete_data_error("ahead to next item", self.buffer.total_consumed()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ Looking at this method with fresh eyes, I noticed the different states weren't that different after all.
if self.is_eof() || self.is_at_end_of_container() { | ||
return Ok(RawStreamItem::Nothing); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ This change hoisted all of the branching involved in is_eof()
and is_at_end_of_container()
into the consume_nop_padding()
method, keeping this method smaller.
@@ -1147,8 +1114,7 @@ impl<'a, A: AsRef<[u8]>> TxReader<'a, A> { | |||
self.encoded_value.header = header; | |||
// Record the *absolute* offset of the type descriptor -- its offset from the beginning of | |||
// the stream. | |||
self.encoded_value.header_offset = | |||
self.main_buffer.total_consumed() + self.tx_buffer.total_consumed(); | |||
self.encoded_value.header_offset = self.tx_buffer.total_consumed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ Now that tx_buffer
holds the original buffer's total_consumed
count, this calculation gets simplified.
total_consumed: 0, | ||
total_consumed: self.total_consumed, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ Previously, the transaction buffer would track its own total consumption. However, I realized this wasn't terribly useful in its existing role. Having it copy the original buffer's total_consumed
means that if it encounters errors, the error position that it reports will be the absolute offset in the Ion stream which is what we want.
Incidentally, this piece of information was the only reason we stored a reference to the original buffer in the TxReader
, so that's been removed as well.
state: &'a mut ReaderState, | ||
main_buffer: &'a BinaryBuffer<A>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🗺️ Now that the tx_buffer below has the original buffer's total_consumed
, this field has been removed.
Co-authored-by: Khushboo <68757952+desaikd@users.noreply.github.com>
Co-authored-by: Khushboo <68757952+desaikd@users.noreply.github.com>
The existing
RawBinaryReader
wraps anio::BufRead
implementationand pulls additional bytes from it as needed. This allows it to read
large files in a streaming fashion; however, it also means that the
file must be complete. If the data source is still growing (for
example: a file that is being appended to), the streaming reader will
treat the end of the stream as an unexpected EOF and error out.
Likewise, in async contexts where bytes can arrive in separate events,
trying to read chunk of Ion bytes can fail if the slice doesn't contain
a complete Ion stream.
This PR adds two new types: a
BinaryBuffer
and aRawBinaryBufferReader
.The
BinaryBuffer
is a stack-allocated wrapper around an implementation ofAsRef<[u8]>
. It provides methods to read binary Ion encoding primitives likeVarUInt
,VarInt
, andNOP
from the data source. It is very cheap tocreate and does not modify the data source.
The
RawBinaryBufferReader
provides a fullReader
interface over aBinaryBuffer
. If it encounters an unexpected EOF while reading, it returnsan
IonError::Incomplete
. Unlike the originalRawBinaryReader
, however,it is guaranteed to still be in a good state afterwards. If more data
becomes available later on, it can be appended to the buffer and the
RawBinaryBufferReader
can try parsing it again.Finally, a new configuration for the
ion-tests
integration suite has beenadded that exercises the
Reader
interface in theRawBinaryBufferReader
,demonstrating that it is spec-compliant.
This PR duplicates a fair amount of functionality that is available
elsewhere in the crate. Follow-on PRs will consolidate these implementations;
it is expected that the blocking raw binary reader will be re-implemented
as a wrapper around the non-blocking reader, making calls to
read
wheneveran
Incomplete
is encountered.Fixes #4, related to #351.
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.