Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35794][SQL] Allow custom plugin for AQE cost evaluator #32944

Closed
wants to merge 5 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Jun 17, 2021

What changes were proposed in this pull request?

Current AQE has cost evaluator to decide whether to use new plan after replanning. The current used evaluator is SimpleCostEvaluator to make decision based on number of shuffle in the query plan. This is not perfect cost evaluator, and different production environments might want to use different custom evaluators. E.g., sometimes we might want to still do skew join even though it might introduce extra shuffle (trade off resource for better latency), sometimes we might want to take sort into consideration for cost as well. Take our own setting as an example, we are using a custom remote shuffle service (Cosco), and the cost model is more complicated. So We want to make the cost evaluator to be pluggable, and developers can implement their own CostEvaluator subclass and plug in dynamically based on configuration.

The approach is to introduce a new config to allow define sub-class name of CostEvaluator - spark.sql.adaptive.customCostEvaluatorClass. And add CostEvaluator.instantiate to instantiate the cost evaluator class in AdaptiveSparkPlanExec.costEvaluator.

Why are the changes needed?

Make AQE cost evaluation more flexible.

Does this PR introduce any user-facing change?

No but an internal config is introduced - spark.sql.adaptive.customCostEvaluatorClass to allow custom implementation of CostEvaluator.

How was this patch tested?

Added unit test in AdaptiveQueryExecSuite.scala.

@github-actions github-actions bot added the SQL label Jun 17, 2021
@c21
Copy link
Contributor Author

c21 commented Jun 17, 2021

cc @cloud-fan could you help take a look when you have time? Thanks.

@cloud-fan
Copy link
Contributor

does it work well with #32816 ?

@c21
Copy link
Contributor Author

c21 commented Jun 17, 2021

does it work well with #32816 ?

@cloud-fan - I think so. If we decide merge this first, then in #32816, we don't need the extra config spark.sql.adaptive.forceEnableSkewJoin. Developers/users can set spark.sql.adaptive.costEvaluatorClass to SkewJoinAwareCostEvaluator and it should work, cc @ulysses-you FYI, thanks.

@SparkQA
Copy link

SparkQA commented Jun 17, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44440/

@SparkQA
Copy link

SparkQA commented Jun 17, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/44440/

@ulysses-you
Copy link
Contributor

@c21 thank you for ping me.

Not sure it's worth to make cost evaluator as plugin. You mentioned sort (I think it's local sort, isn't it ?), and can you provide a real use case about it ?

@SparkQA
Copy link

SparkQA commented Jun 17, 2021

Test build #139911 has finished for PR 32944 at commit 6670938.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

Developers/users can set spark.sql.adaptive.costEvaluatorClass to SkewJoinAwareCostEvaluator and it should work

I don't think it's that simple. If force-skew-join-handling is enabled, Spark must use SkewJoinAwareCostEvaluator, not a user-specified one.

@c21
Copy link
Contributor Author

c21 commented Jun 17, 2021

You mentioned sort (I think it's local sort, isn't it ?), and can you provide a real use case about it ?

@ulysses-you - e.g.

SortAggregate
- SortMergeJoin
  - Sort(Shuffle(Scan))
  - Sort(Shuffle(Scan))       

AQE might change it to

SortAggregate
- Sort
  - ShuffledHashJoin
    - Shuffle(Scan)
    - Shuffle(Scan)

With our Cosco remote shuffle service, we already implemented the sorted shuffle (Sort(Shuffle) where Sort and Shuffle down in shuffle service side together at same time), and it would be more efficient than doing Sort separately in Spark. So a Sort(Shuffle) is more efficient than a pair of Shuffle and Sort in our case. This influences our AQE decision and we have to have a custom cost evaluator. As we can see, we need a separate cost evaluator for forcing skew join and we might have more in the future. Another aspect is Cosco hasn't been open source yet, so we want a clean interface for custom cost evaluator, instead of always maintaining a fork change on our side.

@c21
Copy link
Contributor Author

c21 commented Jun 17, 2021

I don't think it's that simple. If force-skew-join-handling is enabled, Spark must use SkewJoinAwareCostEvaluator, not a user-specified one.

@cloud-fan - from my checking of #32816, it looks like the only logic controlled by the new config spark.sql.adaptive.forceEnableSkewJoin, is to choose a different cost evaluator - SkewJoinAwareCostEvaluator. My idea is to not introduce the new config, but we can just set spark.sql.adaptive.costEvaluatorClass to SkewJoinAwareCostEvaluator to enable force skew join.

@ulysses-you
Copy link
Contributor

@c21 thanks for the explaination, the example SortAggregate(SMJ) to SortAggregate(SHJ) seems useful. But about the usage, I agree with @cloud-fan , the boolean config of forceEnableSkewJoin is necessary and more easy for user. A class name is a little hack for user if they want to optimize skew join anyway.

@c21
Copy link
Contributor Author

c21 commented Jun 18, 2021

the boolean config of forceEnableSkewJoin is necessary and more easy for user. A class name is a little hack for user if they want to optimize skew join anyway.

@ulysses-you - sure, I agree with boolean config is more intuitive and easier to use. If we do need the boolean config, we can add special logic in AdaptiveSparkPlanExec.costEvaluator, to use SkewJoinAwareCostEvaluator when spark.sql.adaptive.forceEnableSkewJoin is true, instead of whatever user sets for spark.sql.adaptive.costEvaluatorClass. It just a matter of priority between different configs.

