diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 9da879a32f6b..5b37c55c09e4 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use crate::parquet::Unit::Page; use crate::parquet::{ContextWithParquet, Scenario}; +use arrow::array::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::PartitionedFile; @@ -40,7 +41,11 @@ use futures::StreamExt; use object_store::path::Path; use object_store::ObjectMeta; -async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec { +async fn get_parquet_exec( + state: &SessionState, + filter: Expr, + pushdown_filters: bool, +) -> DataSourceExec { let object_store_url = ObjectStoreUrl::local_filesystem(); let store = state.runtime_env().object_store(&object_store_url).unwrap(); @@ -78,7 +83,8 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec let source = Arc::new( ParquetSource::default() .with_predicate(predicate) - .with_enable_page_index(true), + .with_enable_page_index(true) + .with_pushdown_filters(pushdown_filters), ); let base_config = FileScanConfigBuilder::new(object_store_url, schema, source) .with_file(partitioned_file) @@ -87,38 +93,44 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec DataSourceExec::new(Arc::new(base_config)) } +async fn get_filter_results( + state: &SessionState, + filter: Expr, + pushdown_filters: bool, +) -> Vec { + let parquet_exec = get_parquet_exec(state, filter, pushdown_filters).await; + let task_ctx = state.task_ctx(); + let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); + let mut batches = Vec::new(); + while let Some(Ok(batch)) = results.next().await { + batches.push(batch); + } + batches +} + #[tokio::test] async fn page_index_filter_one_col() { let session_ctx = SessionContext::new(); let state = session_ctx.state(); - let task_ctx = state.task_ctx(); // 1.create filter month == 1; let filter = col("month").eq(lit(1_i32)); - let parquet_exec = get_parquet_exec(&state, filter).await; - - let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); - - let batch = results.next().await.unwrap().unwrap(); - + let batches = get_filter_results(&state, filter.clone(), false).await; // `month = 1` from the page index should create below RowSelection // vec.push(RowSelector::select(312)); // vec.push(RowSelector::skip(3330)); // vec.push(RowSelector::select(339)); // vec.push(RowSelector::skip(3319)); // total 651 row - assert_eq!(batch.num_rows(), 651); + assert_eq!(batches[0].num_rows(), 651); + + let batches = get_filter_results(&state, filter, true).await; + assert_eq!(batches[0].num_rows(), 620); // 2. create filter month == 1 or month == 2; let filter = col("month").eq(lit(1_i32)).or(col("month").eq(lit(2_i32))); - - let parquet_exec = get_parquet_exec(&state, filter).await; - - let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); - - let batch = results.next().await.unwrap().unwrap(); - + let batches = get_filter_results(&state, filter.clone(), false).await; // `month = 1` or `month = 2` from the page index should create below RowSelection // vec.push(RowSelector::select(312)); // vec.push(RowSelector::skip(900)); @@ -128,95 +140,78 @@ async fn page_index_filter_one_col() { // vec.push(RowSelector::skip(873)); // vec.push(RowSelector::select(318)); // vec.push(RowSelector::skip(2128)); - assert_eq!(batch.num_rows(), 1281); + assert_eq!(batches[0].num_rows(), 1281); + + let batches = get_filter_results(&state, filter, true).await; + assert_eq!(batches[0].num_rows(), 1180); // 3. create filter month == 1 and month == 12; let filter = col("month") .eq(lit(1_i32)) .and(col("month").eq(lit(12_i32))); + let batches = get_filter_results(&state, filter.clone(), false).await; + assert!(batches.is_empty()); - let parquet_exec = get_parquet_exec(&state, filter).await; - - let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); - - let batch = results.next().await; - - assert!(batch.is_none()); + let batches = get_filter_results(&state, filter, true).await; + assert!(batches.is_empty()); // 4.create filter 0 < month < 2 ; let filter = col("month").gt(lit(0_i32)).and(col("month").lt(lit(2_i32))); - - let parquet_exec = get_parquet_exec(&state, filter).await; - - let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); - - let batch = results.next().await.unwrap().unwrap(); - + let batches = get_filter_results(&state, filter.clone(), false).await; // should same with `month = 1` - assert_eq!(batch.num_rows(), 651); - - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + assert_eq!(batches[0].num_rows(), 651); + let batches = get_filter_results(&state, filter, true).await; + assert_eq!(batches[0].num_rows(), 620); // 5.create filter date_string_col == "01/01/09"`; // Note this test doesn't apply type coercion so the literal must match the actual view type let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8view("01/01/09"))); - let parquet_exec = get_parquet_exec(&state, filter).await; - let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); - let batch = results.next().await.unwrap().unwrap(); + let batches = get_filter_results(&state, filter.clone(), false).await; + assert_eq!(batches[0].num_rows(), 14); // there should only two pages match the filter // min max // page-20 0 01/01/09 01/02/09 // page-21 0 01/01/09 01/01/09 // each 7 rows - assert_eq!(batch.num_rows(), 14); + assert_eq!(batches[0].num_rows(), 14); + let batches = get_filter_results(&state, filter, true).await; + assert_eq!(batches[0].num_rows(), 10); } #[tokio::test] async fn page_index_filter_multi_col() { let session_ctx = SessionContext::new(); let state = session_ctx.state(); - let task_ctx = session_ctx.task_ctx(); // create filter month == 1 and year = 2009; let filter = col("month").eq(lit(1_i32)).and(col("year").eq(lit(2009))); - - let parquet_exec = get_parquet_exec(&state, filter).await; - - let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); - - let batch = results.next().await.unwrap().unwrap(); - + let batches = get_filter_results(&state, filter.clone(), false).await; // `year = 2009` from the page index should create below RowSelection // vec.push(RowSelector::select(3663)); // vec.push(RowSelector::skip(3642)); // combine with `month = 1` total 333 row - assert_eq!(batch.num_rows(), 333); + assert_eq!(batches[0].num_rows(), 333); + let batches = get_filter_results(&state, filter, true).await; + assert_eq!(batches[0].num_rows(), 310); // create filter (year = 2009 or id = 1) and month = 1; // this should only use `month = 1` to evaluate the page index. let filter = col("month") .eq(lit(1_i32)) .and(col("year").eq(lit(2009)).or(col("id").eq(lit(1)))); - - let parquet_exec = get_parquet_exec(&state, filter).await; - - let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); - - let batch = results.next().await.unwrap().unwrap(); - assert_eq!(batch.num_rows(), 651); + let batches = get_filter_results(&state, filter.clone(), false).await; + assert_eq!(batches[0].num_rows(), 651); + let batches = get_filter_results(&state, filter, true).await; + assert_eq!(batches[0].num_rows(), 310); // create filter (year = 2009 or id = 1) // this filter use two columns will not push down let filter = col("year").eq(lit(2009)).or(col("id").eq(lit(1))); - - let parquet_exec = get_parquet_exec(&state, filter).await; - - let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); - - let batch = results.next().await.unwrap().unwrap(); - assert_eq!(batch.num_rows(), 7300); + let batches = get_filter_results(&state, filter.clone(), false).await; + assert_eq!(batches[0].num_rows(), 7300); + let batches = get_filter_results(&state, filter, true).await; + assert_eq!(batches[0].num_rows(), 3650); // create filter (year = 2009 and id = 1) or (year = 2010) // this filter use two columns will not push down @@ -226,13 +221,10 @@ async fn page_index_filter_multi_col() { .eq(lit(2009)) .and(col("id").eq(lit(1))) .or(col("year").eq(lit(2010))); - - let parquet_exec = get_parquet_exec(&state, filter).await; - - let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap(); - - let batch = results.next().await.unwrap().unwrap(); - assert_eq!(batch.num_rows(), 7300); + let batches = get_filter_results(&state, filter.clone(), false).await; + assert_eq!(batches[0].num_rows(), 7300); + let batches = get_filter_results(&state, filter, true).await; + assert_eq!(batches[0].num_rows(), 3651); } async fn test_prune(