From b3409172bc97f1d7de6a6685a580f97e1a417155 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 30 Nov 2020 21:04:59 +0200 Subject: [PATCH 1/3] [SPARK-33612][SQL] Add v2SourceRewriteRules batch to Optimizer --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 8 ++++++++ .../spark/sql/internal/BaseSessionStateBuilder.scala | 11 +++++++++++ 2 files changed, 19 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9eee7c2b914a..e5c71c80d213 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -189,6 +189,9 @@ abstract class Optimizer(catalogManager: CatalogManager) // plan may contain nodes that do not report stats. Anything that uses stats must run after // this batch. Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: _*) :+ + // This batch rewrites plans for v2 tables. It should be run after the operator + // optimization batch and before any batches that depend on stats. + Batch("V2 Source Rewrite Rules", Once, v2SourceRewriteRules: _*) :+ // Since join costs in AQP can change between multiple runs, there is no reason that we have an // idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once. Batch("Join Reorder", FixedPoint(1), @@ -289,6 +292,11 @@ abstract class Optimizer(catalogManager: CatalogManager) */ def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil + /** + * Override to provide additional rules for rewriting plans for v2 data sources. + */ + def v2SourceRewriteRules: Seq[Rule[LogicalPlan]] = Nil + /** * Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that * eventually run in the Optimizer. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 538a5408723b..0ed1bf20dfb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -240,6 +240,9 @@ abstract class BaseSessionStateBuilder( override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = super.earlyScanPushDownRules ++ customEarlyScanPushDownRules + override def v2SourceRewriteRules: Seq[Rule[LogicalPlan]] = + super.v2SourceRewriteRules ++ customV2SourceRewriteRules + override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules } @@ -263,6 +266,14 @@ abstract class BaseSessionStateBuilder( */ protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil + /** + * Custom rules for rewriting plans for v2 tables to add to the Optimizer. Prefer overriding + * this instead of creating your own Optimizer. + * + * Note that this may NOT depend on the `optimizer` function. + */ + protected def customV2SourceRewriteRules: Seq[Rule[LogicalPlan]] = Nil + /** * Planner that converts optimized logical plans to physical plans. * From ec31aec6725a0f16e7bf66c031e7ff6d97da7b12 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 1 Dec 2020 10:18:17 +0200 Subject: [PATCH 2/3] Rework approach --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 11 ++++++----- .../spark/sql/internal/BaseSessionStateBuilder.scala | 10 +++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e5c71c80d213..d831ba20ffda 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -185,13 +185,13 @@ abstract class Optimizer(catalogManager: CatalogManager) RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: Nil ++ operatorOptimizationBatch) :+ + // This batch rewrites plans and should be run after the operator + // optimization batch and before any batches that depend on stats. + Batch("Rewrite Rules", Once, rewriteRules: _*) :+ // This batch pushes filters and projections into scan nodes. Before this batch, the logical // plan may contain nodes that do not report stats. Anything that uses stats must run after // this batch. Batch("Early Filter and Projection Push-Down", Once, earlyScanPushDownRules: _*) :+ - // This batch rewrites plans for v2 tables. It should be run after the operator - // optimization batch and before any batches that depend on stats. - Batch("V2 Source Rewrite Rules", Once, v2SourceRewriteRules: _*) :+ // Since join costs in AQP can change between multiple runs, there is no reason that we have an // idempotence enforcement on this batch. We thus make it FixedPoint(1) instead of Once. Batch("Join Reorder", FixedPoint(1), @@ -293,9 +293,10 @@ abstract class Optimizer(catalogManager: CatalogManager) def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil /** - * Override to provide additional rules for rewriting plans for v2 data sources. + * Override to provide additional rules for rewriting plans. Such rules will be executed + * after operator optimization rules and before any rules that depend on stats. */ - def v2SourceRewriteRules: Seq[Rule[LogicalPlan]] = Nil + def rewriteRules: Seq[Rule[LogicalPlan]] = Nil /** * Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 0ed1bf20dfb7..6bc7547c83c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -240,8 +240,8 @@ abstract class BaseSessionStateBuilder( override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = super.earlyScanPushDownRules ++ customEarlyScanPushDownRules - override def v2SourceRewriteRules: Seq[Rule[LogicalPlan]] = - super.v2SourceRewriteRules ++ customV2SourceRewriteRules + override def rewriteRules: Seq[Rule[LogicalPlan]] = + super.rewriteRules ++ customRewriteRules override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules @@ -267,12 +267,12 @@ abstract class BaseSessionStateBuilder( protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil /** - * Custom rules for rewriting plans for v2 tables to add to the Optimizer. Prefer overriding - * this instead of creating your own Optimizer. + * Custom rules for rewriting plans to add to the Optimizer. Prefer overriding this instead + * of creating your own Optimizer. * * Note that this may NOT depend on the `optimizer` function. */ - protected def customV2SourceRewriteRules: Seq[Rule[LogicalPlan]] = Nil + protected def customRewriteRules: Seq[Rule[LogicalPlan]] = Nil /** * Planner that converts optimized logical plans to physical plans. From 11f07cd7c44bb92543dc534a30eac491a1553954 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 1 Dec 2020 13:21:40 +0200 Subject: [PATCH 3/3] Change name --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 10 +++++----- .../spark/sql/internal/BaseSessionStateBuilder.scala | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d831ba20ffda..b7c8f775b857 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -185,9 +185,9 @@ abstract class Optimizer(catalogManager: CatalogManager) RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: Nil ++ operatorOptimizationBatch) :+ - // This batch rewrites plans and should be run after the operator + // This batch rewrites data source plans and should be run after the operator // optimization batch and before any batches that depend on stats. - Batch("Rewrite Rules", Once, rewriteRules: _*) :+ + Batch("Data Source Rewrite Rules", Once, dataSourceRewriteRules: _*) :+ // This batch pushes filters and projections into scan nodes. Before this batch, the logical // plan may contain nodes that do not report stats. Anything that uses stats must run after // this batch. @@ -293,10 +293,10 @@ abstract class Optimizer(catalogManager: CatalogManager) def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil /** - * Override to provide additional rules for rewriting plans. Such rules will be executed - * after operator optimization rules and before any rules that depend on stats. + * Override to provide additional rules for rewriting data source plans. Such rules will be + * applied after operator optimization rules and before any rules that depend on stats. */ - def rewriteRules: Seq[Rule[LogicalPlan]] = Nil + def dataSourceRewriteRules: Seq[Rule[LogicalPlan]] = Nil /** * Returns (defaultBatches - (excludedRules - nonExcludableRules)), the rule batches that diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 6bc7547c83c6..e159d88bd822 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -240,8 +240,8 @@ abstract class BaseSessionStateBuilder( override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] = super.earlyScanPushDownRules ++ customEarlyScanPushDownRules - override def rewriteRules: Seq[Rule[LogicalPlan]] = - super.rewriteRules ++ customRewriteRules + override def dataSourceRewriteRules: Seq[Rule[LogicalPlan]] = + super.dataSourceRewriteRules ++ customDataSourceRewriteRules override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules @@ -267,12 +267,12 @@ abstract class BaseSessionStateBuilder( protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil /** - * Custom rules for rewriting plans to add to the Optimizer. Prefer overriding this instead - * of creating your own Optimizer. + * Custom rules for rewriting data source plans to add to the Optimizer. Prefer overriding + * this instead of creating your own Optimizer. * * Note that this may NOT depend on the `optimizer` function. */ - protected def customRewriteRules: Seq[Rule[LogicalPlan]] = Nil + protected def customDataSourceRewriteRules: Seq[Rule[LogicalPlan]] = Nil /** * Planner that converts optimized logical plans to physical plans.