Skip to content

Conversation

@dilipbiswal
Copy link
Contributor

@dilipbiswal dilipbiswal commented Oct 7, 2019

What changes were proposed in this pull request?

The subquery expressions introduced by DPP are not printed in the newer explain command.
This PR fixes the code that computes the list of subqueries in the plan.

Before
`== Physical Plan ==

  • Project (4)
    +- * Filter (3)
    +- * ColumnarToRow (2)
    +- BatchScan (1)

(1) BatchScan
Output [2]: [value#7, id#8]
Arguments: [value#7, id#8], ParquetScan(org.apache.spark.sql.test.TestSparkSession@6a299b9d,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, spark_hadoop_conf.xml,org.apache.spark.sql.execution.datasources.InMemoryFileIndex@a51294fc,StructType(StructField(value,IntegerType,true)),StructType(StructField(value,IntegerType,true)),StructType(StructField(id,IntegerType,true)),[Lorg.apache.spark.sql.sources.Filter;@363fe35a,org.apache.spark.sql.util.CaseInsensitiveStringMap@7682ad70,Vector(isnotnull(id#8), (id#8 > 1)),List(isnotnull(value#7), (value#7 > 2)))
(2)...
(3)...
(4)...

**After**

|== Physical Plan ==

  • Project (4)
    +- * Filter (3)
    +- * ColumnarToRow (2)
    +- BatchScan (1)

(1) BatchScan
Output [2]: [value#7, id#8]
DataFilters: [isnotnull(value#7), (value#7 > 2)]
Format: parquet
Location: InMemoryFileIndex[.....]
PartitionFilters: [isnotnull(id#8), (id#8 > 1)]
PushedFilers: [IsNotNull(id), IsNotNull(value), GreaterThan(id,1), GreaterThan(value,2)]
ReadSchema: structvalue:int
(2)...
(3)...
(4)...

### Why are the changes needed?
Makes the plan more readable.

### Does this PR introduce any user-facing change?
Yes. the explain output will be different.

### How was this patch tested?
Added a test case in ExplainSuite.

@SparkQA
Copy link

SparkQA commented Oct 7, 2019

Test build #111829 has finished for PR 26039 at commit 0f23194.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor Author

cc @cloud-fan @gatorsmile

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Oct 7, 2019

Just leaving a general comment, as I'm not familiar with this change:

The subquery expressions introduced by DPP are not printed in the newer explain command.
This PR fixes the code that computes the list of subqueries in the plan.

Without the fix, the subqueries are printed in the explain plan.

Sounds like contradiction: former says subquery expressions are "not" printed for now, and latter says subqueries are printed without the fix.

Yes. the explain output will be different.

Shall we provide example query and actual change of explain string (before applying vs after applying)? It would be helpful to determine the effect of the patch visually.

And normally we tend to not describe the title of PR as "problem statement". We tend to describe the title for what this patch will change/fix.

@cloud-fan
Copy link
Contributor

Shall we also print the pushed filters in the scan node?

@dilipbiswal
Copy link
Contributor Author

dilipbiswal commented Oct 7, 2019

@cloud-fan Actually i have a PR almost ready that implements verboseString for DataSourceScanExec to fix SPARK-29092. I am currently testing it and will open the pr very soon. Then we can co-relate the subqueries better.
Here is the branch for your perusal.
https://github.com/dilipbiswal/spark/pull/new/verbose_string_datasrc_scanexec

@dilipbiswal
Copy link
Contributor Author

cc @cloud-fan fyi - i have raised the pr here scan-verbose-pr

case s: BaseSubqueryExec =>
subqueries += ((p, e, s))
getSubqueries(s, subqueries)
case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this method put its result in the parameter subqueries, I think we don't need to call flatMap and collect, just foreach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan We need the collect to traverse the expression tree. I have changed flatMap to foreach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the collect to traverse the expression tree

hmm, can we use foreach to traverse the expression tree? We have TreeNode.foreach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan We have it in this form :
p.expressions.foreach(_.collect {
...
...
})

You are suggesting to do :

p.expressions.foreach(_.foreach {
...
...
}

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Got it... I will send a small follow-up. Thank you.

@dilipbiswal
Copy link
Contributor Author

@gatorsmile @cloud-fan
I was trying to find out why using flatMap causes the issue of missing subqueries. It turns out that p.expressions returns a immutable stream. And the result of foreach is different than flatMap. Here is what i tried.

def printExpressionsFlatMap(p: SparkPlan, exprs: Seq[Expression]): Unit = {
  if (p.isInstanceOf[FileSourceScanExec]) {
     println("flatmap")
     exprs.flatMap { e =>
        println(s"SQL : ${e.sql} toString= ${e.toString}")
       e :: Nil
    }
  }
 return
}

def printExpressionsForEach(p: SparkPlan, exprs: Seq[Expression]): Unit = {
  if (p.isInstanceOf[FileSourceScanExec]) {
    println("foreach")
   exprs.foreach(e => println(s"SQL : ${e.sql} toString= ${e.toString}"))
  }
}

When i call the above methods for the same input p.expressions i get different output.
But if i materialize the stream by calling p.expressions.toList then both the methods return the same output. It is not related to this PR. But just wanted to describe my observation.

@SparkQA
Copy link

SparkQA commented Oct 8, 2019

Test build #111856 has finished for PR 26039 at commit 316e074.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

getSubqueries(s, subqueries)
case _ =>
}
case other =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we remove it is because it is uesless. collect is accepting partial functions.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Thanks! Merged to master.

@gatorsmile gatorsmile closed this in ef1e849 Oct 8, 2019
atronchi pushed a commit to atronchi/spark that referenced this pull request Oct 23, 2019
…AIN FORMATTED

### What changes were proposed in this pull request?
The subquery expressions introduced by DPP are not printed in the newer explain command.
This PR fixes the code that computes the list of subqueries in the plan.

**SQL**
df1 and df2 are partitioned on k.
```
SELECT df1.id, df2.k
FROM df1 JOIN df2 ON df1.k = df2.k AND df2.id < 2
```

**Before**
```
|== Physical Plan ==
* Project (9)
+- * BroadcastHashJoin Inner BuildRight (8)
   :- * ColumnarToRow (2)
   :  +- Scan parquet default.df1 (1)
   +- BroadcastExchange (7)
      +- * Project (6)
         +- * Filter (5)
            +- * ColumnarToRow (4)
               +- Scan parquet default.df2 (3)

