diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 15687ddd728a..b2146cff479b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -22,8 +22,9 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -33,11 +34,16 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation * used in query optimizations. */ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extends RunnableCommand { - override def run(sparkSession: SparkSession): Seq[Row] = { val sessionState = sparkSession.sessionState + val catalog = sessionState.catalog val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName) - val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent)) + val relation = try { + EliminateSubqueryAliases(catalog.lookupRelation(tableIdent)) + } catch { + case _: NoSuchTableException => + throw new AnalysisException(s"Target table in ANALYZE TABLE does not exist: $tableIdent") + } relation match { case relation: CatalogRelation => @@ -66,7 +72,6 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend } else { fileStatus.getLen } - size } @@ -91,6 +96,11 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => updateTableStats(logicalRel.catalogTable.get, logicalRel.relation.sizeInBytes) + // temporary table which is a [[LogicalRelation]]. + case logicalRel: LogicalRelation if catalog.isTemporaryTable(tableIdent) => + val catalogTable = logicalRel.catalogTable.getOrElse(catalog.getTableMetadata(tableIdent)) + updateTableStats(catalogTable, logicalRel.relation.sizeInBytes) + case otherRelation => throw new AnalysisException(s"ANALYZE TABLE is not supported for " + s"${otherRelation.nodeName}.") @@ -120,9 +130,17 @@ case class AnalyzeTableCommand(tableName: String, noscan: Boolean = true) extend // Update the metastore if the above statistics of the table are different from those // recorded in the metastore. if (newStats.isDefined) { - sessionState.catalog.alterTable(catalogTable.copy(stats = newStats)) - // Refresh the cached data source table in the catalog. - sessionState.catalog.refreshTable(tableIdent) + val newCatalogTable = catalogTable.copy(stats = newStats) + if (catalogTable.tableType == CatalogTableType.VIEW) { + assert(relation.isInstanceOf[LogicalRelation]) + val newRelation = + relation.asInstanceOf[LogicalRelation].copy(catalogTable = Some(newCatalogTable)) + catalog.createTempView(tableIdent.table, newRelation, overrideIfExists = true) + } else { + catalog.alterTable(newCatalogTable) + // Refresh the cached data source table in the catalog. + catalog.refreshTable(tableIdent) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index fd35c987cab5..14dc979546e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogStorageFormat} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} @@ -1573,6 +1573,46 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Run analyze table command on temporary view which is analyzable") { + withTempPath { path => + val dataPath = path.getCanonicalPath + spark.range(10).write.format("parquet").save(dataPath) + withView("view1") { + // Create a temporary view which is a [[LogicalRelation]] wrapping a datasource relation. + sql(s"CREATE TEMPORARY VIEW view1 USING parquet OPTIONS(path '$dataPath')") + + // Before ANALYZE TABLE, HadoopFsRelation calculates statistics from file sizes. + val relBefore = spark.sessionState.catalog.lookupRelation(TableIdentifier("view1")) + val statsBefore = EliminateSubqueryAliases(relBefore).statistics + val sizeBefore = statsBefore.sizeInBytes + assert(sizeBefore > 0) + assert(statsBefore.rowCount.isEmpty) + + sql("ANALYZE TABLE view1 COMPUTE STATISTICS") + + val relAfter = spark.sessionState.catalog.lookupRelation(TableIdentifier("view1")) + val statsAfter = EliminateSubqueryAliases(relAfter).statistics + val sizeAfter = statsAfter.sizeInBytes + assert(sizeAfter == sizeBefore) + assert(statsAfter.rowCount.isDefined && statsAfter.rowCount.get == 10) + } + } + } + + test("Run analyze table command on temporary view which is not analyzable") { + withTable("tab1") { + spark.range(10).write.saveAsTable("tab1") + withView("view1") { + // Create a temporary view which is represented as a query plan. + sql("CREATE TEMPORARY VIEW view1 (col1) AS SELECT * FROM tab1") + val df = spark.sessionState.catalog.lookupRelation(TableIdentifier("view1")) + intercept[AnalysisException] { + sql("ANALYZE TABLE view1 COMPUTE STATISTICS") + } + } + } + } + test("create temporary view with mismatched schema") { withTable("tab1") { spark.range(10).write.saveAsTable("tab1")