From eae28204462afeeff7cc4b06dbfb652eed1ad206 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 27 Sep 2023 16:37:39 +0100 Subject: [PATCH 1/7] Facilitate parallel parquet writing --- parquet/src/arrow/arrow_writer/levels.rs | 434 +++++++++++++---------- parquet/src/arrow/arrow_writer/mod.rs | 154 ++++---- 2 files changed, 326 insertions(+), 262 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index 48615dc3d599..4a0bd551e1f9 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -42,19 +42,20 @@ use crate::errors::{ParquetError, Result}; use arrow_array::cast::AsArray; -use arrow_array::{Array, ArrayRef, FixedSizeListArray, OffsetSizeTrait, StructArray}; -use arrow_buffer::NullBuffer; +use arrow_array::{Array, ArrayRef, OffsetSizeTrait}; +use arrow_buffer::{NullBuffer, OffsetBuffer}; use arrow_schema::{DataType, Field}; use std::ops::Range; +use std::sync::Arc; -/// Performs a depth-first scan of the children of `array`, constructing [`LevelInfo`] +/// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`] /// for each leaf column encountered pub(crate) fn calculate_array_levels( array: &ArrayRef, field: &Field, -) -> Result> { - let mut builder = LevelInfoBuilder::try_new(field, Default::default())?; - builder.write(array, 0..array.len()); +) -> Result> { + let mut builder = LevelInfoBuilder::try_new(field, Default::default(), array)?; + builder.write(0..array.len()); Ok(builder.finish()) } @@ -102,31 +103,57 @@ struct LevelContext { def_level: i16, } -/// A helper to construct [`LevelInfo`] from a potentially nested [`Field`] +/// A helper to construct [`ArrayLevels`] from a potentially nested [`Field`] enum LevelInfoBuilder { /// A primitive, leaf array - Primitive(LevelInfo), - /// A list array, contains the [`LevelInfoBuilder`] of the child and - /// the [`LevelContext`] of this list - List(Box, LevelContext), - /// A list array, contains the [`LevelInfoBuilder`] of its children and - /// the [`LevelContext`] of this struct array - Struct(Vec, LevelContext), + Primitive(ArrayLevels), + /// A list array + List( + Box, // Child Values + LevelContext, // Context + OffsetBuffer, // Offsets + Option, // Nulls + ), + /// A large list array + LargeList( + Box, // Child Values + LevelContext, // Context + OffsetBuffer, // Offsets + Option, // Nulls + ), + /// A fixed size list array + FixedSizeList( + Box, // Values + LevelContext, // Context + usize, // List Size + Option, // Nulls + ), + /// A struct array + Struct(Vec, LevelContext, Option), } impl LevelInfoBuilder { /// Create a new [`LevelInfoBuilder`] for the given [`Field`] and parent [`LevelContext`] - fn try_new(field: &Field, parent_ctx: LevelContext) -> Result { - match field.data_type() { - d if is_leaf(d) => Ok(Self::Primitive(LevelInfo::new( - parent_ctx, - field.is_nullable(), - ))), - DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => Ok(Self::Primitive( - LevelInfo::new(parent_ctx, field.is_nullable()), - )), + fn try_new( + field: &Field, + parent_ctx: LevelContext, + array: &ArrayRef, + ) -> Result { + assert_eq!(field.data_type(), array.data_type()); + let is_nullable = field.is_nullable(); + + match array.data_type() { + d if is_leaf(d) => { + let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone()); + Ok(Self::Primitive(levels)) + } + DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => { + let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone()); + Ok(Self::Primitive(levels)) + } DataType::Struct(children) => { - let def_level = match field.is_nullable() { + let array = array.as_struct(); + let def_level = match is_nullable { true => parent_ctx.def_level + 1, false => parent_ctx.def_level, }; @@ -138,16 +165,17 @@ impl LevelInfoBuilder { let children = children .iter() - .map(|f| Self::try_new(f, ctx)) + .zip(array.columns()) + .map(|(f, a)| Self::try_new(f, ctx, a)) .collect::>()?; - Ok(Self::Struct(children, ctx)) + Ok(Self::Struct(children, ctx, array.nulls().cloned())) } DataType::List(child) | DataType::LargeList(child) | DataType::Map(child, _) | DataType::FixedSizeList(child, _) => { - let def_level = match field.is_nullable() { + let def_level = match is_nullable { true => parent_ctx.def_level + 2, false => parent_ctx.def_level + 1, }; @@ -157,79 +185,70 @@ impl LevelInfoBuilder { def_level, }; - let child = Self::try_new(child.as_ref(), ctx)?; - Ok(Self::List(Box::new(child), ctx)) + Ok(match field.data_type() { + DataType::List(_) => { + let list = array.as_list(); + let child = Self::try_new(child.as_ref(), ctx, list.values())?; + let offsets = list.offsets().clone(); + Self::List(Box::new(child), ctx, offsets, list.nulls().cloned()) + } + DataType::LargeList(_) => { + let list = array.as_list(); + let child = Self::try_new(child.as_ref(), ctx, list.values())?; + let offsets = list.offsets().clone(); + let nulls = list.nulls().cloned(); + Self::LargeList(Box::new(child), ctx, offsets, nulls) + } + DataType::Map(_, _) => { + let map = array.as_map(); + let entries = Arc::new(map.entries().clone()) as ArrayRef; + let child = Self::try_new(child.as_ref(), ctx, &entries)?; + let offsets = map.offsets().clone(); + Self::List(Box::new(child), ctx, offsets, map.nulls().cloned()) + } + DataType::FixedSizeList(_, size) => { + let list = array.as_fixed_size_list(); + let child = Self::try_new(child.as_ref(), ctx, list.values())?; + let nulls = list.nulls().cloned(); + Self::FixedSizeList(Box::new(child), ctx, *size as _, nulls) + } + _ => unreachable!(), + }) } d => Err(nyi_err!("Datatype {} is not yet supported", d)), } } - /// Finish this [`LevelInfoBuilder`] returning the [`LevelInfo`] for the leaf columns + /// Finish this [`LevelInfoBuilder`] returning the [`ArrayLevels`] for the leaf columns /// as enumerated by a depth-first search - fn finish(self) -> Vec { + fn finish(self) -> Vec { match self { LevelInfoBuilder::Primitive(v) => vec![v], - LevelInfoBuilder::List(v, _) => v.finish(), - LevelInfoBuilder::Struct(v, _) => { + LevelInfoBuilder::List(v, _, _, _) + | LevelInfoBuilder::LargeList(v, _, _, _) + | LevelInfoBuilder::FixedSizeList(v, _, _, _) => v.finish(), + LevelInfoBuilder::Struct(v, _, _) => { v.into_iter().flat_map(|l| l.finish()).collect() } } } /// Given an `array`, write the level data for the elements in `range` - fn write(&mut self, array: &dyn Array, range: Range) { - match array.data_type() { - d if is_leaf(d) => self.write_leaf(array, range), - DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => { - self.write_leaf(array, range) - } - DataType::Struct(_) => { - let array = array.as_struct(); - self.write_struct(array, range) - } - DataType::List(_) => { - let array = array.as_list::(); - self.write_list( - array.value_offsets(), - array.nulls(), - array.values(), - range, - ) + fn write(&mut self, range: Range) { + match self { + LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range), + LevelInfoBuilder::List(child, ctx, offsets, nulls) => { + Self::write_list(child, ctx, offsets, nulls.as_ref(), range) } - DataType::LargeList(_) => { - let array = array.as_list::(); - self.write_list( - array.value_offsets(), - array.nulls(), - array.values(), - range, - ) + LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => { + Self::write_list(child, ctx, offsets, nulls.as_ref(), range) } - DataType::Map(_, _) => { - let array = array.as_map(); - // A Map is just as ListArray with a StructArray child, we therefore - // treat it as such to avoid code duplication - self.write_list( - array.value_offsets(), - array.nulls(), - array.entries(), - range, - ) + LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => { + Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range) } - &DataType::FixedSizeList(_, size) => { - let array = array - .as_any() - .downcast_ref::() - .expect("unable to get fixed-size list array"); - - self.write_fixed_size_list( - size as usize, - array.nulls(), - array.values(), - range, - ) + LevelInfoBuilder::Struct(children, ctx, nulls) => { + Self::write_struct(children, ctx, nulls.as_ref(), range) } - _ => unreachable!(), } } @@ -237,22 +256,17 @@ impl LevelInfoBuilder { /// /// Note: MapArrays are `ListArray` under the hood and so are dispatched to this method fn write_list( - &mut self, + child: &mut LevelInfoBuilder, + ctx: &LevelContext, offsets: &[O], nulls: Option<&NullBuffer>, - values: &dyn Array, range: Range, ) { - let (child, ctx) = match self { - Self::List(child, ctx) => (child, ctx), - _ => unreachable!(), - }; - let offsets = &offsets[range.start..range.end + 1]; let write_non_null_slice = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| { - child.write(values, start_idx..end_idx); + child.write(start_idx..end_idx); child.visit_leaves(|leaf| { let rep_levels = leaf.rep_levels.as_mut().unwrap(); let mut rev = rep_levels.iter_mut().rev(); @@ -324,12 +338,12 @@ impl LevelInfoBuilder { } /// Write `range` elements from StructArray `array` - fn write_struct(&mut self, array: &StructArray, range: Range) { - let (children, ctx) = match self { - Self::Struct(children, ctx) => (children, ctx), - _ => unreachable!(), - }; - + fn write_struct( + children: &mut [LevelInfoBuilder], + ctx: &LevelContext, + nulls: Option<&NullBuffer>, + range: Range, + ) { let write_null = |children: &mut [LevelInfoBuilder], range: Range| { for child in children { child.visit_leaves(|info| { @@ -346,12 +360,12 @@ impl LevelInfoBuilder { }; let write_non_null = |children: &mut [LevelInfoBuilder], range: Range| { - for (child_array, child) in array.columns().iter().zip(children) { - child.write(child_array, range.clone()) + for child in children { + child.write(range.clone()) } }; - match array.nulls() { + match nulls { Some(validity) => { let mut last_non_null_idx = None; let mut last_null_idx = None; @@ -388,22 +402,17 @@ impl LevelInfoBuilder { /// Write `range` elements from FixedSizeListArray with child data `values` and null bitmap `nulls`. fn write_fixed_size_list( - &mut self, + child: &mut LevelInfoBuilder, + ctx: &LevelContext, fixed_size: usize, nulls: Option<&NullBuffer>, - values: &dyn Array, range: Range, ) { - let (child, ctx) = match self { - Self::List(child, ctx) => (child, ctx), - _ => unreachable!(), - }; - let write_non_null = |child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| { let values_start = start_idx * fixed_size; let values_end = end_idx * fixed_size; - child.write(values, values_start..values_end); + child.write(values_start..values_end); child.visit_leaves(|leaf| { let rep_levels = leaf.rep_levels.as_mut().unwrap(); @@ -481,12 +490,7 @@ impl LevelInfoBuilder { } /// Write a primitive array, as defined by [`is_leaf`] - fn write_leaf(&mut self, array: &dyn Array, range: Range) { - let info = match self { - Self::Primitive(info) => info, - _ => unreachable!(), - }; - + fn write_leaf(info: &mut ArrayLevels, range: Range) { let len = range.end - range.start; match &mut info.def_levels { @@ -494,7 +498,7 @@ impl LevelInfoBuilder { def_levels.reserve(len); info.non_null_indices.reserve(len); - match array.logical_nulls() { + match info.array.logical_nulls() { Some(nulls) => { // TODO: Faster bitmask iteration (#1757) for i in range { @@ -523,11 +527,13 @@ impl LevelInfoBuilder { } /// Visits all children of this node in depth first order - fn visit_leaves(&mut self, visit: impl Fn(&mut LevelInfo) + Copy) { + fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) { match self { LevelInfoBuilder::Primitive(info) => visit(info), - LevelInfoBuilder::List(c, _) => c.visit_leaves(visit), - LevelInfoBuilder::Struct(children, _) => { + LevelInfoBuilder::List(c, _, _, _) + | LevelInfoBuilder::LargeList(c, _, _, _) + | LevelInfoBuilder::FixedSizeList(c, _, _, _) => c.visit_leaves(visit), + LevelInfoBuilder::Struct(children, _, _) => { for c in children { c.visit_leaves(visit) } @@ -537,8 +543,8 @@ impl LevelInfoBuilder { } /// The data necessary to write a primitive Arrow array to parquet, taking into account /// any non-primitive parents it may have in the arrow representation -#[derive(Debug, Eq, PartialEq, Clone)] -pub(crate) struct LevelInfo { +#[derive(Debug, Clone)] +pub(crate) struct ArrayLevels { /// Array's definition levels /// /// Present if `max_def_level != 0` @@ -558,10 +564,25 @@ pub(crate) struct LevelInfo { /// The maximum repetition for this leaf column max_rep_level: i16, + + /// The arrow array + array: ArrayRef, } -impl LevelInfo { - fn new(ctx: LevelContext, is_nullable: bool) -> Self { +impl PartialEq for ArrayLevels { + fn eq(&self, other: &Self) -> bool { + self.def_levels == other.def_levels + && self.rep_levels == other.rep_levels + && self.non_null_indices == other.non_null_indices + && self.max_def_level == other.max_def_level + && self.max_rep_level == other.max_rep_level + && self.array.as_ref() == other.array.as_ref() + } +} +impl Eq for ArrayLevels {} + +impl ArrayLevels { + fn new(ctx: LevelContext, is_nullable: bool, array: ArrayRef) -> Self { let max_rep_level = ctx.rep_level; let max_def_level = match is_nullable { true => ctx.def_level + 1, @@ -574,9 +595,14 @@ impl LevelInfo { non_null_indices: vec![], max_def_level, max_rep_level, + array, } } + pub fn array(&self) -> &ArrayRef { + &self.array + } + pub fn def_levels(&self) -> Option<&[i16]> { self.def_levels.as_deref() } @@ -597,6 +623,7 @@ mod tests { use std::sync::Arc; use arrow_array::builder::*; + use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; use arrow_array::*; use arrow_buffer::{Buffer, ToByteSlice}; @@ -622,7 +649,7 @@ mod tests { let inner_list = ArrayDataBuilder::new(inner_type) .len(4) .add_buffer(offsets) - .add_child_data(primitives.into_data()) + .add_child_data(primitives.to_data()) .build() .unwrap(); @@ -638,12 +665,13 @@ mod tests { let levels = calculate_array_levels(&outer_list, &outer_field).unwrap(); assert_eq!(levels.len(), 1); - let expected = LevelInfo { + let expected = ArrayLevels { def_levels: Some(vec![2; 10]), rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]), non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9], max_def_level: 2, max_rep_level: 2, + array: Arc::new(primitives), }; assert_eq!(&levels[0], &expected); } @@ -657,12 +685,13 @@ mod tests { let levels = calculate_array_levels(&array, &field).unwrap(); assert_eq!(levels.len(), 1); - let expected_levels = LevelInfo { + let expected_levels = ArrayLevels { def_levels: None, rep_levels: None, non_null_indices: (0..10).collect(), max_def_level: 0, max_rep_level: 0, + array, }; assert_eq!(&levels[0], &expected_levels); } @@ -682,12 +711,13 @@ mod tests { let levels = calculate_array_levels(&array, &field).unwrap(); assert_eq!(levels.len(), 1); - let expected_levels = LevelInfo { + let expected_levels = ArrayLevels { def_levels: Some(vec![1, 0, 1, 1, 0]), rep_levels: None, non_null_indices: vec![0, 2, 3], max_def_level: 1, max_rep_level: 0, + array, }; assert_eq!(&levels[0], &expected_levels); } @@ -706,7 +736,7 @@ mod tests { let list = ArrayDataBuilder::new(list_type.clone()) .len(5) .add_buffer(offsets) - .add_child_data(leaf_array.into_data()) + .add_child_data(leaf_array.to_data()) .build() .unwrap(); let list = make_array(list); @@ -715,12 +745,13 @@ mod tests { let levels = calculate_array_levels(&list, &list_field).unwrap(); assert_eq!(levels.len(), 1); - let expected_levels = LevelInfo { + let expected_levels = ArrayLevels { def_levels: Some(vec![1; 5]), rep_levels: Some(vec![0; 5]), non_null_indices: (0..5).collect(), max_def_level: 1, max_rep_level: 1, + array: Arc::new(leaf_array), }; assert_eq!(&levels[0], &expected_levels); @@ -737,7 +768,7 @@ mod tests { let list = ArrayDataBuilder::new(list_type.clone()) .len(5) .add_buffer(offsets) - .add_child_data(leaf_array.into_data()) + .add_child_data(leaf_array.to_data()) .null_bit_buffer(Some(Buffer::from([0b00011101]))) .build() .unwrap(); @@ -747,12 +778,13 @@ mod tests { let levels = calculate_array_levels(&list, &list_field).unwrap(); assert_eq!(levels.len(), 1); - let expected_levels = LevelInfo { + let expected_levels = ArrayLevels { def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]), rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]), non_null_indices: (0..11).collect(), max_def_level: 2, max_rep_level: 1, + array: Arc::new(leaf_array), }; assert_eq!(&levels[0], &expected_levels); } @@ -778,7 +810,7 @@ mod tests { let list_type = DataType::List(Arc::new(leaf_field)); let list = ArrayData::builder(list_type.clone()) .len(5) - .add_child_data(leaf.into_data()) + .add_child_data(leaf.to_data()) .add_buffer(Buffer::from_iter([0_i32, 2, 2, 4, 8, 11])) .build() .unwrap(); @@ -795,12 +827,13 @@ mod tests { let levels = calculate_array_levels(&array, &struct_field).unwrap(); assert_eq!(levels.len(), 1); - let expected_levels = LevelInfo { + let expected_levels = ArrayLevels { def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]), rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]), non_null_indices: (4..11).collect(), max_def_level: 3, max_rep_level: 1, + array: Arc::new(leaf), }; assert_eq!(&levels[0], &expected_levels); @@ -820,7 +853,7 @@ mod tests { let offsets = Buffer::from_iter([0_i32, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22]); let l1 = ArrayData::builder(l1_type.clone()) .len(11) - .add_child_data(leaf.into_data()) + .add_child_data(leaf.to_data()) .add_buffer(offsets) .build() .unwrap(); @@ -840,7 +873,7 @@ mod tests { let levels = calculate_array_levels(&l2, &l2_field).unwrap(); assert_eq!(levels.len(), 1); - let expected_levels = LevelInfo { + let expected_levels = ArrayLevels { def_levels: Some(vec![ 5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, ]), @@ -850,6 +883,7 @@ mod tests { non_null_indices: (0..22).collect(), max_def_level: 5, max_rep_level: 2, + array: Arc::new(leaf), }; assert_eq!(&levels[0], &expected_levels); @@ -871,7 +905,7 @@ mod tests { let list = ArrayData::builder(list_type.clone()) .len(4) .add_buffer(Buffer::from_iter(0_i32..5)) - .add_child_data(leaf.into_data()) + .add_child_data(leaf.to_data()) .build() .unwrap(); let list = make_array(list); @@ -880,12 +914,13 @@ mod tests { let levels = calculate_array_levels(&list, &list_field).unwrap(); assert_eq!(levels.len(), 1); - let expected_levels = LevelInfo { + let expected_levels = ArrayLevels { def_levels: Some(vec![1; 4]), rep_levels: Some(vec![0; 4]), non_null_indices: (0..4).collect(), max_def_level: 1, max_rep_level: 1, + array: Arc::new(leaf), }; assert_eq!(&levels[0], &expected_levels); @@ -898,7 +933,7 @@ mod tests { .len(4) .add_buffer(Buffer::from_iter([0_i32, 0, 3, 5, 7])) .null_bit_buffer(Some(Buffer::from([0b00001110]))) - .add_child_data(leaf.into_data()) + .add_child_data(leaf.to_data()) .build() .unwrap(); let list = make_array(list); @@ -911,12 +946,13 @@ mod tests { let levels = calculate_array_levels(&array, &struct_field).unwrap(); assert_eq!(levels.len(), 1); - let expected_levels = LevelInfo { + let expected_levels = ArrayLevels { def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]), rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]), non_null_indices: (0..7).collect(), max_def_level: 3, max_rep_level: 1, + array: Arc::new(leaf), }; assert_eq!(&levels[0], &expected_levels); @@ -933,7 +969,7 @@ mod tests { let list_1 = ArrayData::builder(list_1_type.clone()) .len(7) .add_buffer(Buffer::from_iter([0_i32, 1, 3, 3, 6, 10, 10, 15])) - .add_child_data(leaf.into_data()) + .add_child_data(leaf.to_data()) .build() .unwrap(); @@ -958,12 +994,13 @@ mod tests { let levels = calculate_array_levels(&array, &struct_field).unwrap(); assert_eq!(levels.len(), 1); - let expected_levels = LevelInfo { + let expected_levels = ArrayLevels { def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5]), rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2]), non_null_indices: (0..15).collect(), max_def_level: 5, max_rep_level: 2, + array: Arc::new(leaf), }; assert_eq!(&levels[0], &expected_levels); } @@ -980,9 +1017,10 @@ mod tests { // - {a: {b: {c: 6}}} let c = Int32Array::from_iter([Some(1), None, Some(3), None, Some(5), Some(6)]); + let leaf = Arc::new(c) as ArrayRef; let c_field = Arc::new(Field::new("c", DataType::Int32, true)); let b = StructArray::from(( - (vec![(c_field, Arc::new(c) as ArrayRef)]), + (vec![(c_field, leaf.clone())]), Buffer::from([0b00110111]), )); @@ -998,12 +1036,13 @@ mod tests { let levels = calculate_array_levels(&a_array, &a_field).unwrap(); assert_eq!(levels.len(), 1); - let expected_levels = LevelInfo { + let expected_levels = ArrayLevels { def_levels: Some(vec![3, 2, 3, 1, 0, 3]), rep_levels: None, non_null_indices: vec![0, 2, 5], max_def_level: 3, max_rep_level: 0, + array: leaf, }; assert_eq!(&levels[0], &expected_levels); } @@ -1020,7 +1059,7 @@ mod tests { .len(5) .add_buffer(a_value_offsets) .null_bit_buffer(Some(Buffer::from(vec![0b00011011]))) - .add_child_data(a_values.into_data()) + .add_child_data(a_values.to_data()) .build() .unwrap(); @@ -1029,21 +1068,21 @@ mod tests { let a = ListArray::from(a_list_data); let item_field = Field::new("item", a_list_type, true); - let mut builder = - LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap(); - builder.write(&a, 2..4); + let mut builder = levels(&item_field, a); + builder.write(2..4); let levels = builder.finish(); assert_eq!(levels.len(), 1); let list_level = levels.get(0).unwrap(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![0, 3, 3, 3]), rep_levels: Some(vec![0, 0, 1, 1]), non_null_indices: vec![3, 4, 5], max_def_level: 3, max_rep_level: 1, + array: Arc::new(a_values), }; assert_eq!(list_level, &expected_level); } @@ -1100,19 +1139,19 @@ mod tests { let g = ListArray::from(g_list_data); let e = StructArray::from(vec![ - (struct_field_f, Arc::new(f) as ArrayRef), + (struct_field_f, Arc::new(f.clone()) as ArrayRef), (struct_field_g, Arc::new(g) as ArrayRef), ]); let c = StructArray::from(vec![ - (struct_field_d, Arc::new(d) as ArrayRef), + (struct_field_d, Arc::new(d.clone()) as ArrayRef), (struct_field_e, Arc::new(e) as ArrayRef), ]); // build a record batch let batch = RecordBatch::try_new( Arc::new(schema), - vec![Arc::new(a), Arc::new(b), Arc::new(c)], + vec![Arc::new(a.clone()), Arc::new(b.clone()), Arc::new(c)], ) .unwrap(); @@ -1132,48 +1171,52 @@ mod tests { // test "a" levels let list_level = levels.get(0).unwrap(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: None, rep_levels: None, non_null_indices: vec![0, 1, 2, 3, 4], max_def_level: 0, max_rep_level: 0, + array: Arc::new(a), }; assert_eq!(list_level, &expected_level); // test "b" levels let list_level = levels.get(1).unwrap(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![1, 0, 0, 1, 1]), rep_levels: None, non_null_indices: vec![0, 3, 4], max_def_level: 1, max_rep_level: 0, + array: Arc::new(b), }; assert_eq!(list_level, &expected_level); // test "d" levels let list_level = levels.get(2).unwrap(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![1, 1, 1, 2, 1]), rep_levels: None, non_null_indices: vec![3], max_def_level: 2, max_rep_level: 0, + array: Arc::new(d), }; assert_eq!(list_level, &expected_level); // test "f" levels let list_level = levels.get(3).unwrap(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![3, 2, 3, 2, 3]), rep_levels: None, non_null_indices: vec![0, 2, 4], max_def_level: 3, max_rep_level: 0, + array: Arc::new(f), }; assert_eq!(list_level, &expected_level); } @@ -1270,27 +1313,31 @@ mod tests { }); assert_eq!(levels.len(), 2); + let map = batch.column(0).as_map(); + // test key levels let list_level = levels.get(0).unwrap(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![1; 7]), rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]), non_null_indices: vec![0, 1, 2, 3, 4, 5, 6], max_def_level: 1, max_rep_level: 1, + array: map.keys().clone(), }; assert_eq!(list_level, &expected_level); // test values levels let list_level = levels.get(1).unwrap(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]), rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]), non_null_indices: vec![0, 1, 2, 4, 6], max_def_level: 2, max_rep_level: 1, + array: map.values().clone(), }; assert_eq!(list_level, &expected_level); } @@ -1358,7 +1405,8 @@ mod tests { let array = Arc::new(list_builder.finish()); - let values_len = array.values().len(); + let values = array.values().as_struct().column(0).clone(); + let values_len = values.len(); assert_eq!(values_len, 5); let schema = Arc::new(Schema::new(vec![list_field])); @@ -1368,12 +1416,13 @@ mod tests { let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap(); let list_level = &levels[0]; - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]), rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]), non_null_indices: vec![0, 4], max_def_level: 4, max_rep_level: 1, + array: values, }; assert_eq!(list_level, &expected_level); @@ -1391,6 +1440,7 @@ mod tests { None, // Masked by struct array None, ]); + let values = inner.values().clone(); // This test assumes that nulls don't take up space assert_eq!(inner.values().len(), 7); @@ -1406,12 +1456,13 @@ mod tests { assert_eq!(levels.len(), 1); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]), rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]), non_null_indices: vec![0, 1, 5, 6], max_def_level: 4, max_rep_level: 1, + array: values, }; assert_eq!(&levels[0], &expected_level); @@ -1422,14 +1473,16 @@ mod tests { // Test the null mask of a struct array and the null mask of a list array // masking out non-null elements of their children - let a1 = Arc::new(ListArray::from_iter_primitive::(vec![ + let a1 = ListArray::from_iter_primitive::(vec![ Some(vec![None]), // Masked by list array Some(vec![]), // Masked by list array Some(vec![Some(3), None]), Some(vec![Some(4), Some(5), None, Some(6)]), // Masked by struct array None, None, - ])) as ArrayRef; + ]); + let a1_values = a1.values().clone(); + let a1 = Arc::new(a1) as ArrayRef; let a2 = Arc::new(Int32Array::from_iter(vec![ Some(1), // Masked by list array @@ -1439,6 +1492,7 @@ mod tests { Some(5), None, ])) as ArrayRef; + let a2_values = a2.clone(); let field_a1 = Arc::new(Field::new("list", a1.data_type().clone(), true)); let field_a2 = Arc::new(Field::new("integers", a2.data_type().clone(), true)); @@ -1486,22 +1540,24 @@ mod tests { assert_eq!(levels.len(), 2); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]), rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]), non_null_indices: vec![1], max_def_level: 6, max_rep_level: 2, + array: a1_values, }; assert_eq!(&levels[0], &expected_level); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]), rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]), non_null_indices: vec![4], max_def_level: 4, max_rep_level: 1, + array: a2_values, }; assert_eq!(&levels[1], &expected_level); @@ -1522,23 +1578,24 @@ mod tests { builder.values().append_slice(&[9, 10]); builder.append(false); let a = builder.finish(); + let values = a.values().clone(); let item_field = Field::new("item", a.data_type().clone(), true); - let mut builder = - LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap(); - builder.write(&a, 1..4); + let mut builder = levels(&item_field, a); + builder.write(1..4); let levels = builder.finish(); assert_eq!(levels.len(), 1); let list_level = levels.get(0).unwrap(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![0, 0, 3, 3]), rep_levels: Some(vec![0, 0, 0, 1]), non_null_indices: vec![6, 7], max_def_level: 3, max_rep_level: 1, + array: values, }; assert_eq!(list_level, &expected_level); } @@ -1670,6 +1727,10 @@ mod tests { assert_eq!(array.values().len(), 8); assert_eq!(array.len(), 4); + let struct_values = array.values().as_struct(); + let values_a = struct_values.column(0).clone(); + let values_b = struct_values.column(1).clone(); + let schema = Arc::new(Schema::new(vec![list_field])); let rb = RecordBatch::try_new(schema, vec![array]).unwrap(); @@ -1678,20 +1739,22 @@ mod tests { let b_levels = &levels[1]; // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]] - let expected_a = LevelInfo { + let expected_a = ArrayLevels { def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]), rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]), non_null_indices: vec![0, 7], max_def_level: 4, max_rep_level: 1, + array: values_a, }; // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]] - let expected_b = LevelInfo { + let expected_b = ArrayLevels { def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]), rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]), non_null_indices: vec![0, 6, 7], max_def_level: 3, max_rep_level: 1, + array: values_b, }; assert_eq!(a_levels, &expected_a); @@ -1704,24 +1767,25 @@ mod tests { builder.append(true); builder.append(false); builder.append(true); - let a = builder.finish(); + let array = builder.finish(); + let values = array.values().clone(); - let item_field = Field::new("item", a.data_type().clone(), true); - let mut builder = - LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap(); - builder.write(&a, 0..3); + let item_field = Field::new("item", array.data_type().clone(), true); + let mut builder = levels(&item_field, array); + builder.write(0..3); let levels = builder.finish(); assert_eq!(levels.len(), 1); let list_level = levels.get(0).unwrap(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![1, 0, 1]), rep_levels: Some(vec![0, 0, 0]), non_null_indices: vec![], max_def_level: 3, max_rep_level: 1, + array: values, }; assert_eq!(list_level, &expected_level); } @@ -1744,19 +1808,20 @@ mod tests { builder.values().append_null(); builder.append(false); let a = builder.finish(); + let values = a.values().as_list::().values().clone(); let item_field = Field::new("item", a.data_type().clone(), true); - let mut builder = - LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap(); - builder.write(&a, 0..4); + let mut builder = levels(&item_field, a); + builder.write(0..4); let levels = builder.finish(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]), rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]), non_null_indices: vec![0, 2, 3, 4, 5], max_def_level: 5, max_rep_level: 2, + array: values, }; assert_eq!(levels[0], expected_level); @@ -1777,17 +1842,22 @@ mod tests { let item_field = Field::new("item", dict.data_type().clone(), true); - let mut builder = - LevelInfoBuilder::try_new(&item_field, Default::default()).unwrap(); - builder.write(&dict, 0..4); + let mut builder = levels(&item_field, dict.clone()); + builder.write(0..4); let levels = builder.finish(); - let expected_level = LevelInfo { + let expected_level = ArrayLevels { def_levels: Some(vec![0, 0, 1, 1]), rep_levels: None, non_null_indices: vec![2, 3], max_def_level: 1, max_rep_level: 0, + array: Arc::new(dict), }; assert_eq!(levels[0], expected_level); } + + fn levels(field: &Field, array: T) -> LevelInfoBuilder { + let v = Arc::new(array) as ArrayRef; + LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap() + } } diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 2e170738f1a8..02c72b6c979d 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -28,8 +28,10 @@ use thrift::protocol::{TCompactOutputProtocol, TSerializable}; use arrow_array::cast::AsArray; use arrow_array::types::*; -use arrow_array::{Array, FixedSizeListArray, RecordBatch, RecordBatchWriter}; -use arrow_schema::{ArrowError, DataType as ArrowDataType, IntervalUnit, SchemaRef}; +use arrow_array::{ArrayRef, RecordBatch, RecordBatchWriter}; +use arrow_schema::{ + ArrowError, DataType as ArrowDataType, Field, IntervalUnit, SchemaRef, +}; use super::schema::{ add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema, @@ -49,7 +51,7 @@ use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; use crate::file::writer::SerializedFileWriter; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; -use levels::{calculate_array_levels, LevelInfo}; +use levels::{calculate_array_levels, ArrayLevels}; mod byte_array; mod levels; @@ -150,7 +152,7 @@ impl ArrowWriter { Some(in_progress) => in_progress .writers .iter() - .map(|(_, x)| x.get_estimated_total_bytes() as usize) + .map(|x| x.get_estimated_total_bytes()) .sum(), None => 0, } @@ -274,7 +276,7 @@ impl ChunkReader for ArrowColumnChunk { } } -/// A [`Read`] for an iterator of [`Bytes`] +/// A [`Read`] for [`ArrowColumnChunk`] pub struct ArrowColumnChunkReader(Peekable>); impl Read for ArrowColumnChunkReader { @@ -347,31 +349,68 @@ impl PageWriter for ArrowPageWriter { } } -/// Encodes a leaf column to [`ArrowPageWriter`] -enum ArrowColumnWriter { +/// A leaf column that can be encoded by [`ArrowColumnWriter`] +pub struct ArrowLeafColumn(ArrayLevels); + +/// Computes the [`ArrowLeafColumn`] for a given potentially nested [`ArrayRef`] +pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result> { + let levels = calculate_array_levels(array, field)?; + Ok(levels.into_iter().map(ArrowLeafColumn).collect()) +} + +/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`] +pub struct ArrowColumnWriter { + writer: ArrowColumnWriterImpl, + chunk: SharedColumnChunk, +} + +enum ArrowColumnWriterImpl { ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>), Column(ColumnWriter<'static>), } impl ArrowColumnWriter { + /// Write an [`ArrowLeafColumn`] + pub fn write(&mut self, col: &ArrowLeafColumn) -> Result<()> { + match &mut self.writer { + ArrowColumnWriterImpl::Column(c) => { + write_leaf(c, &col.0)?; + } + ArrowColumnWriterImpl::ByteArray(c) => { + write_primitive(c, col.0.array().as_ref(), &col.0)?; + } + } + Ok(()) + } + + /// Close this column returning the [`ArrowColumnChunk`] and [`ColumnCloseResult`] + pub fn close(self) -> Result<(ArrowColumnChunk, ColumnCloseResult)> { + let result = match self.writer { + ArrowColumnWriterImpl::ByteArray(c) => c.close()?, + ArrowColumnWriterImpl::Column(c) => c.close()?, + }; + let chunk = Arc::try_unwrap(self.chunk).ok().unwrap(); + Ok((chunk.into_inner().unwrap(), result)) + } + /// Returns the estimated total bytes for this column writer - fn get_estimated_total_bytes(&self) -> u64 { - match self { - ArrowColumnWriter::ByteArray(c) => c.get_estimated_total_bytes(), - ArrowColumnWriter::Column(c) => c.get_estimated_total_bytes(), + pub fn get_estimated_total_bytes(&self) -> usize { + match &self.writer { + ArrowColumnWriterImpl::ByteArray(c) => c.get_estimated_total_bytes() as _, + ArrowColumnWriterImpl::Column(c) => c.get_estimated_total_bytes() as _, } } } /// Encodes [`RecordBatch`] to a parquet row group -pub struct ArrowRowGroupWriter { - writers: Vec<(SharedColumnChunk, ArrowColumnWriter)>, +struct ArrowRowGroupWriter { + writers: Vec, schema: SchemaRef, buffered_rows: usize, } impl ArrowRowGroupWriter { - pub fn new( + fn new( parquet: &SchemaDescriptor, props: &WriterPropertiesPtr, arrow: &SchemaRef, @@ -388,51 +427,50 @@ impl ArrowRowGroupWriter { }) } - pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + fn write(&mut self, batch: &RecordBatch) -> Result<()> { self.buffered_rows += batch.num_rows(); - let mut writers = self.writers.iter_mut().map(|(_, x)| x); - for (array, field) in batch.columns().iter().zip(&self.schema.fields) { - let mut levels = calculate_array_levels(array, field)?.into_iter(); - write_leaves(&mut writers, &mut levels, array.as_ref())?; + let mut writers = self.writers.iter_mut(); + for (field, column) in self.schema.fields().iter().zip(batch.columns()) { + for leaf in compute_leaves(field.as_ref(), column)? { + writers.next().unwrap().write(&leaf)? + } } Ok(()) } - pub fn close(self) -> Result> { + fn close(self) -> Result> { self.writers .into_iter() - .map(|(chunk, writer)| { - let close_result = match writer { - ArrowColumnWriter::ByteArray(c) => c.close()?, - ArrowColumnWriter::Column(c) => c.close()?, - }; - - let chunk = Arc::try_unwrap(chunk).ok().unwrap().into_inner().unwrap(); - Ok((chunk, close_result)) - }) + .map(|writer| writer.close()) .collect() } } -/// Get an [`ArrowColumnWriter`] along with a reference to its [`SharedColumnChunk`] +/// Gets the [`ArrowColumnWriter`] for the given `data_type` fn get_arrow_column_writer( data_type: &ArrowDataType, props: &WriterPropertiesPtr, leaves: &mut Iter<'_, ColumnDescPtr>, - out: &mut Vec<(SharedColumnChunk, ArrowColumnWriter)>, + out: &mut Vec, ) -> Result<()> { let col = |desc: &ColumnDescPtr| { let page_writer = Box::::default(); let chunk = page_writer.buffer.clone(); let writer = get_column_writer(desc.clone(), props.clone(), page_writer); - (chunk, ArrowColumnWriter::Column(writer)) + ArrowColumnWriter { + chunk, + writer: ArrowColumnWriterImpl::Column(writer), + } }; let bytes = |desc: &ColumnDescPtr| { let page_writer = Box::::default(); let chunk = page_writer.buffer.clone(); let writer = GenericColumnWriter::new(desc.clone(), props.clone(), page_writer); - (chunk, ArrowColumnWriter::ByteArray(writer)) + ArrowColumnWriter { + chunk, + writer: ArrowColumnWriterImpl::ByteArray(writer), + } }; match data_type { @@ -478,52 +516,8 @@ fn get_arrow_column_writer( Ok(()) } -/// Write the leaves of `array` in depth-first order to `writers` with `levels` -fn write_leaves<'a, W>( - writers: &mut W, - levels: &mut IntoIter, - array: &(dyn Array + 'static), -) -> Result<()> -where - W: Iterator, -{ - match array.data_type() { - ArrowDataType::List(_) => { - write_leaves(writers, levels, array.as_list::().values().as_ref())? - } - ArrowDataType::LargeList(_) => { - write_leaves(writers, levels, array.as_list::().values().as_ref())? - } - ArrowDataType::FixedSizeList(_, _) => { - let array = array.as_any().downcast_ref::().unwrap(); - write_leaves(writers, levels, array.values().as_ref())? - } - ArrowDataType::Struct(_) => { - for column in array.as_struct().columns() { - write_leaves(writers, levels, column.as_ref())? - } - } - ArrowDataType::Map(_, _) => { - let map = array.as_map(); - write_leaves(writers, levels, map.keys().as_ref())?; - write_leaves(writers, levels, map.values().as_ref())? - } - _ => { - let levels = levels.next().unwrap(); - match writers.next().unwrap() { - ArrowColumnWriter::Column(c) => write_leaf(c, array, levels)?, - ArrowColumnWriter::ByteArray(c) => write_primitive(c, array, levels)?, - }; - } - } - Ok(()) -} - -fn write_leaf( - writer: &mut ColumnWriter<'_>, - column: &dyn Array, - levels: LevelInfo, -) -> Result { +fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result { + let column = levels.array().as_ref(); let indices = levels.non_null_indices(); match writer { ColumnWriter::Int32ColumnWriter(ref mut typed) => { @@ -678,7 +672,7 @@ fn write_leaf( fn write_primitive( writer: &mut GenericColumnWriter, values: &E::Values, - levels: LevelInfo, + levels: &ArrayLevels, ) -> Result { writer.write_batch_internal( values, From 9ce932306c17a3f320c786c6e80047ccc88dcbab Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 27 Sep 2023 17:12:23 +0100 Subject: [PATCH 2/7] Revert OnCloseRowGroup Send --- parquet/src/file/writer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 859a0aa1f902..cafb1761352d 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -115,8 +115,7 @@ pub type OnCloseRowGroup<'a> = Box< Vec>, Vec>, ) -> Result<()> - + 'a - + Send, + + 'a, >; // ---------------------------------------------------------------------- From 9c07441a65feb91e83e11586763a9e49ca0c33c0 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 27 Sep 2023 18:27:46 +0100 Subject: [PATCH 3/7] Add example --- parquet/src/arrow/arrow_writer/mod.rs | 99 +++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 02c72b6c979d..3722d2c11265 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -248,7 +248,7 @@ impl RecordBatchWriter for ArrowWriter { } } -/// A list of [`Bytes`] comprising a single column chunk +/// A single column chunk produced by [`ArrowColumnWriter`] #[derive(Default)] pub struct ArrowColumnChunk { length: usize, @@ -352,13 +352,88 @@ impl PageWriter for ArrowPageWriter { /// A leaf column that can be encoded by [`ArrowColumnWriter`] pub struct ArrowLeafColumn(ArrayLevels); -/// Computes the [`ArrowLeafColumn`] for a given potentially nested [`ArrayRef`] +/// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`] pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result> { let levels = calculate_array_levels(array, field)?; Ok(levels.into_iter().map(ArrowLeafColumn).collect()) } /// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`] +/// +/// Note: This is a low-level interface for applications that require fine-grained control +/// of encoding, see [`ArrowWriter`] for a higher-level interface +/// +/// ``` +/// // The arrow schema +/// # use std::sync::Arc; +/// # use arrow_array::*; +/// # use arrow_schema::*; +/// # use parquet::arrow::arrow_to_parquet_schema; +/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, get_column_writers}; +/// # use parquet::file::properties::WriterProperties; +/// # use parquet::file::writer::SerializedFileWriter; +/// # +/// let schema = Arc::new(Schema::new(vec![ +/// Field::new("i32", DataType::Int32, false), +/// Field::new("f32", DataType::Float32, false), +/// ])); +/// +/// // Compute the parquet schema +/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap(); +/// let props = Arc::new(WriterProperties::default()); +/// +/// // Create writers for each of the leaf columns +/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap(); +/// +/// // Spawn a worker thread for each column +/// // This is for demonstration purposes, a thread-pool e.g. rayon or tokio, would be better +/// let mut workers: Vec<_> = col_writers +/// .into_iter() +/// .map(|mut col_writer| { +/// let (send, recv) = std::sync::mpsc::channel::(); +/// let handle = std::thread::spawn(move || { +/// for col in recv { +/// col_writer.write(&col)?; +/// } +/// col_writer.close() +/// }); +/// (handle, send) +/// }) +/// .collect(); +/// +/// // Create parquet writer +/// let root_schema = parquet_schema.root_schema_ptr(); +/// let mut out = Vec::with_capacity(1024); // This could be a File +/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, props.clone()).unwrap(); +/// +/// // Start row group +/// let mut row_group = writer.next_row_group().unwrap(); +/// +/// // Columns to encode +/// let to_write = vec![ +/// Arc::new(Int32Array::from_iter_values([1, 2, 3])) as _, +/// Arc::new(Float32Array::from_iter_values([1., 45., -1.])) as _, +/// ]; +/// +/// // Spawn work to encode columns +/// let mut worker_iter = workers.iter_mut(); +/// for (a, f) in to_write.iter().zip(&schema.fields) { +/// for c in compute_leaves(f, a).unwrap() { +/// worker_iter.next().unwrap().1.send(c).unwrap(); +/// } +/// } +/// +/// // Finish up parallel column encoding +/// for (handle, send) in workers { +/// drop(send); // Drop send side to signal termination +/// let (chunk, result) = handle.join().unwrap().unwrap(); +/// row_group.append_column(&chunk, result).unwrap(); +/// } +/// row_group.close().unwrap(); +/// +/// let metadata = writer.close().unwrap(); +/// assert_eq!(metadata.num_rows, 3); +/// ``` pub struct ArrowColumnWriter { writer: ArrowColumnWriterImpl, chunk: SharedColumnChunk, @@ -415,11 +490,7 @@ impl ArrowRowGroupWriter { props: &WriterPropertiesPtr, arrow: &SchemaRef, ) -> Result { - let mut writers = Vec::with_capacity(arrow.fields.len()); - let mut leaves = parquet.columns().iter(); - for field in &arrow.fields { - get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?; - } + let writers = get_column_writers(parquet, props, arrow)?; Ok(Self { writers, schema: arrow.clone(), @@ -446,6 +517,20 @@ impl ArrowRowGroupWriter { } } +/// Returns the [`ArrowColumnWriter`] for a given schema +pub fn get_column_writers( + parquet: &SchemaDescriptor, + props: &WriterPropertiesPtr, + arrow: &SchemaRef, +) -> Result> { + let mut writers = Vec::with_capacity(arrow.fields.len()); + let mut leaves = parquet.columns().iter(); + for field in &arrow.fields { + get_arrow_column_writer(field.data_type(), props, &mut leaves, &mut writers)?; + } + Ok(writers) +} + /// Gets the [`ArrowColumnWriter`] for the given `data_type` fn get_arrow_column_writer( data_type: &ArrowDataType, From a4c17a902c11c4371eef4bd95d73a7bc8f951238 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 29 Sep 2023 10:09:32 +0100 Subject: [PATCH 4/7] Review feedback --- parquet/src/arrow/arrow_writer/mod.rs | 71 +++++++++++++++++++-------- 1 file changed, 51 insertions(+), 20 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 3722d2c11265..9471ad2d18ce 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -18,7 +18,6 @@ //! Contains writer which writes arrow data into parquet data. use bytes::Bytes; -use std::fmt::Debug; use std::io::{Read, Write}; use std::iter::Peekable; use std::slice::Iter; @@ -49,7 +48,7 @@ use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ColumnChunkMetaData, KeyValue, RowGroupMetaDataPtr}; use crate::file::properties::{WriterProperties, WriterPropertiesPtr}; use crate::file::reader::{ChunkReader, Length}; -use crate::file::writer::SerializedFileWriter; +use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter}; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; use levels::{calculate_array_levels, ArrayLevels}; @@ -99,7 +98,7 @@ pub struct ArrowWriter { max_row_group_size: usize, } -impl Debug for ArrowWriter { +impl std::fmt::Debug for ArrowWriter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let buffered_memory = self.in_progress_size(); f.debug_struct("ArrowWriter") @@ -210,8 +209,8 @@ impl ArrowWriter { }; let mut row_group_writer = self.writer.next_row_group()?; - for (chunk, close) in in_progress.close()? { - row_group_writer.append_column(&chunk, close)?; + for chunk in in_progress.close()? { + chunk.append_to_row_group(&mut row_group_writer)?; } row_group_writer.close()?; Ok(()) @@ -250,18 +249,18 @@ impl RecordBatchWriter for ArrowWriter { /// A single column chunk produced by [`ArrowColumnWriter`] #[derive(Default)] -pub struct ArrowColumnChunk { +struct ArrowColumnChunkData { length: usize, data: Vec, } -impl Length for ArrowColumnChunk { +impl Length for ArrowColumnChunkData { fn len(&self) -> u64 { self.length as _ } } -impl ChunkReader for ArrowColumnChunk { +impl ChunkReader for ArrowColumnChunkData { type T = ArrowColumnChunkReader; fn get_read(&self, start: u64) -> Result { @@ -276,8 +275,8 @@ impl ChunkReader for ArrowColumnChunk { } } -/// A [`Read`] for [`ArrowColumnChunk`] -pub struct ArrowColumnChunkReader(Peekable>); +/// A [`Read`] for [`ArrowColumnChunkData`] +struct ArrowColumnChunkReader(Peekable>); impl Read for ArrowColumnChunkReader { fn read(&mut self, out: &mut [u8]) -> std::io::Result { @@ -299,11 +298,11 @@ impl Read for ArrowColumnChunkReader { } } -/// A shared [`ArrowColumnChunk`] +/// A shared [`ArrowColumnChunkData`] /// /// This allows it to be owned by [`ArrowPageWriter`] whilst allowing access via /// [`ArrowRowGroupWriter`] on flush, without requiring self-referential borrows -type SharedColumnChunk = Arc>; +type SharedColumnChunk = Arc>; #[derive(Default)] struct ArrowPageWriter { @@ -350,6 +349,7 @@ impl PageWriter for ArrowPageWriter { } /// A leaf column that can be encoded by [`ArrowColumnWriter`] +#[derive(Debug)] pub struct ArrowLeafColumn(ArrayLevels); /// Computes the [`ArrowLeafColumn`] for a potentially nested [`ArrayRef`] @@ -358,7 +358,31 @@ pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result) -> std::fmt::Result { + f.debug_struct("ArrowColumnChunk") + .field("length", &self.data.length) + .finish_non_exhaustive() + } +} + +impl ArrowColumnChunk { + /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data + pub fn append_to_row_group( + self, + writer: &mut SerializedRowGroupWriter<'_, W>, + ) -> Result<()> { + writer.append_column(&self.data, self.close) + } +} + +/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunkData`] /// /// Note: This is a low-level interface for applications that require fine-grained control /// of encoding, see [`ArrowWriter`] for a higher-level interface @@ -426,8 +450,8 @@ pub fn compute_leaves(field: &Field, array: &ArrayRef) -> Result) -> std::fmt::Result { + f.debug_struct("ArrowColumnWriter").finish_non_exhaustive() + } +} + enum ArrowColumnWriterImpl { ByteArray(GenericColumnWriter<'static, ByteArrayEncoder>), Column(ColumnWriter<'static>), @@ -458,14 +488,15 @@ impl ArrowColumnWriter { Ok(()) } - /// Close this column returning the [`ArrowColumnChunk`] and [`ColumnCloseResult`] - pub fn close(self) -> Result<(ArrowColumnChunk, ColumnCloseResult)> { - let result = match self.writer { + /// Close this column returning the written [`ArrowColumnChunk`] + pub fn close(self) -> Result { + let close = match self.writer { ArrowColumnWriterImpl::ByteArray(c) => c.close()?, ArrowColumnWriterImpl::Column(c) => c.close()?, }; let chunk = Arc::try_unwrap(self.chunk).ok().unwrap(); - Ok((chunk.into_inner().unwrap(), result)) + let data = chunk.into_inner().unwrap(); + Ok(ArrowColumnChunk { data, close }) } /// Returns the estimated total bytes for this column writer @@ -509,7 +540,7 @@ impl ArrowRowGroupWriter { Ok(()) } - fn close(self) -> Result> { + fn close(self) -> Result> { self.writers .into_iter() .map(|writer| writer.close()) From 44c6713087df3f3eb645be94370d3ad353abd881 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 29 Sep 2023 10:10:30 +0100 Subject: [PATCH 5/7] Fix doc --- parquet/src/arrow/arrow_writer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 9471ad2d18ce..3ea7c6a2d3bf 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -382,7 +382,7 @@ impl ArrowColumnChunk { } } -/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunkData`] +/// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`] /// /// Note: This is a low-level interface for applications that require fine-grained control /// of encoding, see [`ArrowWriter`] for a higher-level interface From 10ab137a7da4936f7e35bd93d17d6f4b9fe94044 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 29 Sep 2023 10:11:43 +0100 Subject: [PATCH 6/7] Further review feedback --- parquet/src/arrow/arrow_writer/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 3ea7c6a2d3bf..43e29d86e8ae 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -441,9 +441,9 @@ impl ArrowColumnChunk { /// /// // Spawn work to encode columns /// let mut worker_iter = workers.iter_mut(); -/// for (a, f) in to_write.iter().zip(&schema.fields) { -/// for c in compute_leaves(f, a).unwrap() { -/// worker_iter.next().unwrap().1.send(c).unwrap(); +/// for (arr, field) in to_write.iter().zip(&schema.fields) { +/// for leaves in compute_leaves(field, arr).unwrap() { +/// worker_iter.next().unwrap().1.send(leaves).unwrap(); /// } /// } /// From 174c555a44d739a9087d676db97977f9f6ccacae Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 29 Sep 2023 10:18:13 +0100 Subject: [PATCH 7/7] More docs --- parquet/src/arrow/arrow_writer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 43e29d86e8ae..5dae81d4711c 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -55,7 +55,7 @@ use levels::{calculate_array_levels, ArrayLevels}; mod byte_array; mod levels; -/// Arrow writer +/// Encodes [`RecordBatch`] to parquet /// /// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] will be encoded /// to the same row group, up to `max_row_group_size` rows. Any remaining rows will be