(1) Scan parquet default.df1
Output: [id#19L, k#20L]

(2) ColumnarToRow [codegen id : 2]
Input: [id#19L, k#20L]

(3) Scan parquet default.df2
Output: [id#21L, k#22L]

(4) ColumnarToRow [codegen id : 1]
Input: [id#21L, k#22L]

(5) Filter [codegen id : 1]
Input     : [id#21L, k#22L]
Condition : (isnotnull(id#21L) AND (id#21L < 2))

(6) Project [codegen id : 1]
Output    : [k#22L]
Input     : [id#21L, k#22L]

(7) BroadcastExchange
Input: [k#22L]

(8) BroadcastHashJoin [codegen id : 2]
Left keys: List(k#20L)
Right keys: List(k#22L)
Join condition: None

(9) Project [codegen id : 2]
Output    : [id#19L, k#22L]
Input     : [id#19L, k#20L, k#22L]
```
**After**
```
|== Physical Plan ==
* Project (9)
+- * BroadcastHashJoin Inner BuildRight (8)
   :- * ColumnarToRow (2)
   :  +- Scan parquet default.df1 (1)
   +- BroadcastExchange (7)
      +- * Project (6)
         +- * Filter (5)
            +- * ColumnarToRow (4)
               +- Scan parquet default.df2 (3)

(1) Scan parquet default.df1
Output: [id#19L, k#20L]

(2) ColumnarToRow [codegen id : 2]
Input: [id#19L, k#20L]

(3) Scan parquet default.df2
Output: [id#21L, k#22L]

(4) ColumnarToRow [codegen id : 1]
Input: [id#21L, k#22L]

(5) Filter [codegen id : 1]
Input     : [id#21L, k#22L]
Condition : (isnotnull(id#21L) AND (id#21L < 2))

(6) Project [codegen id : 1]
Output    : [k#22L]
Input     : [id#21L, k#22L]

(7) BroadcastExchange
Input: [k#22L]

(8) BroadcastHashJoin [codegen id : 2]
Left keys: List(k#20L)
Right keys: List(k#22L)
Join condition: None

(9) Project [codegen id : 2]
Output    : [id#19L, k#22L]
Input     : [id#19L, k#20L, k#22L]

===== Subqueries =====

Subquery:1 Hosting operator id = 1 Hosting Expression = k#20L IN subquery25
* HashAggregate (16)
+- Exchange (15)
   +- * HashAggregate (14)
      +- * Project (13)
         +- * Filter (12)
            +- * ColumnarToRow (11)
               +- Scan parquet default.df2 (10)

(10) Scan parquet default.df2
Output: [id#21L, k#22L]

(11) ColumnarToRow [codegen id : 1]
Input: [id#21L, k#22L]

(12) Filter [codegen id : 1]
Input     : [id#21L, k#22L]
Condition : (isnotnull(id#21L) AND (id#21L < 2))

(13) Project [codegen id : 1]
Output    : [k#22L]
Input     : [id#21L, k#22L]

(14) HashAggregate [codegen id : 1]
Input: [k#22L]

(15) Exchange
Input: [k#22L]

(16) HashAggregate [codegen id : 2]
Input: [k#22L]
```
### Why are the changes needed?
Without the fix, the subqueries are not printed in the explain plan.

### Does this PR introduce any user-facing change?
Yes. the explain output will be different.

### How was this patch tested?
Added a test case in ExplainSuite.

Closes apache#26039 from dilipbiswal/explain_subquery_issue.

Authored-by: Dilip Biswal <dkbiswal@gmail.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants