Skip to content

Commit

Permalink
[minor] make parquet prune tests more readable
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Apr 17, 2024
1 parent 4ad4f90 commit 68d4b9e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 22 deletions.
26 changes: 14 additions & 12 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ enum Scenario {
}

enum Unit {
RowGroup,
Page,
// pass max row per row group in parquet writer
RowGroup(usize),
// pass max row per page in parquet writer
Page(usize),
}

/// Test fixture that has an execution context that has an external
Expand Down Expand Up @@ -185,13 +187,13 @@ impl ContextWithParquet {
mut config: SessionConfig,
) -> Self {
let file = match unit {
Unit::RowGroup => {
Unit::RowGroup(row_per_group) => {
config = config.with_parquet_bloom_filter_pruning(true);
make_test_file_rg(scenario).await
make_test_file_rg(scenario, row_per_group).await
}
Unit::Page => {
Unit::Page(row_per_page) => {
config = config.with_parquet_page_index_pruning(true);
make_test_file_page(scenario).await
make_test_file_page(scenario, row_per_page).await
}
};
let parquet_path = file.path().to_string_lossy();
Expand Down Expand Up @@ -880,15 +882,15 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
}

/// Create a test parquet file with various data types
async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile {
async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTempFile {
let mut output_file = tempfile::Builder::new()
.prefix("parquet_pruning")
.suffix(".parquet")
.tempfile()
.expect("tempfile creation");

let props = WriterProperties::builder()
.set_max_row_group_size(5)
.set_max_row_group_size(row_per_group)
.set_bloom_filter_enabled(true)
.build();

Expand All @@ -906,17 +908,17 @@ async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile {
output_file
}

async fn make_test_file_page(scenario: Scenario) -> NamedTempFile {
async fn make_test_file_page(scenario: Scenario, row_per_page: usize) -> NamedTempFile {
let mut output_file = tempfile::Builder::new()
.prefix("parquet_page_pruning")
.suffix(".parquet")
.tempfile()
.expect("tempfile creation");

// set row count to 5, should get same result as rowGroup
// set row count to row_per_page, should get same result as rowGroup
let props = WriterProperties::builder()
.set_data_page_row_count_limit(5)
.set_write_batch_size(5)
.set_data_page_row_count_limit(row_per_page)
.set_write_batch_size(row_per_page)
.build();

let batches = create_data_batch(scenario);
Expand Down
37 changes: 33 additions & 4 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,10 @@ async fn test_prune(
expected_errors: Option<usize>,
expected_row_pages_pruned: Option<usize>,
expected_results: usize,
row_per_page: usize,
) {
let output: crate::parquet::TestOutput =
ContextWithParquet::new(case_data_type, Page)
ContextWithParquet::new(case_data_type, Page(row_per_page))
.await
.query(sql)
.await;
Expand Down Expand Up @@ -272,6 +273,7 @@ async fn prune_timestamps_nanos() {
Some(0),
Some(5),
10,
5,
)
.await;
}
Expand All @@ -289,6 +291,7 @@ async fn prune_timestamps_micros() {
Some(0),
Some(5),
10,
5,
)
.await;
}
Expand All @@ -306,6 +309,7 @@ async fn prune_timestamps_millis() {
Some(0),
Some(5),
10,
5,
)
.await;
}
Expand All @@ -324,6 +328,7 @@ async fn prune_timestamps_seconds() {
Some(0),
Some(5),
10,
5,
)
.await;
}
Expand All @@ -341,6 +346,7 @@ async fn prune_date32() {
Some(0),
Some(15),
1,
5,
)
.await;
}
Expand All @@ -359,7 +365,7 @@ async fn prune_date64() {
.and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
let date = ScalarValue::Date64(Some(date.and_utc().timestamp_millis()));

let output = ContextWithParquet::new(Scenario::Dates, Page)
let output = ContextWithParquet::new(Scenario::Dates, Page(5))
.await
.query_with_expr(col("date64").lt(lit(date)))
.await;
Expand Down Expand Up @@ -642,6 +648,7 @@ async fn prune_f64_lt() {
Some(0),
Some(5),
11,
5,
)
.await;
test_prune(
Expand All @@ -650,6 +657,7 @@ async fn prune_f64_lt() {
Some(0),
Some(5),
11,
5,
)
.await;
}
Expand All @@ -664,6 +672,7 @@ async fn prune_f64_scalar_fun_and_gt() {
Some(0),
Some(10),
1,
5,
)
.await;
}
Expand All @@ -677,6 +686,7 @@ async fn prune_f64_scalar_fun() {
Some(0),
Some(0),
1,
5,
)
.await;
}
Expand All @@ -690,6 +700,7 @@ async fn prune_f64_complex_expr() {
Some(0),
Some(0),
9,
5,
)
.await;
}
Expand All @@ -703,6 +714,7 @@ async fn prune_f64_complex_expr_subtract() {
Some(0),
Some(0),
9,
5,
)
.await;
}
Expand All @@ -718,6 +730,7 @@ async fn prune_decimal_lt() {
Some(0),
Some(5),
6,
5,
)
.await;
// compare with the casted decimal value
Expand All @@ -727,6 +740,7 @@ async fn prune_decimal_lt() {
Some(0),
Some(5),
8,
5,
)
.await;

