Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(pruning): Support IS NOT NULL predicates in PruningPredicate #9208

Merged
49 changes: 44 additions & 5 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,13 +620,20 @@ mod tests {
ParquetStatistics::boolean(Some(false), Some(true), None, 1, false),
],
);
vec![rgm1, rgm2]
let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![
ParquetStatistics::int32(Some(17), Some(30), None, 1, false),
ParquetStatistics::boolean(Some(false), Some(true), None, 0, false),
],
);
vec![rgm1, rgm2, rgm3]
}

#[test]
fn row_group_pruning_predicate_null_expr() {
use datafusion_expr::{col, lit};
// int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
// c1 > 15 and IsNull(c2) => c1_max > 15 and c2_null_count > 0
Comment on lines -629 to +636
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment to reflect the examples given in the test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 This reads more smoothly.

let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
Expand Down Expand Up @@ -657,7 +664,7 @@ mod tests {
use datafusion_expr::{col, lit};
// test row group predicate with an unknown (Null) expr
//
// int > 1 and bool = NULL => c1_max > 1 and null
// c1 > 15 and c2 = NULL => c1_max > 15 and NULL
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
Expand All @@ -672,7 +679,7 @@ mod tests {

let metrics = parquet_file_metrics();
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
// pass predicates. Ideally these should all be false
assert_eq!(
prune_row_groups_by_statistics(
&schema,
Expand All @@ -682,7 +689,39 @@ mod tests {
Some(&pruning_predicate),
&metrics
),
vec![1]
vec![1, 2]
);
}

#[test]
fn row_group_pruning_predicate_not_null_expr() {
use datafusion_expr::{col, lit};
// c1 > 15 and IsNotNull(c2) => c1_max > 15 and c2_null_count = 0
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, false),
Field::new("c2", DataType::Boolean, false),
]));
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
let expr = col("c1").gt(lit(15)).and(col("c2").is_not_null());
let expr = logical2physical(&expr, &schema);
let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();

let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
&schema,
&schema_descr,
&groups,
None,
Some(&pruning_predicate),
&metrics
),
// The first row group was filtered out because c1_max is 10, which is smaller than 15.
// The second row group was filtered out because it contains null value on "c2".
// The third row group is kept because c1_max is 30, which is greater than 15 AND
// it does NOT contain any null value on "c2".
vec![2]
);
}

Expand Down
63 changes: 63 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ pub trait PruningStatistics {
/// `x < 5` | `x_max < 5`
/// `x = 5 AND y = 10` | `x_min <= 5 AND 5 <= x_max AND y_min <= 10 AND 10 <= y_max`
/// `x IS NULL` | `x_null_count > 0`
/// `x IS NOT NULL` | `x_null_count = 0`
///
/// ## Predicate Evaluation
/// The PruningPredicate works in two passes
Expand Down Expand Up @@ -1120,6 +1121,34 @@ fn build_is_null_column_expr(
}
}

/// Given an expression reference to `expr`, if `expr` is a column expression,
/// returns a pruning expression in terms of IsNotNull that will evaluate to true
/// if the column does NOT contain null, and false if it may contain null
fn build_is_not_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
required_columns: &mut RequiredColumns,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
let field = schema.field_with_name(col.name()).ok()?;

let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNotNull(column) => null_count = 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Eq,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
})
.ok()
} else {
None
}
}

/// The maximum number of entries in an `InList` that might be rewritten into
/// an OR chain
const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
Expand All @@ -1146,6 +1175,14 @@ fn build_predicate_expression(
return build_is_null_column_expr(is_null.arg(), schema, required_columns)
.unwrap_or(unhandled);
}
if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
return build_is_not_null_column_expr(
is_not_null.arg(),
schema,
required_columns,
)
.unwrap_or(unhandled);
}
if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
return build_single_column_expr(col, schema, required_columns, false)
.unwrap_or(unhandled);
Expand Down Expand Up @@ -2052,6 +2089,32 @@ mod tests {
Ok(())
}

#[test]
fn row_group_predicate_is_null() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@0 > 0";

let expr = col("c1").is_null();
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);

Ok(())
}

#[test]
fn row_group_predicate_is_not_null() -> Result<()> {
let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
let expected_expr = "c1_null_count@0 = 0";

let expr = col("c1").is_not_null();
let predicate_expr =
test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
assert_eq!(predicate_expr.to_string(), expected_expr);

Ok(())
}

#[test]
fn row_group_predicate_required_columns() -> Result<()> {
let schema = Schema::new(vec![
Expand Down
Loading