Skip to content

Commit

Permalink
extract statistics read for struct array in parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Jul 12, 2024
1 parent 4d04a6e commit 15cd7c5
Show file tree
Hide file tree
Showing 3 changed files with 322 additions and 49 deletions.
213 changes: 173 additions & 40 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::{
new_empty_array, new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, Float16Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, StructArray,
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
Expand Down Expand Up @@ -989,6 +989,11 @@ macro_rules! get_data_page_statistics {
}
}

fn find_parquet_idx(parquet_schema: &SchemaDescriptor, root_idx: usize) -> Option<usize> {
(0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)
}

/// Lookups up the parquet column by name
///
/// Returns the parquet column index and the corresponding arrow field
Expand All @@ -998,20 +1003,31 @@ pub(crate) fn parquet_column<'a>(
name: &str,
) -> Option<(usize, &'a FieldRef)> {
let (root_idx, field) = arrow_schema.fields.find(name)?;
if field.data_type().is_nested() {
// Nested fields are not supported and require non-trivial logic
// to correctly walk the parquet schema accounting for the
// logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
//
// For example a ListArray could correspond to anything from 1 to 3 levels
// in the parquet schema
return None;
if !field.data_type().is_nested() {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
return Some((parquet_idx, field));
}
// Nested field
match field.data_type() {
DataType::Struct(_) => {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
Some((parquet_idx, field))
}
_ => {
if field.data_type().is_nested() {
// Nested fields are not supported and require non-trivial logic
// to correctly walk the parquet schema accounting for the
// logical type rules - <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
//
// For example a ListArray could correspond to anything from 1 to 3 levels
// in the parquet schema
None
} else {
let parquet_idx = find_parquet_idx(parquet_schema, root_idx)?;
Some((parquet_idx, field))
}
}
}

// This could be made more efficient (#TBD)
let parquet_idx = (0..parquet_schema.columns().len())
.find(|x| parquet_schema.get_column_root_idx(*x) == root_idx)?;
Some((parquet_idx, field))
}

/// Extracts the min statistics from an iterator of [`ParquetStatistics`] to an
Expand Down Expand Up @@ -1234,7 +1250,87 @@ impl<'a> StatisticsConverter<'a> {
arrow_field,
})
}
/// recursively get the corresponding statistics for all the column data, used for
/// DataType::Struct
pub(crate) fn get_statistics_min_max_recursive(
metadata: &[&RowGroupMetaData],
index: &mut usize,
is_min: bool,
data_type: &DataType,
) -> Result<ArrayRef> {
match data_type.is_nested() {
false => {
let iterator = metadata.iter().map(|meta| {
let stat = meta.column(*index).statistics();
stat
});
let stat = if is_min {
min_statistics(data_type, iterator)
} else {
max_statistics(data_type, iterator)
};
*index += 1;
stat
}
true => {
if let DataType::Struct(fields) = data_type {
let field_arrays: Vec<_> = fields
.iter()
.map(|field| {
let array = Self::get_statistics_min_max_recursive(
metadata,
index,
is_min,
field.data_type(),
)?;
Ok((field.clone(), array))
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(StructArray::from(field_arrays)) as ArrayRef)
} else {
plan_err!("unsupported nested data type for extracting statistics")
}
}
}
}
/// recursively get the corresponding statistics for all the column data, used for
/// DataType::Struct
pub(crate) fn get_null_counts_recursive(
metadata: &[&RowGroupMetaData],
index: usize,
data_type: &DataType,
) -> Vec<u64> {
if let DataType::Struct(fields) = data_type {
let num_row_groups = metadata.len();
let mut null_counts = vec![0; num_row_groups];

fields.iter().for_each(|field| {
let field_null_counts = Self::get_null_counts_recursive(
metadata,
index + 1,
field.data_type(),
);
null_counts
.iter_mut()
.zip(field_null_counts)
.for_each(|(acc, count)| {
*acc += count;
});
});

null_counts
} else {
metadata
.iter()
.map(|meta| {
meta.column(index)
.statistics()
.map(|s| s.null_count())
.unwrap_or(0)
})
.collect()
}
}
/// Extract the minimum values from row group statistics in [`RowGroupMetaData`]
///
/// # Return Value
Expand Down Expand Up @@ -1284,11 +1380,22 @@ impl<'a> StatisticsConverter<'a> {
let Some(parquet_index) = self.parquet_index else {
return Ok(self.make_null_array(data_type, metadatas));
};

let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
min_statistics(data_type, iter)
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};
match data_type {
// In a Rowgroup, parquet for nested struct members,
// each one is also stored in the Column of RowGroupMetadata in order.
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
Self::get_statistics_min_max_recursive(
&group_vec, &mut 0, true, data_type,
)
}
_ => min_statistics(data_type, create_iterator(metadatas, parquet_index)),
}
}

