Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
None)
val logicalRelation = cached.getOrElse {
val updatedTable = inferIfNeeded(relation, options, fileFormat)
// Intialize the catalogTable stats if its not defined.An intial value has to be defined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why table must have stats. For both file sources and hive tables, we will estimate the data size with files, if the table doesn't have stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your valuable feedback.
My observations :

  1. In insert flow we are always trying to update the HiveStats as per the below statement in InsertIntoHadoopFsRelationCommand.
      if (catalogTable.nonEmpty) {
        CommandUtils.updateTableStats(sparkSession, catalogTable.get)
      }

but after create table command, when we do insert command within the same session Hive statistics is not getting updated due to below validation where condition expects stats to be non-empty as below

def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
    if (table.stats.nonEmpty) { 

But if we re-launch spark-shell and trying to do insert command the Hivestatistics will be saved and now onward the stats will be taken from HiveStats and the flow will never try to estimate the data size with file .

  1. Currently always system is not trying to estimate the data size with files when we are executing the insert command, as i told above if we launch the query from a new context , system will try to read the stats from the Hive. i think there is a problem in the behavior consistency and also if we can always get the stats from hive then shall we need to calculate again eveytime the stats from files?

I think we may need to update the flow where it shall always try read the data size from files, it shall never depend on HiveStats,
Or if we are recording the HiveStats then everytime it shall read the Hivestats.
Please let me know whether i am going right direction, let me know for any clarifications.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but after create table command, when we do insert command within the same session Hive statistics is not getting updated

This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the DetermineTableStats rule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sujith71955 What if spark.sql.statistics.size.autoUpdate.enabled=false or hive.stats.autogather=false? It still update stats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes its default setting which means false. but i think it should be fine to keep default setting in this scenario .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but after create table command, when we do insert command within the same session Hive statistics is not getting updated

This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the DetermineTableStats rule.

Right,but i DefaultStatistics will return default value for the stats

but after create table command, when we do insert command within the same session Hive statistics is not getting updated

This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the DetermineTableStats rule.
I think this rule will return default stats always unless we make session.sessionState.conf.fallBackToHdfsForStatsEnabled as true, i will reconfirm this behaviour.

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table created in the current session does not have stats. In this situation. It gets sizeInBytes from

override def computeStats(): Statistics = {
catalogTable
.flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled)))
.getOrElse(Statistics(sizeInBytes = relation.sizeInBytes))
}

override def sizeInBytes: Long = {
val compressionFactor = sqlContext.conf.fileCompressionFactor
(location.sizeInBytes * compressionFactor).toLong
}

.
It's realy size, that why it's broadcast join. In fact, we should invalidate this table to let Spark use the DetermineTableStats to take effect. I am doing it here: #22721

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right its about considering the default size, but i am not very sure whether we shall invalidate the cache, i will explain my understanding below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but after create table command, when we do insert command within the same session Hive statistics is not getting updated

This is the thing I don't understand. Like I said before, even if table has no stats, Spark will still get a stats via the DetermineTableStats rule.

@cloud-fan DetermineStats is just initializing the stats if the stats is not set, only if session.sessionState.conf.fallBackToHdfsForStatsEnabled is true then the rule is deriving the stats from file system and updating the stats as shown below code snippet. In insert flow this condition never gets executed, so the stats will be still none.
image

// so that the hive statistics will be updated after each insert command.
val withStats = {
if (updatedTable.stats == None) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, so it is basically subset of #22721? It's funny that Hive tables should set the initial stats alone here, which is supposed to be set somewhere else.

Copy link
Contributor Author

@sujith71955 sujith71955 Mar 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum, so it is basically subset of #22721? It's funny that Hive tables should set the initial stats alone here, which is supposed to be set somewhere else.
Bit old PR :), will go through the problem once again and let you know more concise details. i remeber i struggled a lot for handing this issue :) Please let me know for any inputs if this way of handling is wrong.

val sizeInBytes = HiveUtils.getSizeInBytes(updatedTable, sparkSession)
updatedTable.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
} else {
updatedTable
}
}
val created =
LogicalRelation(
DataSource(
Expand All @@ -202,7 +212,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
bucketSpec = None,
options = options,
className = fileType).resolveRelation(),
table = updatedTable)
table = withStats)

catalogProxy.cacheTable(tableIdentifier, created)
created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
val table = relation.tableMeta
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
fs.getContentSummary(tablePath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
session.sessionState.conf.defaultSizeInBytes
}
} else {
session.sessionState.conf.defaultSizeInBytes
}

val sizeInBytes = HiveUtils.getSizeInBytes(table, session)
val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
relation.copy(tableMeta = withStats)
}
Expand Down
25 changes: 25 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.File
import java.io.IOException
import java.net.{URL, URLClassLoader}
import java.nio.charset.StandardCharsets
import java.sql.Timestamp
Expand All @@ -29,6 +30,7 @@ import scala.collection.mutable.HashMap
import scala.language.implicitConversions

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
Expand Down Expand Up @@ -499,4 +501,27 @@ private[spark] object HiveUtils extends Logging {
table.copy(schema = StructType(dataCols ++ partCols))
}
}

/**
* Method will return the stats for a particular CatalogTable by considering
* session.sessionState.conf.fallBackToHdfsForStatsEnabled proprty, if its not enabled
* then return default stats.
*/
def getSizeInBytes(table: CatalogTable, session: SparkSession): Long = {
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = session.sessionState.newHadoopConf()
val tablePath = new Path(table.location)
val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
fs.getContentSummary(tablePath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
session.sessionState.conf.defaultSizeInBytes
}
} else {
session.sessionState.conf.defaultSizeInBytes
}
sizeInBytes
}
}