Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add more tests for statistics reading #10592

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 276 additions & 15 deletions datafusion/core/tests/parquet/arrow_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::fs::File;
use std::sync::Arc;

use arrow_array::{
make_array, Array, ArrayRef, Int16Array, Int32Array, Int64Array, Int8Array,
RecordBatch, UInt64Array,
make_array, Array, ArrayRef, Decimal128Array, FixedSizeBinaryArray, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, UInt64Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::physical_plan::parquet::{
Expand Down Expand Up @@ -624,20 +624,281 @@ async fn test_dates_64_diff_rg_sizes() {
.run("date64");
}

// BUG:
// https://github.com/apache/datafusion/issues/10604
#[tokio::test]
async fn test_uint() {
let row_per_group = 4;

// This creates a parquet files of 4 columns named "u8", "u16", "u32", "u64"
// "u8" --> UInt8Array
// "u16" --> UInt16Array
// "u32" --> UInt32Array
// "u64" --> UInt64Array

// The file is created by 4 record batches (each has a null row), each has 5 rows but then will be split into 5 row groups with size 4
let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await;

// u8
// BUG: expect UInt8Array but returns Int32Array
Test {
reader,
expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt8Array
expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt8Array
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
}
.run("u8");

// u16
// BUG: expect UInt16Array but returns Int32Array
let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await;
Test {
reader,
expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt16Array
expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt16Array
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
}
.run("u16");

// u32
// BUG: expect UInt32Array but returns Int32Array
let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await;
Test {
reader,
expected_min: Arc::new(Int32Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt32Array
expected_max: Arc::new(Int32Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt32Array
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
}
.run("u32");

// u64
// BUG: expect UInt64rray but returns Int64Array
let reader = parquet_file_many_columns(Scenario::UInt, row_per_group).await;
Test {
reader,
expected_min: Arc::new(Int64Array::from(vec![0, 1, 4, 7, 251])), // shoudld be UInt64Array
expected_max: Arc::new(Int64Array::from(vec![3, 4, 6, 250, 254])), // shoudld be UInt64Array
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]),
}
.run("u64");
}

#[tokio::test]
async fn test_int32_range() {
let row_per_group = 5;
// This creates a parquet file of 1 column "i"
// file has 2 record batches, each has 2 rows. They will be saved into one row group
let reader = parquet_file_many_columns(Scenario::Int32Range, row_per_group).await;

Test {
reader,
expected_min: Arc::new(Int32Array::from(vec![0])),
expected_max: Arc::new(Int32Array::from(vec![300000])),
expected_null_counts: UInt64Array::from(vec![0]),
expected_row_counts: UInt64Array::from(vec![4]),
}
.run("i");
}

// BUG: not convert UInt32Array to Int32Array
// https://github.com/apache/datafusion/issues/10604
#[tokio::test]
async fn test_uint32_range() {
let row_per_group = 5;
// This creates a parquet file of 1 column "u"
// file has 2 record batches, each has 2 rows. They will be saved into one row group
let reader = parquet_file_many_columns(Scenario::UInt32Range, row_per_group).await;

Test {
reader,
expected_min: Arc::new(Int32Array::from(vec![0])), // shoudld be UInt32Array
expected_max: Arc::new(Int32Array::from(vec![300000])), // shoudld be UInt32Array
expected_null_counts: UInt64Array::from(vec![0]),
expected_row_counts: UInt64Array::from(vec![4]),
}
.run("u");
}

#[tokio::test]
async fn test_float64() {
let row_per_group = 5;
// This creates a parquet file of 1 column "f"
// file has 4 record batches, each has 5 rows. They will be saved into 4 row groups
let reader = parquet_file_many_columns(Scenario::Float64, row_per_group).await;

Test {
reader,
expected_min: Arc::new(Float64Array::from(vec![-5.0, -4.0, -0.0, 5.0])),
expected_max: Arc::new(Float64Array::from(vec![-1.0, 0.0, 4.0, 9.0])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]),
}
.run("f");
}

#[tokio::test]
async fn test_decimal() {
let row_per_group = 5;
// This creates a parquet file of 1 column "decimal_col" with decimal data type and precicion 9, scale 2
// file has 3 record batches, each has 5 rows. They will be saved into 3 row groups
let reader = parquet_file_many_columns(Scenario::Decimal, row_per_group).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another important thing to test with decimals is different precision / scales -- maybe we can do this as a different PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb : can you be more specific? In the test below, I had to make sure they have the same precision & scale. What else do we have to test here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking smaller precisions -- I can't remember but I vaguely remember that spark stores different scale decimals with different underlying datatypes or something


Test {
reader,
expected_min: Arc::new(
Decimal128Array::from(vec![100, -500, 2000])
.with_precision_and_scale(9, 2)
.unwrap(),
),
expected_max: Arc::new(
Decimal128Array::from(vec![600, 600, 6000])
.with_precision_and_scale(9, 2)
.unwrap(),
),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
}
.run("decimal_col");
}

// BUG: not convert BinaryArray to StringArray
// https://github.com/apache/datafusion/issues/10605
#[tokio::test]
async fn test_byte() {
let row_per_group = 5;

// This creates a parquet file of 4 columns
// "name"
// "service_string"
// "service_binary"
// "service_fixedsize"

// file has 3 record batches, each has 5 rows. They will be saved into 3 row groups
let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await;

// column "name"
Test {
reader,
expected_min: Arc::new(StringArray::from(vec![
"all frontends",
"mixed",
"all backends",
])),
expected_max: Arc::new(StringArray::from(vec![
"all frontends",
"mixed",
"all backends",
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
}
.run("name");

// column "service_string"
let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await;
Test {
reader,
expected_min: Arc::new(StringArray::from(vec![
"frontend five",
"backend one",
"backend eight",
])),
expected_max: Arc::new(StringArray::from(vec![
"frontend two",
"frontend six",
"backend six",
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
}
.run("service_string");

// column "service_binary"
let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await;
Test {
reader,
expected_min: Arc::new(StringArray::from(vec![
"frontend five",
"backend one",
"backend eight",
])), // Shuld be BinaryArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

expected_max: Arc::new(StringArray::from(vec![
"frontend two",
"frontend six",
"backend six",
])), // Shuld be BinaryArray
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
}
.run("service_binary");

// column "service_fixedsize"
// b"fe1", b"be1", b"be4"
let min_input = vec![vec![102, 101, 49], vec![98, 101, 49], vec![98, 101, 52]];
// b"fe5", b"fe6", b"be8"
let max_input = vec![vec![102, 101, 55], vec![102, 101, 54], vec![98, 101, 56]];
let reader = parquet_file_many_columns(Scenario::ByteArray, row_per_group).await;
Test {
reader,
expected_min: Arc::new(
FixedSizeBinaryArray::try_from_iter(min_input.into_iter()).unwrap(),
),
expected_max: Arc::new(
FixedSizeBinaryArray::try_from_iter(max_input.into_iter()).unwrap(),
),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
}
.run("service_fixedsize");
}

// PeriodsInColumnNames
#[tokio::test]
async fn test_period_in_column_names() {
let row_per_group = 5;
// This creates a parquet file of 2 columns "name" and "service.name"
// file has 3 record batches, each has 5 rows. They will be saved into 3 row groups
let reader =
parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await;

// column "name"
Test {
reader,
expected_min: Arc::new(StringArray::from(vec![
"HTTP GET / DISPATCH",
"HTTP PUT / DISPATCH",
"HTTP GET / DISPATCH",
])),
expected_max: Arc::new(StringArray::from(vec![
"HTTP GET / DISPATCH",
"HTTP PUT / DISPATCH",
"HTTP GET / DISPATCH",
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
}
.run("name");

// column "service.name"
let reader =
parquet_file_many_columns(Scenario::PeriodsInColumnNames, row_per_group).await;
Test {
reader,
expected_min: Arc::new(StringArray::from(vec!["frontend", "backend", "backend"])),
expected_max: Arc::new(StringArray::from(vec![
"frontend", "frontend", "backend",
])),
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
}
.run("service.name");
}

// TODO:
// Other data types to tests
// `u8`, `u16`, `u32`, and `u64`,
// UInt,
// UInt32Range,
// Float64,
// Decimal,
// DecimalBloomFilterInt32,
// DecimalBloomFilterInt64,
// DecimalLargePrecision,
// DecimalLargePrecisionBloomFilter,
// ByteArray,
// PeriodsInColumnNames,
// WithNullValuesPageLevel,
// WITHOUT Stats

/////// NEGATIVE TESTS ///////
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ fn make_int_batches(start: i8, end: i8) -> RecordBatch {
.unwrap()
}

/// Return record batch with i8, i16, i32, and i64 sequences
/// Return record batch with u8, u16, u32, and u64 sequences
///
/// Columns are named
/// "u8" -> UInt8Array
Expand Down