.version("3.2.0")
.internal()
.stringConf
.createWithDefault("org.apache.spark.sql.execution.adaptive.SimpleCostEvaluator")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make it an optional conf: spark.sql.adaptive.customCostEvaluatorClass. If not set, we use the builtin impl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sure, updated.

*/
def instantiate(className: String): CostEvaluator = {
logDebug(s"Creating CostEvaluator $className")
val clazz = Utils.classForName[CostEvaluator](className)
Copy link
Contributor

@cloud-fan cloud-fan Jul 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the standard API in Spark: Utils.loadExtensions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - good call, updated.

@@ -38,7 +38,7 @@ case class SimpleCost(value: Long) extends Cost {
* A simple implementation of [[CostEvaluator]], which counts the number of
* [[ShuffleExchangeLike]] nodes in the plan.
*/
object SimpleCostEvaluator extends CostEvaluator {
case class SimpleCostEvaluator() extends CostEvaluator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - yeah, updated.

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45060/

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45060/

@@ -3555,6 +3564,9 @@ class SQLConf extends Serializable with Logging {

def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED)

def adaptiveCustomCostEvaluatorClass: Option[String] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't have to create a method here if it's only called once

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - sure, updated.

val query = "SELECT * FROM testData join testData2 ON key = a where value = '1'"

withSQLConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key ->
"org.apache.spark.sql.execution.adaptive.SimpleShuffleSortCostEvaluator") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this custom cost evaluator change the query plan? It seems to be the same with the builtin cost evaluator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - this evaluator does not change plan, and to be the same with the builtin evaluator for this query. Do we want to come up a different one here? I think this just validates the custom evaluator works.

Copy link
Contributor

@cloud-fan cloud-fan Jul 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, let's leave it then

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Test build #140547 has finished for PR 32944 at commit 404fe35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"The custom cost evaluator class to be used for adaptive execution. If not being set,\" +

@cloud-fan
Copy link
Contributor

@c21 can you fix the code conflicts?

@c21
Copy link
Contributor Author

c21 commented Jul 2, 2021

@cloud-fan - thanks, just rebased to latest master.

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45079/

@HyukjinKwon
Copy link
Member

@c21, can you at least mark CostEvaluator with @Unstable API tag? Also please add a note that it is subject to be moved or changed in the near future.

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45079/

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45082/

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45082/

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Test build #140567 has finished for PR 32944 at commit e202aa8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 2, 2021

Test build #140570 has finished for PR 32944 at commit c5ed8e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Jul 4, 2021

@HyukjinKwon - updated per discussion, and this is ready for review again, thanks.

@SparkQA
Copy link

SparkQA commented Jul 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45131/

@SparkQA
Copy link

SparkQA commented Jul 4, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45131/

@SparkQA
Copy link

SparkQA commented Jul 4, 2021

Test build #140618 has finished for PR 32944 at commit ac5c121.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

buildConf("spark.sql.adaptive.customCostEvaluatorClass")
.doc("The custom cost evaluator class to be used for adaptive execution. If not being set," +
" Spark will use its own SimpleCostEvaluator by default.")
.version("3.2.0")
Copy link
Member

@HyukjinKwon HyukjinKwon Jul 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only think is that the version has to be 3.3.0 since we cut the branch now. Since this PR won't likely affect anything in the main code, I am okay with merging to 3.2.0 either tho. I will leave it to @cloud-fan and you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.2 is the first version that enables AQE by default, and this seems to be a useful extension. Let's include it in 3.2.

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.2!

@cloud-fan cloud-fan closed this in 044dddf Jul 5, 2021
cloud-fan pushed a commit that referenced this pull request Jul 5, 2021
### What changes were proposed in this pull request?

Current AQE has cost evaluator to decide whether to use new plan after replanning. The current used evaluator is `SimpleCostEvaluator` to make decision based on number of shuffle in the query plan. This is not perfect cost evaluator, and different production environments might want to use different custom evaluators. E.g., sometimes we might want to still do skew join even though it might introduce extra shuffle (trade off resource for better latency), sometimes we might want to take sort into consideration for cost as well. Take our own setting as an example, we are using a custom remote shuffle service (Cosco), and the cost model is more complicated. So We want to make the cost evaluator to be pluggable, and developers can implement their own `CostEvaluator` subclass and plug in dynamically based on configuration.

The approach is to introduce a new config to allow define sub-class name of `CostEvaluator` - `spark.sql.adaptive.customCostEvaluatorClass`. And add `CostEvaluator.instantiate` to instantiate the cost evaluator class in `AdaptiveSparkPlanExec.costEvaluator`.

### Why are the changes needed?

Make AQE cost evaluation more flexible.

### Does this PR introduce _any_ user-facing change?

No but an internal config is introduced - `spark.sql.adaptive.customCostEvaluatorClass` to allow custom implementation of `CostEvaluator`.

### How was this patch tested?

Added unit test in `AdaptiveQueryExecSuite.scala`.

Closes #32944 from c21/aqe-cost.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 044dddf)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@c21
Copy link
Contributor Author

c21 commented Jul 6, 2021

Thank you @cloud-fan and @HyukjinKwon for review!

@c21 c21 deleted the aqe-cost branch July 6, 2021 04:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants