Skip to content

Decorrelate scalar subqueries with more complex filter expressions #14554

@duongcongtoai

Description

@duongcongtoai

Is your feature request related to a problem or challenge?

Datafusion already support decorrelating simple scalar subqueries in this PR: #6457

This follow the first approach in TUM paper (simple unnesting), and allow decorrelating this simple query

explain select t1.t1_int from t1 where (select count(*) from t2 where t1.t1_id = t2.t2_id) < t1.t1_int

However, if we add an or condition this subquery

explain select t1.t1_int from t1 where (select count(*) from t2 where t1.t1_id = t2.t2_id or t1.t1_name=t2.t2_name) < t1.t1_int

Datafusion cannot decorrelate it

+--------------+----------------------------------------------------------------------------------------+
| plan_type    | plan                                                                                   |
+--------------+----------------------------------------------------------------------------------------+
| logical_plan | Projection: t1.t1_int                                                                  |
|              |   Filter: (<subquery>) < CAST(t1.t1_int AS Int64)                                      |
|              |     Subquery:                                                                          |
|              |       Projection: count(*)                                                             |
|              |         Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]                  |
|              |           Filter: outer_ref(t1.t1_id) = t2.t2_id OR outer_ref(t1.t1_name) = t2.t2_name |
|              |             TableScan: t2                                                              |
|              |     TableScan: t1 projection=[t1_id, t1_name, t1_int]                                  |
+--------------+----------------------------------------------------------------------------------------+

Describe the solution you'd like

Support decorrelating this query following the second method mentioned in the paper

Describe alternatives you've considered

No response

Additional context

General framework for decorrelation maybe discussed here #5492

But the steps needed to make this work is followed

Allow decorrelation for this type of filter exprs in this code:

self.can_pull_over_aggregation = self.can_pull_over_aggregation

Add more logic to handle complex query decorrelation:

  • Build domain/magic relation
  • Rewrite the subquery to join inner table (table of the subquery) with domain/magic relation using its complex filter expression (i.e t2.t2_id = domain.t1_id OR t2.t2_name = domain.t1_name)
  • Rewrite aggregation to group by the additional columns mentioned in the domain/magic relation
  • Join the outer relation with the newly built aggregation

For example the above mentioned query may be rewritten like

explain select t1.t1_int from t1,
(
    select count(*) as count_all, domain.t1_id as t1_id, domain.t1_name as t1_name from (
        select distinct t1_id, t1_name from t1
    ) as domain join t2 where t2.t2_id = domain.t1_id or t2.t2_name=domain.t1_name 
    group by domain.t1_id, domain.t1_name
) as pulled_up
where t1.t1_id=pulled_up.t1_id and pulled_up.count_all < t1.t1_int

Logical plan may look like

| logical_plan  | Projection: t1.t1_int                                                                                                                           |
|               |   Inner Join: t1.t1_id = pulled_up.t1_id Filter: pulled_up.count_all < CAST(t1.t1_int AS Int64)                                                 |
|               |     TableScan: t1 projection=[t1_id, t1_int]                                                                                                    |
|               |     SubqueryAlias: pulled_up                                                                                                                    |
|               |       Projection: count(*) AS count_all, domain.t1_id                                                                                           |
|               |         Aggregate: groupBy=[[domain.t1_id, domain.t1_name]], aggr=[[count(Int64(1)) AS count(*)]]                                               |
|               |           Projection: domain.t1_id, domain.t1_name                                                                                              |
|               |             Inner Join:  Filter: t2.t2_id = domain.t1_id OR t2.t2_name = domain.t1_name                                                         |
|               |               SubqueryAlias: domain                                                                                                             |
|               |                 Aggregate: groupBy=[[t1.t1_id, t1.t1_name]], aggr=[[]]                                                                          |
|               |                   TableScan: t1 projection=[t1_id, t1_name]                                                                                     |
|               |               TableScan: t2 projection=[t2_id, t2_name] 

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions