Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ abstract class ExternalCatalog
*/
def alterTableSchema(db: String, table: String, schema: StructType): Unit

def alterTableStats(db: String, table: String, stats: CatalogStatistics): Unit
/** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */
def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit

def getTable(db: String, table: String): CatalogTable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,10 @@ class InMemoryCatalog(
override def alterTableStats(
db: String,
table: String,
stats: CatalogStatistics): Unit = synchronized {
stats: Option[CatalogStatistics]): Unit = synchronized {
requireTableExists(db, table)
val origTable = catalog(db).tables(table).table
catalog(db).tables(table).table = origTable.copy(stats = Some(stats))
catalog(db).tables(table).table = origTable.copy(stats = stats)
}

override def getTable(db: String, table: String): CatalogTable = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ class SessionCatalog(
* Alter Spark's statistics of an existing metastore table identified by the provided table
* identifier.
*/
def alterTableStats(identifier: TableIdentifier, newStats: CatalogStatistics): Unit = {
def alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]): Unit = {
val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(identifier.table)
val tableIdentifier = TableIdentifier(table, Some(db))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val oldTableStats = catalog.getTable("db2", "tbl1").stats
assert(oldTableStats.isEmpty)
val newStats = CatalogStatistics(sizeInBytes = 1)
catalog.alterTableStats("db2", "tbl1", newStats)
catalog.alterTableStats("db2", "tbl1", Some(newStats))
val newTableStats = catalog.getTable("db2", "tbl1").stats
assert(newTableStats.get == newStats)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ abstract class SessionCatalogSuite extends AnalysisTest {
val oldTableStats = catalog.getTableMetadata(tableId).stats
assert(oldTableStats.isEmpty)
val newStats = CatalogStatistics(sizeInBytes = 1)
catalog.alterTableStats(tableId, newStats)
catalog.alterTableStats(tableId, Some(newStats))
val newTableStats = catalog.getTableMetadata(tableId).stats
assert(newTableStats.get == newStats)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class AnalyzeColumnCommand(
if (tableMeta.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
val sizeInBytes = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)
val sizeInBytes = CommandUtils.calculateTotalSize(sessionState, tableMeta)

// Compute stats for each column
val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames)
Expand All @@ -54,7 +54,7 @@ case class AnalyzeColumnCommand(
// Newly computed column stats should override the existing ones.
colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)

sessionState.catalog.alterTableStats(tableIdentWithDB, statistics)
sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics))

// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,10 @@

package org.apache.spark.sql.execution.command

import java.net.URI

import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.internal.SessionState


/**
Expand All @@ -46,7 +38,7 @@ case class AnalyzeTableCommand(
if (tableMeta.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
val newTotalSize = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)
val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)

val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
Expand Down Expand Up @@ -74,73 +66,11 @@ case class AnalyzeTableCommand(
// Update the metastore if the above statistics of the table are different from those
// recorded in the metastore.
if (newStats.isDefined) {
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats.get)
sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
}

Seq.empty[Row]
}
}

object AnalyzeTableCommand extends Logging {

def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): Long = {
if (catalogTable.partitionColumnNames.isEmpty) {
calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
} else {
// Calculate table size as a sum of the visible partitions. See SPARK-21079
val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
partitions.map(p =>
calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
).sum
}
}

private def calculateLocationSize(
sessionState: SessionState,
tableId: TableIdentifier,
locationUri: Option[URI]): Long = {
// This method is mainly based on
// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
// in Hive 0.13 (except that we do not use fs.getContentSummary).
// TODO: Generalize statistics collection.
// TODO: Why fs.getContentSummary returns wrong size on Jenkins?
// Can we use fs.getContentSummary in future?
// Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
// countFileSize to count the table size.
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")

def calculateLocationSize(fs: FileSystem, path: Path): Long = {
val fileStatus = fs.getFileStatus(path)
val size = if (fileStatus.isDirectory) {
fs.listStatus(path)
.map { status =>
if (!status.getPath.getName.startsWith(stagingDir)) {
calculateLocationSize(fs, status.getPath)
} else {
0L
}
}.sum
} else {
fileStatus.getLen
}

size
}

locationUri.map { p =>
val path = new Path(p)
try {
val fs = path.getFileSystem(sessionState.newHadoopConf())
calculateLocationSize(fs, path)
} catch {
case NonFatal(e) =>
logWarning(
s"Failed to get the size of table ${tableId.table} in the " +
s"database ${tableId.database} because of ${e.toString}", e)
0L
}
}.getOrElse(0L)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import java.net.URI

import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable}
import org.apache.spark.sql.internal.SessionState


object CommandUtils extends Logging {

/** Change statistics after changing data by commands. */
def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = {
if (table.stats.nonEmpty) {
val catalog = sparkSession.sessionState.catalog
catalog.alterTableStats(table.identifier, None)
}
}

def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = {
if (catalogTable.partitionColumnNames.isEmpty) {
calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
} else {
// Calculate table size as a sum of the visible partitions. See SPARK-21079
val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
partitions.map { p =>
calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
}.sum
}
}

def calculateLocationSize(
sessionState: SessionState,
identifier: TableIdentifier,
locationUri: Option[URI]): Long = {
// This method is mainly based on
// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
// in Hive 0.13 (except that we do not use fs.getContentSummary).
// TODO: Generalize statistics collection.
// TODO: Why fs.getContentSummary returns wrong size on Jenkins?
// Can we use fs.getContentSummary in future?
// Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
// countFileSize to count the table size.
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")

def getPathSize(fs: FileSystem, path: Path): Long = {
val fileStatus = fs.getFileStatus(path)
val size = if (fileStatus.isDirectory) {
fs.listStatus(path)
.map { status =>
if (!status.getPath.getName.startsWith(stagingDir)) {
getPathSize(fs, status.getPath)
} else {
0L
}
}.sum
} else {
fileStatus.getLen
}

size
}

locationUri.map { p =>
val path = new Path(p)
try {
val fs = path.getFileSystem(sessionState.newHadoopConf())
getPathSize(fs, path)
} catch {
case NonFatal(e) =>
logWarning(
s"Failed to get the size of table ${identifier.table} in the " +
s"database ${identifier.database} because of ${e.toString}", e)
0L
}
}.getOrElse(0L)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,11 @@ case class AlterTableAddPartitionCommand(
sparkSession.sessionState.conf.resolver)
// inherit table storage format (possibly except for location)
CatalogTablePartition(normalizedSpec, table.storage.copy(
locationUri = location.map(CatalogUtils.stringToURI(_))))
locationUri = location.map(CatalogUtils.stringToURI)))
}
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)

