-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial support for functional dependencies handling primary key and unique constraints #7040
Conversation
# Conflicts: # datafusion/core/tests/sql/timestamp.rs # datafusion/core/tests/sqllogictests/test_files/groupby.slt # datafusion/expr/src/logical_plan/builder.rs
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
Co-authored-by: Metehan Yıldırım <100111937+metesynnada@users.noreply.github.com>
…ada-ai/arrow-datafusion into feature/primary_key_utilize
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
Marking as draft given @ozankabak's suggestion #7040 (comment) as feedback is incorporated |
Sorry -- I missed the update. I will review this shortly |
@alamb, we decided to do a couple more refactors to support constraints involving composite keys, primary key constraints and uniqueness constraints neatly within this PR. We will only leave foreign keys as future work so that TD is minimal. I am marking this as a draft until it is ready for you. |
Sounds good -- thank you @ozankabak |
@alamb, I have implemented your suggestions. Now we support unique and primary key constraints at the source (foreign key not supported yet). I have extended existing tests, to cover unique and composite primary key cases. It is now ready for further review |
This is looking very good now! It provides a good foundation to build even more functional dependencies as we move forward and we already have some plans for that :) |
Awesome -- can't wait to review this. Will do so first thing tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @mustafasrepo -- I went over this PR carefully and it looks really nice to me
Suggestions for this PR or follow on PRs:
- double check that the constraint calculation should be the sql planner.
- Consider adding
Constraints
to make the TableProvider cleaner
I left some other minor comments but otherwise I think this is good to merge
SortPreservingMergeExec: [sn@0 ASC NULLS LAST] | ||
--SortExec: expr=[sn@0 ASC NULLS LAST] | ||
----ProjectionExec: expr=[sn@0 as sn, amount@1 as amount, 2 * CAST(sn@0 AS Int64) as Int64(2) * s.sn] | ||
------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as amount], aggr=[] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the plan is grouping on a unique column I think the AggregateExec
could be avoided entirely because each group will have exactly one non null row
Maybe that would be good optimization to add in a follow on PR
datafusion/sql/src/select.rs
Outdated
@@ -431,6 +432,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { | |||
group_by_exprs: Vec<Expr>, | |||
aggr_exprs: Vec<Expr>, | |||
) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)> { | |||
let schema = input.schema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain why the dependency logic needs to be in the SQL planner? It seems more general than just SQL (it should apply to dataframes as well)
Also, this function calls LogicalPlanBuilder::aggregate
which then eventally calls Aggregate::try_new_with_schema
which also re-calculates functional dependency information
Maybe this logic could be put in Aggregate::try_new_with_schema
if it is not redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In here, we update group by exprs by the expressions that are in target indices (if they are in the select exprs). With this change, we can use dependents that are among select exprs, when their determinant is inside group by expression.
We can do so inside Aggregate::try_new_with_schema
if we pass select_expr
argument to it. I thought it is a bit weird for aggregate to receive select_expr
argument. After this PR merges, I can file another PR, that implements this change. Then we can decide which implementation is better. In any case, I have moved groupby update logic to its own function to not clutter this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it is a bit weird for aggregate to receive select_expr argument.
Yes I agree that sounds weird. I still don't understand why the functional dependency can't be computed from the plan itself (e.g. from the annotations on DFSchema and then the structure of the LogicalPlan::Aggregate / LogicalPlan::Project, etc)
If the logic is in the sql planner, that implies the calculation relies on information that is not encoded in the plan itself (and thus may be lost in subsequent optimizer passes, for example)
After this PR merges, I can file another PR, that implements this change. Then we can decide which implementation is better. In any case, I have moved groupby update logic to its own function to not clutter this method.
Sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree that sounds weird. I still don't understand why the functional dependency can't be computed from the plan itself
Currently, if the query is
SELECT sn, amount
FROM sales_global
GROUP BY sn
and we know that sn
is primary key. We rewrite query above as
SELECT sn, amount
FROM sales_global
GROUP BY sn, amount
so that select either contains group by expressions or aggregate expressions. This update is done here.
However, if the query were
SELECT sn
FROM sales_global
GROUP BY sn
we wouldn't rewrite query as
SELECT sn
FROM sales_global
GROUP BY sn, amount
hence, amount is not used and. We do not need to update group by here.
For this reason, we need the information of select exprs
to decide how to extend group by expressions to support a query.
However, if we were to extend group by expressions with their functional dependencies in all cases. We won't need the select exprs
for this decision. Such as, we would treat
SELECT sn
FROM sales_global
GROUP BY sn
as
SELECT sn
FROM sales_global
GROUP BY sn, amount
under the hood.
What do you think about extending group by expression in all cases, without checking for select exprs
. I think this is a bit confusing.
In short, functional dependency can be calculated without external information. However, query rewrite cannot be done without resorting to the information in the select exprs
in current design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about extending group by expression in all cases, without checking for select exprs. I think this is a bit confusing.
I agree
In short, functional dependency can be calculated without external information. However, query rewrite cannot be done without resorting to the information in the select exprs in current design.
I think this is the key point I was missing -- that the code in the sql planner is actually doing a SQL level rewrite. Thank you for clarifying.
What would you think about pulling the rewrite code into its own function and adding a clear explanation of what it was doing (basically copy/paste your example from above)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Conflicts: # datafusion/core/tests/sqllogictests/test_files/groupby.slt
Thanks @mustafasrepo and @ozankabak -- this is a really nice step forward |
Which issue does this PR close?
Closes #6190.
Rationale for this change
Primary key information specifies that some of the columns in the table, would consist of unique values. This contract enables us to support additional queries (esp. on unbounded data), currently cannot be run by datafusiuon.
Consider the query below
When
sn
isPRIMARY KEY
(eachsn
will have unique values) we know that, all the columns the tablesales_global
can be emitted after aggregation (since for each group all rows would have same value). In terms of functionality, above query and following queryare same. The reason is that, since column
sn
already consists of unique values; bothGROUP BY sn, amount
andGROUP BY sn
will produce groups that consist of single row.Above query can run in Postgre. However, datafusion can only emit
sn
after aggregation from the original table. With this PR above query also supported by Datafusion.As part of this PR, we keep track of identifier key (where primary key is a special case) to accomplish this. To illusturate what an identifier key is
Consider table below
In table above,
column a
consists of unique values, it is primary key. We store this information asthis means that, when we know the value of
0th
column, we will know the values of0th and 1st
columns deterministically and each key will be unique.As an another example, consider table below,
In the example above,
column a
is not primary key, however, still knowing value ofa
enables us to know value ofb
deterministically. We encode this information asAll of these information are propagated from the primary key information at the source. These analysis enables us to support complex queries. As an example, while we can support query below
we reject, following query
because second query, uses an unassociated column(
r.sn
) withl.sn
after group by, whereas, first query uses only associated columns(r.sn
,r.amount
) withr.sn
during projection.What changes are included in this PR?
Are these changes tested?
Yes, new tests that show supported functionality are added to the
groupby.slt
file.Are there any user-facing changes?