From 8de39583a23bde3a29fa1af5cc98d9aa4de0b68d Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 16 Dec 2020 18:50:33 +0200 Subject: [PATCH 1/2] [SPARK-33784][SQL] Rename dataSourceRewriteRules batch --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 12 ++++++------ .../apache/spark/sql/SparkSessionExtensions.scala | 15 ++++++++------- .../sql/internal/BaseSessionStateBuilder.scala | 10 +++++----- .../spark/sql/SparkSessionExtensionSuite.scala | 6 +++--- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index aa8540fb4455..0ed1625350a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -185,9 +185,9 @@ abstract class Optimizer(catalogManager: CatalogManager) RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: Nil ++ operatorOptimizationBatch) :+ - // This batch rewrites data source plans and should be run after the operator - // optimization batch and before any batches that depend on stats. - Batch("Data Source Rewrite Rules", Once, dataSourceRewriteRules: _*) :+ + // This batch rewrites plans after the operator optimization batch and + // before any batches that depend on stats. + Batch("Post Operator Optimization Rules", Once, postOperatorOptimizationRules: _*) :+ // This batch pushes filters and projections into scan nodes. Before this batch, the logical // plan may contain nodes that do not report stats. Anything that uses stats must run after // this batch. @@ -293,10 +293,10 @@ abstract class Optimizer(catalogManager: CatalogManager) def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil /** - * Override to provide additional rules for rewriting data source plans. Such rules will be - * applied after operator optimization rules and before any rules that depend on stats. + * Override to provide additional rules for rewriting plans after operator optimization rules and + * before any rules that depend on stats. */ - def dataSourceRewriteRules: Seq[Rule[LogicalPlan]] = Nil + def postOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil /** * Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala index d5d969032a5e..28e551da11cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} *
  • Analyzer Rules.
  • *
  • Check Analysis Rules.
  • *
  • Optimizer Rules.
  • - *
  • Data Source Rewrite Rules.
  • + *
  • Post Operator Optimization Rules.
  • *
  • Planning Strategies.
  • *
  • Customized Parser.
  • *
  • (External) Catalog listeners.
  • @@ -200,19 +200,20 @@ class SparkSessionExtensions { optimizerRules += builder } - private[this] val dataSourceRewriteRules = mutable.Buffer.empty[RuleBuilder] + private[this] val postOperatorOptimizationRules = mutable.Buffer.empty[RuleBuilder] - private[sql] def buildDataSourceRewriteRules(session: SparkSession): Seq[Rule[LogicalPlan]] = { - dataSourceRewriteRules.map(_.apply(session)).toSeq + private[sql] def buildPostOperatorOptimizationRules( + session: SparkSession): Seq[Rule[LogicalPlan]] = { + postOperatorOptimizationRules.map(_.apply(session)).toSeq } /** - * Inject an optimizer `Rule` builder that rewrites data source plans into the [[SparkSession]]. + * Inject an optimizer `Rule` builder that rewrites logical plans into the [[SparkSession]]. * The injected rules will be executed after the operator optimization batch and before rules * that depend on stats. */ - def injectDataSourceRewriteRule(builder: RuleBuilder): Unit = { - dataSourceRewriteRules += builder + def injectPostOperatorOptimizationRule(builder: RuleBuilder): Unit = { + postOperatorOptimizationRules += builder } private[this] val plannerStrategyBuilders = mutable.Buffer.empty[StrategyBuilder] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 8fb351a2a3b2..f48afc6af429 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -231,8 +231,8 @@ abstract class BaseSessionStateBuilder( override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = super.earlyScanPushDownRules ++ customEarlyScanPushDownRules - override def dataSourceRewriteRules: Seq[Rule[LogicalPlan]] = - super.dataSourceRewriteRules ++ customDataSourceRewriteRules + override def postOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = + super.postOperatorOptimizationRules ++ customPostOperatorOptimizationRules override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules @@ -258,13 +258,13 @@ abstract class BaseSessionStateBuilder( protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil /** - * Custom rules for rewriting data source plans to add to the Optimizer. Prefer overriding + * Custom rules for rewriting plans after operator optimization. Prefer overriding * this instead of creating your own Optimizer. * * Note that this may NOT depend on the `optimizer` function. */ - protected def customDataSourceRewriteRules: Seq[Rule[LogicalPlan]] = { - extensions.buildDataSourceRewriteRules(session) + protected def customPostOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = { + extensions.buildPostOperatorOptimizationRules(session) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 7c19f98b762f..c261787b80bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -88,9 +88,9 @@ class SparkSessionExtensionSuite extends SparkFunSuite { } } - test("SPARK-33621: inject data source rewrite rule") { - withSession(Seq(_.injectDataSourceRewriteRule(MyRule))) { session => - assert(session.sessionState.optimizer.dataSourceRewriteRules.contains(MyRule(session))) + test("SPARK-33621: inject post operator optimization rule") { + withSession(Seq(_.injectPostOperatorOptimizationRule(MyRule))) { session => + assert(session.sessionState.optimizer.postOperatorOptimizationRules.contains(MyRule(session))) } } From dcd0b7c5fa972572d53c8322071b85169d6682ff Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 21 Dec 2020 13:26:26 +0200 Subject: [PATCH 2/2] Switch to preCBORules --- .../sql/catalyst/optimizer/Optimizer.scala | 8 ++++---- .../spark/sql/SparkSessionExtensions.scala | 17 ++++++++--------- .../sql/internal/BaseSessionStateBuilder.scala | 12 ++++++------ .../spark/sql/SparkSessionExtensionSuite.scala | 6 +++--- 4 files changed, 21 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0ed1625350a5..63299a39a9f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -185,9 +185,9 @@ abstract class Optimizer(catalogManager: CatalogManager) RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: Nil ++ operatorOptimizationBatch) :+ - // This batch rewrites plans after the operator optimization batch and + // This batch rewrites plans after the operator optimization and // before any batches that depend on stats. - Batch("Post Operator Optimization Rules", Once, postOperatorOptimizationRules: _*) :+ + Batch("Pre CBO Rules", Once, preCBORules: _*) :+ // This batch pushes filters and projections into scan nodes. Before this batch, the logical // plan may contain nodes that do not report stats. Anything that uses stats must run after // this batch. @@ -294,9 +294,9 @@ abstract class Optimizer(catalogManager: CatalogManager) /** * Override to provide additional rules for rewriting plans after operator optimization rules and - * before any rules that depend on stats. + * before any cost-based optimization rules that depend on stats. */ - def postOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = Nil + def preCBORules: Seq[Rule[LogicalPlan]] = Nil /** * Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala index 28e551da11cc..074906a971b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan} *
  • Analyzer Rules.
  • *
  • Check Analysis Rules.
  • *
  • Optimizer Rules.
  • - *
  • Post Operator Optimization Rules.
  • + *
  • Pre CBO Rules.
  • *
  • Planning Strategies.
  • *
  • Customized Parser.
  • *
  • (External) Catalog listeners.
  • @@ -200,20 +200,19 @@ class SparkSessionExtensions { optimizerRules += builder } - private[this] val postOperatorOptimizationRules = mutable.Buffer.empty[RuleBuilder] + private[this] val preCBORules = mutable.Buffer.empty[RuleBuilder] - private[sql] def buildPostOperatorOptimizationRules( - session: SparkSession): Seq[Rule[LogicalPlan]] = { - postOperatorOptimizationRules.map(_.apply(session)).toSeq + private[sql] def buildPreCBORules(session: SparkSession): Seq[Rule[LogicalPlan]] = { + preCBORules.map(_.apply(session)).toSeq } /** * Inject an optimizer `Rule` builder that rewrites logical plans into the [[SparkSession]]. - * The injected rules will be executed after the operator optimization batch and before rules - * that depend on stats. + * The injected rules will be executed once after the operator optimization batch and + * before any cost-based optimization rules that depend on stats. */ - def injectPostOperatorOptimizationRule(builder: RuleBuilder): Unit = { - postOperatorOptimizationRules += builder + def injectPreCBORule(builder: RuleBuilder): Unit = { + preCBORules += builder } private[this] val plannerStrategyBuilders = mutable.Buffer.empty[StrategyBuilder] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index f48afc6af429..6b84f0e636c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -231,8 +231,8 @@ abstract class BaseSessionStateBuilder( override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = super.earlyScanPushDownRules ++ customEarlyScanPushDownRules - override def postOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = - super.postOperatorOptimizationRules ++ customPostOperatorOptimizationRules + override def preCBORules: Seq[Rule[LogicalPlan]] = + super.preCBORules ++ customPreCBORules override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules @@ -258,13 +258,13 @@ abstract class BaseSessionStateBuilder( protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil /** - * Custom rules for rewriting plans after operator optimization. Prefer overriding - * this instead of creating your own Optimizer. + * Custom rules for rewriting plans after operator optimization and before CBO. + * Prefer overriding this instead of creating your own Optimizer. * * Note that this may NOT depend on the `optimizer` function. */ - protected def customPostOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = { - extensions.buildPostOperatorOptimizationRules(session) + protected def customPreCBORules: Seq[Rule[LogicalPlan]] = { + extensions.buildPreCBORules(session) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index c261787b80bd..35d251383561 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -88,9 +88,9 @@ class SparkSessionExtensionSuite extends SparkFunSuite { } } - test("SPARK-33621: inject post operator optimization rule") { - withSession(Seq(_.injectPostOperatorOptimizationRule(MyRule))) { session => - assert(session.sessionState.optimizer.postOperatorOptimizationRules.contains(MyRule(session))) + test("SPARK-33621: inject a pre CBO rule") { + withSession(Seq(_.injectPreCBORule(MyRule))) { session => + assert(session.sessionState.optimizer.preCBORules.contains(MyRule(session))) } }