-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Projection Pushdown through user defined LogicalPlan nodes. #9690
Projection Pushdown through user defined LogicalPlan nodes. #9690
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @mustafasrepo -- I think this is a very interesting and useful PR, but I am not sure the handling of parent indicies is quite correct
@@ -1192,4 +1291,24 @@ mod tests { | |||
\n TableScan: test projection=[a]"; | |||
assert_optimized_plan_equal(&plan, expected) | |||
} | |||
|
|||
// Optimize Projections Rule, pushes down projection through users defined logical plan node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please add a few more tests:
- The NoOpUserDefined plan itself refers to column
a
in its expressions - The NoOpUserDefined plan itself refers to column
b
in its expressions - The NoOpUserDefined plan itself refers to column
a + b
in its expressions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added tests for these cases.
let child = children[0]; | ||
let node_schema = extension.node.schema(); | ||
let child_schema = child.schema(); | ||
if let Some(parent_required_indices_mapped) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem right to me -- I don't think we can assume that a column named "foo" on the input is the same as a column named "foo" in the output schema (this isn't true for LogicalPlan::Projection
for example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the information that is needed is "given the parent only needs some of the output columns of this UserDefinedPlan, can it avoid requiring additional input columns"
Given the UserDefinedPlan is basically a black box and the optimizer doesn't know how it works, I think the only way to get this information would be to add some sort of API to UserDefined node
I suggest for this PR, only push down information about used expressions (don't try and incorporate a projection from the parent) and we can handle the parent projection information in a follow on PR
I am thinking of an API like
trait UserDefinedLogicalNode {
...
/// If only the specified indices of the output of node are needed
/// by the consumer, optionally returns a new UserDefinedNode that
/// computes only those indices, in the specified order.
///
/// Return Ok(None), the default, if no such pushdown is possible
///
/// This is used to implement projection pushdown for user defined nodes
fn try_project(&self, indices: &[usize]) -> Result<Option<Arc<dyn Self>>> {
Ok(None)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest for this PR, only push down information about used expressions (don't try and incorporate a projection from the parent) and we can handle the parent projection information in a follow on PR.
This behavior may not be safe. Assuming a user defined operator similar to LogicalPlan::Filter
. Following Plan
Projection(a,b,c)
--UserDefineFilter(d=0)
----TableScan(a,b,c,d)
If we don't incorporate requirements from the parent, user defined filter may insert Projection(d)
below it. Which would produce following invalid plan:
Projection(a,b,c) (a,b,c is missing at its input)
--UserDefineFilter(d=0)
----Projection(d)
------TableScan(a,b,c,d)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking of an API like
trait UserDefinedLogicalNode { ... /// If only the specified indices of the output of node are needed /// by the consumer, optionally returns a new UserDefinedNode that /// computes only those indices, in the specified order. /// /// Return Ok(None), the default, if no such pushdown is possible /// /// This is used to implement projection pushdown for user defined nodes fn try_project(&self, indices: &[usize]) -> Result<Option<Arc<dyn Self>>> { Ok(None) }
I think, implementers of the try_project
needs to insert LogicalPlan::Projection
on top of their input to pushdown projection. This might not be the case, if I am missing something. If that is the case, this might be cumbersome for the users. What about an API something like
/// Returns the necessary expressions at each child to calculate argument.
/// assuming exprs refers to valid fields at the output of the operator.
/// Return Ok(None), the default, Cannot determine necessary expressions at the children to calculate given exprs.
fn necessary_children_exprs(&self, exprs: &[Expr]) -> Option<Vec<Vec<Expr>>> {
Ok(None)
}
with an API like above, we can handle projection insertion outside the trait method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which would produce following invalid plan:
That is a good point.
What about an API something like
That is a great idea, though I don't fully understand why it uses Expr
s
How about a slight refinement in terms of input/output columns:
/// Returns the necessary input columns to this node required to compute
/// the columns in the output schema
///
/// This is used for projection pushdown when DataFusion has determined that
/// only a subset of the output columns of this node are needed by its parents.
/// This API is used to tell DataFusion which, if any, of the input columns are no longer
/// needed.
///
/// Return `Ok(None)`, the default, if this information can not be determined
/// Returns `Ok(input_columns)` with the column indexes from this nodes' input that are
/// needed to compute `output_columns`)
fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<usize>> {
Ok(None)
}
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much better. Knowing the mapping of the Column
s is adequate. Using Expr
has no additional support. Thanks @alamb for this suggestion. I will update this PR to use this mechanism. Then we can discuss further. I will let you know when it ready for further review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I introduced new API fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>>
to this PR (where return type is changed to return result per child). I also added a new test case showing the support for user defined nodes with multiple children. This Pr is ready for further review.
I also filed #9698 to try and clarify the documentation about what |
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
I think, you are right, we shouldn't rely on name match during analysis. |
I am sorry -- I am behind this week on reviews. I will give this one another look shortly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @mustafasrepo -- this PR looks good to me.
I had a few suggestions that would be nice to fix prior to merging but I don't think they are required
Thanks again @mustafasrepo |
) * Naive support for schema preserving plans * Add mapping support between schemas * Fix name * Update comment * Update comment * Do not calculate mapping for unnecessary sections * Update datafusion/optimizer/src/optimize_projections.rs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * Add new tests * Add new api to get necessary columns * Add new test for multi children * Address reviews --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
Improves the situation on #9146.
Rationale for this change
Currently, we do not have support for projection pushdown through user defined logical plan nodes. This can cause moving around unnecessary columns during execution, when plan involves user defined logical plan node.
What changes are included in this PR?
This PR adds support for projection pushdown through user defined logical plan nodes, when
Are these changes tested?
Yes
Are there any user-facing changes?