Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write backwards compatible row group statistics (#3526) #3527

Merged
merged 4 commits into from
Jan 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,13 @@ pub enum SortOrder {
UNDEFINED,
}

impl SortOrder {
/// Returns true if this is [`Self::SIGNED`]
pub fn is_signed(&self) -> bool {
matches!(self, Self::SIGNED)
}
}

/// Column order that specifies what method was used to aggregate min/max values for
/// statistics.
///
Expand Down
21 changes: 19 additions & 2 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::encodings::levels::LevelEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
use crate::file::statistics::Statistics;
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::file::{
metadata::ColumnChunkMetaData,
properties::{WriterProperties, WriterPropertiesPtr, WriterVersion},
Expand Down Expand Up @@ -817,13 +817,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.set_dictionary_page_offset(dict_page_offset);

if self.statistics_enabled != EnabledStatistics::None {
let statistics = Statistics::new(
let statistics = ValueStatistics::<E::T>::new(
self.column_metrics.min_column_value.clone(),
self.column_metrics.max_column_value.clone(),
self.column_metrics.column_distinct_count,
self.column_metrics.num_column_nulls,
false,
);
let statistics = statistics
tustvold marked this conversation as resolved.
Show resolved Hide resolved
.with_backwards_compatible_min_max(self.descr.sort_order().is_signed())
.into();
builder = builder.set_statistics(statistics);
}

Expand Down Expand Up @@ -1893,6 +1896,7 @@ mod tests {
fn test_bool_statistics() {
let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
assert!(stats.has_min_max_set());
assert!(!stats.is_min_max_backwards_compatible());
tustvold marked this conversation as resolved.
Show resolved Hide resolved
if let Statistics::Boolean(stats) = stats {
assert_eq!(stats.min(), &false);
assert_eq!(stats.max(), &true);
Expand All @@ -1905,6 +1909,7 @@ mod tests {
fn test_int32_statistics() {
let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
assert!(stats.has_min_max_set());
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Int32(stats) = stats {
assert_eq!(stats.min(), &-2);
assert_eq!(stats.max(), &3);
Expand All @@ -1917,6 +1922,7 @@ mod tests {
fn test_int64_statistics() {
let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
assert!(stats.has_min_max_set());
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Int64(stats) = stats {
assert_eq!(stats.min(), &-2);
assert_eq!(stats.max(), &3);
Expand All @@ -1938,6 +1944,7 @@ mod tests {

let stats = statistics_roundtrip::<Int96Type>(&input);
assert!(stats.has_min_max_set());
assert!(!stats.is_min_max_backwards_compatible());
if let Statistics::Int96(stats) = stats {
assert_eq!(stats.min(), &Int96::from(vec![0, 20, 30]));
assert_eq!(stats.max(), &Int96::from(vec![3, 20, 10]));
Expand All @@ -1950,6 +1957,7 @@ mod tests {
fn test_float_statistics() {
let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
assert!(stats.has_min_max_set());
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Float(stats) = stats {
assert_eq!(stats.min(), &-2.0);
assert_eq!(stats.max(), &3.0);
Expand All @@ -1962,6 +1970,7 @@ mod tests {
fn test_double_statistics() {
let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
assert!(stats.has_min_max_set());
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Double(stats) = stats {
assert_eq!(stats.min(), &-2.0);
assert_eq!(stats.max(), &3.0);
Expand All @@ -1978,6 +1987,7 @@ mod tests {
.collect::<Vec<ByteArray>>();

let stats = statistics_roundtrip::<ByteArrayType>(&input);
assert!(!stats.is_min_max_backwards_compatible());
assert!(stats.has_min_max_set());
if let Statistics::ByteArray(stats) = stats {
assert_eq!(stats.min(), &ByteArray::from("aaw"));
Expand All @@ -1999,6 +2009,7 @@ mod tests {

let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
assert!(stats.has_min_max_set());
assert!(!stats.is_min_max_backwards_compatible());
if let Statistics::FixedLenByteArray(stats) = stats {
let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
assert_eq!(stats.min(), &expected_min);
Expand All @@ -2013,6 +2024,7 @@ mod tests {
fn test_float_statistics_nan_middle() {
let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
assert!(stats.has_min_max_set());
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Float(stats) = stats {
assert_eq!(stats.min(), &1.0);
assert_eq!(stats.max(), &2.0);
Expand All @@ -2025,6 +2037,7 @@ mod tests {
fn test_float_statistics_nan_start() {
let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
assert!(stats.has_min_max_set());
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Float(stats) = stats {
assert_eq!(stats.min(), &1.0);
assert_eq!(stats.max(), &2.0);
Expand All @@ -2037,13 +2050,15 @@ mod tests {
fn test_float_statistics_nan_only() {
let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
assert!(!stats.has_min_max_set());
assert!(stats.is_min_max_backwards_compatible());
assert!(matches!(stats, Statistics::Float(_)));
}

#[test]
fn test_double_statistics_nan_middle() {
let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
assert!(stats.has_min_max_set());
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Double(stats) = stats {
assert_eq!(stats.min(), &1.0);
assert_eq!(stats.max(), &2.0);
Expand All @@ -2056,6 +2071,7 @@ mod tests {
fn test_double_statistics_nan_start() {
let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
assert!(stats.has_min_max_set());
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Double(stats) = stats {
assert_eq!(stats.min(), &1.0);
assert_eq!(stats.max(), &2.0);
Expand All @@ -2069,6 +2085,7 @@ mod tests {
let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
assert!(!stats.has_min_max_set());
assert!(matches!(stats, Statistics::Double(_)));
assert!(stats.is_min_max_backwards_compatible());
}

#[test]
Expand Down
46 changes: 38 additions & 8 deletions parquet/src/file/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,12 @@ pub fn to_thrift(stats: Option<&Statistics>) -> Option<TStatistics> {
(None, None)
};

if stats.is_min_max_deprecated() {
thrift_stats.min = min;
thrift_stats.max = max;
} else {
if stats.is_min_max_backwards_compatible() {
tustvold marked this conversation as resolved.
Show resolved Hide resolved
thrift_stats.min = min.clone();
thrift_stats.max = max.clone();
}

if !stats.is_min_max_deprecated() {
thrift_stats.min_value = min;
thrift_stats.max_value = max;
}
Expand Down Expand Up @@ -329,6 +331,12 @@ impl Statistics {
statistics_enum_func![self, is_min_max_deprecated]
}

/// Returns `true` if the statistics are backwards compatible with the
/// deprecated `min` and `max` fields
tustvold marked this conversation as resolved.
Show resolved Hide resolved
pub fn is_min_max_backwards_compatible(&self) -> bool {
statistics_enum_func![self, is_min_max_backwards_compatible]
}

/// Returns optional value of number of distinct values occurring.
/// When it is `None`, the value should be ignored.
pub fn distinct_count(&self) -> Option<u64> {
Expand Down Expand Up @@ -405,7 +413,13 @@ pub struct ValueStatistics<T> {
// Distinct count could be omitted in some cases
distinct_count: Option<u64>,
null_count: u64,

/// If `true` populate the deprecated `min` and `max` fields instead of
/// `min_value` and `max_value`
is_min_max_deprecated: bool,

/// If `true` should always write deprecated `min` and `max` fields
tustvold marked this conversation as resolved.
Show resolved Hide resolved
is_min_max_backwards_compatible: bool,
}

impl<T: ParquetValueType> ValueStatistics<T> {
Expand All @@ -423,6 +437,15 @@ impl<T: ParquetValueType> ValueStatistics<T> {
distinct_count,
null_count,
is_min_max_deprecated,
is_min_max_backwards_compatible: is_min_max_deprecated,
}
}

/// Set whether to write the deprecated `min` and `max` fields
tustvold marked this conversation as resolved.
Show resolved Hide resolved
pub fn with_backwards_compatible_min_max(self, backwards_compatible: bool) -> Self {
Self {
is_min_max_backwards_compatible: backwards_compatible,
..self
}
}

Expand Down Expand Up @@ -478,6 +501,12 @@ impl<T: ParquetValueType> ValueStatistics<T> {
fn is_min_max_deprecated(&self) -> bool {
self.is_min_max_deprecated
}

/// Returns `true` if the statistics are backwards compatible
tustvold marked this conversation as resolved.
Show resolved Hide resolved
/// with the deprecated `min` and `max` fields
pub fn is_min_max_backwards_compatible(&self) -> bool {
self.is_min_max_backwards_compatible
}
}

impl<T: ParquetValueType> fmt::Display for ValueStatistics<T> {
Expand Down Expand Up @@ -509,12 +538,13 @@ impl<T: ParquetValueType> fmt::Debug for ValueStatistics<T> {
write!(
f,
"{{min: {:?}, max: {:?}, distinct_count: {:?}, null_count: {}, \
min_max_deprecated: {}}}",
min_max_deprecated: {}, min_max_backwards_compatible: {}}}",
self.min,
self.max,
self.distinct_count,
self.null_count,
self.is_min_max_deprecated
self.is_min_max_deprecated,
self.is_min_max_backwards_compatible
)
}
}
Expand Down Expand Up @@ -569,14 +599,14 @@ mod tests {
assert_eq!(
format!("{:?}", stats),
"Int32({min: Some(1), max: Some(12), distinct_count: None, null_count: 12, \
min_max_deprecated: true})"
min_max_deprecated: true, min_max_backwards_compatible: true})"
);

let stats = Statistics::int32(None, None, None, 7, false);
assert_eq!(
format!("{:?}", stats),
"Int32({min: None, max: None, distinct_count: None, null_count: 7, \
min_max_deprecated: false})"
min_max_deprecated: false, min_max_backwards_compatible: false})"
)
}

Expand Down
56 changes: 56 additions & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ mod tests {
};
use crate::format::SortingColumn;
use crate::record::{Row, RowAccessor};
use crate::schema::parser::parse_message_type;
use crate::schema::types::{ColumnDescriptor, ColumnPath};
use crate::util::memory::ByteBufferPtr;

Expand Down Expand Up @@ -1428,4 +1429,59 @@ mod tests {
test_kv_metadata(Some(vec![kv1]), Some(vec![]));
test_kv_metadata(None, Some(vec![]));
}

#[test]
fn test_backwards_compatible_statistics() {
let message_type = "
message test_schema {
REQUIRED INT32 decimal1 (DECIMAL(8,2));
REQUIRED INT32 i32 (INTEGER(32,true));
REQUIRED INT32 u32 (INTEGER(32,false));
}
";

let schema = Arc::new(parse_message_type(message_type).unwrap());
let props = Arc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(vec![], schema, props).unwrap();
let mut row_group_writer = writer.next_row_group().unwrap();

for _ in 0..3 {
let mut writer = row_group_writer.next_column().unwrap().unwrap();
writer
.typed::<Int32Type>()
.write_batch(&[1, 2, 3], None, None)
.unwrap();
writer.close().unwrap();
}
let metadata = row_group_writer.close().unwrap();
writer.close().unwrap();

let thrift = metadata.to_thrift();
let encoded_stats: Vec<_> = thrift
.columns
.into_iter()
.map(|x| x.meta_data.unwrap().statistics.unwrap())
.collect();

// decimal
let s = &encoded_stats[0];
assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));

// i32
let s = &encoded_stats[1];
assert_eq!(s.min.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
assert_eq!(s.max.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));

// u32
let s = &encoded_stats[2];
assert_eq!(s.min.as_deref(), None);
assert_eq!(s.max.as_deref(), None);
assert_eq!(s.min_value.as_deref(), Some(1_i32.to_le_bytes().as_ref()));
assert_eq!(s.max_value.as_deref(), Some(3_i32.to_le_bytes().as_ref()));
}
}
12 changes: 11 additions & 1 deletion parquet/src/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use std::{collections::HashMap, convert::From, fmt, sync::Arc};
use crate::format::SchemaElement;

use crate::basic::{
ConvertedType, LogicalType, Repetition, TimeUnit, Type as PhysicalType,
ColumnOrder, ConvertedType, LogicalType, Repetition, SortOrder, TimeUnit,
Type as PhysicalType,
};
use crate::errors::{ParquetError, Result};

Expand Down Expand Up @@ -846,6 +847,15 @@ impl ColumnDescriptor {
_ => panic!("Expected primitive type!"),
}
}

/// Returns the sort order for this column
pub fn sort_order(&self) -> SortOrder {
ColumnOrder::get_sort_order(
self.logical_type(),
self.converted_type(),
self.physical_type(),
)
}
}

/// A schema descriptor. This encapsulates the top-level schemas for all the columns,
Expand Down