diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 332b893aa4bc..69ebce6950f5 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -47,6 +47,8 @@ pub struct ArrowWriter { /// /// The schema is used to verify that each record batch written has the correct schema arrow_schema: SchemaRef, + /// The length of arrays to write to each row group + max_row_group_size: usize, } impl ArrowWriter { @@ -65,6 +67,8 @@ impl ArrowWriter { let mut props = props.unwrap_or_else(|| WriterProperties::builder().build()); add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props); + let max_row_group_size = props.max_row_group_size(); + let file_writer = SerializedFileWriter::new( writer.try_clone()?, schema.root_schema_ptr(), @@ -74,12 +78,17 @@ impl ArrowWriter { Ok(Self { writer: file_writer, arrow_schema, + max_row_group_size, }) } /// Write a RecordBatch to writer /// - /// *NOTE:* The writer currently does not support all Arrow data types + /// The writer will slice the `batch` into `max_row_group_size`, + /// but if a batch has left-over rows less than the row group size, + /// the last row group will have fewer records. + /// This is currently a limitation because we close the row group + /// instead of keeping it open for the next batch. pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { // validate batch schema against writer's supplied schema if self.arrow_schema != batch.schema() { @@ -87,17 +96,31 @@ impl ArrowWriter { "Record batch schema does not match writer schema".to_string(), )); } - // compute the definition and repetition levels of the batch - let batch_level = LevelInfo::new_from_batch(batch); - let mut row_group_writer = self.writer.next_row_group()?; - for (array, field) in batch.columns().iter().zip(batch.schema().fields()) { - let mut levels = batch_level.calculate_array_levels(array, field); - // Reverse levels as we pop() them when writing arrays - levels.reverse(); - write_leaves(&mut row_group_writer, array, &mut levels)?; + // Track the number of rows being written in the batch. + // We currently do not have a way of slicing nested arrays, thus we + // track this manually. + let num_rows = batch.num_rows(); + let batches = (num_rows + self.max_row_group_size - 1) / self.max_row_group_size; + let min_batch = num_rows.min(self.max_row_group_size); + for batch_index in 0..batches { + // Determine the offset and length of arrays + let offset = batch_index * min_batch; + let length = (num_rows - offset).min(self.max_row_group_size); + + // Compute the definition and repetition levels of the batch + let batch_level = LevelInfo::new(offset, length); + let mut row_group_writer = self.writer.next_row_group()?; + for (array, field) in batch.columns().iter().zip(batch.schema().fields()) { + let mut levels = batch_level.calculate_array_levels(array, field); + // Reverse levels as we pop() them when writing arrays + levels.reverse(); + write_leaves(&mut row_group_writer, array, &mut levels)?; + } + + self.writer.close_row_group(row_group_writer)?; } - self.writer.close_row_group(row_group_writer) + Ok(()) } /// Close and finalize the underlying Parquet writer @@ -209,16 +232,19 @@ fn write_leaf( levels: LevelInfo, ) -> Result { let indices = levels.filter_array_indices(); + // Slice array according to computed offset and length + let column = column.slice(levels.offset, levels.length); let written = match writer { ColumnWriter::Int32ColumnWriter(ref mut typed) => { let values = match column.data_type() { ArrowDataType::Date64 => { // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32 let array = if let ArrowDataType::Date64 = column.data_type() { - let array = arrow::compute::cast(column, &ArrowDataType::Date32)?; + let array = + arrow::compute::cast(&column, &ArrowDataType::Date32)?; arrow::compute::cast(&array, &ArrowDataType::Int32)? } else { - arrow::compute::cast(column, &ArrowDataType::Int32)? + arrow::compute::cast(&column, &ArrowDataType::Int32)? }; let array = array .as_any() @@ -240,7 +266,7 @@ fn write_leaf( get_numeric_array_slice::(&array, &indices) } _ => { - let array = arrow::compute::cast(column, &ArrowDataType::Int32)?; + let array = arrow::compute::cast(&column, &ArrowDataType::Int32)?; let array = array .as_any() .downcast_ref::() @@ -288,7 +314,7 @@ fn write_leaf( get_numeric_array_slice::(&array, &indices) } _ => { - let array = arrow::compute::cast(column, &ArrowDataType::Int64)?; + let array = arrow::compute::cast(&column, &ArrowDataType::Int64)?; let array = array .as_any() .downcast_ref::() @@ -866,7 +892,17 @@ mod tests { ) .unwrap(); - roundtrip("test_arrow_writer_complex.parquet", batch); + roundtrip( + "test_arrow_writer_complex.parquet", + batch.clone(), + Some(SMALL_SIZE / 2), + ); + + roundtrip( + "test_arrow_writer_complex_small_batch.parquet", + batch, + Some(SMALL_SIZE / 3), + ); } #[test] @@ -904,7 +940,11 @@ mod tests { RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]) .unwrap(); - roundtrip("test_arrow_writer_complex_mixed.parquet", batch); + roundtrip( + "test_arrow_writer_complex_mixed.parquet", + batch, + Some(SMALL_SIZE / 2), + ); } #[test] @@ -936,7 +976,11 @@ mod tests { // build a racord batch let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); - roundtrip("test_arrow_writer_2_level_struct.parquet", batch); + roundtrip( + "test_arrow_writer_2_level_struct.parquet", + batch, + Some(SMALL_SIZE / 2), + ); } #[test] @@ -966,7 +1010,11 @@ mod tests { // build a racord batch let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); - roundtrip("test_arrow_writer_2_level_struct_non_null.parquet", batch); + roundtrip( + "test_arrow_writer_2_level_struct_non_null.parquet", + batch, + Some(SMALL_SIZE / 2), + ); } #[test] @@ -998,18 +1046,30 @@ mod tests { // build a racord batch let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); - roundtrip("test_arrow_writer_2_level_struct_mixed_null.parquet", batch); + roundtrip( + "test_arrow_writer_2_level_struct_mixed_null.parquet", + batch, + Some(SMALL_SIZE / 2), + ); } - const SMALL_SIZE: usize = 4; + const SMALL_SIZE: usize = 7; - fn roundtrip(filename: &str, expected_batch: RecordBatch) -> File { + fn roundtrip( + filename: &str, + expected_batch: RecordBatch, + max_row_group_size: Option, + ) -> File { let file = get_temp_file(filename, &[]); let mut writer = ArrowWriter::try_new( file.try_clone().unwrap(), expected_batch.schema(), - None, + max_row_group_size.map(|size| { + WriterProperties::builder() + .set_max_row_group_size(size) + .build() + }), ) .expect("Unable to write file"); writer.write(&expected_batch).unwrap(); @@ -1037,7 +1097,12 @@ mod tests { file } - fn one_column_roundtrip(filename: &str, values: ArrayRef, nullable: bool) -> File { + fn one_column_roundtrip( + filename: &str, + values: ArrayRef, + nullable: bool, + max_row_group_size: Option, + ) -> File { let schema = Schema::new(vec![Field::new( "col", values.data_type().clone(), @@ -1046,7 +1111,7 @@ mod tests { let expected_batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap(); - roundtrip(filename, expected_batch) + roundtrip(filename, expected_batch, max_row_group_size) } fn values_required(iter: I, filename: &str) @@ -1056,7 +1121,7 @@ mod tests { { let raw_values: Vec<_> = iter.into_iter().collect(); let values = Arc::new(A::from(raw_values)); - one_column_roundtrip(filename, values, false); + one_column_roundtrip(filename, values, false, Some(SMALL_SIZE / 2)); } fn values_optional(iter: I, filename: &str) @@ -1070,7 +1135,7 @@ mod tests { .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) }) .collect(); let optional_values = Arc::new(A::from(optional_raw_values)); - one_column_roundtrip(filename, optional_values, true); + one_column_roundtrip(filename, optional_values, true, Some(SMALL_SIZE / 2)); } fn required_and_optional(iter: I, filename: &str) @@ -1085,12 +1150,17 @@ mod tests { #[test] fn all_null_primitive_single_column() { let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE])); - one_column_roundtrip("all_null_primitive_single_column", values, true); + one_column_roundtrip( + "all_null_primitive_single_column", + values, + true, + Some(SMALL_SIZE / 2), + ); } #[test] fn null_single_column() { let values = Arc::new(NullArray::new(SMALL_SIZE)); - one_column_roundtrip("null_single_column", values, true); + one_column_roundtrip("null_single_column", values, true, Some(SMALL_SIZE / 2)); // null arrays are always nullable, a test with non-nullable nulls fails } @@ -1176,7 +1246,7 @@ mod tests { let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None)); - one_column_roundtrip("timestamp_second_single_column", values, false); + one_column_roundtrip("timestamp_second_single_column", values, false, Some(3)); } #[test] @@ -1184,7 +1254,12 @@ mod tests { let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); let values = Arc::new(TimestampMillisecondArray::from_vec(raw_values, None)); - one_column_roundtrip("timestamp_millisecond_single_column", values, false); + one_column_roundtrip( + "timestamp_millisecond_single_column", + values, + false, + Some(SMALL_SIZE / 2 + 1), + ); } #[test] @@ -1192,7 +1267,12 @@ mod tests { let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); let values = Arc::new(TimestampMicrosecondArray::from_vec(raw_values, None)); - one_column_roundtrip("timestamp_microsecond_single_column", values, false); + one_column_roundtrip( + "timestamp_microsecond_single_column", + values, + false, + Some(SMALL_SIZE / 2 + 2), + ); } #[test] @@ -1200,7 +1280,12 @@ mod tests { let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); let values = Arc::new(TimestampNanosecondArray::from_vec(raw_values, None)); - one_column_roundtrip("timestamp_nanosecond_single_column", values, false); + one_column_roundtrip( + "timestamp_nanosecond_single_column", + values, + false, + Some(SMALL_SIZE / 2), + ); } #[test] @@ -1336,7 +1421,12 @@ mod tests { builder.append_value(b"1112").unwrap(); let array = Arc::new(builder.finish()); - one_column_roundtrip("fixed_size_binary_single_column", array, true); + one_column_roundtrip( + "fixed_size_binary_single_column", + array, + true, + Some(SMALL_SIZE / 2), + ); } #[test] @@ -1379,7 +1469,7 @@ mod tests { let a = ListArray::from(a_list_data); let values = Arc::new(a); - one_column_roundtrip("list_single_column", values, true); + one_column_roundtrip("list_single_column", values, true, Some(SMALL_SIZE / 2)); } #[test] @@ -1404,7 +1494,12 @@ mod tests { let a = LargeListArray::from(a_list_data); let values = Arc::new(a); - one_column_roundtrip("large_list_single_column", values, true); + one_column_roundtrip( + "large_list_single_column", + values, + true, + Some(SMALL_SIZE / 2), + ); } #[test] @@ -1414,7 +1509,7 @@ mod tests { let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]); let values = Arc::new(s); - one_column_roundtrip("struct_single_column", values, false); + one_column_roundtrip("struct_single_column", values, false, Some(SMALL_SIZE / 2)); } #[test] @@ -1440,6 +1535,7 @@ mod tests { roundtrip( "test_arrow_writer_string_dictionary.parquet", expected_batch, + Some(SMALL_SIZE / 2), ); } @@ -1470,6 +1566,7 @@ mod tests { roundtrip( "test_arrow_writer_primitive_dictionary.parquet", expected_batch, + Some(SMALL_SIZE / 2), ); } @@ -1496,6 +1593,7 @@ mod tests { roundtrip( "test_arrow_writer_string_dictionary_unsigned_index.parquet", expected_batch, + Some(SMALL_SIZE / 2), ); } @@ -1511,7 +1609,7 @@ mod tests { u32::MAX - 1, u32::MAX, ])); - let file = one_column_roundtrip("u32_min_max_single_column", values, false); + let file = one_column_roundtrip("u32_min_max_single_column", values, false, None); // check statistics are valid let reader = SerializedFileReader::new(file).unwrap(); @@ -1542,7 +1640,7 @@ mod tests { u64::MAX - 1, u64::MAX, ])); - let file = one_column_roundtrip("u64_min_max_single_column", values, false); + let file = one_column_roundtrip("u64_min_max_single_column", values, false, None); // check statistics are valid let reader = SerializedFileReader::new(file).unwrap(); @@ -1565,7 +1663,7 @@ mod tests { fn statistics_null_counts_only_nulls() { // check that null-count statistics for "only NULL"-columns are correct let values = Arc::new(UInt64Array::from(vec![None, None])); - let file = one_column_roundtrip("null_counts", values, true); + let file = one_column_roundtrip("null_counts", values, true, None); // check statistics are valid let reader = SerializedFileReader::new(file).unwrap(); diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs index be567260e1d8..2e95039c16b9 100644 --- a/parquet/src/arrow/levels.rs +++ b/parquet/src/arrow/levels.rs @@ -42,7 +42,6 @@ use arrow::array::{make_array, ArrayRef, StructArray}; use arrow::datatypes::{DataType, Field}; -use arrow::record_batch::RecordBatch; /// Keeps track of the level information per array that is needed to write an Arrow array to Parquet. /// @@ -67,6 +66,10 @@ pub(crate) struct LevelInfo { pub max_definition: i16, /// The type of array represented by this level info pub level_type: LevelType, + /// The offset of the current level's array + pub offset: usize, + /// The length of the current level's array + pub length: usize, } /// LevelType defines the type of level, and whether it is nullable or not @@ -91,22 +94,23 @@ impl LevelType { } impl LevelInfo { - /// Create a new [LevelInfo] from a record batch. + /// Create a new [LevelInfo] by filling `length` slots, and setting an initial offset. /// /// This is a convenience function to populate the starting point of the traversal. - pub(crate) fn new_from_batch(batch: &RecordBatch) -> Self { - let num_rows = batch.num_rows(); + pub(crate) fn new(offset: usize, length: usize) -> Self { Self { // a batch has no definition level yet - definition: vec![0; num_rows], + definition: vec![0; length], // a batch has no repetition as it is not a list repetition: None, // a batch has sequential offsets, should be num_rows + 1 - array_offsets: (0..=(num_rows as i64)).collect(), + array_offsets: (0..=(length as i64)).collect(), // all values at a batch-level are non-null - array_mask: vec![true; num_rows], + array_mask: vec![true; length], max_definition: 0, level_type: LevelType::Root, + offset, + length, } } @@ -127,16 +131,19 @@ impl LevelInfo { array: &ArrayRef, field: &Field, ) -> Vec { - let (array_offsets, array_mask) = Self::get_array_offsets_and_masks(array); + let (array_offsets, array_mask) = + Self::get_array_offsets_and_masks(array, self.offset, self.length); match array.data_type() { DataType::Null => vec![Self { definition: self.definition.clone(), repetition: self.repetition.clone(), - array_offsets: self.array_offsets.clone(), + array_offsets, array_mask, max_definition: self.max_definition.max(1), // Null type is always nullable level_type: LevelType::Primitive(true), + offset: self.offset, + length: self.length, }], DataType::Boolean | DataType::Int8 @@ -171,6 +178,8 @@ impl LevelInfo { )] } DataType::List(list_field) | DataType::LargeList(list_field) => { + let child_offset = array_offsets[0] as usize; + let child_len = *array_offsets.last().unwrap() as usize; // Calculate the list level let list_level = self.calculate_child_levels( array_offsets, @@ -182,8 +191,11 @@ impl LevelInfo { let array_data = array.data(); let child_data = array_data.child_data().get(0).unwrap(); let child_array = make_array(child_data.clone()); - let (child_offsets, child_mask) = - Self::get_array_offsets_and_masks(&child_array); + let (child_offsets, child_mask) = Self::get_array_offsets_and_masks( + &child_array, + child_offset, + child_len - child_offset, + ); match child_array.data_type() { // TODO: The behaviour of a > is untested @@ -364,14 +376,15 @@ impl LevelInfo { if parent_len == 0 { // If the parent length is 0, there won't be a slot for the child - let index = start + nulls_seen; + let index = start + nulls_seen - self.offset; definition.push(self.definition[index]); repetition.push(0); merged_array_mask.push(self.array_mask[index]); nulls_seen += 1; } else { (start..end).for_each(|parent_index| { - let index = parent_index + nulls_seen; + let index = parent_index + nulls_seen - self.offset; + let parent_index = parent_index - self.offset; // parent is either defined at this level, or earlier let parent_def = self.definition[index]; @@ -418,6 +431,9 @@ impl LevelInfo { debug_assert_eq!(definition.len(), merged_array_mask.len()); + let offset = *array_offsets.first().unwrap() as usize; + let length = *array_offsets.last().unwrap() as usize - offset; + Self { definition, repetition: Some(repetition), @@ -425,6 +441,8 @@ impl LevelInfo { array_mask: merged_array_mask, max_definition, level_type, + offset: offset + self.offset, + length, } } (LevelType::List(_), _) => { @@ -442,7 +460,7 @@ impl LevelInfo { let parent_len = end - start; if parent_len == 0 { - let index = start + nulls_seen; + let index = start + nulls_seen - self.offset; definition.push(self.definition[index]); repetition.push(reps[index]); merged_array_mask.push(self.array_mask[index]); @@ -450,8 +468,8 @@ impl LevelInfo { } else { // iterate through the array, adjusting child definitions for nulls (start..end).for_each(|child_index| { - let index = child_index + nulls_seen; - let child_mask = array_mask[child_index]; + let index = child_index + nulls_seen - self.offset; + let child_mask = array_mask[child_index - self.offset]; let parent_mask = self.array_mask[index]; let parent_def = self.definition[index]; @@ -470,6 +488,9 @@ impl LevelInfo { debug_assert_eq!(definition.len(), merged_array_mask.len()); + let offset = *array_offsets.first().unwrap() as usize; + let length = *array_offsets.last().unwrap() as usize - offset; + Self { definition, repetition: Some(repetition), @@ -477,6 +498,8 @@ impl LevelInfo { array_mask: merged_array_mask, max_definition, level_type, + offset: offset + self.offset, + length, } } (_, LevelType::List(is_nullable)) => { @@ -539,6 +562,9 @@ impl LevelInfo { debug_assert_eq!(definition.len(), merged_array_mask.len()); + let offset = *array_offsets.first().unwrap() as usize; + let length = *array_offsets.last().unwrap() as usize - offset; + Self { definition, repetition: Some(repetition), @@ -546,6 +572,8 @@ impl LevelInfo { array_mask: merged_array_mask, max_definition, level_type, + offset, + length, } } (_, _) => { @@ -583,6 +611,9 @@ impl LevelInfo { array_mask: merged_array_mask, max_definition, level_type, + // Inherit parent offset and length + offset: self.offset, + length: self.length, } } } @@ -592,7 +623,11 @@ impl LevelInfo { /// - Primitive, binary and struct arrays' offsets will be a sequence, masks obtained /// from validity bitmap /// - List array offsets will be the value offsets, masks are computed from offsets - fn get_array_offsets_and_masks(array: &ArrayRef) -> (Vec, Vec) { + fn get_array_offsets_and_masks( + array: &ArrayRef, + offset: usize, + len: usize, + ) -> (Vec, Vec) { match array.data_type() { DataType::Null | DataType::Boolean @@ -622,10 +657,10 @@ impl LevelInfo { | DataType::Dictionary(_, _) | DataType::Decimal(_, _) => { let array_mask = match array.data().null_buffer() { - Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()), - None => vec![true; array.len()], + Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len), + None => vec![true; len], }; - ((0..=(array.len() as i64)).collect(), array_mask) + ((0..=(len as i64)).collect(), array_mask) } DataType::List(_) => { let data = array.data(); @@ -633,31 +668,37 @@ impl LevelInfo { let offsets = offsets .to_vec() .into_iter() + .skip(offset) + .take(len + 1) .map(|v| v as i64) .collect::>(); let array_mask = match array.data().null_buffer() { - Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()), - None => vec![true; array.len()], + Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len), + None => vec![true; len], }; (offsets, array_mask) } DataType::LargeList(_) => { - let offsets = - unsafe { array.data().buffers()[0].typed_data::() }.to_vec(); + let offsets = unsafe { array.data().buffers()[0].typed_data::() } + .iter() + .skip(offset) + .take(len + 1) + .copied() + .collect(); let array_mask = match array.data().null_buffer() { - Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()), - None => vec![true; array.len()], + Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len), + None => vec![true; len], }; (offsets, array_mask) } DataType::FixedSizeBinary(value_len) => { let array_mask = match array.data().null_buffer() { - Some(buf) => get_bool_array_slice(buf, array.offset(), array.len()), - None => vec![true; array.len()], + Some(buf) => get_bool_array_slice(buf, array.offset() + offset, len), + None => vec![true; len], }; let value_len = *value_len as i64; ( - (0..=(array.len() as i64)).map(|v| v * value_len).collect(), + (0..=(len as i64)).map(|v| v * value_len).collect(), array_mask, ) } @@ -722,20 +763,14 @@ fn get_bool_array_slice( #[cfg(test)] mod tests { - use std::sync::Arc; + use super::*; - use arrow::{ - array::ListArray, - array::{Array, ArrayData, Int32Array}, - buffer::Buffer, - datatypes::Schema, - }; - use arrow::{ - array::{Float32Array, Float64Array, Int16Array}, - datatypes::ToByteSlice, - }; + use std::sync::Arc; - use super::*; + use arrow::array::*; + use arrow::buffer::Buffer; + use arrow::datatypes::{Schema, ToByteSlice}; + use arrow::record_batch::RecordBatch; #[test] fn test_calculate_array_levels_twitter_example() { @@ -748,6 +783,8 @@ mod tests { array_mask: vec![true, true], // both lists defined max_definition: 0, level_type: LevelType::Root, + offset: 0, + length: 2, }; // offset into array, each level1 has 2 values let array_offsets = vec![0, 2, 4]; @@ -767,6 +804,8 @@ mod tests { array_mask: vec![true, true, true, true], max_definition: 1, level_type: LevelType::List(false), + offset: 0, + length: 4, }; // the separate asserts make it easier to see what's failing assert_eq!(&levels.definition, &expected_levels.definition); @@ -794,6 +833,8 @@ mod tests { array_mask: vec![true; 10], max_definition: 2, level_type: LevelType::List(false), + offset: 0, + length: 10, }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); @@ -814,6 +855,8 @@ mod tests { array_mask: vec![true; 10], max_definition: 0, level_type: LevelType::Root, + offset: 0, + length: 10, }; let array_offsets: Vec = (0..=10).collect(); let array_mask = vec![true; 10]; @@ -830,6 +873,8 @@ mod tests { array_mask, max_definition: 1, level_type: LevelType::Primitive(false), + offset: 0, + length: 10, }; assert_eq!(&levels, &expected_levels); } @@ -844,6 +889,8 @@ mod tests { array_mask: vec![true, true, true, true, true], max_definition: 0, level_type: LevelType::Root, + offset: 0, + length: 5, }; let array_offsets: Vec = (0..=5).collect(); let array_mask = vec![true, false, true, true, false]; @@ -860,6 +907,8 @@ mod tests { array_mask, max_definition: 1, level_type: LevelType::Primitive(true), + offset: 0, + length: 5, }; assert_eq!(&levels, &expected_levels); } @@ -875,6 +924,8 @@ mod tests { array_mask: vec![true, true, true, true, true], max_definition: 0, level_type: LevelType::Root, + offset: 0, + length: 5, }; let array_offsets = vec![0, 2, 2, 4, 8, 11]; let array_mask = vec![true, false, true, true, true]; @@ -905,6 +956,8 @@ mod tests { ], max_definition: 1, level_type: LevelType::List(true), + offset: 0, + length: 11, // the child has 11 slots }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); @@ -936,6 +989,8 @@ mod tests { array_mask: vec![false, true, false, true, true], max_definition: 1, level_type: LevelType::Struct(true), + offset: 0, + length: 5, }; let array_offsets = vec![0, 2, 2, 4, 8, 11]; let array_mask = vec![true, false, true, true, true]; @@ -960,6 +1015,8 @@ mod tests { ], max_definition: 2, level_type: LevelType::List(true), + offset: 0, + length: 11, }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); @@ -1017,6 +1074,8 @@ mod tests { ], max_definition: 4, level_type: LevelType::List(true), + offset: 0, + length: 22, }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); @@ -1042,6 +1101,8 @@ mod tests { array_mask: vec![true, true, true, true], max_definition: 1, level_type: LevelType::Struct(true), + offset: 0, + length: 4, }; // 0: null ([], but mask is false, so it's not just an empty list) // 1: [1, 2, 3] @@ -1066,6 +1127,8 @@ mod tests { array_mask: vec![false, true, true, true, true, true, true, true], max_definition: 2, level_type: LevelType::List(true), + offset: 0, + length: 8, }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); @@ -1113,6 +1176,8 @@ mod tests { array_offsets, max_definition: 4, level_type: LevelType::List(true), + offset: 0, + length: 16, }; assert_eq!(&levels.definition, &expected_levels.definition); assert_eq!(&levels.repetition, &expected_levels.repetition); @@ -1140,6 +1205,8 @@ mod tests { array_mask: vec![true, true, true, true, false, true], max_definition: 1, level_type: LevelType::Struct(true), + offset: 0, + length: 6, }; // b's offset and mask let b_offsets: Vec = (0..=6).collect(); @@ -1152,6 +1219,8 @@ mod tests { array_mask: vec![true, true, true, false, false, true], max_definition: 2, level_type: LevelType::Struct(true), + offset: 0, + length: 6, }; let b_levels = a_levels.calculate_child_levels( b_offsets.clone(), @@ -1171,6 +1240,8 @@ mod tests { array_mask: vec![true, false, true, false, false, true], max_definition: 3, level_type: LevelType::Struct(true), + offset: 0, + length: 6, }; let c_levels = b_levels.calculate_child_levels(c_offsets, c_mask, LevelType::Struct(true)); @@ -1203,15 +1274,17 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap(); let expected_batch_level = LevelInfo { - definition: vec![0; 5], + definition: vec![0; 2], repetition: None, - array_offsets: (0..=5).collect(), - array_mask: vec![true, true, true, true, true], + array_offsets: (0..=2).collect(), + array_mask: vec![true, true], max_definition: 0, level_type: LevelType::Root, + offset: 2, + length: 2, }; - let batch_level = LevelInfo::new_from_batch(&batch); + let batch_level = LevelInfo::new(2, 2); assert_eq!(&batch_level, &expected_batch_level); // calculate the list's level @@ -1229,14 +1302,14 @@ mod tests { let list_level = levels.get(0).unwrap(); let expected_level = LevelInfo { - definition: vec![3, 3, 3, 0, 3, 3, 3, 3, 3, 3, 3], - repetition: Some(vec![0, 0, 1, 0, 0, 1, 1, 0, 1, 1, 1]), - array_offsets: vec![0, 1, 3, 3, 6, 10], - array_mask: vec![ - true, true, true, false, true, true, true, true, true, true, true, - ], + definition: vec![0, 3, 3, 3], + repetition: Some(vec![0, 0, 1, 1]), + array_offsets: vec![3, 3, 6], + array_mask: vec![false, true, true, true], max_definition: 3, level_type: LevelType::Primitive(true), + offset: 3, + length: 3, }; assert_eq!(&list_level.definition, &expected_level.definition); assert_eq!(&list_level.repetition, &expected_level.repetition); @@ -1320,9 +1393,11 @@ mod tests { array_mask: vec![true, true, true, true, true], max_definition: 0, level_type: LevelType::Root, + offset: 0, + length: 5, }; - let batch_level = LevelInfo::new_from_batch(&batch); + let batch_level = LevelInfo::new(0, 5); assert_eq!(&batch_level, &expected_batch_level); // calculate the list's level @@ -1347,6 +1422,8 @@ mod tests { array_mask: vec![true, true, true, true, true], max_definition: 1, level_type: LevelType::Primitive(false), + offset: 0, + length: 5, }; assert_eq!(list_level, &expected_level); @@ -1360,6 +1437,8 @@ mod tests { array_mask: vec![true, false, false, true, true], max_definition: 1, level_type: LevelType::Primitive(true), + offset: 0, + length: 5, }; assert_eq!(list_level, &expected_level); @@ -1373,6 +1452,8 @@ mod tests { array_mask: vec![false, false, false, true, false], max_definition: 2, level_type: LevelType::Primitive(true), + offset: 0, + length: 5, }; assert_eq!(list_level, &expected_level); @@ -1386,6 +1467,8 @@ mod tests { array_mask: vec![true, false, true, false, true], max_definition: 3, level_type: LevelType::Primitive(true), + offset: 0, + length: 5, }; assert_eq!(list_level, &expected_level); } @@ -1399,6 +1482,8 @@ mod tests { array_mask: vec![true, true, true, false, true, true, true], max_definition: 3, level_type: LevelType::Primitive(true), + offset: 0, + length: 6, }; let expected = vec![0, 1, 2, 3, 4, 5]; @@ -1427,7 +1512,7 @@ mod tests { RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]) .unwrap(); - let batch_level = LevelInfo::new_from_batch(&batch); + let batch_level = LevelInfo::new(0, batch.num_rows()); let struct_null_level = batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0)); @@ -1451,7 +1536,7 @@ mod tests { RecordBatch::try_new(Arc::new(schema), vec![Arc::new(some_nested_object)]) .unwrap(); - let batch_level = LevelInfo::new_from_batch(&batch); + let batch_level = LevelInfo::new(0, batch.num_rows()); let struct_non_null_level = batch_level.calculate_array_levels(batch.column(0), batch.schema().field(0)); diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index b0b25f9b9527..0d0cbef0b9ea 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -290,6 +290,7 @@ impl WriterPropertiesBuilder { /// Sets max size for a row group. pub fn set_max_row_group_size(mut self, value: usize) -> Self { + assert!(value > 0, "Cannot have a 0 max row group size"); self.max_row_group_size = value; self }