Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make UnKnownColumns not equal to others physical exprs #11536

Merged
merged 4 commits into from
Jul 19, 2024

Conversation

jonahgao
Copy link
Member

@jonahgao jonahgao commented Jul 18, 2024

Which issue does this PR close?

Closes #11409.

Rationale for this change

After certain optimizations such as ProjectionPushdown, the children of InterleaveExec can no longer meet the interleave condition because their output_partitioning becomes inconsistent.
Perhaps in this situation, we could consider falling back to UnionExec from InterleaveExec instead of throwing an error.

Update

We determine that union inputs can interleave when they have the same hash partition spec.

pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(

But this may be incorrect if the expressions in the partition spec contain UnKnownColumn.

When the partitioning expression no longer appears in the plan's output, for example, if the projection expressions in ProjectionExec no longer include it, this expression is converted into an UnknownColumn, using expr::to_string() as its name. This expression comes from the child of current plan. It is invalid for the current plan and shouldn't be used for evaluation or making decisions against the current plan.

Arc::new(UnKnownColumn::new(&expr.to_string()))

Even if two UnknownColumns have the same name, their source expressions might not be the same.
For example, in issue #11409, the partition expressions of both inputs of InterleaveExec are UnKnownColumn { name: "v0@0" }, which corresponds to a Column expression named v0 with index 0. However, this Column expression references the output of the child plan of these inputs, and their corresponding source expressions are CAST(t1.v0 AS Float64) and t2.v0, respectively.

# InterleaveExec Input-0
ProjectionExec: expr=[v2@2 as v2, v0@1 as v0], schema=[v2:Int64;N, v0:Int64;N], O=Hash([UnKnownColumn { name: "v0@0" }], 2)
  ProjectionExec: expr=[v0@0 as v0, v0@1 as v0, v2@2 as v2], schema=[v0:Float64;N, v0:Int64;N, v2:Int64;N], O=Hash([Column { name: "v0", index: 0 }], 2)
    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(v0@0, CAST(t1.v0 AS Float64)@2)], schema=[v0:Float64;N, v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N], O=Hash([Column { name: "CAST(t1.v0 AS Float64)", index: 3 }], 2)
      RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, schema=[v0:Float64;N], O=Hash([Column { name: "v0", index: 0 }], 2)
        MemoryExec: partitions=1, partition_sizes=[1], schema=[v0:Float64;N], O=UnknownPartitioning(1)
      RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), input_partitions=1, schema=[v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N], O=Hash([Column { name: "CAST(t1.v0 AS Float64)", index: 2 }], 2)
        ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)], schema=[v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N], O=UnknownPartitioning(1)
          MemoryExec: partitions=1, partition_sizes=[4], schema=[v0:Int64;N, v2:Int64;N], O=UnknownPartitioning(1)

# InterleaveExec Input-1
ProjectionExec: expr=[v2@2 as v2, v0@1 as v0], schema=[v2:Int64;N, v0:Int64;N], O=Hash([UnKnownColumn { name: "v0@0" }], 2)
  ProjectionExec: expr=[v0@0 as v0, v0@1 as v0, v2@2 as v2], schema=[v0:Float64;N, v0:Int64;N, v2:Int64;N], O=Hash([Column { name: "v0", index: 0 }], 2)
    ProjectionExec: expr=[v0@3 as v0, v0@0 as v0, v2@1 as v2, CAST(t1.v0 AS Float64)@2 as CAST(t1.v0 AS Float64)], schema=[v0:Float64;N, v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N], O=Hash([Column { name: "v0", index: 0 }], 2)
      HashJoinExec: mode=Partitioned, join_type=Inner, on=[(CAST(t1.v0 AS Float64)@2, v0@0)], schema=[v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N, v0:Float64;N], O=Hash([Column { name: "v0", index: 3 }], 2)
        RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), input_partitions=2, schema=[v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N], O=Hash([Column { name: "CAST(t1.v0 AS Float64)", index: 2 }], 2)
          ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)], schema=[v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N], O=RoundRobinBatch(2)
            RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, schema=[v0:Int64;N, v2:Int64;N], O=RoundRobinBatch(2)
              FilterExec: v2@1 IS NULL, schema=[v0:Int64;N, v2:Int64;N], O=UnknownPartitioning(1)
                MemoryExec: partitions=1, partition_sizes=[4], schema=[v0:Int64;N, v2:Int64;N], O=UnknownPartitioning(1)
        RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, schema=[v0:Float64;N], O=Hash([Column { name: "v0", index: 0 }], 2)
          MemoryExec: partitions=1, partition_sizes=[1], schema=[v0:Float64;N], O=UnknownPartitioning(1)

After ProjectionPushdown creates new ProjectionExec, it performs some degree of restoration on the UnKnownColumn, they become different and no longer meet the interleave condition.

# New InterleaveExec Input-0:
ProjectionExec: expr=[v2@1 as v2, v0@0 as v0], schema=[v2:Int64;N, v0:Int64;N], O=Hash([UnKnownColumn { name: "CAST(t1.v0 AS Float64)@3" }], 2)
  HashJoinExec: mode=Partitioned, join_type=Inner, on=[(v0@0, CAST(t1.v0 AS Float64)@2)], projection=[v0@1, v2@2], schema=[v0:Int64;N, v2:Int64;N], O=Hash([UnKnownColumn { name: "CAST(t1.v0 AS Float64)@3" }], 2)
    RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, schema=[v0:Float64;N], O=Hash([Column { name: "v0", index: 0 }], 2)
      MemoryExec: partitions=1, partition_sizes=[1], schema=[v0:Float64;N], O=UnknownPartitioning(1)
    RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), input_partitions=1, schema=[v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N], O=Hash([Column { name: "CAST(t1.v0 AS Float64)", index: 2 }], 2)
      ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)], schema=[v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N], O=UnknownPartitioning(1)
        MemoryExec: partitions=1, partition_sizes=[4], schema=[v0:Int64;N, v2:Int64;N], O=UnknownPartitioning(1)

