Skip to content

Conversation

@mcdull-zhang
Copy link
Contributor

What changes were proposed in this pull request?

For example the following statement:

cache table t1 as select a from testData3 group by a;
cache table t2 as select a,b from testData2 where a in (select a from t1);
select key,value,b from testData t3 join t2 on t3.key=t2.a;

The cached t2 is not used in the third statement

before pr:

Project [key#13, value#14, b#24]
+- SortMergeJoin [key#13], [a#23], Inner
   :- BroadcastHashJoin [key#13], [a#359], LeftSemi, BuildRight, false
   :  :- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
   :  :  +- Scan[obj#12]
   :  +- Scan In-memory table t1 [a#359]
   :        +- InMemoryRelation [a#359], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- *(2) HashAggregate(keys=[a#33], functions=[], output=[a#33])
   :                 +- Exchange hashpartitioning(a#33, 5), ENSURE_REQUIREMENTS, [plan_id=92]
   :                    +- *(1) HashAggregate(keys=[a#33], functions=[], output=[a#33])
   :                       +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData3, true])).a AS a#33]
   :                          +- Scan[obj#32]
   +- BroadcastHashJoin [a#23], [a#359], LeftSemi, BuildRight, false
      :- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
      :  +- Scan[obj#22]
      +- Scan In-memory table t1 [a#359]
            +- InMemoryRelation [a#359], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(2) HashAggregate(keys=[a#33], functions=[], output=[a#33])
                     +- Exchange hashpartitioning(a#33, 5), ENSURE_REQUIREMENTS, [plan_id=92]
                        +- *(1) HashAggregate(keys=[a#33], functions=[], output=[a#33])
                           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData3, true])).a AS a#33]
                              +- Scan[obj#32]

after pr:

Project [key#13, value#14, b#358]
+- BroadcastHashJoin [key#13], [a#357], Inner, BuildRight, false
   :- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#14]
   :  +- Scan[obj#12]
   +- Scan In-memory table t2 [a#357, b#358]
         +- InMemoryRelation [a#357, b#358], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) BroadcastHashJoin [a#23], [a#261], LeftSemi, BuildRight, false
                  :- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
                  :  +- Scan[obj#22]
                  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=155]
                     +- Scan In-memory table t1 [a#261]
                           +- InMemoryRelation [a#261], StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- *(2) HashAggregate(keys=[a#33], functions=[], output=[a#33])
                                    +- Exchange hashpartitioning(a#33, 5), ENSURE_REQUIREMENTS, [plan_id=92]
                                       +- *(1) HashAggregate(keys=[a#33], functions=[], output=[a#33])
                                          +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData3, true])).a AS a#33]
                                             +- Scan[obj#32]

Why are the changes needed?

performance improvement

Does this PR introduce any user-facing change?

No

How was this patch tested?

added test

@github-actions github-actions bot added the SQL label Nov 18, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mcdull-zhang
Copy link
Contributor Author

ping @cloud-fan

outerAttrs.map(_.canonicalized),
ExprId(0),
childOutputs.map(_.canonicalized.asInstanceOf[Attribute]),
plan.canonicalized.output,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct, but I don't know how is this related to the cache problem. Can you elaborate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The premise of using cache is that canonicalized of two plans is equals.
canonicalized of plan of two ListQuery is equals, but canonicalized of childOutputs is different because their exprIds are different.
In the end, the cache did not take effect.

plan to be executed:
image

Plan that have been cached:
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@allisonwang-db do you know why we have childOutputs? It seems to be child.output most of the time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or should we delete childOutputs?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm experiencing the similar issue after upgrading Spark version from 3.2 to 3.3 when updating AWS EMR version from 6.7.0 to 6.9.0. May I ask which Spark version this issue starts with? And I'm wondering if Spark has any plan on pushing this fix?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label May 22, 2023
@github-actions github-actions bot closed this May 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants