Skip to content

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Jul 11, 2019

What changes were proposed in this pull request?

query plan was designed to be immutable, but sometimes we do allow it to carry mutable states, because of the complexity of the SQL system. One example is TreeNodeTag. It's a state of TreeNode and can be carried over during copy and transform. The adaptive execution framework relies on it to link the logical and physical plans.

This leads to a problem: when we get QueryExecution#analyzed, the plan can be changed unexpectedly because it's mutable. I hit a real issue in #25107 : I use TreeNodeTag to carry dataset id in logical plans. However, the analyzed plan ends up with many duplicated dataset id tags in different nodes. It turns out that, the optimizer transforms the logical plan and add the tag to more nodes.

For example, the logical plan is SubqueryAlias(Filter(...)), and I expect only the SubqueryAlais has the dataset id tag. However, the optimizer removes SubqueryAlias and carries over the dataset id tag to Filter. When I go back to the analyzed plan, both SubqueryAlias and Filter has the dataset id tag, which breaks my assumption.

Since now query plan is mutable, I think it's better to limit the life cycle of a query plan instance. We can clone the query plan between analyzer, optimizer and planner, so that the life cycle is limited in one stage.

How was this patch tested?

new test

@cloud-fan cloud-fan changed the title [SPARK-xxx][SQL] clone the query plan between analyzer, optimizer and planner [SPARK-28346][SQL] clone the query plan between analyzer, optimizer and planner Jul 11, 2019
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fragile to use member variable to keep stats, as they will be lost after copy.

Copy link
Contributor Author

@cloud-fan cloud-fan Jul 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fragile to use member variable to keep stats, as they will be lost after copy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clone defined in TreeNode doesn't work for case object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mapChildren in TreeNode will change the map type. (from CaseInsensitiveMap to a normal map)

@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107513 has finished for PR 25111 at commit 656ae55.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107523 has finished for PR 25111 at commit 656ae55.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107534 has finished for PR 25111 at commit 7ab8e49.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 12, 2019

Test build #107573 has finished for PR 25111 at commit 58ff049.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 12, 2019

Test build #107587 has finished for PR 25111 at commit 92095b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}

lazy val withCachedData: LogicalPlan = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not necessary, but should we clone logical too before sending to analyzer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think we should

@volatile var statsOfPlanToCache: Statistics = null
def getStatsOfPlanToCache(): Statistics = {
getTagValue(STATS_OF_PLAN_TO_CACHE_TAG).get
}
Copy link
Member

@viirya viirya Jul 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, statsOfPlanToCache has volatile semantics. But making it as a TreeNodeTag, seems we don't preserve that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. tree node tag should be thread safe as well.

@SparkQA
Copy link

SparkQA commented Jul 15, 2019

Test build #107657 has finished for PR 25111 at commit f614cc6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 15, 2019

Test build #107686 has finished for PR 25111 at commit 5e0ab9a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @maryannxue @hvanhovell

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107753 has finished for PR 25111 at commit 6f9b59f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maryannxue
Copy link
Contributor

The code looks good, but one thing we should be aware of:
clone() actually creates different copies/instances of MultiInstanceRelation objects, while before this change identical relations in a single query always share the same instance. This is what exactly needed by AQE, but would this contradict with any other existing assumptions (e.g., in datasource)?

@cloud-fan
Copy link
Contributor Author

@maryannxue AFAIK we don't rely on plan instance equality in Spark. AQE is the only one I'm aware of that needs to check plan instance equality.

@maryannxue
Copy link
Contributor

Thank you, @cloud-fan! LGTM.


lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) {
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, tracker)
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now query plan is mutable, I think it's better to limit the life cycle of a query plan instance. We can clone the query plan between analyzer, optimizer and planner, so that the life cycle is limited in one stage.

If we decide to clone the plan after each stage, will any test fail if we do not clone it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test added

@SparkQA
Copy link

SparkQA commented Jul 19, 2019

Test build #107914 has finished for PR 25111 at commit 66f1281.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

assert(error.getMessage.contains("error"))
}

test("analyzed plan should not change after it's generated") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests still can pass without calling clone() in QueryExecution

spark.experimental.extraStrategies = Nil
}

test("SPARK-28346: clone the query plan between analyzer, optimizer and planner") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails in the latest master branch.

@SparkQA
Copy link

SparkQA commented Jul 23, 2019

Test build #108044 has finished for PR 25111 at commit 4f75ba4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

Thanks! Merged to master

dongjoon-hyun pushed a commit that referenced this pull request Nov 25, 2019
### What changes were proposed in this pull request?

Since JIRA SPARK-28346,PR [25111](#25111), QueryExecution will copy all node stage-by-stage. This make all node instance twice almost. So we should make all class fields lazy to avoid create more unexpected object.

### Why are the changes needed?

Avoid create more unexpected object.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Exists UT.

Closes #26565 from ulysses-you/make-val-lazy.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Jan 29, 2021
…nd planner

query plan was designed to be immutable, but sometimes we do allow it to carry mutable states, because of the complexity of the SQL system. One example is `TreeNodeTag`. It's a state of `TreeNode` and can be carried over during copy and transform. The adaptive execution framework relies on it to link the logical and physical plans.

This leads to a problem: when we get `QueryExecution#analyzed`, the plan can be changed unexpectedly because it's mutable. I hit a real issue in apache#25107 : I use `TreeNodeTag` to carry dataset id in logical plans. However, the analyzed plan ends up with many duplicated dataset id tags in different nodes. It turns out that, the optimizer transforms the logical plan and add the tag to more nodes.

For example, the logical plan is `SubqueryAlias(Filter(...))`, and I expect only the `SubqueryAlais` has the dataset id tag. However, the optimizer removes `SubqueryAlias` and carries over the dataset id tag to `Filter`. When I go back to the analyzed plan, both `SubqueryAlias` and `Filter` has the dataset id tag, which breaks my assumption.

Since now query plan is mutable, I think it's better to limit the life cycle of a query plan instance. We can clone the query plan between analyzer, optimizer and planner, so that the life cycle is limited in one stage.

new test

Closes apache#25111 from cloud-fan/clone.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@MasterDDT
Copy link
Contributor

MasterDDT commented Feb 5, 2024

@cloud-fan I'm seeing some memory issues because of all these clone calls. I have a big query tree, maybe of ~20 height, so all the clone calls are recursive and keep everything in the stack alive: https://gist.github.com/MasterDDT/af98ad20ab0ed301476b9e8c58d8f5bb. 4g driver memory isnt enough on Spark 3.3, but I can run exact same workload on Spark 2.4 without any problems.

In my forked code, could I disable all the clone calls if AQE is off? Will that cause any correctness issues?

@dongjoon-hyun
Copy link
Member

To @MasterDDT , I'd like to recommend to file an official JIRA issue. Otherwise, it's difficult to get any further discussion or help because this is too old thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants