From 68d4b9e29c30b87f0716f8d74e7061500f9a9eb4 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 17 Apr 2024 15:46:06 +0800 Subject: [PATCH] [minor] make parquet prune tests more readable --- datafusion/core/tests/parquet/mod.rs | 26 +++++++------ datafusion/core/tests/parquet/page_pruning.rs | 37 +++++++++++++++++-- .../core/tests/parquet/row_group_pruning.rs | 13 ++++--- 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index f90d0e8afb4c6..43064df7df9bb 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -81,8 +81,10 @@ enum Scenario { } enum Unit { - RowGroup, - Page, + // pass max row per row group in parquet writer + RowGroup(usize), + // pass max row per page in parquet writer + Page(usize), } /// Test fixture that has an execution context that has an external @@ -185,13 +187,13 @@ impl ContextWithParquet { mut config: SessionConfig, ) -> Self { let file = match unit { - Unit::RowGroup => { + Unit::RowGroup(row_per_group) => { config = config.with_parquet_bloom_filter_pruning(true); - make_test_file_rg(scenario).await + make_test_file_rg(scenario, row_per_group).await } - Unit::Page => { + Unit::Page(row_per_page) => { config = config.with_parquet_page_index_pruning(true); - make_test_file_page(scenario).await + make_test_file_page(scenario, row_per_page).await } }; let parquet_path = file.path().to_string_lossy(); @@ -880,7 +882,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { } /// Create a test parquet file with various data types -async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile { +async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTempFile { let mut output_file = tempfile::Builder::new() .prefix("parquet_pruning") .suffix(".parquet") @@ -888,7 +890,7 @@ async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile { .expect("tempfile creation"); let props = WriterProperties::builder() - .set_max_row_group_size(5) + .set_max_row_group_size(row_per_group) .set_bloom_filter_enabled(true) .build(); @@ -906,17 +908,17 @@ async fn make_test_file_rg(scenario: Scenario) -> NamedTempFile { output_file } -async fn make_test_file_page(scenario: Scenario) -> NamedTempFile { +async fn make_test_file_page(scenario: Scenario, row_per_page: usize) -> NamedTempFile { let mut output_file = tempfile::Builder::new() .prefix("parquet_page_pruning") .suffix(".parquet") .tempfile() .expect("tempfile creation"); - // set row count to 5, should get same result as rowGroup + // set row count to row_per_page, should get same result as rowGroup let props = WriterProperties::builder() - .set_data_page_row_count_limit(5) - .set_write_batch_size(5) + .set_data_page_row_count_limit(row_per_page) + .set_write_batch_size(row_per_page) .build(); let batches = create_data_batch(scenario); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 1615a1c5766a5..e305c9c3f2551 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -241,9 +241,10 @@ async fn test_prune( expected_errors: Option, expected_row_pages_pruned: Option, expected_results: usize, + row_per_page: usize, ) { let output: crate::parquet::TestOutput = - ContextWithParquet::new(case_data_type, Page) + ContextWithParquet::new(case_data_type, Page(row_per_page)) .await .query(sql) .await; @@ -272,6 +273,7 @@ async fn prune_timestamps_nanos() { Some(0), Some(5), 10, + 5, ) .await; } @@ -289,6 +291,7 @@ async fn prune_timestamps_micros() { Some(0), Some(5), 10, + 5, ) .await; } @@ -306,6 +309,7 @@ async fn prune_timestamps_millis() { Some(0), Some(5), 10, + 5, ) .await; } @@ -324,6 +328,7 @@ async fn prune_timestamps_seconds() { Some(0), Some(5), 10, + 5, ) .await; } @@ -341,6 +346,7 @@ async fn prune_date32() { Some(0), Some(15), 1, + 5, ) .await; } @@ -359,7 +365,7 @@ async fn prune_date64() { .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap()); let date = ScalarValue::Date64(Some(date.and_utc().timestamp_millis())); - let output = ContextWithParquet::new(Scenario::Dates, Page) + let output = ContextWithParquet::new(Scenario::Dates, Page(5)) .await .query_with_expr(col("date64").lt(lit(date))) .await; @@ -642,6 +648,7 @@ async fn prune_f64_lt() { Some(0), Some(5), 11, + 5, ) .await; test_prune( @@ -650,6 +657,7 @@ async fn prune_f64_lt() { Some(0), Some(5), 11, + 5, ) .await; } @@ -664,6 +672,7 @@ async fn prune_f64_scalar_fun_and_gt() { Some(0), Some(10), 1, + 5, ) .await; } @@ -677,6 +686,7 @@ async fn prune_f64_scalar_fun() { Some(0), Some(0), 1, + 5, ) .await; } @@ -690,6 +700,7 @@ async fn prune_f64_complex_expr() { Some(0), Some(0), 9, + 5, ) .await; } @@ -703,6 +714,7 @@ async fn prune_f64_complex_expr_subtract() { Some(0), Some(0), 9, + 5, ) .await; } @@ -718,6 +730,7 @@ async fn prune_decimal_lt() { Some(0), Some(5), 6, + 5, ) .await; // compare with the casted decimal value @@ -727,6 +740,7 @@ async fn prune_decimal_lt() { Some(0), Some(5), 8, + 5, ) .await; @@ -737,6 +751,7 @@ async fn prune_decimal_lt() { Some(0), Some(5), 6, + 5, ) .await; // compare with the casted decimal value @@ -746,6 +761,7 @@ async fn prune_decimal_lt() { Some(0), Some(5), 8, + 5, ) .await; } @@ -761,6 +777,7 @@ async fn prune_decimal_eq() { Some(0), Some(5), 2, + 5, ) .await; test_prune( @@ -769,6 +786,7 @@ async fn prune_decimal_eq() { Some(0), Some(5), 2, + 5, ) .await; @@ -779,6 +797,7 @@ async fn prune_decimal_eq() { Some(0), Some(5), 2, + 5, ) .await; test_prune( @@ -787,6 +806,7 @@ async fn prune_decimal_eq() { Some(0), Some(5), 2, + 5, ) .await; test_prune( @@ -795,6 +815,7 @@ async fn prune_decimal_eq() { Some(0), Some(10), 2, + 5, ) .await; } @@ -810,6 +831,7 @@ async fn prune_decimal_in_list() { Some(0), Some(5), 5, + 5, ) .await; test_prune( @@ -818,6 +840,7 @@ async fn prune_decimal_in_list() { Some(0), Some(5), 6, + 5, ) .await; @@ -828,6 +851,7 @@ async fn prune_decimal_in_list() { Some(0), Some(5), 5, + 5, ) .await; test_prune( @@ -836,17 +860,18 @@ async fn prune_decimal_in_list() { Some(0), Some(5), 6, + 5, ) .await; } #[tokio::test] async fn without_pushdown_filter() { - let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await; + let mut context = ContextWithParquet::new(Scenario::Timestamps, Page(5)).await; let output1 = context.query("SELECT * FROM t").await; - let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await; + let mut context = ContextWithParquet::new(Scenario::Timestamps, Page(5)).await; let output2 = context .query("SELECT * FROM t where nanos < to_timestamp('2023-01-02 01:01:11Z')") @@ -887,6 +912,7 @@ async fn test_pages_with_null_values() { // (row_group1, page2), (row_group4, page2) Some(10), 22, + 5, ) .await; @@ -897,6 +923,7 @@ async fn test_pages_with_null_values() { // expect prune (row_group1, page2) and (row_group4, page2) = 10 rows Some(10), 29, + 5, ) .await; @@ -907,6 +934,7 @@ async fn test_pages_with_null_values() { // expect prune (row_group1, page1), (row_group2, page1+2), (row_group3, page1), (row_group3, page1) = 25 rows Some(25), 11, + 5, ) .await; @@ -918,6 +946,7 @@ async fn test_pages_with_null_values() { // (row_group1, page1+2), (row_group2, page1), (row_group3, page1) (row_group4, page1+2) = 30 rows Some(30), 7, + 5, ) .await; } diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index b3f1fec1753bb..d6de2b6f8ef05 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -100,7 +100,7 @@ impl RowGroupPruningTest { // Execute the test with the current configuration async fn test_row_group_prune(self) { - let output = ContextWithParquet::new(self.scenario, RowGroup) + let output = ContextWithParquet::new(self.scenario, RowGroup(5)) .await .query(&self.query) .await; @@ -231,7 +231,7 @@ async fn prune_date64() { .and_time(chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap()); let date = ScalarValue::Date64(Some(date.and_utc().timestamp_millis())); - let output = ContextWithParquet::new(Scenario::Dates, RowGroup) + let output = ContextWithParquet::new(Scenario::Dates, RowGroup(5)) .await .query_with_expr(col("date64").lt(lit(date))) // .query( @@ -267,10 +267,11 @@ async fn prune_disabled() { let expected_rows = 10; let config = SessionConfig::new().with_parquet_pruning(false); - let output = ContextWithParquet::with_config(Scenario::Timestamps, RowGroup, config) - .await - .query(query) - .await; + let output = + ContextWithParquet::with_config(Scenario::Timestamps, RowGroup(5), config) + .await + .query(query) + .await; println!("{}", output.description()); // This should not prune any