Skip to content

Commit

Permalink
Add tests for reading numeric limits in parquet statistics (apache#10642
Browse files Browse the repository at this point in the history
)

* Add numeric limits tests for statistics reading

* Apply suggestions from code review

Co-authored-by: Oleks V <comphead@users.noreply.github.com>

* fix

---------

Co-authored-by: Oleks V <comphead@users.noreply.github.com>
  • Loading branch information
2 people authored and findepi committed Jul 16, 2024
1 parent 00d47e2 commit 77b5893
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 8 deletions.
170 changes: 162 additions & 8 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use arrow::compute::kernels::cast_utils::Parser;
use arrow::datatypes::{Date32Type, Date64Type};
use arrow_array::{
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
Decimal128Array, FixedSizeBinaryArray, Float64Array, Int16Array, Int32Array,
Int64Array, Int8Array, RecordBatch, StringArray, UInt64Array,
Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::parquet::{
Expand Down Expand Up @@ -189,7 +189,10 @@ impl Test {
.extract(reader.metadata())
.unwrap();

assert_eq!(&min, &expected_min, "Mismatch with expected minimums");
assert_eq!(
&min, &expected_min,
"{column_name}: Mismatch with expected minimums"
);

let max = StatisticsConverter::try_new(
column_name,
Expand All @@ -199,7 +202,10 @@ impl Test {
.unwrap()
.extract(reader.metadata())
.unwrap();
assert_eq!(&max, &expected_max, "Mismatch with expected maximum");
assert_eq!(
&max, &expected_max,
"{column_name}: Mismatch with expected maximum"
);

let null_counts = StatisticsConverter::try_new(
column_name,
Expand All @@ -212,13 +218,15 @@ impl Test {
let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef;
assert_eq!(
&null_counts, &expected_null_counts,
"Mismatch with expected null counts"
"{column_name}: Mismatch with expected null counts. \
Actual: {null_counts:?}. Expected: {expected_null_counts:?}"
);

let row_counts = StatisticsConverter::row_counts(reader.metadata()).unwrap();
assert_eq!(
row_counts, expected_row_counts,
"Mismatch with expected row counts"
"{column_name}: Mismatch with expected row counts. \
Actual: {row_counts:?}. Expected: {expected_row_counts:?}"
);
}

Expand Down Expand Up @@ -802,6 +810,152 @@ async fn test_uint32_range() {
.run();
}

#[tokio::test]
async fn test_numeric_limits_unsigned() {
// file has 7 rows, 2 row groups: one with 5 rows, one with 2 rows.
let reader = TestReader {
scenario: Scenario::NumericLimits,
row_per_group: 5,
};

Test {
reader: reader.build().await,
expected_min: Arc::new(Int8Array::from(vec![i8::MIN, -100])),
expected_max: Arc::new(Int8Array::from(vec![100, i8::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i8",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Int16Array::from(vec![i16::MIN, -100])),
expected_max: Arc::new(Int16Array::from(vec![100, i16::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i16",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![i32::MIN, -100])),
expected_max: Arc::new(Int32Array::from(vec![100, i32::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i32",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![i64::MIN, -100])),
expected_max: Arc::new(Int64Array::from(vec![100, i64::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i64",
}
.run();
}
#[tokio::test]
async fn test_numeric_limits_signed() {
// file has 7 rows, 2 row groups: one with 5 rows, one with 2 rows.
let reader = TestReader {
scenario: Scenario::NumericLimits,
row_per_group: 5,
};

Test {
reader: reader.build().await,
expected_min: Arc::new(Int8Array::from(vec![i8::MIN, -100])),
expected_max: Arc::new(Int8Array::from(vec![100, i8::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i8",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Int16Array::from(vec![i16::MIN, -100])),
expected_max: Arc::new(Int16Array::from(vec![100, i16::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i16",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Int32Array::from(vec![i32::MIN, -100])),
expected_max: Arc::new(Int32Array::from(vec![100, i32::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i32",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Int64Array::from(vec![i64::MIN, -100])),
expected_max: Arc::new(Int64Array::from(vec![100, i64::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "i64",
}
.run();
}

#[tokio::test]
async fn test_numeric_limits_float() {
// file has 7 rows, 2 row groups: one with 5 rows, one with 2 rows.
let reader = TestReader {
scenario: Scenario::NumericLimits,
row_per_group: 5,
};

Test {
reader: reader.build().await,
expected_min: Arc::new(Float32Array::from(vec![f32::MIN, -100.0])),
expected_max: Arc::new(Float32Array::from(vec![100.0, f32::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "f32",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Float64Array::from(vec![f64::MIN, -100.0])),
expected_max: Arc::new(Float64Array::from(vec![100.0, f64::MAX])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "f64",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Float32Array::from(vec![-1.0, -100.0])),
expected_max: Arc::new(Float32Array::from(vec![100.0, -100.0])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "f32_nan",
}
.run();

Test {
reader: reader.build().await,
expected_min: Arc::new(Float64Array::from(vec![-1.0, -100.0])),
expected_max: Arc::new(Float64Array::from(vec![100.0, -100.0])),
expected_null_counts: UInt64Array::from(vec![0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 2]),
column_name: "f64_nan",
}
.run();
}

#[tokio::test]
async fn test_float64() {
// This creates a parquet file of 1 column "f"
Expand Down Expand Up @@ -914,8 +1068,8 @@ async fn test_byte() {

Test {
reader: reader.build().await,
expected_min: Arc::new(BinaryArray::from(expected_service_binary_min_values)), // Shuld be BinaryArray
expected_max: Arc::new(BinaryArray::from(expected_service_binary_max_values)), // Shuld be BinaryArray
expected_min: Arc::new(BinaryArray::from(expected_service_binary_min_values)),
expected_max: Arc::new(BinaryArray::from(expected_service_binary_max_values)),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
column_name: "service_binary",
Expand Down
39 changes: 39 additions & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ enum Scenario {
Int32Range,
UInt,
UInt32Range,
/// 7 Rows, for each i8, i16, i32, i64, u8, u16, u32, u64, f32, f64
/// -MIN, -100, -1, 0, 1, 100, MAX
NumericLimits,
Float64,
Decimal,
DecimalBloomFilterInt32,
Expand Down Expand Up @@ -710,6 +713,39 @@ fn make_int_batches_with_null(
.unwrap()
}

fn make_numeric_limit_batch() -> RecordBatch {
let i8 = Int8Array::from(vec![i8::MIN, 100, -1, 0, 1, -100, i8::MAX]);
let i16 = Int16Array::from(vec![i16::MIN, 100, -1, 0, 1, -100, i16::MAX]);
let i32 = Int32Array::from(vec![i32::MIN, 100, -1, 0, 1, -100, i32::MAX]);
let i64 = Int64Array::from(vec![i64::MIN, 100, -1, 0, 1, -100, i64::MAX]);
let u8 = UInt8Array::from(vec![u8::MIN, 100, 1, 0, 1, 100, u8::MAX]);
let u16 = UInt16Array::from(vec![u16::MIN, 100, 1, 0, 1, 100, u16::MAX]);
let u32 = UInt32Array::from(vec![u32::MIN, 100, 1, 0, 1, 100, u32::MAX]);
let u64 = UInt64Array::from(vec![u64::MIN, 100, 1, 0, 1, 100, u64::MAX]);
let f32 = Float32Array::from(vec![f32::MIN, 100.0, -1.0, 0.0, 1.0, -100.0, f32::MAX]);
let f64 = Float64Array::from(vec![f64::MIN, 100.0, -1.0, 0.0, 1.0, -100.0, f64::MAX]);
let f32_nan =
Float32Array::from(vec![f32::NAN, 100.0, -1.0, 0.0, 1.0, -100.0, f32::NAN]);
let f64_nan =
Float64Array::from(vec![f64::NAN, 100.0, -1.0, 0.0, 1.0, -100.0, f64::NAN]);

RecordBatch::try_from_iter(vec![
("i8", Arc::new(i8) as _),
("i16", Arc::new(i16) as _),
("i32", Arc::new(i32) as _),
("i64", Arc::new(i64) as _),
("u8", Arc::new(u8) as _),
("u16", Arc::new(u16) as _),
("u32", Arc::new(u32) as _),
("u64", Arc::new(u64) as _),
("f32", Arc::new(f32) as _),
("f64", Arc::new(f64) as _),
("f32_nan", Arc::new(f32_nan) as _),
("f64_nan", Arc::new(f64_nan) as _),
])
.unwrap()
}

fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
match scenario {
Scenario::Boolean => {
Expand Down Expand Up @@ -768,6 +804,9 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
Scenario::UInt32Range => {
vec![make_uint32_range(0, 10), make_uint32_range(200000, 300000)]
}
Scenario::NumericLimits => {
vec![make_numeric_limit_batch()]
}
Scenario::Float64 => {
vec![
make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
Expand Down

0 comments on commit 77b5893

Please sign in to comment.