Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ class Analyzer(
}
}

override def verifyOnceStrategyIdempotence: Boolean = true

override def execute(plan: LogicalPlan): LogicalPlan = {
AnalysisContext.reset()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
*/
protected def isPlanIntegral(plan: TreeType): Boolean = true

/** Whether to verify batches with once strategy stabilize after one run. */
protected def verifyOnceStrategyIdempotence: Boolean = false

private val knownUnstableBatches = Seq(
("UDF", "org.apache.spark.ml.feature.StringIndexerSuite.transform"),
("View", "org.apache.spark.sql.hive.execution.HiveCatalogedDDLSuite." +
"SPARK-22431: view with nested type")
)

/**
* Executes the batches of rules defined by the subclass. The batches are executed serially
* using the defined execution strategy. Within each batch, rules are also executed serially.
Expand All @@ -78,6 +87,15 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
var iteration = 1
var lastPlan = curPlan
var continue = true
// Verify that once-strategy batches stabilize after one run when testing.
val maxIterations =
if (batch.strategy.maxIterations == 1
&& verifyOnceStrategyIdempotence
&& !knownUnstableBatches.exists(_._1 == batch.name)) {
2
} else {
batch.strategy.maxIterations
}

// Run until fix point (or the max number of iterations as specified in the strategy.
while (continue) {
Expand Down Expand Up @@ -108,11 +126,19 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {

result
}
iteration += 1
if (iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"

// Stop applying this batch if:
// 1) the plan hasn't changed over the last iteration; or
// 2) max number of iterations has been reached.
if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration} iterations.")
continue = false
} else if (iteration >= maxIterations) {
// Only log if the batch has run more than once.
if (iteration > 1) {
val message = s"Plan did not stabilize after max iterations " +
s"(${batch.strategy.maxIterations}) reached for batch ${batch.name}."
if (Utils.isTesting) {
throw new TreeNodeException(curPlan, message, null)
} else {
Expand All @@ -122,11 +148,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
continue = false
}

if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
continue = false
}
iteration += 1
lastPlan = curPlan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class RuleExecutorSuite extends SparkFunSuite {
val message = intercept[TreeNodeException[LogicalPlan]] {
ToFixedPoint.execute(Literal(100))
}.getMessage
assert(message.contains("Max iterations (10) reached for batch fixedPoint"))
assert(
message.contains(
"Plan did not stabilize after max iterations (10) reached for batch fixedPoint"))
}

test("structural integrity checker") {
Expand All @@ -73,4 +75,17 @@ class RuleExecutorSuite extends SparkFunSuite {
}.getMessage
assert(message.contains("the structural integrity of the plan is broken"))
}

test("only once but did not stabilize") {
object ApplyOnce extends RuleExecutor[Expression] {
override def verifyOnceStrategyIdempotence: Boolean = true
val batches = Batch("once", Once, DecrementLiterals) :: Nil
}

val message = intercept[TreeNodeException[LogicalPlan]] {
ApplyOnce.execute(Literal(10))
}.getMessage
assert(
message.contains("Plan did not stabilize after max iterations (1) reached for batch once"))
}
}