Skip to content

Commit

Permalink
Project columns within DatafusionArrowPredicate (apache#4005) (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 29, 2022
1 parent 6860ae4 commit 34229e8
Showing 1 changed file with 23 additions and 4 deletions.
27 changes: 23 additions & 4 deletions datafusion/core/src/physical_plan/file_format/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ use std::sync::Arc;
#[derive(Debug)]
pub(crate) struct DatafusionArrowPredicate {
physical_expr: Arc<dyn PhysicalExpr>,
projection: ProjectionMask,
projection_mask: ProjectionMask,
projection: Vec<usize>,
}

impl DatafusionArrowPredicate {
Expand All @@ -82,22 +83,40 @@ impl DatafusionArrowPredicate {
let physical_expr =
create_physical_expr(&candidate.expr, &df_schema, &schema, &props)?;

// ArrowPredicate::evaluate is passed columns in the order they appear in the file
// If the predicate has multiple columns, we therefore must project the columns based
// on the order they appear in the file
let projection = match candidate.projection.len() {
0 | 1 => vec![],
len => {
let mut projection: Vec<_> = (0..len).collect();
projection.sort_by_key(|x| candidate.projection[*x]);
projection
}
};

Ok(Self {
physical_expr,
projection: ProjectionMask::roots(
projection,
projection_mask: ProjectionMask::roots(
metadata.file_metadata().schema_descr(),
candidate.projection,
),
)
})
}
}

impl ArrowPredicate for DatafusionArrowPredicate {
fn projection(&self) -> &ProjectionMask {
&self.projection
&self.projection_mask
}

fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
let batch = match self.projection.is_empty() {
true => batch,
false => batch.project(&self.projection)?
};

match self
.physical_expr
.evaluate(&batch)
Expand Down

0 comments on commit 34229e8

Please sign in to comment.