-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: ParquetExec logical expr. => phys. expr. #5419
Conversation
&expr, | ||
&table_df_schema, | ||
&self.table_schema, | ||
state.execution_props(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uses the real ExeuctionProps
now :)
use parquet::arrow::parquet_to_arrow_schema; | ||
use parquet::file::reader::{FileReader, SerializedFileReader}; | ||
use rand::prelude::*; | ||
|
||
// Assume a column expression for a column not in the table schema is a projected column and ignore it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is no longer relevant because you cannot lower such expression from logical to physical.
I've also fixed a few test expressions that where buggy (or at least would require type coercion which would run on the logical plan), most notably:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @crepererum -- I went through this PR's code and tsets carefully and it made sense to me. Thank you
Can you please confirm that passing a false
predicate to the pruning logic on the main branch will not prune anything as well (aka that there is no behavior change?)
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> { | ||
if let Expr::Column(column) = expr { | ||
if let Ok(idx) = self.file_schema.index_of(&column.name) { | ||
impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for FilterCandidateBuilder<'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool to see that there is an equivalent logical and physical rewriter
@@ -324,12 +329,15 @@ mod tests { | |||
fn row_group_pruning_predicate_partial_expr() { | |||
use datafusion_expr::{col, lit}; | |||
// test row group predicate with partially supported expression | |||
// int > 1 and int % 2 => c1_max > 1 and true | |||
let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2))); | |||
// (int > 1) and ((int % 2) = 0) => c1_max > 1 and true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you - this is a good change
/// | ||
/// This works recursively. | ||
pub fn get_phys_expr_columns(pred: &Arc<dyn PhysicalExpr>) -> HashSet<Column> { | ||
let mut rewriter = ColumnCollector::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 this is quite clever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting! We had the same need (collecting columns) emerge in SHJ implementation, so we used this more lightweight recursion:
fn collect_columns_recursive(expr: &Arc<dyn PhysicalExpr>, columns: &mut Vec<Column>) {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
if !columns.iter().any(|c| c.eq(column)) {
columns.push(column.clone())
}
}
expr.children()
.iter()
.for_each(|e| collect_columns_recursive(e, columns))
}
fn collect_columns(expr: &Arc<dyn PhysicalExpr>) -> Vec<Column> {
let mut columns = vec![];
collect_columns_recursive(expr, &mut columns);
columns
}
We used a Vec
instead of a HashSet
due to anticipated small sizes, but the code is essentially the same 🙂
This makes me think that doing a comprehensive code review and collecting/coalescing/documenting utilities such as this may simplify the codebase, and could be a worthy pursuit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ozankabak the issue w/ Vec
is that you have a O(n^2)
complexity in the number of used columns. In InfluxDB IOx we sometimes have schemas w/ over 200 columns and I'm somewhat worried that such a simple oversight quickly turns into a performance bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me think that doing a comprehensive code review and collecting/coalescing/documenting utilities such as this may simplify the codebase, and could be a worthy pursuit.
This absolutely would be a worthy pursuit -- is that something you could help lead @ozankabak ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, happy to help!
@@ -84,7 +84,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { | |||
&self, | |||
state: &SessionState, | |||
conf: FileScanConfig, | |||
filters: &[Expr], | |||
filters: Option<&Arc<dyn PhysicalExpr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a good change to have this take a single PhysicalExpr
rather than a slice as it is more consistent with TableProvider::scan
👍
let stat_schema = Schema::new(stat_fields); | ||
let stat_dfschema = DFSchema::try_from(stat_schema.clone())?; | ||
|
||
// TODO allow these properties to be passed in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 now it uses the real thing . Nice!
min_column_expr | ||
.lt_eq(expr_builder.scalar_expr().clone()) | ||
.and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr)) | ||
Arc::new(phys_expr::BinaryExpr::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is unfortunate there isn't as nice "fluent" style APIs for building physical expressions. Maybe we can add a PhysicalExprExt
trait that knew how to do so.
I'll file a ticket to track doing so as a follow on PR
// false | ||
// constant literals that do NOT refer to any columns are currently not evaluated at all, hence the result is | ||
// "all true" | ||
let expr = lit(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is unfortunate (an all false predicate should skip the entire file). However, as long as this PR doesn't make the pruning worse in this situation I think it is fine (and we can improve things in a follow on PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not making things worse. This was the state of the art even before #5386, but I was asked here #5386 (comment) for the concrete semantics and I couldn't find a test that already covered that, so I've added one.
Use `Arc<dyn PhysicalExpr>` instead of `Expr` within `ParquetExec` and move lowering from logical to physical expression into plan lowering (e.g. `ListingTable`). This is in line w/ all other physical plan nodes (e.g. `FilterExpr`) and simplifies reasoning within physical optimizer but also allows correct passing of `ExecutionProps` into the conversion. Closes apache#4695.
3dae660
to
b909edf
Compare
Benchmark runs are scheduled for baseline = c4b8958 and contender = 03fbf9f. 03fbf9f is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
This PR appears to have introduced a regression -- see #5708 |
This feature is not implemented: Physical plan does not support logical expression EXISTS () version: 19.0.0 worked. |
Which issue does this PR close?
Closes #4695.
Rationale for this change
Use
Arc<dyn PhysicalExpr>
instead ofExpr
withinParquetExec
and move lowering from logical to physical expression into plan lowering (e.g.ListingTable
).This is in line w/ all other physical plan nodes (e.g.
FilterExpr
) and simplifies reasoning within physical optimizer but also allows correct passing ofExecutionProps
into the conversion.What changes are included in this PR?
Basically a bunch of type changes. The tests still use logical expressions because they are easier to construct and its a simple conversion from logical to phys. (at least in the test cases).
Are these changes tested?
All existing tests pass. Removed tests that no longer apply. Added one to demonstrate
PruningPredicate
behavior when no columns are referenced (see #5386 (comment) ).Are there any user-facing changes?
ParquetExec
arguments changed.