From 323656872799b8dd636061220f3ed139379c9c79 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Wed, 8 Aug 2018 22:20:32 -0700 Subject: [PATCH 1/4] Add once-policy batch check --- .../sql/catalyst/analysis/Analyzer.scala | 2 ++ .../sql/catalyst/rules/RuleExecutor.scala | 33 +++++++++++++------ .../catalyst/trees/RuleExecutorSuite.scala | 17 +++++++++- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d23d43bef76e..40b72b296704 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -115,6 +115,8 @@ class Analyzer( } } + override def verifyOnceStrategyIdempotence: Boolean = true + override def execute(plan: LogicalPlan): LogicalPlan = { AnalysisContext.reset() try { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index dccb44ddebfa..dc0cedd4f39e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -65,6 +65,9 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { */ protected def isPlanIntegral(plan: TreeType): Boolean = true + /** Whether to verify batches with once strategy stabilize after one run. */ + protected def verifyOnceStrategyIdempotence: Boolean = false + /** * Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. @@ -78,6 +81,12 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { var iteration = 1 var lastPlan = curPlan var continue = true + // Verify that once-strategy batches stabilize after one run when testing. + val maxIterations = if (batch.strategy.maxIterations == 1 && verifyOnceStrategyIdempotence) { + 2 + } else { + batch.strategy.maxIterations + } // Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { @@ -108,11 +117,19 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { result } - iteration += 1 - if (iteration > batch.strategy.maxIterations) { - // Only log if this is a rule that is supposed to run more than once. - if (iteration != 2) { - val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + + // Stop applying this batch if: + // 1) the plan hasn't changed over the last iteration; or + // 2) max number of iterations has been reached. + if (curPlan.fastEquals(lastPlan)) { + logTrace( + s"Fixed point reached for batch ${batch.name} after ${iteration} iterations.") + continue = false + } else if (iteration >= maxIterations) { + // Only log if the batch has run more than once. + if (iteration > 1) { + val message = s"Plan did not stabilize after max iterations " + + s"(${batch.strategy.maxIterations}) reached for batch ${batch.name}." if (Utils.isTesting) { throw new TreeNodeException(curPlan, message, null) } else { @@ -122,11 +139,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { continue = false } - if (curPlan.fastEquals(lastPlan)) { - logTrace( - s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") - continue = false - } + iteration += 1 lastPlan = curPlan } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index a67f54b263cc..7172afcf1c9e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -54,7 +54,9 @@ class RuleExecutorSuite extends SparkFunSuite { val message = intercept[TreeNodeException[LogicalPlan]] { ToFixedPoint.execute(Literal(100)) }.getMessage - assert(message.contains("Max iterations (10) reached for batch fixedPoint")) + assert( + message.contains( + "Plan did not stabilize after max iterations (10) reached for batch fixedPoint")) } test("structural integrity checker") { @@ -73,4 +75,17 @@ class RuleExecutorSuite extends SparkFunSuite { }.getMessage assert(message.contains("the structural integrity of the plan is broken")) } + + test("only once but did not stabilize") { + object ApplyOnce extends RuleExecutor[Expression] { + override def verifyOnceStrategyIdempotence: Boolean = true + val batches = Batch("once", Once, DecrementLiterals) :: Nil + } + + val message = intercept[TreeNodeException[LogicalPlan]] { + ApplyOnce.execute(Literal(10)) + }.getMessage + assert( + message.contains("Plan did not stabilize after max iterations (1) reached for batch once")) + } } From 7986a4378a3f2aec601a4b1681488d06dde313da Mon Sep 17 00:00:00 2001 From: maryannxue Date: Fri, 5 Oct 2018 12:11:26 -0500 Subject: [PATCH 2/4] Whitelist batch UDF --- .../org/apache/spark/sql/catalyst/rules/RuleExecutor.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index dc0cedd4f39e..2757aee72224 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -68,6 +68,10 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { /** Whether to verify batches with once strategy stabilize after one run. */ protected def verifyOnceStrategyIdempotence: Boolean = false + private val knownUnstableBatches = Seq( + ("UDF", "org.apache.spark.ml.feature.StringIndexerSuite.transform") + ) + /** * Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. @@ -130,7 +134,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (iteration > 1) { val message = s"Plan did not stabilize after max iterations " + s"(${batch.strategy.maxIterations}) reached for batch ${batch.name}." - if (Utils.isTesting) { + if (Utils.isTesting && !knownUnstableBatches.exists(_._1 == batch.name)) { throw new TreeNodeException(curPlan, message, null) } else { logWarning(message) From d595a0c3385faf127cefab6617fe00aad966007d Mon Sep 17 00:00:00 2001 From: maryannxue Date: Fri, 5 Oct 2018 15:38:37 -0500 Subject: [PATCH 3/4] Whitelist batch View --- .../org/apache/spark/sql/catalyst/rules/RuleExecutor.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 2757aee72224..e3d34fc432f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -69,7 +69,9 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { protected def verifyOnceStrategyIdempotence: Boolean = false private val knownUnstableBatches = Seq( - ("UDF", "org.apache.spark.ml.feature.StringIndexerSuite.transform") + ("UDF", "org.apache.spark.ml.feature.StringIndexerSuite.transform"), + ("View", "org.apache.spark.sql.hive.execution.HiveCatalogedDDLSuite." + + "SPARK-22431: view with nested type") ) /** From 7fc1d11388babe169cf45ce2376d898d89f299b7 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Fri, 5 Oct 2018 20:29:13 -0500 Subject: [PATCH 4/4] Whitelist fix --- .../spark/sql/catalyst/rules/RuleExecutor.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index e3d34fc432f5..409bd0555e48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -88,11 +88,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { var lastPlan = curPlan var continue = true // Verify that once-strategy batches stabilize after one run when testing. - val maxIterations = if (batch.strategy.maxIterations == 1 && verifyOnceStrategyIdempotence) { - 2 - } else { - batch.strategy.maxIterations - } + val maxIterations = + if (batch.strategy.maxIterations == 1 + && verifyOnceStrategyIdempotence + && !knownUnstableBatches.exists(_._1 == batch.name)) { + 2 + } else { + batch.strategy.maxIterations + } // Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { @@ -136,7 +139,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (iteration > 1) { val message = s"Plan did not stabilize after max iterations " + s"(${batch.strategy.maxIterations}) reached for batch ${batch.name}." - if (Utils.isTesting && !knownUnstableBatches.exists(_._1 == batch.name)) { + if (Utils.isTesting) { throw new TreeNodeException(curPlan, message, null) } else { logWarning(message)