/// Extract the maximum values from row group statistics in [`RowGroupMetaData`]
Expand All @@ -1304,10 +1411,22 @@ impl<'a> StatisticsConverter<'a> {
return Ok(self.make_null_array(data_type, metadatas));
};

let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
max_statistics(data_type, iter)
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};
match data_type {
// In a Rowgroup, parquet for nested struct members,
// each one is also stored in the Column of RowGroupMetadata in order.
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
Self::get_statistics_min_max_recursive(
&group_vec, &mut 0, false, data_type,
)
}
_ => max_statistics(data_type, create_iterator(metadatas, parquet_index)),
}
}

/// Extract the null counts from row group statistics in [`RowGroupMetaData`]
Expand All @@ -1317,18 +1436,33 @@ impl<'a> StatisticsConverter<'a> {
where
I: IntoIterator<Item = &'a RowGroupMetaData>,
{
let data_type = self.arrow_field.data_type();

let Some(parquet_index) = self.parquet_index else {
let num_row_groups = metadatas.into_iter().count();
return Ok(UInt64Array::from_iter(
std::iter::repeat(None).take(num_row_groups),
));
};
let create_iterator = |metadatas: I, parquet_index: usize| {
metadatas
.into_iter()
.map(move |x| x.column(parquet_index).statistics())
};

let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.map(|s| s.null_count()));
Ok(UInt64Array::from_iter(null_counts))
match data_type {
DataType::Struct(_) => {
let group_vec: Vec<&RowGroupMetaData> = metadatas.into_iter().collect();
let null_counts =
Self::get_null_counts_recursive(&group_vec, 0, data_type);
Ok(UInt64Array::from_iter(null_counts))
}
_ => {
let null_counts = create_iterator(metadatas, parquet_index)
.map(|s| s.map(|s| s.null_count()));
Ok(UInt64Array::from_iter(null_counts))
}
}
}

/// Extract the minimum values from Data Page statistics.
Expand Down Expand Up @@ -1541,10 +1675,10 @@ mod test {
use arrow::compute::kernels::cast_utils::Parser;
use arrow::datatypes::{i256, Date32Type, Date64Type};
use arrow_array::{
new_empty_array, new_null_array, Array, BinaryArray, BooleanArray, Date32Array,
Date64Array, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch,
StringArray, StructArray, TimestampNanosecondArray,
new_empty_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray,
StructArray, TimestampNanosecondArray,
};
use arrow_schema::{Field, SchemaRef};
use bytes::Bytes;
Expand Down Expand Up @@ -1988,7 +2122,7 @@ mod test {

#[test]
fn roundtrip_struct() {
let mut test = Test {
let test = Test {
input: struct_array(vec![
// row group 1
(Some(true), Some(1)),
Expand All @@ -2005,22 +2139,18 @@ mod test {
]),
expected_min: struct_array(vec![
(Some(true), Some(1)),
(Some(true), Some(0)),
(Some(false), Some(0)),
(None, None),
]),

expected_max: struct_array(vec![
(Some(true), Some(3)),
(Some(true), Some(0)),
(Some(true), Some(5)),
(None, None),
]),
};
// Due to https://github.com/apache/datafusion/issues/8334,
// statistics for struct arrays are not supported
test.expected_min =
new_null_array(test.input.data_type(), test.expected_min.len());
test.expected_max =
new_null_array(test.input.data_type(), test.expected_min.len());
test.run()
}

Expand Down Expand Up @@ -2376,7 +2506,10 @@ mod test {
let row_groups = metadata.row_groups();

for field in schema.fields() {
if field.data_type().is_nested() {
let data_type = field.data_type();
if field.data_type().is_nested()
&& !matches!(data_type, &DataType::Struct(_))
{
let lookup = parquet_column(parquet_schema, &schema, field.name());
assert_eq!(lookup, None);
continue;
Expand Down
Loading

0 comments on commit 15cd7c5

Please sign in to comment.