diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs index ad953e49b68e..9334e1839f9e 100644 --- a/arrow/src/json/reader.rs +++ b/arrow/src/json/reader.rs @@ -41,8 +41,7 @@ //! let mut json = json::Reader::new( //! BufReader::new(file), //! Arc::new(schema), -//! 1024, -//! Default::default() +//! json::reader::DecoderOptions::new(), //! ); //! //! let batch = json.next().unwrap().unwrap(); @@ -561,7 +560,7 @@ where /// /// # Examples /// ``` -/// use arrow::json::reader::{Decoder, ValueIter, infer_json_schema}; +/// use arrow::json::reader::{Decoder, DecoderOptions, ValueIter, infer_json_schema}; /// use std::fs::File; /// use std::io::{BufReader, Seek, SeekFrom}; /// use std::sync::Arc; @@ -569,8 +568,9 @@ where /// let mut reader = /// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap()); /// let inferred_schema = infer_json_schema(&mut reader, None).unwrap(); -/// let batch_size = 1024; -/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default()); +/// let options = DecoderOptions::new() +/// .with_batch_size(1024); +/// let decoder = Decoder::new(Arc::new(inferred_schema), options); /// /// // seek back to start so that the original file is usable again /// reader.seek(SeekFrom::Start(0)).unwrap(); @@ -583,35 +583,68 @@ where pub struct Decoder { /// Explicit schema for the JSON file schema: SchemaRef, - /// Batch size (number of records to load each time) - batch_size: usize, /// This is a collection of options for json decoder - doptions: DecoderOptions, + options: DecoderOptions, } -#[derive(Default, Debug)] +#[derive(Debug)] pub struct DecoderOptions { + /// Batch size (number of records to load each time), defaults to 1024 records + batch_size: usize, /// Optional projection for which columns to load (case-sensitive names) projection: Option>, /// optional HashMap of column names to its format string format_strings: Option>, } +impl Default for DecoderOptions { + fn default() -> Self { + Self { + batch_size: 1024, + projection: None, + format_strings: None, + } + } +} + +impl DecoderOptions { + pub fn new() -> Self { + Default::default() + } + + /// Set the batch size (number of records to load at one time) + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + + /// Set the reader's column projection + pub fn with_projection(mut self, projection: Vec) -> Self { + self.projection = Some(projection); + self + } + + /// Set the decoder's format Strings param + pub fn with_format_strings( + mut self, + format_strings: HashMap, + ) -> Self { + self.format_strings = Some(format_strings); + self + } +} + impl Decoder { /// Create a new JSON decoder from any value that implements the `Iterator>` /// trait. - pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self { - Self { - schema, - batch_size, - doptions, - } + pub fn new(schema: SchemaRef, options: DecoderOptions) -> Self { + Self { schema, options } } /// Returns the schema of the reader, useful for getting the schema without reading /// record batches pub fn schema(&self) -> SchemaRef { - match &self.doptions.projection { + match &self.options.projection { Some(projection) => { let fields = self.schema.fields(); let projected_fields: Vec = fields @@ -636,9 +669,10 @@ impl Decoder { where I: Iterator>, { - let mut rows: Vec = Vec::with_capacity(self.batch_size); + let batch_size = self.options.batch_size; + let mut rows: Vec = Vec::with_capacity(batch_size); - for value in value_iter.by_ref().take(self.batch_size) { + for value in value_iter.by_ref().take(batch_size) { let v = value?; match v { Value::Object(_) => rows.push(v), @@ -656,7 +690,7 @@ impl Decoder { } let rows = &rows[..]; - let projection = self.doptions.projection.clone().unwrap_or_default(); + let projection = self.options.projection.clone().unwrap_or_default(); let arrays = self.build_struct_array(rows, self.schema.fields(), &projection); let projected_fields: Vec = if projection.is_empty() { @@ -934,7 +968,7 @@ impl Decoder { T::Native: num::NumCast, { let format_string = self - .doptions + .options .format_strings .as_ref() .and_then(|fmts| fmts.get(col_name)); @@ -1556,13 +1590,8 @@ impl Reader { /// /// If reading a `File`, you can customise the Reader, such as to enable schema /// inference, use `ReaderBuilder`. - pub fn new( - reader: R, - schema: SchemaRef, - batch_size: usize, - doptions: DecoderOptions, - ) -> Self { - Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions) + pub fn new(reader: R, schema: SchemaRef, options: DecoderOptions) -> Self { + Self::from_buf_reader(BufReader::new(reader), schema, options) } /// Create a new JSON Reader from a `BufReader` @@ -1571,12 +1600,11 @@ impl Reader { pub fn from_buf_reader( reader: BufReader, schema: SchemaRef, - batch_size: usize, - doptions: DecoderOptions, + options: DecoderOptions, ) -> Self { Self { reader, - decoder: Decoder::new(schema, batch_size, doptions), + decoder: Decoder::new(schema, options), } } @@ -1595,7 +1623,7 @@ impl Reader { } /// JSON file reader builder -#[derive(Debug)] +#[derive(Debug, Default)] pub struct ReaderBuilder { /// Optional schema for the JSON file /// @@ -1606,26 +1634,8 @@ pub struct ReaderBuilder { /// /// If a number is not provided, all the records are read. max_records: Option, - /// Batch size (number of records to load each time) - /// - /// The default batch size when using the `ReaderBuilder` is 1024 records - batch_size: usize, - /// Optional projection for which columns to load (zero-based column indices) - projection: Option>, - /// optional HashMap of column names to format strings - format_strings: Option>, -} - -impl Default for ReaderBuilder { - fn default() -> Self { - Self { - schema: None, - max_records: None, - batch_size: 1024, - projection: None, - format_strings: None, - } - } + /// Options for json decoder + options: DecoderOptions, } impl ReaderBuilder { @@ -1672,13 +1682,13 @@ impl ReaderBuilder { /// Set the batch size (number of records to load at one time) pub fn with_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = batch_size; + self.options = self.options.with_batch_size(batch_size); self } /// Set the reader's column projection pub fn with_projection(mut self, projection: Vec) -> Self { - self.projection = Some(projection); + self.options = self.options.with_projection(projection); self } @@ -1687,7 +1697,7 @@ impl ReaderBuilder { mut self, format_strings: HashMap, ) -> Self { - self.format_strings = Some(format_strings); + self.options = self.options.with_format_strings(format_strings); self } @@ -1707,15 +1717,7 @@ impl ReaderBuilder { )?), }; - Ok(Reader::from_buf_reader( - buf_reader, - schema, - self.batch_size, - DecoderOptions { - projection: self.projection, - format_strings: self.format_strings, - }, - )) + Ok(Reader::from_buf_reader(buf_reader, schema, self.options)) } } @@ -1868,8 +1870,7 @@ mod tests { let mut reader: Reader = Reader::new( File::open("test/data/basic.json").unwrap(), Arc::new(schema.clone()), - 1024, - Default::default(), + DecoderOptions::new(), ); let reader_schema = reader.schema(); assert_eq!(reader_schema, Arc::new(schema)); @@ -1919,11 +1920,7 @@ mod tests { let mut reader: Reader = Reader::new( File::open("test/data/basic.json").unwrap(), schema.clone(), - 1024, - DecoderOptions { - format_strings: Some(fmts), - ..Default::default() - }, + DecoderOptions::new().with_format_strings(fmts), ); let reader_schema = reader.schema(); assert_eq!(reader_schema, schema); @@ -1955,11 +1952,7 @@ mod tests { let mut reader: Reader = Reader::new( File::open("test/data/basic.json").unwrap(), Arc::new(schema), - 1024, - DecoderOptions { - projection: Some(vec!["a".to_string(), "c".to_string()]), - ..Default::default() - }, + DecoderOptions::new().with_projection(vec!["a".to_string(), "c".to_string()]), ); let reader_schema = reader.schema(); let expected_schema = Arc::new(Schema::new(vec![ @@ -2126,8 +2119,8 @@ mod tests { file.seek(SeekFrom::Start(0)).unwrap(); let reader = BufReader::new(GzDecoder::new(&file)); - let mut reader = - Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default()); + let options = DecoderOptions::new().with_batch_size(64); + let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), options); let batch_gz = reader.next().unwrap().unwrap(); for batch in vec![batch, batch_gz] { @@ -3199,7 +3192,7 @@ mod tests { true, )]); - let decoder = Decoder::new(Arc::new(schema), 1024, Default::default()); + let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new()); let batch = decoder .next_batch( &mut vec![ @@ -3234,7 +3227,7 @@ mod tests { true, )]); - let decoder = Decoder::new(Arc::new(schema), 1024, Default::default()); + let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new()); let batch = decoder .next_batch( // NOTE: total struct element count needs to be greater than @@ -3263,7 +3256,7 @@ mod tests { #[test] fn test_json_read_binary_structs() { let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]); - let decoder = Decoder::new(Arc::new(schema), 1024, Default::default()); + let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new()); let batch = decoder .next_batch( &mut vec![