diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 93b5826f8a74..888cc446d518 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -93,53 +93,6 @@ private[spark] object Utils extends Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 @volatile private var localRootDirs: Array[String] = null - /** - * The performance overhead of creating and logging strings for wide schemas can be large. To - * limit the impact, we bound the number of fields to include by default. This can be overridden - * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv. - */ - val DEFAULT_MAX_TO_STRING_FIELDS = 25 - - private[spark] def maxNumToStringFields = { - if (SparkEnv.get != null) { - SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS) - } else { - DEFAULT_MAX_TO_STRING_FIELDS - } - } - - /** Whether we have warned about plan string truncation yet. */ - private val truncationWarningPrinted = new AtomicBoolean(false) - - /** - * Format a sequence with semantics similar to calling .mkString(). Any elements beyond - * maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder. - * - * @return the trimmed and formatted string. - */ - def truncatedString[T]( - seq: Seq[T], - start: String, - sep: String, - end: String, - maxNumFields: Int = maxNumToStringFields): String = { - if (seq.length > maxNumFields) { - if (truncationWarningPrinted.compareAndSet(false, true)) { - logWarning( - "Truncated the string representation of a plan since it was too large. This " + - "behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.") - } - val numFields = math.max(0, maxNumFields - 1) - seq.take(numFields).mkString( - start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end) - } else { - seq.mkString(start, sep, end) - } - } - - /** Shorthand for calling truncatedString() without start or end strings. */ - def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "") - /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 39f4fba78583..534b33be7efc 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -45,14 +45,6 @@ import org.apache.spark.scheduler.SparkListener class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { - test("truncatedString") { - assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]") - assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]") - assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]") - assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]") - assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3") - } - test("timeConversion") { // Test -1 assert(Utils.timeStringAsSeconds("-1") === -1) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c2d22c5e7ce6..896deb625eec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -953,7 +953,7 @@ class Analyzer( case plan if containsDeserializer(plan.expressions) => plan case q: LogicalPlan => - logTrace(s"Attempting to resolve ${q.simpleString}") + logTrace(s"Attempting to resolve ${q.simpleString(maxFields = None)}") q.mapExpressions(resolve(_, q)) } @@ -1733,7 +1733,7 @@ class Analyzer( case p if p.expressions.exists(hasGenerator) => throw new AnalysisException("Generators are not supported outside the SELECT clause, but " + - "got: " + p.simpleString) + "got: " + p.simpleString(maxFields = None)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 6a91d556b2f3..703c035a8624 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -308,7 +308,7 @@ trait CheckAnalysis extends PredicateHelper { val missingAttributes = o.missingInput.mkString(",") val input = o.inputSet.mkString(",") val msgForMissingAttributes = s"Resolved attribute(s) $missingAttributes missing " + - s"from $input in operator ${operator.simpleString}." + s"from $input in operator ${operator.simpleString(maxFields = None)}." val resolver = plan.conf.resolver val attrsWithSameName = o.missingInput.filter { missing => @@ -373,7 +373,7 @@ trait CheckAnalysis extends PredicateHelper { s"""nondeterministic expressions are only allowed in |Project, Filter, Aggregate or Window, found: | ${o.expressions.map(_.sql).mkString(",")} - |in operator ${operator.simpleString} + |in operator ${operator.simpleString(maxFields = None)} """.stripMargin) case _: UnresolvedHint => @@ -385,7 +385,8 @@ trait CheckAnalysis extends PredicateHelper { } extendedCheckRules.foreach(_(plan)) plan.foreachUp { - case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}") + case o if !o.resolved => + failAnalysis(s"unresolved operator ${o.simpleString(maxFields = None)}") case _ => } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 72ac80e0a0a1..8b8883d20799 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -1038,7 +1038,8 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging { case Some(newType) if a.dataType == newType.dataType => a case Some(newType) => logDebug( - s"Promoting $a from ${a.dataType} to ${newType.dataType} in ${q.simpleString}") + s"Promoting $a from ${a.dataType} to ${newType.dataType} in " + + q.simpleString(maxFields = None)) newType } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 592520c59a76..2e6ad67a2b8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -318,8 +318,9 @@ case class ExpressionEncoder[T]( extractProjection(inputRow) } catch { case e: Exception => + val encoded = serializer.map(_.simpleString(maxFields = None)).mkString("\n") throw new RuntimeException( - s"Error while encoding: $e\n${serializer.map(_.simpleString).mkString("\n")}", e) + s"Error while encoding: $e\n${encoded}", e) } /** @@ -331,7 +332,8 @@ case class ExpressionEncoder[T]( constructProjection(row).get(0, ObjectType(clsTag.runtimeClass)).asInstanceOf[T] } catch { case e: Exception => - throw new RuntimeException(s"Error while decoding: $e\n${deserializer.simpleString}", e) + val decoded = deserializer.simpleString(maxFields = None) + throw new RuntimeException(s"Error while decoding: $e\n${decoded}", e) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 141fcffcb6fa..f48599195f7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -25,9 +25,9 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the basic expression abstract classes in Catalyst. @@ -233,12 +233,12 @@ abstract class Expression extends TreeNode[Expression] { // Marks this as final, Expression.verboseString should never be called, and thus shouldn't be // overridden by concrete classes. - final override def verboseString: String = simpleString + final override def verboseString(maxFields: Option[Int]): String = simpleString(maxFields) - override def simpleString: String = toString + override def simpleString(maxFields: Option[Int]): String = toString - override def toString: String = prettyName + Utils.truncatedString( - flatArguments.toSeq, "(", ", ", ")") + override def toString: String = prettyName + truncatedString( + flatArguments.toSeq, "(", ", ", ")", maxFields = None) /** * Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala index 17d4a0dc4e88..7e88e0533727 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala @@ -197,7 +197,7 @@ trait Block extends TreeNode[Block] with JavaCode { case _ => code"$this\n$other" } - override def verboseString: String = toString + override def verboseString(maxFields: Option[Int]): String = toString } object Block { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index d6e67b9ac3d1..a14c417b7e7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -101,7 +101,7 @@ case class UserDefinedGenerator( inputRow = new InterpretedProjection(children) convertToScala = { val inputSchema = StructType(children.map { e => - StructField(e.simpleString, e.dataType, nullable = true) + StructField(e.simpleString(maxFields = None), e.dataType, nullable = true) }) CatalystTypeConverters.createToScalaConverter(inputSchema) }.asInstanceOf[InternalRow => Row] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index b07d9466ba0d..9970814e3ec0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -54,7 +54,9 @@ case class NamedLambdaVariable( override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" - override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" + override def simpleString(maxFields: Option[Int]): String = { + s"lambda $name#${exprId.id}: ${dataType.simpleString}" + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 0cdeda9b1051..5b46f4b937ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -40,7 +40,7 @@ case class PrintToStderr(child: Expression) extends UnaryExpression { input } - private val outputPrefix = s"Result of ${child.simpleString} is " + private val outputPrefix = s"Result of ${child.simpleString(maxFields = None)} is " override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val outputPrefixField = ctx.addReferenceObj("outputPrefix", outputPrefix) @@ -72,7 +72,7 @@ case class AssertTrue(child: Expression) extends UnaryExpression with ImplicitCa override def prettyName: String = "assert_true" - private val errMsg = s"'${child.simpleString}' is not true!" + private val errMsg = s"'${child.simpleString(maxFields = None)}' is not true!" override def eval(input: InternalRow) : Any = { val v = child.eval(input) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 584a2946bd56..8a6fab08100b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -326,7 +326,9 @@ case class AttributeReference( // Since the expression id is not in the first constructor it is missing from the default // tree string. - override def simpleString: String = s"$name#${exprId.id}: ${dataType.simpleString}" + override def simpleString(maxFields: Option[Int]): String = { + s"$name#${exprId.id}: ${dataType.simpleString}" + } override def sql: String = { val qualifierPrefix = if (qualifier.nonEmpty) qualifier.mkString(".") + "." else "" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index ca0cea6ba7de..df1109374dd6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -172,9 +172,11 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT */ protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else "" - override def simpleString: String = statePrefix + super.simpleString + override def simpleString(maxFields: Option[Int]): String = { + statePrefix + super.simpleString(maxFields) + } - override def verboseString: String = simpleString + override def verboseString(maxFields: Option[Int]): String = simpleString(maxFields) /** * All the subqueries of current plan. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index a520eba001af..f17bdf48f66e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -36,8 +36,8 @@ abstract class LogicalPlan /** Returns true if this subtree has data from a streaming data source. */ def isStreaming: Boolean = children.exists(_.isStreaming == true) - override def verboseStringWithSuffix: String = { - super.verboseString + statsCache.map(", " + _.toString).getOrElse("") + override def verboseStringWithSuffix(maxFields: Option[Int]): String = { + super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("") } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index f09c5ceefed1..c5ffc77c1f52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils import org.apache.spark.util.random.RandomSampler /** @@ -468,7 +468,7 @@ case class View( override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) - override def simpleString: String = { + override def simpleString(maxFields: Option[Int]): String = { s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})" } } @@ -484,8 +484,8 @@ case class View( case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode { override def output: Seq[Attribute] = child.output - override def simpleString: String = { - val cteAliases = Utils.truncatedString(cteRelations.map(_._1), "[", ", ", "]") + override def simpleString(maxFields: Option[Int]): String = { + val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]", maxFields) s"CTE $cteAliases" } @@ -557,7 +557,7 @@ case class Range( override def newInstance(): Range = copy(output = output.map(_.newInstance())) - override def simpleString: String = { + override def simpleString(maxFields: Option[Int]): String = { s"Range ($start, $end, step=$step, splits=$numSlices)" } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index becfa8d98221..17529c997513 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.catalyst.trees +import java.io._ import java.util.UUID import scala.collection.Map import scala.reflect.ClassTag +import org.apache.commons.io.output.StringBuilderWriter import org.apache.commons.lang3.ClassUtils import org.json4s.JsonAST._ import org.json4s.JsonDSL._ @@ -30,14 +32,14 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.ScalaReflection._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils /** Used by [[TreeNode.getNodeNumbered]] when traversing the tree for a given number */ private class MutableInt(var i: Int) @@ -431,17 +433,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]] /** Returns a string representing the arguments to this node, minus any children */ - def argString: String = stringArgs.flatMap { + def argString(maxFields: Option[Int]): String = stringArgs.flatMap { case tn: TreeNode[_] if allChildren.contains(tn) => Nil case Some(tn: TreeNode[_]) if allChildren.contains(tn) => Nil - case Some(tn: TreeNode[_]) => tn.simpleString :: Nil - case tn: TreeNode[_] => tn.simpleString :: Nil + case Some(tn: TreeNode[_]) => tn.simpleString(maxFields) :: Nil + case tn: TreeNode[_] => tn.simpleString(maxFields) :: Nil case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil case iter: Iterable[_] if iter.isEmpty => Nil - case seq: Seq[_] => Utils.truncatedString(seq, "[", ", ", "]") :: Nil - case set: Set[_] => Utils.truncatedString(set.toSeq, "{", ", ", "}") :: Nil + case seq: Seq[_] => truncatedString(seq, "[", ", ", "]", maxFields) :: Nil + case set: Set[_] => truncatedString(set.toSeq, "{", ", ", "}", maxFields) :: Nil case array: Array[_] if array.isEmpty => Nil - case array: Array[_] => Utils.truncatedString(array, "[", ", ", "]") :: Nil + case array: Array[_] => truncatedString(array, "[", ", ", "]", maxFields) :: Nil case null => Nil case None => Nil case Some(null) => Nil @@ -454,14 +456,22 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case other => other :: Nil }.mkString(", ") - /** ONE line description of this node. */ - def simpleString: String = s"$nodeName $argString".trim + /** + * ONE line description of this node. + * @param maxFields Maximum number of fields that will be converted to strings. + * Any elements beyond the limit will be dropped. + * `None` means the limit is defined by the SQL config + * `spark.sql.debug.maxToStringFields`. + */ + def simpleString(maxFields: Option[Int]): String = { + s"$nodeName ${argString(maxFields)}".trim + } /** ONE line description of this node with more information */ - def verboseString: String + def verboseString(maxFields: Option[Int]): String /** ONE line description of this node with some suffix information */ - def verboseStringWithSuffix: String = verboseString + def verboseStringWithSuffix(maxFields: Option[Int]): String = verboseString(maxFields) override def toString: String = treeString @@ -469,7 +479,21 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def treeString: String = treeString(verbose = true) def treeString(verbose: Boolean, addSuffix: Boolean = false): String = { - generateTreeString(0, Nil, new StringBuilder, verbose = verbose, addSuffix = addSuffix).toString + val writer = new StringBuilderWriter() + try { + treeString(writer, verbose, addSuffix, None) + writer.toString + } finally { + writer.close() + } + } + + def treeString( + writer: Writer, + verbose: Boolean, + addSuffix: Boolean, + maxFields: Option[Int]): Unit = { + generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields) } /** @@ -521,7 +545,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { protected def innerChildren: Seq[TreeNode[_]] = Seq.empty /** - * Appends the string representation of this node and its children to the given StringBuilder. + * Appends the string representation of this node and its children to the given Writer. * * The `i`-th element in `lastChildren` indicates whether the ancestor of the current node at * depth `i + 1` is the last child of its own parent node. The depth of the root node is 0, and @@ -532,44 +556,43 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - builder: StringBuilder, + writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): StringBuilder = { + addSuffix: Boolean = false, + maxFields: Option[Int]): Unit = { if (depth > 0) { lastChildren.init.foreach { isLast => - builder.append(if (isLast) " " else ": ") + writer.write(if (isLast) " " else ": ") } - builder.append(if (lastChildren.last) "+- " else ":- ") + writer.write(if (lastChildren.last) "+- " else ":- ") } val str = if (verbose) { - if (addSuffix) verboseStringWithSuffix else verboseString + if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields) } else { - simpleString + simpleString(maxFields) } - builder.append(prefix) - builder.append(str) - builder.append("\n") + writer.write(prefix) + writer.write(str) + writer.write("\n") if (innerChildren.nonEmpty) { innerChildren.init.foreach(_.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ false, builder, verbose, - addSuffix = addSuffix)) + depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose, + addSuffix = addSuffix, maxFields = maxFields)) innerChildren.last.generateTreeString( - depth + 2, lastChildren :+ children.isEmpty :+ true, builder, verbose, - addSuffix = addSuffix) + depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose, + addSuffix = addSuffix, maxFields = maxFields) } if (children.nonEmpty) { children.init.foreach(_.generateTreeString( - depth + 1, lastChildren :+ false, builder, verbose, prefix, addSuffix)) + depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxFields)) children.last.generateTreeString( - depth + 1, lastChildren :+ true, builder, verbose, prefix, addSuffix) + depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxFields) } - - builder } /** @@ -651,7 +674,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { t.forall(_.isInstanceOf[Partitioning]) || t.forall(_.isInstanceOf[DataType]) => JArray(t.map(parseToJson).toList) case t: Seq[_] if t.length > 0 && t.head.isInstanceOf[String] => - JString(Utils.truncatedString(t, "[", ", ", "]")) + JString(truncatedString(t, "[", ", ", "]", maxFields = None)) case t: Seq[_] => JNull case m: Map[_, _] => JNull // if it's a scala object, we can simply keep the full class path. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 0978e92dd4f7..3c8727727abd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -19,13 +19,18 @@ package org.apache.spark.sql.catalyst import java.io._ import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{NumericType, StringType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils -package object util { +package object util extends Logging { /** Silences output to stderr or stdout for the duration of f */ def quietly[A](f: => A): A = { @@ -167,6 +172,43 @@ package object util { builder.toString() } + /** Whether we have warned about plan string truncation yet. */ + private val truncationWarningPrinted = new AtomicBoolean(false) + + /** + * Format a sequence with semantics similar to calling .mkString(). Any elements beyond + * maxFields will be dropped and replaced by a "... N more fields" placeholder. + * If maxFields is set to `None`, maximum number of fields is defined by the SQL config + * `spark.sql.debug.maxToStringFields`. + * + * @return the trimmed and formatted string. + */ + def truncatedString[T]( + seq: Seq[T], + start: String, + sep: String, + end: String, + maxFields: Option[Int]): String = { + val maxNumFields = maxFields.getOrElse(SQLConf.get.maxToStringFields) + if (seq.length > maxNumFields) { + if (truncationWarningPrinted.compareAndSet(false, true)) { + logWarning( + "Truncated the string representation of a plan since it was too large. This " + + s"behavior can be adjusted by setting '${SQLConf.MAX_TO_STRING_FIELDS.key}'") + } + val numFields = math.max(0, maxNumFields - 1) + seq.take(numFields).mkString( + start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end) + } else { + seq.mkString(start, sep, end) + } + } + + /** Shorthand for calling truncatedString() without start or end strings. */ + def truncatedString[T](seq: Seq[T], sep: String, maxFields: Option[Int]): String = { + truncatedString(seq, "", sep, "", maxFields) + } + /* FIX ME implicit class debugLogging(a: Any) { def debugLogging() { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fa59fa578a96..51f2f8dfbc83 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1594,6 +1594,14 @@ object SQLConf { "WHERE, which does not follow SQL standard.") .booleanConf .createWithDefault(false) + + val MAX_TO_STRING_FIELDS = buildConf("spark.sql.debug.maxToStringFields") + .internal() + .doc("Maximum number of fields of a tree node that can be converted to strings " + + "in debug output. Any elements beyond the limit will be dropped and replaced by a" + + """ "... N more fields" placeholder.""") + .intConf + .createWithDefault(25) } /** @@ -2009,6 +2017,8 @@ class SQLConf extends Serializable with Logging { def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG) + def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index e53628d11ccf..317899a947a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -75,8 +75,14 @@ abstract class DataType extends AbstractDataType { /** String representation for the type saved in external catalogs. */ def catalogString: String = simpleString - /** Readable string representation for the type with truncation */ - private[sql] def simpleString(maxNumberFields: Int): String = simpleString + /** + * Readable string representation for the type with truncation. + * @param maxFields Maximum number of fields that will be converted to strings. + * Any elements beyond the limit will be dropped. + * `None` means the limit is defined by the SQL config + * `spark.sql.debug.maxToStringFields`. + */ + private[sql] def simpleString(maxFields: Option[Int]): String = simpleString def sql: String = simpleString.toUpperCase(Locale.ROOT) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 06289b148320..67c95e6445ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -27,8 +27,8 @@ import org.apache.spark.SparkException import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser} -import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier} -import org.apache.spark.util.Utils +import org.apache.spark.sql.catalyst.util.{quoteIdentifier, truncatedString} +import org.apache.spark.sql.internal.SQLConf /** * A [[StructType]] object can be constructed by @@ -346,7 +346,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def simpleString: String = { val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}") - Utils.truncatedString(fieldTypes, "struct<", ",", ">") + truncatedString(fieldTypes, "struct<", ",", ">", maxFields = None) } override def catalogString: String = { @@ -370,10 +370,11 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru */ def toDDL: String = fields.map(_.toDDL).mkString(",") - private[sql] override def simpleString(maxNumberFields: Int): String = { + private[sql] override def simpleString(maxFields: Option[Int]): String = { val builder = new StringBuilder + val maxNumberFields = maxFields.getOrElse(SQLConf.get.maxToStringFields) val fieldTypes = fields.take(maxNumberFields).map { - f => s"${f.name}: ${f.dataType.simpleString(maxNumberFields)}" + f => s"${f.name}: ${f.dataType.simpleString(maxFields)}" } builder.append("struct<") builder.append(fieldTypes.mkString(", ")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala index 294fce8e9a10..d5c7d9e3d4f2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala @@ -215,7 +215,7 @@ class PercentileSuite extends SparkFunSuite { val percentile2 = new Percentile(child, percentage) assertEqual(percentile2.checkInputDataTypes(), TypeCheckFailure(s"Percentage(s) must be between 0.0 and 1.0, " + - s"but got ${percentage.simpleString}")) + s"but got ${percentage.simpleString(maxFields = None)}")) } val nonFoldablePercentage = Seq(NonFoldableLiteral(0.5), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilsSuite.scala new file mode 100644 index 000000000000..58cd20387ebe --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilsSuite.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util._ + +class UtilsSuite extends SparkFunSuite { + + test("truncatedString") { + assert(truncatedString(Nil, "[", ", ", "]", Some(2)) == "[]") + assert(truncatedString(Seq(1, 2), "[", ", ", "]", Some(2)) == "[1, 2]") + assert(truncatedString(Seq(1, 2, 3), "[", ", ", "]", Some(2)) == "[1, ... 2 more fields]") + assert(truncatedString(Seq(1, 2, 3), "[", ", ", "]", Some(-5)) == "[, ... 3 more fields]") + assert(truncatedString(Seq(1, 2, 3), ", ", maxFields = None) == "1, 2, 3") + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c91b0d778fab..70fe19f5a254 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -375,7 +375,7 @@ class Dataset[T] private[sql]( try { val builder = new StringBuilder val fields = schema.take(2).map { - case f => s"${f.name}: ${f.dataType.simpleString(2)}" + case f => s"${f.name}: ${f.dataType.simpleString(Some(2))}" } builder.append("[") builder.append(fields.mkString(", ")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 555bcdffb6ee..190648dda4ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -567,10 +567,10 @@ class KeyValueGroupedDataset[K, V] private[sql]( override def toString: String = { val builder = new StringBuilder val kFields = kExprEnc.schema.map { - case f => s"${f.name}: ${f.dataType.simpleString(2)}" + case f => s"${f.name}: ${f.dataType.simpleString(Some(2))}" } val vFields = vExprEnc.schema.map { - case f => s"${f.name}: ${f.dataType.simpleString(2)}" + case f => s"${f.name}: ${f.dataType.simpleString(Some(2))}" } builder.append("KeyValueGroupedDataset: [key: [") builder.append(kFields.take(2).mkString(", ")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index d4e75b5ebd40..cee27a013be5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -528,7 +528,7 @@ class RelationalGroupedDataset protected[sql]( builder.append("RelationalGroupedDataset: [grouping expressions: [") val kFields = groupingExprs.collect { case expr: NamedExpression if expr.resolved => - s"${expr.name}: ${expr.dataType.simpleString(2)}" + s"${expr.name}: ${expr.dataType.simpleString(Some(2))}" case expr: NamedExpression => expr.name case o => o.toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index a9b18ab57237..c7f518d2848a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics @@ -51,16 +52,19 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { // Metadata that describes more details of this scan. protected def metadata: Map[String, String] - override def simpleString: String = { + override def simpleString(maxFields: Option[Int]): String = { val metadataEntries = metadata.toSeq.sorted.map { case (key, value) => key + ": " + StringUtils.abbreviate(redact(value), 100) } - val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") - s"$nodeNamePrefix$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" + val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields) + val outputStr = truncatedString(output, "[", ",", "]", maxFields) + s"$nodeNamePrefix$nodeName${outputStr}$metadataStr" } - override def verboseString: String = redact(super.verboseString) + override def verboseString(maxFields: Option[Int]): String = { + redact(super.verboseString(maxFields)) + } override def treeString(verbose: Boolean, addSuffix: Boolean): String = { redact(super.treeString(verbose, addSuffix)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 2962becb64e8..c4e735f08430 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -24,9 +24,9 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.DataType -import org.apache.spark.util.Utils object RDDConversions { def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { @@ -119,7 +119,7 @@ case class ExternalRDDScanExec[T]( } } - override def simpleString: String = { + override def simpleString(maxFields: Option[Int]): String = { s"$nodeName${output.mkString("[", ",", "]")}" } } @@ -196,7 +196,7 @@ case class RDDScanExec( } } - override def simpleString: String = { - s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" + override def simpleString(maxFields: Option[Int]): String = { + s"$nodeName${truncatedString(output, "[", ",", "]", maxFields)}" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 64f49e2d0d4e..39dad51ba851 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -17,16 +17,20 @@ package org.apache.spark.sql.execution +import java.io.{BufferedWriter, OutputStreamWriter, Writer} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} +import org.apache.commons.io.output.StringBuilderWriter +import org.apache.hadoop.fs.Path + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{truncatedString, DateTimeUtils} import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} @@ -98,7 +102,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { protected def stringOrError[A](f: => A): String = try f.toString catch { case e: AnalysisException => e.toString } - /** * Returns the result as a hive compatible sequence of strings. This is used in tests and * `SparkSQLDriver` for CLI applications. @@ -189,23 +192,38 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { """.stripMargin.trim } + private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = { + try f(writer) + catch { + case e: AnalysisException => writer.write(e.toString) + } + } + + private def writePlans(writer: Writer, maxFields: Option[Int]): Unit = { + val (verbose, addSuffix) = (true, false) + + writer.write("== Parsed Logical Plan ==\n") + writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields)) + writer.write("\n== Analyzed Logical Plan ==\n") + val analyzedOutput = stringOrError(truncatedString( + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields)) + writer.write(analyzedOutput) + writer.write("\n") + writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields)) + writer.write("\n== Optimized Logical Plan ==\n") + writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields)) + writer.write("\n== Physical Plan ==\n") + writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields)) + } + override def toString: String = withRedaction { - def output = Utils.truncatedString( - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ") - val analyzedPlan = Seq( - stringOrError(output), - stringOrError(analyzed.treeString(verbose = true)) - ).filter(_.nonEmpty).mkString("\n") - - s"""== Parsed Logical Plan == - |${stringOrError(logical.treeString(verbose = true))} - |== Analyzed Logical Plan == - |$analyzedPlan - |== Optimized Logical Plan == - |${stringOrError(optimizedPlan.treeString(verbose = true))} - |== Physical Plan == - |${stringOrError(executedPlan.treeString(verbose = true))} - """.stripMargin.trim + val writer = new StringBuilderWriter() + try { + writePlans(writer, None) + writer.toString + } finally { + writer.close() + } } def stringWithStats: String = withRedaction { @@ -250,5 +268,22 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { def codegenToSeq(): Seq[(String, String)] = { org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan) } + + /** + * Dumps debug information about query execution into the specified file. + */ + def toFile(path: String): Unit = { + val filePath = new Path(path) + val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) + + try { + writePlans(writer, Some(Int.MaxValue)) + writer.write("\n== Whole Stage Codegen ==\n") + org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan) + } finally { + writer.close() + } + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 59ffd1638111..ad2c023bd32a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -62,7 +62,10 @@ private[execution] object SparkPlanInfo { case fileScan: FileSourceScanExec => fileScan.metadata case _ => Map[String, String]() } - new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), + new SparkPlanInfo( + plan.nodeName, + plan.simpleString(maxFields = None), + children.map(fromSparkPlan), metadata, metrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 5f81b6fe743c..50e4d47fe54a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import java.io.Writer import java.util.Locale import java.util.function.Supplier @@ -86,7 +87,7 @@ trait CodegenSupport extends SparkPlan { this.parent = parent ctx.freshNamePrefix = variablePrefix s""" - |${ctx.registerComment(s"PRODUCE: ${this.simpleString}")} + |${ctx.registerComment(s"PRODUCE: ${this.simpleString(maxFields = None)}")} |${doProduce(ctx)} """.stripMargin } @@ -187,7 +188,7 @@ trait CodegenSupport extends SparkPlan { parent.doConsume(ctx, inputVars, rowVar) } s""" - |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} + |${ctx.registerComment(s"CONSUME: ${parent.simpleString(maxFields = None)}")} |$evaluated |$consumeFunc """.stripMargin @@ -450,11 +451,19 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - builder: StringBuilder, + writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): StringBuilder = { - child.generateTreeString(depth, lastChildren, builder, verbose, "") + addSuffix: Boolean = false, + maxFields: Option[Int]): Unit = { + child.generateTreeString( + depth, + lastChildren, + writer, + verbose, + prefix = "", + addSuffix = false, + maxFields) } override def needCopyResult: Boolean = false @@ -726,11 +735,19 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - builder: StringBuilder, + writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): StringBuilder = { - child.generateTreeString(depth, lastChildren, builder, verbose, s"*($codegenStageId) ") + addSuffix: Boolean = false, + maxFields: Option[Int]): Unit = { + child.generateTreeString( + depth, + lastChildren, + writer, + verbose, + prefix = s"*($codegenStageId) ", + addSuffix = false, + maxFields) } override def needStopCheck: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 25d8e7dff3d9..dee121547045 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.MutableColumnarRow @@ -919,18 +920,19 @@ case class HashAggregateExec( """ } - override def verboseString: String = toString(verbose = true) + override def verboseString(maxFields: Option[Int]): String = toString(verbose = true, maxFields) - override def simpleString: String = toString(verbose = false) + override def simpleString(maxFields: Option[Int]): String = toString(verbose = false, maxFields) - private def toString(verbose: Boolean): String = { + private def toString(verbose: Boolean, maxFields: Option[Int]): String = { val allAggregateExpressions = aggregateExpressions testFallbackStartsAt match { case None => - val keyString = Utils.truncatedString(groupingExpressions, "[", ", ", "]") - val functionString = Utils.truncatedString(allAggregateExpressions, "[", ", ", "]") - val outputString = Utils.truncatedString(output, "[", ", ", "]") + val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields) + val functionString = + truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields) + val outputString = truncatedString(output, "[", ", ", "]", maxFields) if (verbose) { s"HashAggregate(keys=$keyString, functions=$functionString, output=$outputString)" } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 66955b8ef723..00b89554f57d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.util.Utils /** * A hash-based aggregate operator that supports [[TypedImperativeAggregate]] functions that may @@ -137,15 +137,15 @@ case class ObjectHashAggregateExec( } } - override def verboseString: String = toString(verbose = true) + override def verboseString(maxFields: Option[Int]): String = toString(verbose = true, maxFields) - override def simpleString: String = toString(verbose = false) + override def simpleString(maxFields: Option[Int]): String = toString(verbose = false, maxFields) - private def toString(verbose: Boolean): String = { + private def toString(verbose: Boolean, maxFields: Option[Int]): String = { val allAggregateExpressions = aggregateExpressions - val keyString = Utils.truncatedString(groupingExpressions, "[", ", ", "]") - val functionString = Utils.truncatedString(allAggregateExpressions, "[", ", ", "]") - val outputString = Utils.truncatedString(output, "[", ", ", "]") + val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields) + val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields) + val outputString = truncatedString(output, "[", ", ", "]", maxFields) if (verbose) { s"ObjectHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)" } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index fc87de2c52e4..02c477b76299 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.util.Utils /** * Sort-based aggregate operator. @@ -107,16 +107,16 @@ case class SortAggregateExec( } } - override def simpleString: String = toString(verbose = false) + override def simpleString(maxFields: Option[Int]): String = toString(verbose = false, maxFields) - override def verboseString: String = toString(verbose = true) + override def verboseString(maxFields: Option[Int]): String = toString(verbose = true, maxFields) - private def toString(verbose: Boolean): String = { + private def toString(verbose: Boolean, maxFields: Option[Int]): String = { val allAggregateExpressions = aggregateExpressions - val keyString = Utils.truncatedString(groupingExpressions, "[", ", ", "]") - val functionString = Utils.truncatedString(allAggregateExpressions, "[", ", ", "]") - val outputString = Utils.truncatedString(output, "[", ", ", "]") + val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields) + val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields) + val outputString = truncatedString(output, "[", ", ", "]", maxFields) if (verbose) { s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)" } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 09effe087e19..1bf6de2e3efd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -586,7 +586,9 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) } } - override def simpleString: String = s"Range ($start, $end, step=$step, splits=$numSlices)" + override def simpleString(maxFields: Option[Int]): String = { + s"Range ($start, $end, step=$step, splits=$numSlices)" + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 3b6588587c35..d0afa7006448 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -27,9 +27,10 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{LongAccumulator, Utils} +import org.apache.spark.util.LongAccumulator /** @@ -208,6 +209,8 @@ case class InMemoryRelation( override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache) - override def simpleString: String = - s"InMemoryRelation [${Utils.truncatedString(output, ", ")}], ${cacheBuilder.storageLevel}" + override def simpleString(maxFields: Option[Int]): String = { + val outputStr = truncatedString(output, ", ", maxFields) + s"InMemoryRelation [${outputStr}], ${cacheBuilder.storageLevel}" + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index fe27b78bf336..9546c7092891 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -194,7 +194,7 @@ object FileSourceStrategy extends Strategy with Logging { .filter(requiredAttributes.contains) .filterNot(partitionColumns.contains) val outputSchema = readDataColumns.toStructType - logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}") + logInfo(s"Output Data Schema: ${outputSchema.simpleString(Some(5))}") val outputAttributes = readDataColumns ++ partitionColumns diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 8d715f634298..0a20f23b758f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.util.Utils @@ -63,7 +64,9 @@ case class LogicalRelation( case _ => // Do nothing. } - override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation" + override def simpleString(maxFields: Option[Int]): String = { + s"Relation[${truncatedString(output, ",", maxFields)}] $relation" + } } object LogicalRelation { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 00b1b5dedb59..5c26a8c9f757 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -48,7 +48,7 @@ case class SaveIntoDataSourceCommand( Seq.empty[Row] } - override def simpleString: String = { + override def simpleString(maxFields: Option[Int]): String = { val redacted = SQLConf.get.redactOptions(options) s"SaveIntoDataSourceCommand ${dataSource}, ${redacted}, ${mode}" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index fdc5e85f3c2e..d9219c0a9c41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -68,7 +68,7 @@ case class CreateTempViewUsing( s"Temporary view '$tableIdent' should not have specified a database") } - override def argString: String = { + override def argString(maxFields: Option[Int]): String = { s"[tableIdent:$tableIdent " + userSpecifiedSchema.map(_ + " ").getOrElse("") + s"replace:$replace " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index f15014442e3f..59ba993f1d08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -27,10 +27,10 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, DateType, NumericType, StructType, TimestampType} -import org.apache.spark.util.Utils /** * Instructions on how to partition the table among workers. @@ -160,7 +160,7 @@ private[sql] object JDBCRelation extends Logging { resolver(f.name, columnName) || resolver(dialect.quoteIdentifier(f.name), columnName) }.getOrElse { throw new AnalysisException(s"User-defined partition column $columnName not " + - s"found in the JDBC relation: ${schema.simpleString(Utils.maxNumToStringFields)}") + s"found in the JDBC relation: ${schema.simpleString(maxFields = None)}") } column.dataType match { case _: NumericType | DateType | TimestampType => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index f7e29593a635..1026225c6492 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -56,7 +56,9 @@ case class DataSourceV2Relation( override def pushedFilters: Seq[Expression] = Seq.empty - override def simpleString: String = "RelationV2 " + metadataString + override def simpleString(maxFields: Option[Int]): String = { + "RelationV2 " + metadataString(maxFields) + } def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema) @@ -90,7 +92,9 @@ case class StreamingDataSourceV2Relation( override def isStreaming: Boolean = true - override def simpleString: String = "Streaming RelationV2 " + metadataString + override def simpleString(maxFields: Option[Int]): String = { + "Streaming RelationV2 " + metadataString(maxFields) + } override def pushedFilters: Seq[Expression] = Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 25f86a66a826..deb6399f4cc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -40,7 +40,9 @@ case class DataSourceV2ScanExec( @transient scanConfig: ScanConfig) extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { - override def simpleString: String = "ScanV2 " + metadataString + override def simpleString(maxFields: Option[Int]): String = { + "ScanV2 " + metadataString(maxFields) + } // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala index 97e6c6d702ac..d2ea08f7ee9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.DataSourceV2 import org.apache.spark.util.Utils @@ -58,7 +59,7 @@ trait DataSourceV2StringFormat { case _ => Utils.getSimpleName(source.getClass) } - def metadataString: String = { + def metadataString(maxFields: Option[Int]): String = { val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)] if (pushedFilters.nonEmpty) { @@ -72,12 +73,12 @@ trait DataSourceV2StringFormat { }.mkString("[", ",", "]") } - val outputStr = Utils.truncatedString(output, "[", ", ", "]") + val outputStr = truncatedString(output, "[", ", ", "]", maxFields) val entriesStr = if (entries.nonEmpty) { - Utils.truncatedString(entries.map { + truncatedString(entries.map { case (key, value) => key + ": " + StringUtils.abbreviate(value, 100) - }, " (", ", ", ")") + }, " (", ", ", ")", maxFields) } else { "" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 366e1fe6a4aa..905f89efe2c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution +import java.io._ import java.util.Collections import scala.collection.JavaConverters._ +import org.apache.commons.io.output.StringBuilderWriter + import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -30,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, Codegen import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} -import org.apache.spark.sql.execution.streaming.continuous.WriteToContinuousDataSourceExec import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{AccumulatorV2, LongAccumulator} @@ -70,15 +72,25 @@ package object debug { * @return single String containing all WholeStageCodegen subtrees and corresponding codegen */ def codegenString(plan: SparkPlan): String = { + val writer = new StringBuilderWriter() + + try { + writeCodegen(writer, plan) + writer.toString + } finally { + writer.close() + } + } + + def writeCodegen(writer: Writer, plan: SparkPlan): Unit = { val codegenSeq = codegenStringSeq(plan) - var output = s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n" + writer.write(s"Found ${codegenSeq.size} WholeStageCodegen subtrees.\n") for (((subtree, code), i) <- codegenSeq.zipWithIndex) { - output += s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n" - output += subtree - output += "\nGenerated code:\n" - output += s"${code}\n" + writer.write(s"== Subtree ${i + 1} / ${codegenSeq.size} ==\n") + writer.write(subtree) + writer.write("\nGenerated code:\n") + writer.write(s"${code}\n") } - output } /** @@ -204,7 +216,7 @@ package object debug { val columnStats: Array[ColumnMetrics] = Array.fill(child.output.size)(new ColumnMetrics()) def dumpStats(): Unit = { - debugPrint(s"== ${child.simpleString} ==") + debugPrint(s"== ${child.simpleString(maxFields = None)} ==") debugPrint(s"Tuples output: ${tupleCount.value}") child.output.zip(columnStats).foreach { case (attr, metric) => // This is called on driver. All accumulator updates have a fixed value. So it's safe to use diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 9bfe1a79fc1e..160ef3fffa8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, LazilyGeneratedOrdering} import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.util.Utils /** * Take the first `limit` elements and collect them to a single partition. @@ -176,9 +176,9 @@ case class TakeOrderedAndProjectExec( override def outputPartitioning: Partitioning = SinglePartition - override def simpleString: String = { - val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]") - val outputString = Utils.truncatedString(output, "[", ",", "]") + override def simpleString(maxFields: Option[Int]): String = { + val orderByString = truncatedString(sortOrder, "[", ",", "]", maxFields) + val outputString = truncatedString(output, "[", ",", "]", maxFields) s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 2cac86599ef1..42e96a60ad17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -24,13 +24,14 @@ import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} -import org.apache.spark.util.{Clock, Utils} +import org.apache.spark.util.Clock class MicroBatchExecution( sparkSession: SparkSession, @@ -475,8 +476,8 @@ class MicroBatchExecution( case StreamingExecutionRelation(source, output) => newData.get(source).map { dataPlan => assert(output.size == dataPlan.output.size, - s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + - s"${Utils.truncatedString(dataPlan.output, ",")}") + s"Invalid batch: ${truncatedString(output, ",", maxFields = None)} != " + + s"${truncatedString(dataPlan.output, ",", maxFields = None)}") val aliases = output.zip(dataPlan.output).map { case (to, from) => Alias(from, to.name)(exprId = to.exprId, explicitMetadata = Some(from.metadata)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index f009c52449ad..4c073777b113 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -28,6 +28,7 @@ import org.apache.spark.SparkEnv import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, StreamingDataSourceV2Relation} import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} @@ -35,7 +36,7 @@ import org.apache.spark.sql.sources.v2 import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, StreamingWriteSupportProvider} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} -import org.apache.spark.util.{Clock, Utils} +import org.apache.spark.util.Clock class ContinuousExecution( sparkSession: SparkSession, @@ -164,8 +165,8 @@ class ContinuousExecution( val newOutput = readSupport.fullSchema().toAttributes assert(output.size == newOutput.size, - s"Invalid reader: ${Utils.truncatedString(output, ",")} != " + - s"${Utils.truncatedString(newOutput, ",")}") + s"Invalid reader: ${truncatedString(output, ",", maxFields = None)} != " + + s"${truncatedString(newOutput, ",", maxFields = None)}") replacements ++= output.zip(newOutput) val loggedOffset = offsets.offsets(0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index adf52aba21a0..79918379fb70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -31,11 +31,11 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType -import org.apache.spark.util.Utils object MemoryStream { protected val currentBlockId = new AtomicInteger(0) @@ -117,7 +117,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } - override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" + override def toString: String = { + s"MemoryStream[${truncatedString(output, ",", maxFields = None)}]" + } override def deserializeOffset(json: String): OffsetV2 = LongOffset(json.toLong) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 310ebcdf6768..6c9d2f422060 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -51,7 +51,7 @@ case class ScalarSubquery( override def dataType: DataType = plan.schema.fields.head.dataType override def children: Seq[Expression] = Nil override def nullable: Boolean = true - override def toString: String = plan.simpleString + override def toString: String = plan.simpleString(maxFields = None) override def withNewPlan(query: SubqueryExec): ScalarSubquery = copy(plan = query) override def semanticEquals(other: Expression): Boolean = other match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 964440346deb..5c09d0d18cb7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -16,11 +16,108 @@ */ package org.apache.spark.sql.execution +import scala.io.Source + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext +case class QueryExecutionTestRecord( + c0: Int, c1: Int, c2: Int, c3: Int, c4: Int, + c5: Int, c6: Int, c7: Int, c8: Int, c9: Int, + c10: Int, c11: Int, c12: Int, c13: Int, c14: Int, + c15: Int, c16: Int, c17: Int, c18: Int, c19: Int, + c20: Int, c21: Int, c22: Int, c23: Int, c24: Int, + c25: Int, c26: Int) + class QueryExecutionSuite extends SharedSQLContext { + import testImplicits._ + def checkDumpedPlans(path: String, expected: Int): Unit = { + assert(Source.fromFile(path).getLines.toList + .takeWhile(_ != "== Whole Stage Codegen ==") == List( + "== Parsed Logical Plan ==", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Analyzed Logical Plan ==", + "id: bigint", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Optimized Logical Plan ==", + s"Range (0, $expected, step=1, splits=Some(2))", + "", + "== Physical Plan ==", + s"*(1) Range (0, $expected, step=1, splits=2)", + "")) + } + test("dumping query execution info to a file") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/plans.txt" + val df = spark.range(0, 10) + df.queryExecution.debug.toFile(path) + + checkDumpedPlans(path, expected = 10) + } + } + + test("dumping query execution info to an existing file") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/plans.txt" + val df = spark.range(0, 10) + df.queryExecution.debug.toFile(path) + + val df2 = spark.range(0, 1) + df2.queryExecution.debug.toFile(path) + checkDumpedPlans(path, expected = 1) + } + } + + test("dumping query execution info to non-existing folder") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/newfolder/plans.txt" + val df = spark.range(0, 100) + df.queryExecution.debug.toFile(path) + checkDumpedPlans(path, expected = 100) + } + } + + test("dumping query execution info by invalid path") { + val path = "1234567890://plans.txt" + val exception = intercept[IllegalArgumentException] { + spark.range(0, 100).queryExecution.debug.toFile(path) + } + + assert(exception.getMessage.contains("Illegal character in scheme name")) + } + + test("check maximum fields restriction") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/plans.txt" + val ds = spark.createDataset(Seq(QueryExecutionTestRecord( + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, + 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26))) + ds.queryExecution.debug.toFile(path) + val localRelations = Source.fromFile(path).getLines().filter(_.contains("LocalRelation")) + + assert(!localRelations.exists(_.contains("more fields"))) + } + } + + test("limit number of fields by sql config") { + def relationPlans: String = { + val ds = spark.createDataset(Seq(QueryExecutionTestRecord( + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, + 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26))) + ds.queryExecution.toString + } + withSQLConf(SQLConf.MAX_TO_STRING_FIELDS.key -> "26") { + assert(relationPlans.contains("more fields")) + } + withSQLConf(SQLConf.MAX_TO_STRING_FIELDS.key -> "27") { + assert(!relationPlans.contains("more fields")) + } + } + test("toString() exception/error handling") { spark.experimental.extraStrategies = Seq( new SparkStrategy { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index aa573b54a2b6..ac73bbdcae5f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -95,7 +95,7 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } - override def argString: String = { + override def argString(maxFields: Option[Int]): String = { s"[Database:${tableDesc.database}}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]"