# New InterleaveExec Input-1:
ProjectionExec: expr=[v2@1 as v2, v0@0 as v0], schema=[v2:Int64;N, v0:Int64;N], O=Hash([UnKnownColumn { name: "v0@3" }], 2)
  HashJoinExec: mode=Partitioned, join_type=Inner, on=[(CAST(t1.v0 AS Float64)@2, v0@0)], projection=[v0@0, v2@1], schema=[v0:Int64;N, v2:Int64;N], O=Hash([UnKnownColumn { name: "v0@3" }], 2)
    RepartitionExec: partitioning=Hash([CAST(t1.v0 AS Float64)@2], 2), input_partitions=2, schema=[v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N], O=Hash([Column { name: "CAST(t1.v0 AS Float64)", index: 2 }], 2)
      ProjectionExec: expr=[v0@0 as v0, v2@1 as v2, CAST(v0@0 AS Float64) as CAST(t1.v0 AS Float64)], schema=[v0:Int64;N, v2:Int64;N, CAST(t1.v0 AS Float64):Float64;N], O=RoundRobinBatch(2)
        RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, schema=[v0:Int64;N, v2:Int64;N], O=RoundRobinBatch(2)
          FilterExec: v2@1 IS NULL, schema=[v0:Int64;N, v2:Int64;N], O=UnknownPartitioning(1)
            MemoryExec: partitions=1, partition_sizes=[4], schema=[v0:Int64;N, v2:Int64;N], O=UnknownPartitioning(1)
    RepartitionExec: partitioning=Hash([v0@0], 2), input_partitions=1, schema=[v0:Float64;N], O=Hash([Column { name: "v0", index: 0 }], 2)
      MemoryExec: partitions=1, partition_sizes=[1], schema=[v0:Float64;N], O=UnknownPartitioning(1)

What changes are included in this PR?

Are these changes tested?

Yes

Are there any user-facing changes?

No

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jul 18, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am approving this PR as it fixes a bug and a query that didn't run before will now run

However, I think the proposed fix might be treating the symptom rather than the root cause and I think more investigation is warranted. We could merge this PR and file a follow on ticket to investigate for example

Thank you very much @jonahgao

@@ -431,7 +431,12 @@ impl ExecutionPlan for InterleaveExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(InterleaveExec::try_new(children)?))
// New children may not be able to interleave; in that case, we fall back to UnionExec.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how this particular change is beneficial as now the plan runs where it didn't before, so that is hard to argue with and I would be ok merging it in

However, I feel like this might be silently masking some bug in the planner. My personal preference would be to make this an error

if !can_interleave(children.iter()) {
  return internal_err!("Can not create InterleaveExec: new children can not be interleaved");
}

And then track down / fix whatever optimizer pass is causing the children to no longer be interleavable

FYI @mustafasrepo

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied. Thank you @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how this particular change is beneficial as now the plan runs where it didn't before, so that is hard to argue with and I would be ok merging it in

However, I feel like this might be silently masking some bug in the planner. My personal preference would be to make this an error

if !can_interleave(children.iter()) {
  return internal_err!("Can not create InterleaveExec: new children can not be interleaved");
}

And then track down / fix whatever optimizer pass is causing the children to no longer be interleavable

FYI @mustafasrepo

This makes sense, invariant should be handled outside the operator during planning. This checks the whether assumptions met. Thanks for this.

@github-actions github-actions bot added the physical-expr Changes to the physical-expr crates label Jul 19, 2024
@jonahgao jonahgao changed the title fix: fall back to UnionExec if can't interleave fix: make UnKnownColumns not equal to each other Jul 19, 2024
@jonahgao jonahgao changed the title fix: make UnKnownColumns not equal to each other fix: make UnKnownColumns not equal to each other Jul 19, 2024
@jonahgao jonahgao marked this pull request as draft July 19, 2024 07:33
@jonahgao
Copy link
Member Author

However, I think the proposed fix might be treating the symptom rather than the root cause and I think more investigation is warranted. We could merge this PR and file a follow on ticket to investigate for example

@alamb I think the root clause should be UnKnownColumn. I changed to an alternative fix and updated the description in the PR.

@jonahgao jonahgao marked this pull request as ready for review July 19, 2024 08:01
@jonahgao jonahgao changed the title fix: make UnKnownColumns not equal to each other fix: make UnKnownColumns not equal to others physical exprs Jul 19, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me -- thank you @jonahgao and @mustafasrepo

.map(|x| self == x)
.unwrap_or(false)
fn eq(&self, _other: &dyn Any) -> bool {
// UnknownColumn is not a valid expression, so it should not be equal to any other expression.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit ebe61ba into apache:main Jul 19, 2024
27 checks passed
@jonahgao
Copy link
Member Author

Thank you @alamb for the review.

@jonahgao jonahgao deleted the union_fallback branch July 20, 2024 00:50
Lordworms pushed a commit to Lordworms/arrow-datafusion that referenced this pull request Jul 23, 2024
…#11536)

* fix: fall back to `UnionExec` if can't interleave

* alternative fix

* check interleavable in with_new_children

* link to pr
wiedld pushed a commit to influxdata/arrow-datafusion that referenced this pull request Jul 31, 2024
…#11536)

* fix: fall back to `UnionExec` if can't interleave

* alternative fix

* check interleavable in with_new_children

* link to pr
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Changes to the physical-expr crates sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

A projection pushdown related bug (SQLancer-TLP)
3 participants