Skip to content

Commit

Permalink
Fix predicate pushdown bugs: project columns within DatafusionArrowPr…
Browse files Browse the repository at this point in the history
…edicate (apache#4005) (apache#4006) (apache#4021)

* Project columns within DatafusionArrowPredicate (apache#4005) (apache#4006)

* Add test

* Format

* Fix merge blunder

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
2 people authored and Dandandan committed Nov 5, 2022
1 parent c0e7987 commit f9bff57
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
28 changes: 28 additions & 0 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,34 @@ mod tests {
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}

#[tokio::test]
async fn multi_column_predicate_pushdown() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);

// Columns in different order to schema
let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));

// read/write them files:
let read = round_trip_to_parquet(vec![batch1], None, None, Some(filter), true)
.await
.unwrap();

let expected = vec![
"+-----+----+",
"| c1 | c2 |",
"+-----+----+",
"| Foo | 1 |",
"| bar | |",
"+-----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_incompatible_types() {
let c1: ArrayRef =
Expand Down
25 changes: 22 additions & 3 deletions datafusion/core/src/physical_plan/file_format/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ use crate::physical_plan::metrics;
#[derive(Debug)]
pub(crate) struct DatafusionArrowPredicate {
physical_expr: Arc<dyn PhysicalExpr>,
projection: ProjectionMask,
projection_mask: ProjectionMask,
projection: Vec<usize>,
/// how many rows were filtered out by this predicate
rows_filtered: metrics::Count,
/// how long was spent evaluating this predicate
Expand All @@ -90,9 +91,22 @@ impl DatafusionArrowPredicate {
let physical_expr =
create_physical_expr(&candidate.expr, &df_schema, &schema, &props)?;

// ArrowPredicate::evaluate is passed columns in the order they appear in the file
// If the predicate has multiple columns, we therefore must project the columns based
// on the order they appear in the file
let projection = match candidate.projection.len() {
0 | 1 => vec![],
len => {
let mut projection: Vec<_> = (0..len).collect();
projection.sort_unstable_by_key(|x| candidate.projection[*x]);
projection
}
};

Ok(Self {
physical_expr,
projection: ProjectionMask::roots(
projection,
projection_mask: ProjectionMask::roots(
metadata.file_metadata().schema_descr(),
candidate.projection,
),
Expand All @@ -104,10 +118,15 @@ impl DatafusionArrowPredicate {

impl ArrowPredicate for DatafusionArrowPredicate {
fn projection(&self) -> &ProjectionMask {
&self.projection
&self.projection_mask
}

fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
let batch = match self.projection.is_empty() {
true => batch,
false => batch.project(&self.projection)?,
};

// scoped timer updates on drop
let mut timer = self.time.timer();
match self
Expand Down

0 comments on commit f9bff57

Please sign in to comment.