Skip to content

Conversation

@sujith71955
Copy link
Contributor

@sujith71955 sujith71955 commented Oct 17, 2018

What changes were proposed in this pull request?

Problem:
Instead of broadcast hash join ,Sort merge join has selected when restart spark-shell/spark-JDBC for hive provider.

spark.sql("create table x1(name string,age int) stored as parquet ")
spark.sql("insert into x1 select 'a',29")
spark.sql("create table x2 (name string,age int) stored as parquet '")
spark.sql("insert into x2_ex select 'a',29")
scala> spark.sql("select * from x1 t1 ,x2 t2 where t1.name=t2.name").explain

Query Plan:

== Physical Plan ==
*(2) BroadcastHashJoin name#101, name#103, Inner, BuildRight
:- *(2) Project name#101, age#102
: +- *(2) Filter isnotnull(name#101)
: +- *(2) FileScan parquet default.x1_exname#101,age#102 Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/D:/spark_release/spark/bin/spark-warehouse/x1, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string,age:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- *(1) Project name#103, age#104
+- *(1) Filter isnotnull(name#103)
+- *(1) FileScan parquet default.x2_exname#103,age#104 Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/D:/spark_release/spark/bin/spark-warehouse/x2, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string,age:int>

Now Restart Spark-Shell or do spark-submit or restsrt JDBCServer again and run same select query

 scala> spark.sql("select * from x1 t1 ,x2 t2 where t1.name=t2.name").explain
scala> spark.sql("select * from x1 t1 ,x2 t2 where t1.name=t2.name").explain

Query plan will be modified as below.

== Physical Plan ==
*(5) SortMergeJoin [name#43], name#45, Inner
:- *(2) Sort name#43 ASC NULLS FIRST, false, 0
: +- Exchange hashpartitioning(name#43, 200)
: +- *(1) Project name#43, age#44
: +- *(1) Filter isnotnull(name#43)
: +- *(1) FileScan parquet default.x1name#43,age#44 Batched: true, Format: Parquet, Location: InMemoryFileIndexfile:/D:/spark_release/spark/bin/spark-warehouse/x1, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string,age:int>
+- *(4) Sort name#45 ASC NULLS FIRST, false, 0
+- Exchange hashpartitioning(name#45, 200)
+- *(3) Project name#45, age#46
+- *(3) Filter isnotnull(name#45)
+- *(3) FileScan parquet default.x2name#45,age#46 Batched: true, Format: Parquet, Location: InMemoryFileIndexfile:/D:/spark_release/spark/bin/spark-warehouse/x2, PartitionFilters: [], PushedFilters: [IsNotNull(name)], ReadSchema: struct<name:string,age:int>

What changes were proposed in this pull request?

Here the main problem is hive stats is not getting recorded after insert command, this is because of a condition "if (table.stats.nonEmpty)" in updateTableStats() which will be executed as part of InsertIntoHadoopFsRelationCommand command. since the CatalogTable stats never initialized, this condition never meets. so again if we fire same query in a new context/session the plan ll changed to SortMergeJoin.

image.

So as part of fix we initialized a default value for the CatalogTable stats if there is no cache of a particular LogicalRelation.

As per this statement after insert we are expecting the Hivestats shall get updated, please correct me if i am missing something.

How was this patch tested?

Manually tested, attaching the snapshot.
After Fix
Login to spark-shell => create 2 tables => Run insert commad => Explain the check the plan =>Plan contains Broadcast join => Exit

step-1-afterfix-spark-25332

Step 2:
Relaunch Spark-shell => Run explain command of particular select statement => verify the plan => Plan still contains Broadcast join since after fix Hivestats is available for the table.
step-2-afterfix-spark-25332

Step 3:
Again Run insert command => Run explain command of particular select statement => verify the plan
we can observer the plan still retains BroadcastJoin - Nowonwards the results are always consistent

@sujith71955
Copy link
Contributor Author

@srowen @cloud-fan @HyukjinKwon @felixcheung.
@wangyum i think this PR shall also solves the problem mentioned in SPARK-25403.
Please review and provide me any suggestions. Thanks all

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to test when the conversion is on and off. We shouldn't change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. i think some misunderstanding i will recheck into this. Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why table must have stats. For both file sources and hive tables, we will estimate the data size with files, if the table doesn't have stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your valuable feedback.
My observations :

  1. In insert flow we are always trying to update the HiveStats as per the below statement in InsertIntoHadoopFsRelationCommand.
      if (catalogTable.nonEmpty) {
        CommandUtils.updateTableStats(sparkSession, catalogTable.get)
      }

but after create table command, when we do insert command within the same session Hive statistics is not getting updated due to below validation where condition expects stats to be non-empty as below

def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
    if (table.stats.nonEmpty) { 

But if we re-launch spark-shell and trying to do insert command the Hivestatistics will be saved and now onward the stats will be taken from HiveStats and the flow will never try to estimate the data size with file .

  1. Currently always system is not trying to estimate the data size with files when we are executing the insert command, as i told above if we launch the query from a new context , system will try to read the stats from the Hive. i think there is a problem in the behavior consistency and also if we can always get the stats from hive then shall we need to calculate again eveytime the stats from files?

I think we may need to update the flow where it shall always try read the data size from files, it shall never depend on HiveStats,
Or if we are recording the HiveStats then everytime it shall read the Hivestats.
Please let me know whether i am going right direction, let me know for any clarifications.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but after create table command, when we do insert command within the same session Hive statistics is not getting updated

This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the DetermineTableStats rule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sujith71955 What if spark.sql.statistics.size.autoUpdate.enabled=false or hive.stats.autogather=false? It still update stats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes its default setting which means false. but i think it should be fine to keep default setting in this scenario .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but after create table command, when we do insert command within the same session Hive statistics is not getting updated

This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the DetermineTableStats rule.

Right,but i DefaultStatistics will return default value for the stats

but after create table command, when we do insert command within the same session Hive statistics is not getting updated

This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the DetermineTableStats rule.
I think this rule will return default stats always unless we make session.sessionState.conf.fallBackToHdfsForStatsEnabled as true, i will reconfirm this behaviour.

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table created in the current session does not have stats. In this situation. It gets sizeInBytes from

override def computeStats(): Statistics = {
catalogTable
.flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled)))
.getOrElse(Statistics(sizeInBytes = relation.sizeInBytes))
}

override def sizeInBytes: Long = {
val compressionFactor = sqlContext.conf.fileCompressionFactor
(location.sizeInBytes * compressionFactor).toLong
}

.
It's realy size, that why it's broadcast join. In fact, we should invalidate this table to let Spark use the DetermineTableStats to take effect. I am doing it here: #22721

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right its about considering the default size, but i am not very sure whether we shall invalidate the cache, i will explain my understanding below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but after create table command, when we do insert command within the same session Hive statistics is not getting updated

This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the DetermineTableStats rule.

@cloud-fan DetermineStats is just initializing the stats if the stats is not set, only if session.sessionState.conf.fallBackToHdfsForStatsEnabled is true then the rule is deriving the stats from file system and updating the stats as shown below code snippet. In insert flow this condition never gets executed, so the stats will be still none.
image

@sujith71955
Copy link
Contributor Author

sujith71955 commented Oct 19, 2018

@cloud-fan
I can think as one solution, that In DetermineStats flow we can add one more condition to not update the stats for convertable relations, since we always get the stats from HadoopFileSystem for convertable relations . this shall solve all the inconsistency problems which we are facing . please let me know for your suggestions as i will update this PR as per this logic. Thanks .

@wangyum
Copy link
Member

wangyum commented Oct 19, 2018

I think the cost of get the stats from HadoopFileSystem may be quite high.

@sujith71955
Copy link
Contributor Author

I think the cost of get the stats from HadoopFileSystem may be quite high.

Then we shall depend on HiveStats always to get the statistics, which is happening now also but partially. and i think this PR solving that problem, But what i told is based on cloudFans expectation

image

@sujith71955
Copy link
Contributor Author

sujith71955 commented Oct 19, 2018

Inorder to make this flow consistent either
a) we need to record HiveStats for insert command flow and always consider this stats while compting this will eliminate need of again estimate the data size from files as HiveStats will always available
OR
b) As mentioned above in snapshot we will estimate the data size with files always for convertable relations.

@sujith71955
Copy link
Contributor Author

Inorder to make this flow consistent either
a) we need to record HiveStats for insert command flow and always consider this stats while compting
OR
b) As mentioned above in snapshot we will estimate the data size with files always for convertable relations.

Just a suggestion let me know for any thoughts;) Thanks all for your valuable time.

