Skip to content

Commit

Permalink
Conslidate JSON reader options
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Apr 12, 2022
1 parent 68038f5 commit ac9c647
Showing 1 changed file with 73 additions and 80 deletions.
153 changes: 73 additions & 80 deletions arrow/src/json/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
//! let mut json = json::Reader::new(
//! BufReader::new(file),
//! Arc::new(schema),
//! 1024,
//! Default::default()
//! json::reader::DecoderOptions::new(),
//! );
//!
//! let batch = json.next().unwrap().unwrap();
Expand Down Expand Up @@ -561,16 +560,17 @@ where
///
/// # Examples
/// ```
/// use arrow::json::reader::{Decoder, ValueIter, infer_json_schema};
/// use arrow::json::reader::{Decoder, DecoderOptions, ValueIter, infer_json_schema};
/// use std::fs::File;
/// use std::io::{BufReader, Seek, SeekFrom};
/// use std::sync::Arc;
///
/// let mut reader =
/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
/// let batch_size = 1024;
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default());
/// let options = DecoderOptions::new()
/// .with_batch_size(1024);
/// let decoder = Decoder::new(Arc::new(inferred_schema), options);
///
/// // seek back to start so that the original file is usable again
/// reader.seek(SeekFrom::Start(0)).unwrap();
Expand All @@ -583,35 +583,68 @@ where
pub struct Decoder {
/// Explicit schema for the JSON file
schema: SchemaRef,
/// Batch size (number of records to load each time)
batch_size: usize,
/// This is a collection of options for json decoder
doptions: DecoderOptions,
options: DecoderOptions,
}

#[derive(Default, Debug)]
#[derive(Debug)]
pub struct DecoderOptions {
/// Batch size (number of records to load each time), defaults to 1024 records
batch_size: usize,
/// Optional projection for which columns to load (case-sensitive names)
projection: Option<Vec<String>>,
/// optional HashMap of column names to its format string
format_strings: Option<HashMap<String, String>>,
}

impl Default for DecoderOptions {
fn default() -> Self {
Self {
batch_size: 1024,
projection: None,
format_strings: None,
}
}
}

impl DecoderOptions {
pub fn new() -> Self {
Default::default()
}

/// Set the batch size (number of records to load at one time)
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}

/// Set the reader's column projection
pub fn with_projection(mut self, projection: Vec<String>) -> Self {
self.projection = Some(projection);
self
}

/// Set the decoder's format Strings param
pub fn with_format_strings(
mut self,
format_strings: HashMap<String, String>,
) -> Self {
self.format_strings = Some(format_strings);
self
}
}

impl Decoder {
/// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
/// trait.
pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self {
Self {
schema,
batch_size,
doptions,
}
pub fn new(schema: SchemaRef, options: DecoderOptions) -> Self {
Self { schema, options }
}

/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
match &self.doptions.projection {
match &self.options.projection {
Some(projection) => {
let fields = self.schema.fields();
let projected_fields: Vec<Field> = fields
Expand All @@ -636,9 +669,10 @@ impl Decoder {
where
I: Iterator<Item = Result<Value>>,
{
let mut rows: Vec<Value> = Vec::with_capacity(self.batch_size);
let batch_size = self.options.batch_size;
let mut rows: Vec<Value> = Vec::with_capacity(batch_size);

for value in value_iter.by_ref().take(self.batch_size) {
for value in value_iter.by_ref().take(batch_size) {
let v = value?;
match v {
Value::Object(_) => rows.push(v),
Expand All @@ -656,7 +690,7 @@ impl Decoder {
}

let rows = &rows[..];
let projection = self.doptions.projection.clone().unwrap_or_default();
let projection = self.options.projection.clone().unwrap_or_default();
let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);

let projected_fields: Vec<Field> = if projection.is_empty() {
Expand Down Expand Up @@ -934,7 +968,7 @@ impl Decoder {
T::Native: num::NumCast,
{
let format_string = self
.doptions
.options
.format_strings
.as_ref()
.and_then(|fmts| fmts.get(col_name));
Expand Down Expand Up @@ -1556,13 +1590,8 @@ impl<R: Read> Reader<R> {
///
/// If reading a `File`, you can customise the Reader, such as to enable schema
/// inference, use `ReaderBuilder`.
pub fn new(
reader: R,
schema: SchemaRef,
batch_size: usize,
doptions: DecoderOptions,
) -> Self {
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions)
pub fn new(reader: R, schema: SchemaRef, options: DecoderOptions) -> Self {
Self::from_buf_reader(BufReader::new(reader), schema, options)
}

/// Create a new JSON Reader from a `BufReader<R: Read>`
Expand All @@ -1571,12 +1600,11 @@ impl<R: Read> Reader<R> {
pub fn from_buf_reader(
reader: BufReader<R>,
schema: SchemaRef,
batch_size: usize,
doptions: DecoderOptions,
options: DecoderOptions,
) -> Self {
Self {
reader,
decoder: Decoder::new(schema, batch_size, doptions),
decoder: Decoder::new(schema, options),
}
}

Expand All @@ -1595,7 +1623,7 @@ impl<R: Read> Reader<R> {
}

/// JSON file reader builder
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct ReaderBuilder {
/// Optional schema for the JSON file
///
Expand All @@ -1606,26 +1634,8 @@ pub struct ReaderBuilder {
///
/// If a number is not provided, all the records are read.
max_records: Option<usize>,
/// Batch size (number of records to load each time)
///
/// The default batch size when using the `ReaderBuilder` is 1024 records
batch_size: usize,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<String>>,
/// optional HashMap of column names to format strings
format_strings: Option<HashMap<String, String>>,
}

impl Default for ReaderBuilder {
fn default() -> Self {
Self {
schema: None,
max_records: None,
batch_size: 1024,
projection: None,
format_strings: None,
}
}
/// Options for json decoder
options: DecoderOptions,
}

impl ReaderBuilder {
Expand Down Expand Up @@ -1672,13 +1682,13 @@ impl ReaderBuilder {

/// Set the batch size (number of records to load at one time)
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self.options = self.options.with_batch_size(batch_size);
self
}

/// Set the reader's column projection
pub fn with_projection(mut self, projection: Vec<String>) -> Self {
self.projection = Some(projection);
self.options = self.options.with_projection(projection);
self
}

Expand All @@ -1687,7 +1697,7 @@ impl ReaderBuilder {
mut self,
format_strings: HashMap<String, String>,
) -> Self {
self.format_strings = Some(format_strings);
self.options = self.options.with_format_strings(format_strings);
self
}

Expand All @@ -1707,15 +1717,7 @@ impl ReaderBuilder {
)?),
};

Ok(Reader::from_buf_reader(
buf_reader,
schema,
self.batch_size,
DecoderOptions {
projection: self.projection,
format_strings: self.format_strings,
},
))
Ok(Reader::from_buf_reader(buf_reader, schema, self.options))
}
}

Expand Down Expand Up @@ -1868,8 +1870,7 @@ mod tests {
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
Arc::new(schema.clone()),
1024,
Default::default(),
DecoderOptions::new(),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, Arc::new(schema));
Expand Down Expand Up @@ -1919,11 +1920,7 @@ mod tests {
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
schema.clone(),
1024,
DecoderOptions {
format_strings: Some(fmts),
..Default::default()
},
DecoderOptions::new().with_format_strings(fmts),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, schema);
Expand Down Expand Up @@ -1955,11 +1952,7 @@ mod tests {
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
Arc::new(schema),
1024,
DecoderOptions {
projection: Some(vec!["a".to_string(), "c".to_string()]),
..Default::default()
},
DecoderOptions::new().with_projection(vec!["a".to_string(), "c".to_string()]),
);
let reader_schema = reader.schema();
let expected_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -2126,8 +2119,8 @@ mod tests {
file.seek(SeekFrom::Start(0)).unwrap();

let reader = BufReader::new(GzDecoder::new(&file));
let mut reader =
Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default());
let options = DecoderOptions::new().with_batch_size(64);
let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), options);
let batch_gz = reader.next().unwrap().unwrap();

for batch in vec![batch, batch_gz] {
Expand Down Expand Up @@ -3199,7 +3192,7 @@ mod tests {
true,
)]);

let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
&mut vec![
Expand Down Expand Up @@ -3234,7 +3227,7 @@ mod tests {
true,
)]);

let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
// NOTE: total struct element count needs to be greater than
Expand Down Expand Up @@ -3263,7 +3256,7 @@ mod tests {
#[test]
fn test_json_read_binary_structs() {
let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
&mut vec![
Expand Down

0 comments on commit ac9c647

Please sign in to comment.