Expand All @@ -737,6 +751,7 @@ async fn prune_decimal_lt() {
Some(0),
Some(5),
6,
5,
)
.await;
// compare with the casted decimal value
Expand All @@ -746,6 +761,7 @@ async fn prune_decimal_lt() {
Some(0),
Some(5),
8,
5,
)
.await;
}
Expand All @@ -761,6 +777,7 @@ async fn prune_decimal_eq() {
Some(0),
Some(5),
2,
5,
)
.await;
test_prune(
Expand All @@ -769,6 +786,7 @@ async fn prune_decimal_eq() {
Some(0),
Some(5),
2,
5,
)
.await;

Expand All @@ -779,6 +797,7 @@ async fn prune_decimal_eq() {
Some(0),
Some(5),
2,
5,
)
.await;
test_prune(
Expand All @@ -787,6 +806,7 @@ async fn prune_decimal_eq() {
Some(0),
Some(5),
2,
5,
)
.await;
test_prune(
Expand All @@ -795,6 +815,7 @@ async fn prune_decimal_eq() {
Some(0),
Some(10),
2,
5,
)
.await;
}
Expand All @@ -810,6 +831,7 @@ async fn prune_decimal_in_list() {
Some(0),
Some(5),
5,
5,
)
.await;
test_prune(
Expand All @@ -818,6 +840,7 @@ async fn prune_decimal_in_list() {
Some(0),
Some(5),
6,
5,
)
.await;

Expand All @@ -828,6 +851,7 @@ async fn prune_decimal_in_list() {
Some(0),
Some(5),
5,
5,
)
.await;
test_prune(
Expand All @@ -836,17 +860,18 @@ async fn prune_decimal_in_list() {
Some(0),
Some(5),
6,
5,
)
.await;
}

#[tokio::test]
async fn without_pushdown_filter() {
let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await;
let mut context = ContextWithParquet::new(Scenario::Timestamps, Page(5)).await;

let output1 = context.query("SELECT * FROM t").await;

let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await;
let mut context = ContextWithParquet::new(Scenario::Timestamps, Page(5)).await;

let output2 = context
.query("SELECT * FROM t where nanos < to_timestamp('2023-01-02 01:01:11Z')")
Expand Down Expand Up @@ -887,6 +912,7 @@ async fn test_pages_with_null_values() {
// (row_group1, page2), (row_group4, page2)
Some(10),
22,
5,
)
.await;

Expand All @@ -897,6 +923,7 @@ async fn test_pages_with_null_values() {
// expect prune (row_group1, page2) and (row_group4, page2) = 10 rows
Some(10),
29,
5,
)
.await;

Expand All @@ -907,6 +934,7 @@ async fn test_pages_with_null_values() {
// expect prune (row_group1, page1), (row_group2, page1+2), (row_group3, page1), (row_group3, page1) = 25 rows
Some(25),
11,
5,
)
.await;

Expand All @@ -918,6 +946,7 @@ async fn test_pages_with_null_values() {
// (row_group1, page1+2), (row_group2, page1), (row_group3, page1) (row_group4, page1+2) = 30 rows
Some(30),
7,
5,
)
.await;
}
Expand Down
13 changes: 7 additions & 6 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl RowGroupPruningTest {

// Execute the test with the current configuration
async fn test_row_group_prune(self) {
let output = ContextWithParquet::new(self.scenario, RowGroup)
let output = ContextWithParquet::new(self.scenario, RowGroup(5))
.await
.query(&self.query)
.await;
Expand Down Expand Up @@ -231,7 +231,7 @@ async fn prune_date64() {
.and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap());
let date = ScalarValue::Date64(Some(date.and_utc().timestamp_millis()));

let output = ContextWithParquet::new(Scenario::Dates, RowGroup)
let output = ContextWithParquet::new(Scenario::Dates, RowGroup(5))
.await
.query_with_expr(col("date64").lt(lit(date)))
// .query(
Expand Down Expand Up @@ -267,10 +267,11 @@ async fn prune_disabled() {
let expected_rows = 10;
let config = SessionConfig::new().with_parquet_pruning(false);

let output = ContextWithParquet::with_config(Scenario::Timestamps, RowGroup, config)
.await
.query(query)
.await;
let output =
ContextWithParquet::with_config(Scenario::Timestamps, RowGroup(5), config)
.await
.query(query)
.await;
println!("{}", output.description());

// This should not prune any
Expand Down

0 comments on commit 68d4b9e

Please sign in to comment.