diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 47cd3c7d62a7..6dda541b9c01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -522,6 +522,15 @@ object SQLConf { .checkValue(_ >= 0, "The non-empty partition ratio must be positive number.") .createWithDefault(0.2) + val ADAPTIVE_OPTIMIZER_EXCLUDED_RULES = + buildConf("spark.sql.adaptive.optimizer.excludedRules") + .doc("Configures a list of rules to be disabled in the adaptive optimizer, in which the " + + "rules are specified by their rule names and separated by comma. The optimizer will log " + + "the rules that have indeed been excluded.") + .version("3.1.0") + .stringConf + .createOptional + val SUBEXPRESSION_ELIMINATION_ENABLED = buildConf("spark.sql.subexpressionElimination.enabled") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala new file mode 100644 index 000000000000..c82b264a600e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * The optimizer for re-optimizing the logical plan used by AdaptiveSparkPlanExec. + */ +class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] { + private val defaultBatches = Seq( + Batch("Demote BroadcastHashJoin", Once, + DemoteBroadcastHashJoin(conf)), + Batch("Eliminate Join to Empty Relation", Once, EliminateJoinToEmptyRelation) + ) + + final override protected def batches: Seq[Batch] = { + val excludedRules = conf.getConf(SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES) + .toSeq.flatMap(Utils.stringToSeq) + defaultBatches.flatMap { batch => + val filteredRules = batch.rules.filter { rule => + val exclude = excludedRules.contains(rule.ruleName) + if (exclude) { + logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.") + } + !exclude + } + if (batch.rules == filteredRules) { + Some(batch) + } else if (filteredRules.nonEmpty) { + Some(Batch(batch.name, batch.strategy, filteredRules: _*)) + } else { + logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " + + s"as all enclosed rules have been excluded.") + None + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index d4018f8ce3a9..014358b663bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -76,13 +76,7 @@ case class AdaptiveSparkPlanExec( } // The logical plan optimizer for re-optimizing the current logical plan. - @transient private val optimizer = new RuleExecutor[LogicalPlan] { - // TODO add more optimization rules - override protected def batches: Seq[Batch] = Seq( - Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)), - Batch("Eliminate Join to Empty Relation", Once, EliminateJoinToEmptyRelation) - ) - } + @transient private val optimizer = new AQEOptimizer(conf) @transient private val removeRedundantProjects = RemoveRedundantProjects(conf) @transient private val ensureRequirements = EnsureRequirements(conf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 3bd079cf6543..f892e66954f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1184,6 +1184,25 @@ class AdaptiveQueryExecSuite } } + test("SPARK-32717: AQEOptimizer should respect excludedRules configuration") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString, + // This test is a copy of test(SPARK-32573), in order to test the configuration + // `spark.sql.adaptive.optimizer.excludedRules` works as expect. + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> EliminateJoinToEmptyRelation.ruleName) { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)") + val bhj = findTopLevelBroadcastHashJoin(plan) + assert(bhj.size == 1) + val join = findTopLevelBaseJoin(adaptivePlan) + // this is different compares to test(SPARK-32573) due to the rule + // `EliminateJoinToEmptyRelation` has been excluded. + assert(join.nonEmpty) + checkNumLocalShuffleReaders(adaptivePlan) + } + } + test("SPARK-32649: Eliminate inner and semi join to empty relation") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",