|
17 | 17 |
|
18 | 18 | package org.apache.spark.sql.execution |
19 | 19 |
|
20 | | -import java.io.{BufferedWriter, OutputStreamWriter} |
| 20 | +import java.io.{OutputStreamWriter, StringWriter, Writer} |
21 | 21 | import java.nio.charset.StandardCharsets |
22 | 22 | import java.sql.{Date, Timestamp} |
23 | 23 |
|
@@ -102,7 +102,6 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { |
102 | 102 | protected def stringOrError[A](f: => A): String = |
103 | 103 | try f.toString catch { case e: AnalysisException => e.toString } |
104 | 104 |
|
105 | | - |
106 | 105 | /** |
107 | 106 | * Returns the result as a hive compatible sequence of strings. This is used in tests and |
108 | 107 | * `SparkSQLDriver` for CLI applications. |
@@ -193,23 +192,32 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { |
193 | 192 | """.stripMargin.trim |
194 | 193 | } |
195 | 194 |
|
| 195 | + private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = |
| 196 | + try f(writer) catch { case e: AnalysisException => writer.write(e.toString) } |
| 197 | + |
| 198 | + private def writePlans(writer: Writer): Unit = { |
| 199 | + writer.write("== Parsed Logical Plan ==\n") |
| 200 | + writeOrError(writer)(logical.treeString(_, verbose = true, addSuffix = false)) |
| 201 | + writer.write("== Analyzed Logical Plan ==\n") |
| 202 | + writeOrError(writer) { writer => |
| 203 | + analyzed.output.foreach(o => writer.write(s"${o.name}: ${o.dataType.simpleString}")) |
| 204 | + } |
| 205 | + writer.write("\n") |
| 206 | + writeOrError(writer)(analyzed.treeString(_, verbose = true, addSuffix = false)) |
| 207 | + writer.write("== Optimized Logical Plan ==\n") |
| 208 | + writeOrError(writer)(optimizedPlan.treeString(_, verbose = true, addSuffix = false)) |
| 209 | + writer.write("== Physical Plan ==\n") |
| 210 | + writeOrError(writer)(executedPlan.treeString(_, verbose = true, addSuffix = false)) |
| 211 | + } |
| 212 | + |
196 | 213 | override def toString: String = withRedaction { |
197 | | - def output = Utils.truncatedString( |
198 | | - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ") |
199 | | - val analyzedPlan = Seq( |
200 | | - stringOrError(output), |
201 | | - stringOrError(analyzed.treeString(verbose = true)) |
202 | | - ).filter(_.nonEmpty).mkString("\n") |
203 | | - |
204 | | - s"""== Parsed Logical Plan == |
205 | | - |${stringOrError(logical.treeString(verbose = true))} |
206 | | - |== Analyzed Logical Plan == |
207 | | - |$analyzedPlan |
208 | | - |== Optimized Logical Plan == |
209 | | - |${stringOrError(optimizedPlan.treeString(verbose = true))} |
210 | | - |== Physical Plan == |
211 | | - |${stringOrError(executedPlan.treeString(verbose = true))} |
212 | | - """.stripMargin.trim |
| 214 | + val writer = new StringWriter() |
| 215 | + try { |
| 216 | + writePlans(writer) |
| 217 | + writer.toString |
| 218 | + } finally { |
| 219 | + writer.close() |
| 220 | + } |
213 | 221 | } |
214 | 222 |
|
215 | 223 | def stringWithStats: String = withRedaction { |
@@ -267,16 +275,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { |
267 | 275 |
|
268 | 276 | try { |
269 | 277 | SparkEnv.get.conf.set(Utils.MAX_TO_STRING_FIELDS, Int.MaxValue.toString) |
270 | | - writer.write("== Parsed Logical Plan ==\n") |
271 | | - logical.treeString(writer, verbose = true, addSuffix = false) |
272 | | - writer.write("== Analyzed Logical Plan ==\n") |
273 | | - analyzed.output.foreach(o => writer.write(s"${o.name}: ${o.dataType.simpleString}")) |
274 | | - writer.write("\n") |
275 | | - analyzed.treeString(writer, verbose = true, addSuffix = false) |
276 | | - writer.write("== Optimized Logical Plan ==\n") |
277 | | - optimizedPlan.treeString(writer, verbose = true, addSuffix = false) |
278 | | - writer.write("== Physical Plan ==\n") |
279 | | - executedPlan.treeString(writer, verbose = true, addSuffix = false) |
| 278 | + writePlans(writer) |
280 | 279 | writer.write("== Whole Stage Codegen ==\n") |
281 | 280 | org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan) |
282 | 281 | } finally { |
|
0 commit comments