Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a non-blocking raw binary reader #394

Merged
merged 7 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rstest = "0.9"
walkdir = "2.3"
test-generator = "0.3"
pretty-hex = "0.2"
memmap = "0.7.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗺️ This has been added as a dev dependency so it can be used in examples/read_all_values.rs.


[profile.release]
lto = true
Expand Down
69 changes: 46 additions & 23 deletions examples/read_all_values.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,57 @@
use ion_rs::binary::non_blocking::raw_binary_reader::RawBinaryBufferReader;
use ion_rs::raw_reader::RawStreamItem;
use ion_rs::result::IonResult;
use ion_rs::StreamReader;
use ion_rs::{IonDataSource, IonType, RawBinaryReader};
use ion_rs::RawReader;
use ion_rs::{IonType, RawBinaryReader};
use memmap::MmapOptions;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗺️ I updated this example program to be able to use either a blocking or non-blocking raw reader depending on the command line arguments passed. I used it as a quick-and-dirty benchmark during development. In its current state, the non-blocking reader is a modest improvement over the blocking one, taking about 5% less time to complete on a ~1.3GB file.

use std::fs::File;
use std::process::exit;

fn main() -> IonResult<()> {
let args: Vec<String> = std::env::args().collect();
let path = args.get(1).unwrap_or_else(|| {
let mode = args.get(1).unwrap_or_else(|| {
eprintln!("USAGE:\n\n {} [Binary Ion file]\n", args.get(0).unwrap());
eprintln!("No input file was specified.");
eprintln!("No mode was specified.");
exit(1);
});
let path = args.get(2).unwrap_or_else(|| {
eprintln!("USAGE:\n\n {} [Binary Ion file]\n", args.get(0).unwrap());
eprintln!("No input file was specified.");
exit(2);
});
let file = File::open(path).unwrap();

// This example uses `mmap` so we can use either the blocking reader (which reads from an
// io::BufRead) or the non-blocking reader (which reads from an AsRef<[u8]>).
let mmap = unsafe { MmapOptions::new().map(&file).unwrap() };

// Treat the mmap as a byte array.
let ion_data: &[u8] = &mmap[..];

if mode == "blocking" {
let mut reader = RawBinaryReader::new(ion_data);
let number_of_values = read_all_values(&mut reader)?;
println!("Blocking: read {} values", number_of_values);
} else if mode == "nonblocking" {
let mut reader = RawBinaryBufferReader::new(ion_data);
let number_of_values = read_all_values(&mut reader)?;
println!("Non-blocking: read {} values", number_of_values);
} else {
eprintln!("Unsupported `mode`: {}.", mode);
exit(3);
}

let file = File::open(path)?;
let buf_reader = std::io::BufReader::new(file);
let mut cursor = RawBinaryReader::new(buf_reader);
let number_of_values = read_all_values(&mut cursor)?;
println!("Read {} values", number_of_values);
Ok(())
}

// Visits each value in the stream recursively, reading each scalar into a native Rust type.
// Prints the total number of values read upon completion.
fn read_all_values<R: IonDataSource>(cursor: &mut RawBinaryReader<R>) -> IonResult<usize> {
fn read_all_values<R: RawReader>(reader: &mut R) -> IonResult<usize> {
use IonType::*;
use RawStreamItem::{Nothing, Null as NullValue, Value, VersionMarker};
let mut count: usize = 0;
loop {
match cursor.next()? {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name was an artifact of when the RawBinaryReader used to be called a BinaryCursor.

match reader.next()? {
VersionMarker(_major, _minor) => {}
NullValue(_ion_type) => {
count += 1;
Expand All @@ -37,39 +60,39 @@ fn read_all_values<R: IonDataSource>(cursor: &mut RawBinaryReader<R>) -> IonResu
Value(ion_type) => {
count += 1;
match ion_type {
Struct | List | SExpression => cursor.step_in()?,
Struct | List | SExpression => reader.step_in()?,
String => {
let _text = cursor.map_string(|_s| ())?;
let _text = reader.map_string(|_s| ())?;
}
Symbol => {
let _symbol_id = cursor.read_symbol()?;
let _symbol_id = reader.read_symbol()?;
}
Integer => {
let _int = cursor.read_i64()?;
let _int = reader.read_i64()?;
}
Float => {
let _float = cursor.read_f64()?;
let _float = reader.read_f64()?;
}
Decimal => {
let _decimal = cursor.read_decimal()?;
let _decimal = reader.read_decimal()?;
}
Timestamp => {
let _timestamp = cursor.read_timestamp()?;
let _timestamp = reader.read_timestamp()?;
}
Boolean => {
let _boolean = cursor.read_bool()?;
let _boolean = reader.read_bool()?;
}
Blob => {
let _blob = cursor.map_blob(|_b| ())?;
let _blob = reader.map_blob(|_b| ())?;
}
Clob => {
let _clob = cursor.map_clob(|_c| ())?;
let _clob = reader.map_clob(|_c| ())?;
}
Null => {}
}
}
Nothing if cursor.depth() > 0 => {
cursor.step_out()?;
Nothing if reader.depth() > 0 => {
reader.step_out()?;
}
_ => break,
}
Expand Down
8 changes: 8 additions & 0 deletions src/binary/int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ pub struct DecodedInt {
}

impl DecodedInt {
pub(crate) fn new(value: Integer, is_negative: bool, size_in_bytes: usize) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗺️ The encoding primitive constructors (VarUInt::new, DecodedInt::new, etc) are now pub (crate) to allow them to be constructed by the BinaryBuffer during parsing.

DecodedInt {
size_in_bytes,
value,
is_negative,
}
}

/// Reads an Int with `length` bytes from the provided data source.
pub fn read<R: IonDataSource>(data_source: &mut R, length: usize) -> IonResult<DecodedInt> {
if length == 0 {
Expand Down
5 changes: 3 additions & 2 deletions src/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ pub mod decimal;
mod header;
pub mod int;
mod nibbles;
pub mod non_blocking;
pub(crate) mod raw_binary_reader;
pub mod raw_binary_writer;
pub mod timestamp;
mod type_code;
pub mod uint;
mod var_int;
mod var_uint;
pub mod var_int;
pub mod var_uint;

pub use type_code::IonTypeCode;
2 changes: 1 addition & 1 deletion src/binary/nibbles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const MAX_NIBBLE_VALUE: u8 = 15;
const NIBBLE_SIZE_IN_BITS: u8 = 4;

/// Given a byte, will return a tuple containing the values of its left and right nibbles.
pub(crate) fn nibbles_from_byte(byte: u8) -> (u8, u8) {
pub(crate) const fn nibbles_from_byte(byte: u8) -> (u8, u8) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗺️ This function was made const so it could be called in compile-time computations. In particular, this enabled a pre-computed table of type descriptors to be constructed at compile time instead of during each reader's initialization; I'll call that out when it appears further down in the diff.

let left = byte >> NIBBLE_SIZE_IN_BITS;
let right = byte & 0b1111;
(left, right)
Expand Down
Loading