Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 62 additions & 70 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use crate::parquet::Unit::Page;
use crate::parquet::{ContextWithParquet, Scenario};

use arrow::array::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::PartitionedFile;
Expand All @@ -40,7 +41,11 @@ use futures::StreamExt;
use object_store::path::Path;
use object_store::ObjectMeta;

async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec {
async fn get_parquet_exec(
state: &SessionState,
filter: Expr,
pushdown_filters: bool,
) -> DataSourceExec {
let object_store_url = ObjectStoreUrl::local_filesystem();
let store = state.runtime_env().object_store(&object_store_url).unwrap();

Expand Down Expand Up @@ -78,7 +83,8 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
let source = Arc::new(
ParquetSource::default()
.with_predicate(predicate)
.with_enable_page_index(true),
.with_enable_page_index(true)
.with_pushdown_filters(pushdown_filters),
);
let base_config = FileScanConfigBuilder::new(object_store_url, schema, source)
.with_file(partitioned_file)
Expand All @@ -87,38 +93,44 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
DataSourceExec::new(Arc::new(base_config))
}

async fn get_filter_results(
state: &SessionState,
filter: Expr,
pushdown_filters: bool,
) -> Vec<RecordBatch> {
let parquet_exec = get_parquet_exec(state, filter, pushdown_filters).await;
let task_ctx = state.task_ctx();
let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
let mut batches = Vec::new();
while let Some(Ok(batch)) = results.next().await {
batches.push(batch);
}
batches
}

#[tokio::test]
async fn page_index_filter_one_col() {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();

// 1.create filter month == 1;
let filter = col("month").eq(lit(1_i32));

let parquet_exec = get_parquet_exec(&state, filter).await;

let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();

let batch = results.next().await.unwrap().unwrap();

let batches = get_filter_results(&state, filter.clone(), false).await;
// `month = 1` from the page index should create below RowSelection
// vec.push(RowSelector::select(312));
// vec.push(RowSelector::skip(3330));
// vec.push(RowSelector::select(339));
// vec.push(RowSelector::skip(3319));
// total 651 row
assert_eq!(batch.num_rows(), 651);
assert_eq!(batches[0].num_rows(), 651);

let batches = get_filter_results(&state, filter, true).await;
assert_eq!(batches[0].num_rows(), 620);

// 2. create filter month == 1 or month == 2;
let filter = col("month").eq(lit(1_i32)).or(col("month").eq(lit(2_i32)));

let parquet_exec = get_parquet_exec(&state, filter).await;

let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();

let batch = results.next().await.unwrap().unwrap();

let batches = get_filter_results(&state, filter.clone(), false).await;
// `month = 1` or `month = 2` from the page index should create below RowSelection
// vec.push(RowSelector::select(312));
// vec.push(RowSelector::skip(900));
Expand All @@ -128,95 +140,78 @@ async fn page_index_filter_one_col() {
// vec.push(RowSelector::skip(873));
// vec.push(RowSelector::select(318));
// vec.push(RowSelector::skip(2128));
assert_eq!(batch.num_rows(), 1281);
assert_eq!(batches[0].num_rows(), 1281);

let batches = get_filter_results(&state, filter, true).await;
assert_eq!(batches[0].num_rows(), 1180);

// 3. create filter month == 1 and month == 12;
let filter = col("month")
.eq(lit(1_i32))
.and(col("month").eq(lit(12_i32)));
let batches = get_filter_results(&state, filter.clone(), false).await;
assert!(batches.is_empty());

let parquet_exec = get_parquet_exec(&state, filter).await;

let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();

let batch = results.next().await;

assert!(batch.is_none());
let batches = get_filter_results(&state, filter, true).await;
assert!(batches.is_empty());

// 4.create filter 0 < month < 2 ;
let filter = col("month").gt(lit(0_i32)).and(col("month").lt(lit(2_i32)));

let parquet_exec = get_parquet_exec(&state, filter).await;

let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();

let batch = results.next().await.unwrap().unwrap();

let batches = get_filter_results(&state, filter.clone(), false).await;
// should same with `month = 1`
assert_eq!(batch.num_rows(), 651);

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
assert_eq!(batches[0].num_rows(), 651);
let batches = get_filter_results(&state, filter, true).await;
assert_eq!(batches[0].num_rows(), 620);

// 5.create filter date_string_col == "01/01/09"`;
// Note this test doesn't apply type coercion so the literal must match the actual view type
let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8view("01/01/09")));
let parquet_exec = get_parquet_exec(&state, filter).await;
let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();
let batch = results.next().await.unwrap().unwrap();
let batches = get_filter_results(&state, filter.clone(), false).await;
assert_eq!(batches[0].num_rows(), 14);

// there should only two pages match the filter
// min max
// page-20 0 01/01/09 01/02/09
// page-21 0 01/01/09 01/01/09
// each 7 rows
assert_eq!(batch.num_rows(), 14);
assert_eq!(batches[0].num_rows(), 14);
let batches = get_filter_results(&state, filter, true).await;
assert_eq!(batches[0].num_rows(), 10);
}

#[tokio::test]
async fn page_index_filter_multi_col() {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = session_ctx.task_ctx();

// create filter month == 1 and year = 2009;
let filter = col("month").eq(lit(1_i32)).and(col("year").eq(lit(2009)));

let parquet_exec = get_parquet_exec(&state, filter).await;

let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();

let batch = results.next().await.unwrap().unwrap();

let batches = get_filter_results(&state, filter.clone(), false).await;
// `year = 2009` from the page index should create below RowSelection
// vec.push(RowSelector::select(3663));
// vec.push(RowSelector::skip(3642));
// combine with `month = 1` total 333 row
assert_eq!(batch.num_rows(), 333);
assert_eq!(batches[0].num_rows(), 333);
let batches = get_filter_results(&state, filter, true).await;
assert_eq!(batches[0].num_rows(), 310);

// create filter (year = 2009 or id = 1) and month = 1;
// this should only use `month = 1` to evaluate the page index.
let filter = col("month")
.eq(lit(1_i32))
.and(col("year").eq(lit(2009)).or(col("id").eq(lit(1))));

let parquet_exec = get_parquet_exec(&state, filter).await;

let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();

let batch = results.next().await.unwrap().unwrap();
assert_eq!(batch.num_rows(), 651);
let batches = get_filter_results(&state, filter.clone(), false).await;
assert_eq!(batches[0].num_rows(), 651);
let batches = get_filter_results(&state, filter, true).await;
assert_eq!(batches[0].num_rows(), 310);

// create filter (year = 2009 or id = 1)
// this filter use two columns will not push down
let filter = col("year").eq(lit(2009)).or(col("id").eq(lit(1)));

let parquet_exec = get_parquet_exec(&state, filter).await;

let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();

let batch = results.next().await.unwrap().unwrap();
assert_eq!(batch.num_rows(), 7300);
let batches = get_filter_results(&state, filter.clone(), false).await;
assert_eq!(batches[0].num_rows(), 7300);
let batches = get_filter_results(&state, filter, true).await;
assert_eq!(batches[0].num_rows(), 3650);

// create filter (year = 2009 and id = 1) or (year = 2010)
// this filter use two columns will not push down
Expand All @@ -226,13 +221,10 @@ async fn page_index_filter_multi_col() {
.eq(lit(2009))
.and(col("id").eq(lit(1)))
.or(col("year").eq(lit(2010)));

let parquet_exec = get_parquet_exec(&state, filter).await;

let mut results = parquet_exec.execute(0, task_ctx.clone()).unwrap();

let batch = results.next().await.unwrap().unwrap();
assert_eq!(batch.num_rows(), 7300);
let batches = get_filter_results(&state, filter.clone(), false).await;
assert_eq!(batches[0].num_rows(), 7300);
let batches = get_filter_results(&state, filter, true).await;
assert_eq!(batches[0].num_rows(), 3651);
}

async fn test_prune(
Expand Down