-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Project record batches to avoid filtering unused columns in CASE evaluation
#18329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| let mut used_column_indices = HashSet::<usize>::new(); | ||
| let mut collect_column_indices = |expr: &Arc<dyn PhysicalExpr>| { | ||
| expr.apply(|expr| { | ||
| if let Some(column) = expr.as_any().downcast_ref::<Column>() { | ||
| used_column_indices.insert(column.index()); | ||
| } | ||
| Ok(TreeNodeRecursion::Continue) | ||
| }) | ||
| .expect("Closure cannot fail"); | ||
| }; | ||
|
|
||
| if let Some(e) = &self.expr { | ||
| collect_column_indices(e); | ||
| } | ||
| self.when_then_expr.iter().for_each(|(w, t)| { | ||
| collect_column_indices(w); | ||
| collect_column_indices(t); | ||
| }); | ||
| if let Some(e) = &self.else_expr { | ||
| collect_column_indices(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like in projection.rs we have something similar (not pub though)
datafusion/datafusion/physical-plan/src/projection.rs
Lines 900 to 912 in 6eb8d45
| /// Collect all column indices from the given projection expressions. | |
| fn collect_column_indices(exprs: &[ProjectionExpr]) -> Vec<usize> { | |
| // Collect indices and remove duplicates. | |
| let mut indices = exprs | |
| .iter() | |
| .flat_map(|proj_expr| collect_columns(&proj_expr.expr)) | |
| .map(|x| x.index()) | |
| .collect::<std::collections::HashSet<_>>() | |
| .into_iter() | |
| .collect::<Vec<_>>(); | |
| indices.sort(); | |
| indices | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend you use ProjectionExprs from https://github.com/apache/datafusion/blob/main/datafusion/physical-expr/src/projection.rs, if there's manipulations that you think might be duplicated elsewhere or useful to have abstracted feel free to propose adding them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rluvaton I don't think I'll be able to access code in physical_plan since that would create a dependency loop. Should I turn this into a public util function in physical_expr and use that from physical_plan?
@adriangb It's not clear to me how I could make use of ProjectionExprs. update_expr is I think the closest to what this code is trying to do, but what I need is a very limited version of it where I'm only renumbering columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's ProjectionExprs::column_indices which is pub and similar to the non-pub collect_column_indices referenced above. I haven't reviewed this PR in detail but there may be other helper bits that you can use and generally it would be nice if we coalesce projection manipulation into ProjectionExprs because I feel like there's a lot of duplicate code in random places right now (obviously needs to be balanced with keeping the API surface area on ProjectionExprs reasonable).
datafusion/datafusion/physical-expr/src/projection.rs
Lines 314 to 325 in e9431fc
| /// Extract the column indices used in this projection. | |
| /// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1, | |
| /// this function would return `[0, 1]`. | |
| /// Repeated indices are returned only once, and the order is ascending. | |
| pub fn column_indices(&self) -> Vec<usize> { | |
| self.exprs | |
| .iter() | |
| .flat_map(|e| collect_columns(&e.expr).into_iter().map(|col| col.index())) | |
| .sorted_unstable() | |
| .dedup() | |
| .collect_vec() | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
. I haven't reviewed this PR in detail but there may be other helper bits that you can use and generally it would be nice if we coalesce projection manipulation into ProjectionExprs because I feel like there's a lot of duplicate code in random places right now (obviously needs to be balanced with keeping the API surface area on ProjectionExprs reasonable).
I also agree it feels like there is lots of random remapping code floating around
However, that being said it is not a problem this PR introduces (though it may make it slightly worse)
|
Can you please add tests for each eval method and when used all columns and not also check when there are duplicate columns, columns with different name but same index and so on and can you please provide benchmark results |
Co-authored-by: Raz Luvaton <16746759+rluvaton@users.noreply.github.com>
|
@rluvaton I've added some SLTs which cover what you asked for
|
|
Benchmark results so far. I'll do another run with all the lookup table ones, but those take much longer to complete. |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Benchmark results confirm, I think, the waste in filtering unused columns and that the gains can be significant. I am still a bit worried about the heavy handedness of the approach I implemented here. Does the gain in execution speed cost too much during planning? Or is there maybe a simpler way to achieve the same end result? The second version I made of this was more like an optimizer pass. Instead of narrowing the record batch inside the CaseExpr, a “project record batch” expr node was wrapped around it. That has the benefit of not needing the duplicate expr tree. The downside was that this becomes visible in the physical expr tree. Much less an internal implementation detail that way. I was experimenting with an actual |
At the logical level this would be possible though, but there's no Perhaps at the logical-to-physical translation point? Although you need insight into the internals of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pepijnve -- in my opinion this PR could be merged as is and is a very nice refinement.
However, I think we could potentially simplify it / reduce some complexity by avoiding multiple copies of CaseExpr if possible -- I left some suggestions
| let mut used_column_indices = HashSet::<usize>::new(); | ||
| let mut collect_column_indices = |expr: &Arc<dyn PhysicalExpr>| { | ||
| expr.apply(|expr| { | ||
| if let Some(column) = expr.as_any().downcast_ref::<Column>() { | ||
| used_column_indices.insert(column.index()); | ||
| } | ||
| Ok(TreeNodeRecursion::Continue) | ||
| }) | ||
| .expect("Closure cannot fail"); | ||
| }; | ||
|
|
||
| if let Some(e) = &self.expr { | ||
| collect_column_indices(e); | ||
| } | ||
| self.when_then_expr.iter().for_each(|(w, t)| { | ||
| collect_column_indices(w); | ||
| collect_column_indices(t); | ||
| }); | ||
| if let Some(e) = &self.else_expr { | ||
| collect_column_indices(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
. I haven't reviewed this PR in detail but there may be other helper bits that you can use and generally it would be nice if we coalesce projection manipulation into ProjectionExprs because I feel like there's a lot of duplicate code in random places right now (obviously needs to be balanced with keeping the API surface area on ProjectionExprs reasonable).
I also agree it feels like there is lots of random remapping code floating around
However, that being said it is not a problem this PR introduces (though it may make it slightly worse)
| let mut result_builder = ResultBuilder::new(return_type, batch.num_rows()); | ||
|
|
||
| // `remainder_rows` contains the indices of the rows that need to be evaluated | ||
| let mut remainder_rows: ArrayRef = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it took me som reading to understand the point of the PR is to remove unused columns carried along in remainder_rows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been living in this little corner of DataFusion for too long already it seems 😄 The reason for this PR is indeed that:
- we need to filter
RecordBatchvalues - filtering a
RecordBatchfilters all columns of the batch - the when, then, and else expressions may only reference a few of the columns of the batch
Any time spent filtering the unreferenced columns is unnecessary work.
The comment you suggested will hopefully clarify that for future readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the other key thing to realize is that Case is being evaluated incrementally -- and any rows that have not yet matched one of the case expressions are carried (copied) through (rather than tracked with a bitmask for example)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. What's easy to miss in the original code is that, even though it was tracking remaining rows with a bit mask, by calling PhysicalExpr#evaluate_selection you still pay the cost of carrying/copying because there are no true selection vector based conditional evaluation implementations. evaluate_selection just hides the filtering/scattering cost from sight.
This led me to wonder if evaluate_selection is actually useful. In DataFusion itself case is the only user of it and at the moment the only remaining usage is in expr_or_expr. Even there its usage is dubious. I did a little experiment in https://github.com/pepijnve/datafusion/tree/expr_or_expr where I avoid the 'scatter' cost using an unaligned zip implementation and get better performance that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like another follow on PR 🤪
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here it is #18444.
| projected: &ProjectedCaseBody, | ||
| ) -> Result<ColumnarValue> { | ||
| let return_type = self.data_type(&batch.schema())?; | ||
| if projected.projection.len() < batch.num_columns() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was confused about this check -- when would projection.len() be greater? When all the input columns are used?
If that is correct, can you please add some comments about that invariant in ProjectedCaseBody? Or maybe even better you could represent the idea of no projection more explicity
Perhaps soemthing like
pub struct CaseExpr {
/// The case expression body
body: CaseBody,
/// Optional projection to apply
projection: Option<Projection>,
/// Evaluation method to use
eval_method: EvalMethod,
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would projection.len() be greater?
It will never be greater (or shouldn't be at least), but may be equal when all the input columns are used indeed.
The reason this is necessary in the first place is because at construction time of the CaseExpr you're flying blind wrt the schema. If the set of used columns is for instance 0, 1, 2 there's no way to know if that's all of them or a prefix of the full schema. Unfortunately that necessitates this per evaluate check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb maybe this is another instance where some sort of "optimize/specialize/compile given the schema / children" would be helpful
| /// [ELSE result] | ||
| /// END | ||
| NoExpression, | ||
| NoExpression(ProjectedCaseBody), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am somewhat concerned about the duplication here -- the EvalMethod has a CaseBody (embedded in ProjectedCaseBody) but the CaseExpr also (still) has a CaseBody -- could they get out of sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory yes, in practice no. with_new_children calls CaseExpr::try_new which will derive a new projected version. So at least through the public API there's no way to get the two out of sync.
I wanted to avoid that case would silently rewrite the externally visible expressions. The projected variant may not match the schema of the execution plan anymore so I felt it would be a bad idea to let this leak out.
I need to keep the rewritten version somewhere in order to evaluate it so I ended up putting that in the EvalMethods that would actually make use of it.
That being said, I'm not entirely happy with this solution myself either. It's the best I could come up with that completely hides what's going on from the outside world.
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
|
I'll plan to merge this PR tomorrow unless anyone would like more time to comment |
CASE evaluation
|
Thanks again @pepijnve -- this one is very exciting |
…aluation (apache#18329) ## Which issue does this PR close? - Closes apache#18056 - Part of apache#18075 ## Rationale for this change When `CaseExpr` needs to evaluate a `PhysicalExpr` for a subset of the rows of the input `RecordBatch` it will first filter the record batch using a selection vector. This filter steps filters all columns of the `RecordBatch`, including ones that may not be accessed by the `PhysicalExpr`. For wide (many columns) record batches and narrow expressions (few column references) it can be beneficial to project the record batch first to reduce the amount of wasted filtering work. ## What changes are included in this PR? This PR attempts to reduce the amount of time spent filtering columns unnecessarily by reducing the columns of the record batch prior to filtering. Since this renumbers the columns, it is also required to derive new versions of the `when`, `then`, and `else` expressions that have corrected column references. To make this more manageable the set of child expressions of a `case` expression are collected in a new struct named `CaseBody`. The projection logic derives a projection vector and a projected `CaseBody`. This logic is only used when the number of used columns (the length of the projection vector) is less than the number of columns of the incoming record batch. Certain evaluation methods in `case` do not perform any filtering. These remain unchanged and will never perform the projection logic since this is only beneficial when filtering of record batches is required. ## Are these changes tested? - Covered by existing tests ## Are there any user-facing changes? No --------- Co-authored-by: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
SELECT *, CASE ... END#18056Rationale for this change
When
CaseExprneeds to evaluate aPhysicalExprfor a subset of the rows of the inputRecordBatchit will first filter the record batch using a selection vector. This filter steps filters all columns of theRecordBatch, including ones that may not be accessed by thePhysicalExpr. For wide (many columns) record batches and narrow expressions (few column references) it can be beneficial to project the record batch first to reduce the amount of wasted filtering work.What changes are included in this PR?
This PR attempts to reduce the amount of time spent filtering columns unnecessarily by reducing the columns of the record batch prior to filtering. Since this renumbers the columns, it is also required to derive new versions of the
when,then, andelseexpressions that have corrected column references.To make this more manageable the set of child expressions of a
caseexpression are collected in a new struct namedCaseBody. The projection logic derives a projection vector and a projectedCaseBody.This logic is only used when the number of used columns (the length of the projection vector) is less than the number of columns of the incoming record batch.
Certain evaluation methods in
casedo not perform any filtering. These remain unchanged and will never perform the projection logic since this is only beneficial when filtering of record batches is required.Are these changes tested?
Are there any user-facing changes?
No