CommandUtils.updateTableStats(sparkSession, table)
Seq.empty[Row]
}

Expand Down Expand Up @@ -519,6 +521,9 @@ case class AlterTableDropPartitionCommand(
catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
retainData = retainData)

CommandUtils.updateTableStats(sparkSession, table)

Seq.empty[Row]
}

Expand Down Expand Up @@ -768,6 +773,8 @@ case class AlterTableSetLocationCommand(
// No partition spec is specified, so we set the location for the table itself
catalog.alterTable(table.withNewStorage(locationUri = Some(locUri)))
}

CommandUtils.updateTableStats(sparkSession, table)
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ case class LoadDataCommand(
// Refresh the metadata cache to ensure the data visible to the users
catalog.refreshTable(targetTable.identifier)

CommandUtils.updateTableStats(sparkSession, targetTable)
Seq.empty[Row]
}
}
Expand Down Expand Up @@ -487,6 +488,12 @@ case class TruncateTableCommand(
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table $tableIdentWithDB", e)
}

if (table.stats.nonEmpty) {
// empty table after truncation
val newStats = CatalogStatistics(sizeInBytes = 0, rowCount = Some(0))
catalog.alterTableStats(tableName, Some(newStats))
}
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ case class InsertIntoHadoopFsRelationCommand(
fileIndex.foreach(_.refresh())
// refresh data cache if table is cached
sparkSession.catalog.refreshByPath(outputPath.toString)

if (catalogTable.nonEmpty) {
CommandUtils.updateTableStats(sparkSession, catalogTable.get)
}

} else {
logInfo("Skipping insertion into a relation that already exists.")
}
Expand Down
Loading