Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,14 @@ object ExplainUtils {
subqueries: ArrayBuffer[(SparkPlan, Expression, BaseSubqueryExec)]): Unit = {
plan.foreach {
case p: SparkPlan =>
p.expressions.flatMap(_.collect {
p.expressions.foreach (_.collect {
case e: PlanExpression[_] =>
e.plan match {
case s: BaseSubqueryExec =>
subqueries += ((p, e, s))
getSubqueries(s, subqueries)
case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this method put its result in the parameter subqueries, I think we don't need to call flatMap and collect, just foreach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan We need the collect to traverse the expression tree. I have changed flatMap to foreach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the collect to traverse the expression tree

hmm, can we use foreach to traverse the expression tree? We have TreeNode.foreach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan We have it in this form :
p.expressions.foreach(_.collect {
...
...
})

You are suggesting to do :

p.expressions.foreach(_.foreach {
...
...
}

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Got it... I will send a small follow-up. Thank you.

}
case other =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we remove it is because it is uesless. collect is accepting partial functions.

})
}
}
Expand Down
49 changes: 49 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
Expand All @@ -36,6 +37,19 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
f(normalizedOutput)
}

/**
* Get the explain by running the sql. The explain mode should be part of the
* sql text itself.
*/
private def withNormalizedExplain(queryText: String)(f: String => Unit) = {
val output = new java.io.ByteArrayOutputStream()
Console.withOut(output) {
sql(queryText).show(false)
}
val normalizedOutput = output.toString.replaceAll("#\\d+", "#x")
f(normalizedOutput)
}

/**
* Runs the plan and makes sure the plans contains all of the keywords.
*/
Expand Down Expand Up @@ -200,6 +214,41 @@ class ExplainSuite extends QueryTest with SharedSparkSession {
}
}
}

test("explain formatted - check presence of subquery in case of DPP") {
withTable("df1", "df2") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST.key -> "false") {
withTable("df1", "df2") {
spark.range(1000).select(col("id"), col("id").as("k"))
.write
.partitionBy("k")
.format("parquet")
.mode("overwrite")
.saveAsTable("df1")

spark.range(100)
.select(col("id"), col("id").as("k"))
.write
.partitionBy("k")
.format("parquet")
.mode("overwrite")
.saveAsTable("df2")

val sqlText =
"""
|EXPLAIN FORMATTED SELECT df1.id, df2.k
|FROM df1 JOIN df2 ON df1.k = df2.k AND df2.id < 2
|""".stripMargin

val expected_pattern = "Subquery:1 Hosting operator id = 1 Hosting Expression = k#x"
withNormalizedExplain(sqlText) { normalizedOutput =>
assert(expected_pattern.r.findAllMatchIn(normalizedOutput).length == 1)
}
}
}
}
}
}

case class ExplainSingleData(id: Int)