@sujith71955
Copy link
Contributor Author

sujith71955 commented Oct 22, 2018

@cloud-fan Shall i update this PR based on the second approach, will that be fine?I tested with the second approach also and the usecases are working fine which is mentioned in this JIRA . please let me know your view, or we are going to continue with the approach mentioned in #22721.
image

If any clarifications is required regarding this flow please let me know , i will try my best to explain as the scenarios are confusing.

@sujith71955 sujith71955 changed the title [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join has selected when restart spark-shell/spark-JDBC for hive provider [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join has selected for the join queries in a new spark session/context Oct 28, 2018
@sujith71955 sujith71955 changed the title [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join has selected for the join queries in a new spark session/context [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join node is added in the plan for the join queries in a new spark session/context Oct 28, 2018
@sujith71955 sujith71955 changed the title [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join node is added in the plan for the join queries in a new spark session/context [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join node is added in the plan for the join queries executed in a new spark session/context Oct 28, 2018
…s selected when restart spark-shell/spark-JDBC for hive provider

## What changes were proposed in this pull request?
Problaem:
Below steps in sequence to reproduce the issue.
a.Create parquet table with stored as clause.
b.Run insert statement  => This will not update Hivestats.
c.Run (Select query which needs to calculate stats or explain cost select statement)  -> this will evaluate  stats from HadoopFsRelation
d.Since correct stats(sizeInBytes) is available , the plan  will select  broadcast node if join with any table.
e. Exit; (come out of shell)

f.Now again run setp c ( calculate stat) query. This gives wrong stats (sizeInBytes - default value will be assigned)  in plan. Because it is calculated from HiveCatalogTable which does not have any stats as it is not updated in step b
g.Since in-correct stats(sizeInBytes - default value will be assigned) is available, the plan  will select  SortMergeJoin node if join with any table.
h.Now Run insert statement => This will  update Hivestats .
i.Now again run setp c ( calculate stat) query. This gives correct stat (sizeInBytes)  in plan .because it can read the hive stats which is updated in step i.
j.Now onward always stat is available so correct stat is plan will be displayed  which picks  Broadcast join node(based on threshold size) always.

Solution:
Here the main problem is hive stats is not getting recorded after insert command, this is because of a condition  "if (table.stats.nonEmpty)" in updateTableStats()  which will be executed as part of InsertIntoHadoopFsRelationCommand command.
So as part of fix we initialized a default value for the CatalogTable stats if there is no cache of a particular LogicalRelation.

Also it is observed in Test Case “test statistics of LogicalRelation converted from Hive serde tables" in StatisticsSuite,  orc and parquet both are convertible but we are expecting that only orc should
get/record stats Hivestats  not for parquet.But since both relations are convertible so we should have same expectation. Same is corrected in this PR.

How this patch tested:
Manually testes, adding the test snapshots and the UT is corrected which will verify the PR scenario.
@sujith71955
Copy link
Contributor Author

sujith71955 commented Oct 28, 2018

@cloud-fan @HyukjinKwon @srowen
As a result of my above observations
a) I am having some doubt like if we are expecting the stats shall estimate the data size with files then why in the insert flow there is a statement for updating the HiveStats?

b) If we have mechanism to read the stats from hive then why we shall estimate the data size with files?

Please let me know your suggestions i feel there is an inconsistency in this flow.
I updated the description part, please have a look. If more clarity is required in the flow please let me know so that i can elaborate more. Thanks.

@sujith71955
Copy link
Contributor Author

@cloud-fan @HyukjinKwon @wangyum Any suggestions on this issue , because of this defect we are facing some performance issues in our customer environment. Requesting you all to please have a look on this again and please provide me suggestions if any so that i can handle it.
@wangyum I am not sure whether invalidating the cache will solve my problem.

@sujith71955
Copy link
Contributor Author

sujith71955 commented Nov 1, 2018

cc @srowen @dongjoon-hyun

@srowen
Copy link
Member

srowen commented Nov 1, 2018

I don't know this code well enough to review. I think there is skepticism from people who know this code whether this is change is correct and beneficial. If there's doubt, I think it should be closed.

@sujith71955
Copy link
Contributor Author

sujith71955 commented Nov 1, 2018 via email

@sujith71955
Copy link
Contributor Author

do we need to handle this scenario? do we have any PR for handling this issue?

@sujith71955
Copy link
Contributor Author

sujith71955 commented Dec 1, 2018

cc @wzhfy

@sujith71955
Copy link
Contributor Author

@srowen @cloud-fan @wangyum @dongjoon-hyun @wzhfy Any update on this, How shall we move forward to handle this scenario?Problem still exists.

@HyukjinKwon
Copy link
Member

Can you make the PR description concise .. ? It's super hard to read and follow.

// Intialize the catalogTable stats if its not defined.An intial value has to be defined
// so that the hive statistics will be updated after each insert command.
val withStats = {
if (updatedTable.stats == None) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, so it is basically subset of #22721? It's funny that Hive tables should set the initial stats alone here, which is supposed to be set somewhere else.

Copy link
Contributor Author

@sujith71955 sujith71955 Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, so it is basically subset of #22721? It's funny that Hive tables should set the initial stats alone here, which is supposed to be set somewhere else.
Bit old PR :), will go through the problem once again and let you know more concise details. i remeber i struggled a lot for handing this issue :) Please let me know for any inputs if this way of handling is wrong.

@sujith71955 sujith71955 changed the title [SPARK-25332][SQL] Instead of broadcast hash join ,Sort merge join node is added in the plan for the join queries executed in a new spark session/context [SPARK-25332][SQL] Selecting broadcast join for the small size table even query fired via new session/context Mar 16, 2019
@sujith71955
Copy link
Contributor Author

Can you make the PR description concise .. ? It's super hard to read and follow.
Done.

@sujith71955 sujith71955 changed the title [SPARK-25332][SQL] Selecting broadcast join for the small size table even query fired via new session/context [SPARK-25332][SQL] select broadcast join instead of sortMergeJoin for the small size table even query fired via new session/context Mar 18, 2019
@sujith71955
Copy link
Contributor Author

@wangyum Please test my scenario with your fix and check whether this issue will be addressed with your PR, i doubt . Let me know for any suggestions. Thanks

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

github-actions bot commented Jan 5, 2020

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Jan 5, 2020
@srowen srowen closed this Jan 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants