Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.trees

import java.io.Writer
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.Map
import scala.reflect.ClassTag
Expand All @@ -29,6 +30,8 @@ import org.json4s.JsonAST._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.ScalaReflection._
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -75,7 +78,7 @@ object CurrentOrigin {
}

// scalastyle:off
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Logging {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure TreeNode should have the baggage of its own logger. The catalyst.trees package object extends logger and there is a comment that tree nodes should just use that logger, but I don't see a way to do that since loggers are protected.

I could add a log function to the package object and then use that from TreeNode:

private[trees] def logWarning 

// scalastyle:on
self: BaseType =>

Expand Down Expand Up @@ -484,7 +487,15 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
writer: Writer,
verbose: Boolean,
addSuffix: Boolean): Unit = {
generateTreeString(0, Nil, writer, verbose, "", addSuffix)
treeString(writer, verbose, addSuffix, TreeNode.maxTreeToStringDepth)
}

def treeString(
writer: Writer,
verbose: Boolean,
addSuffix: Boolean,
maxDepth: Int): Unit = {
generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxDepth)
}

/**
Expand Down Expand Up @@ -550,7 +561,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
writer: Writer,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): Unit = {
addSuffix: Boolean = false,
maxDepth: Int = TreeNode.maxTreeToStringDepth): Unit = {

if (depth > 0) {
lastChildren.init.foreach { isLast =>
Expand All @@ -559,30 +571,42 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
writer.write(if (lastChildren.last) "+- " else ":- ")
}

val str = if (verbose) {
if (addSuffix) verboseStringWithSuffix else verboseString
} else {
simpleString
}
writer.write(prefix)
writer.write(str)
writer.write("\n")

if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
addSuffix = addSuffix))
innerChildren.last.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
addSuffix = addSuffix)
}

if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix))
children.last.generateTreeString(
depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix)
}
if (depth < maxDepth) {
val str = if (verbose) {
if (addSuffix) verboseStringWithSuffix else verboseString
} else {
simpleString
}
writer.write(prefix)
writer.write(str)
writer.write("\n")

if (innerChildren.nonEmpty) {
innerChildren.init.foreach(_.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose,
addSuffix = addSuffix, maxDepth = maxDepth))
innerChildren.last.generateTreeString(
depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose,
addSuffix = addSuffix, maxDepth = maxDepth)
}

if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(
depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxDepth))
children.last.generateTreeString(
depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxDepth)
}
}
else {
if (TreeNode.treeDepthWarningPrinted.compareAndSet(false, true)) {
logWarning(
"Truncated the string representation of a plan since it was nested too deeply. " +
"This behavior can be adjusted by setting 'spark.debug.maxToStringTreeDepth' in " +
"SparkEnv.conf.")
}
writer.write(prefix)
writer.write("...\n")
}
}

/**
Expand Down Expand Up @@ -701,3 +725,23 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case _ => false
}
}

object TreeNode {
/**
* Query plans for large, deeply nested plans can get extremely large. To limit the impact,
* we add a parameter that limits the logging to the top layers if the tree gets too deep.
* This can be overridden by setting the 'spark.debug.maxToStringTreeDepth' conf in SparkEnv.
*/
val DEFAULT_MAX_TO_STRING_TREE_DEPTH = 15

def maxTreeToStringDepth: Int = {
if (SparkEnv.get != null) {
SparkEnv.get.conf.getInt("spark.debug.maxToStringTreeDepth", DEFAULT_MAX_TO_STRING_TREE_DEPTH)
} else {
DEFAULT_MAX_TO_STRING_TREE_DEPTH
}
}

/** Whether we have warned about plan string truncation yet. */
private val treeDepthWarningPrinted = new AtomicBoolean(false)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure where to put this code, so made a TreeNode companion object. If there is a better place let me know.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLMetrics
Expand Down Expand Up @@ -454,8 +455,9 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
writer: Writer,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): Unit = {
child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false)
addSuffix: Boolean = false,
maxDepth: Int = TreeNode.maxTreeToStringDepth): Unit = {
child.generateTreeString(depth, lastChildren, writer, verbose, prefix, addSuffix, maxDepth)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefix and addSuffix are ignored in the original code but I can't tell why. Seems like they should be passed through from parent to child here. I adjusted, but it might not be a bug or maybe should go in a separate PR

Suggested change
child.generateTreeString(depth, lastChildren, writer, verbose, prefix, addSuffix, maxDepth)
child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false, maxDepth)

}

override def needCopyResult: Boolean = false
Expand Down Expand Up @@ -730,8 +732,16 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
writer: Writer,
verbose: Boolean,
prefix: String = "",
addSuffix: Boolean = false): Unit = {
child.generateTreeString(depth, lastChildren, writer, verbose, s"*($codegenStageId) ", false)
addSuffix: Boolean = false,
maxDepth: Int = TreeNode.maxTreeToStringDepth): Unit = {
child.generateTreeString(
depth,
lastChildren,
writer,
verbose,
s"*($codegenStageId) ",
false,
maxDepth)
}

override def needStopCheck: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.test.SharedSQLContext

case class Simple(a: String, b: Int)

class QueryExecutionSuite extends SharedSQLContext {
def checkDumpedPlans(path: String, expected: Int): Unit = {
assert(Source.fromFile(path).getLines.toList
Expand Down Expand Up @@ -108,4 +110,16 @@ class QueryExecutionSuite extends SharedSQLContext {
val error = intercept[Error](qe.toString)
assert(error.getMessage.contains("error"))
}

test("toString() tree depth") {
import testImplicits._

val s = Seq(Simple("a", 1), Simple("b", 3), Simple("c", 4))
val ds = (1 until 30).foldLeft(s.toDF()) { case (newDs, _) =>
newDs.join(s.toDF(), "a")
}

val nLines = ds.queryExecution.optimizedPlan.toString.split("\n").length
assert(nLines <= 31)
}
}