Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.datasources.LogicalRelation

Expand All @@ -33,11 +34,16 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
* used in query optimizations.
*/
case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionState = sparkSession.sessionState
val catalog = sessionState.catalog
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
val relation = try {
EliminateSubqueryAliases(catalog.lookupRelation(tableIdent))
} catch {
case _: NoSuchTableException =>
throw new AnalysisException(s"Target table in ANALYZE TABLE does not exist: $tableIdent")
}

relation match {
case relation: CatalogRelation =>
Expand Down Expand Up @@ -66,7 +72,6 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend
} else {
fileStatus.getLen
}

size
}

Expand All @@ -91,6 +96,11 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes)

// temporary table which is a [[LogicalRelation]].
case logicalRel: LogicalRelation if catalog.isTemporaryTable(tableIdent) =>
val catalogTable = logicalRel.catalogTable.getOrElse(catalog.getTableMetadata(tableIdent))
updateTableStats(catalogTable, logicalRel.relation.sizeInBytes)

case otherRelation =>
throw new AnalysisException(s"ANALYZE TABLE is not supported for " +
s"${otherRelation.nodeName}.")
Expand Down Expand Up @@ -120,9 +130,17 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend
// Update the metastore if the above statistics of the table are different from those
// recorded in the metastore.
if (newStats.isDefined) {
sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdent)
val newCatalogTable = catalogTable.copy(stats = newStats)
if (catalogTable.tableType == CatalogTableType.VIEW) {
assert(relation.isInstanceOf[LogicalRelation])
val newRelation =
relation.asInstanceOf[LogicalRelation].copy(catalogTable = Some(newCatalogTable))
catalog.createTempView(tableIdent.table, newRelation, overrideIfExists = true)
} else {
catalog.alterTable(newCatalogTable)
// Refresh the cached data source table in the catalog.
catalog.refreshTable(tableIdent)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
Expand Down Expand Up @@ -1573,6 +1573,46 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}

test("Run analyze table command on temporary view which is analyzable") {
withTempPath { path =>
val dataPath = path.getCanonicalPath
spark.range(10).write.format("parquet").save(dataPath)
withView("view1") {
// Create a temporary view which is a [[LogicalRelation]] wrapping a datasource relation.
sql(s"CREATE TEMPORARY VIEW view1 USING parquet OPTIONS(path '$dataPath')")

// Before ANALYZE TABLE, HadoopFsRelation calculates statistics from file sizes.
val relBefore = spark.sessionState.catalog.lookupRelation(TableIdentifier("view1"))
val statsBefore = EliminateSubqueryAliases(relBefore).statistics
val sizeBefore = statsBefore.sizeInBytes
assert(sizeBefore > 0)
assert(statsBefore.rowCount.isEmpty)

sql("ANALYZE TABLE view1 COMPUTE STATISTICS")

val relAfter = spark.sessionState.catalog.lookupRelation(TableIdentifier("view1"))
val statsAfter = EliminateSubqueryAliases(relAfter).statistics
val sizeAfter = statsAfter.sizeInBytes
assert(sizeAfter == sizeBefore)
assert(statsAfter.rowCount.isDefined && statsAfter.rowCount.get == 10)
}
}
}

test("Run analyze table command on temporary view which is not analyzable") {
withTable("tab1") {
spark.range(10).write.saveAsTable("tab1")
withView("view1") {
// Create a temporary view which is represented as a query plan.
sql("CREATE TEMPORARY VIEW view1 (col1) AS SELECT * FROM tab1")
val df = spark.sessionState.catalog.lookupRelation(TableIdentifier("view1"))
intercept[AnalysisException] {
sql("ANALYZE TABLE view1 COMPUTE STATISTICS")
}
}
}
}

test("create temporary view with mismatched schema") {
withTable("tab1") {
spark.range(10).write.saveAsTable("tab1")
Expand Down