Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.UsingJoin
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,12 @@ class SessionCatalog(
child = parser.parsePlan(viewText))
SubqueryAlias(table, child, Some(name.copy(table = table, database = Some(db))))
} else {
SubqueryAlias(table, SimpleCatalogRelation(metadata), None)
val tableRelation = CatalogRelation(
metadata,
// we assume all the columns are nullable.
metadata.dataSchema.asNullable.toAttributes,
metadata.partitionSchema.asNullable.toAttributes)
SubqueryAlias(table, tableRelation, None)
}
} else {
SubqueryAlias(table, tempTables(table), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.sql.catalyst.catalog

import java.util.Date

import scala.collection.mutable
import com.google.common.base.Objects

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.quoteIdentifier
Expand Down Expand Up @@ -349,36 +350,43 @@ object CatalogTypes {


/**
* An interface that is implemented by logical plans to return the underlying catalog table.
* If we can in the future consolidate SimpleCatalogRelation and MetastoreRelation, we should
* probably remove this interface.
* A [[LogicalPlan]] that represents a table.
*/
trait CatalogRelation {
def catalogTable: CatalogTable
def output: Seq[Attribute]
}
case class CatalogRelation(
tableMeta: CatalogTable,
dataCols: Seq[Attribute],
partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataCols and partitionCols are needed only because CatalogRelation extends MultiInstanceRelation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and at least we need a Seq[Attribute] as output.

assert(tableMeta.identifier.database.isDefined)
Copy link
Member

@gatorsmile gatorsmile Feb 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add two more asserts?

  assert(tableMeta.partitionSchema == StructType.fromAttributes(partitionCols))
  assert(tableMeta.dataSchema.asNullable == StructType.fromAttributes(dataCols))

assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType))
assert(tableMeta.dataSchema.sameType(dataCols.toStructType))

// The partition column should always appear after data columns.
override def output: Seq[Attribute] = dataCols ++ partitionCols

def isPartitioned: Boolean = partitionCols.nonEmpty

override def equals(relation: Any): Boolean = relation match {
case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output
case _ => false
}

override def hashCode(): Int = {
Objects.hashCode(tableMeta.identifier, output)
}

/**
* A [[LogicalPlan]] that wraps [[CatalogTable]].
*
* Note that in the future we should consolidate this and HiveCatalogRelation.
*/
case class SimpleCatalogRelation(
metadata: CatalogTable)
extends LeafNode with CatalogRelation {

override def catalogTable: CatalogTable = metadata

override lazy val resolved: Boolean = false

override val output: Seq[Attribute] = {
val (partCols, dataCols) = metadata.schema.toAttributes
// Since data can be dumped in randomly with no validation, everything is nullable.
.map(_.withNullability(true).withQualifier(Some(metadata.identifier.table)))
.partition { a =>
metadata.partitionColumnNames.contains(a.name)
}
dataCols ++ partCols
/** Only compare table identifier. */
override lazy val cleanArgs: Seq[Any] = Seq(tableMeta.identifier)

override def computeStats(conf: CatalystConf): Statistics = {
// For data source tables, we will create a `LogicalRelation` and won't call this method, for
// hive serde tables, we will always generate a statistics.
// TODO: unify the table stats generation.
tableMeta.stats.map(_.toPlanStats(output)).getOrElse {
throw new IllegalStateException("table stats must be specified.")
}
}

override def newInstance(): LogicalPlan = copy(
dataCols = dataCols.map(_.newInstance()),
partitionCols = partitionCols.map(_.newInstance()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ case class TableIdentifier(table: String, database: Option[String])
}

/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)
case class QualifiedTableName(database: String, name: String) {
override def toString: String = s"$database.$name"
}

object TableIdentifier {
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,15 @@ class SessionCatalogSuite extends PlanTest {
sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
sessionCatalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
== SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head
.asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
// Otherwise, we'll first look up a temporary table with the same name
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1, None))
// Then, if that does not exist, look up the relation in the current database
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")).children.head
.asInstanceOf[CatalogRelation].tableMeta == metastoreTable1)
}

test("look up view relation") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// Get all input data source or hive relations of the query.
val srcRelations = df.logicalPlan.collect {
case LogicalRelation(src: BaseRelation, _, _) => src
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) =>
relation.catalogTable.identifier
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) =>
relation.tableMeta.identifier
}

val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
Expand All @@ -360,8 +360,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
// check hive table relation when overwrite mode
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable)
&& srcRelations.contains(relation.catalogTable.identifier) =>
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta)
&& srcRelations.contains(relation.tableMeta.identifier) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
case _ => // OK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ case class OptimizeMetadataOnlyQuery(
LocalRelation(partAttrs, partitionData.map(_.values))

case relation: CatalogRelation =>
val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
val partitionData = catalog.listPartitions(relation.catalogTable.identifier).map { p =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p =>
InternalRow.fromSeq(partAttrs.map { attr =>
// TODO: use correct timezone for partition values.
Cast(Literal(p.spec(attr.name)), attr.dataType,
Expand Down Expand Up @@ -135,8 +135,8 @@ case class OptimizeMetadataOnlyQuery(
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
Some(AttributeSet(partAttrs), l)

case relation: CatalogRelation if relation.catalogTable.partitionColumnNames.nonEmpty =>
val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation)
case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
Some(AttributeSet(partAttrs), relation)

case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

package org.apache.spark.sql.execution.command

import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.LogicalRelation


/**
Expand All @@ -40,60 +37,40 @@ case class AnalyzeColumnCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val relation =
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)

// Compute total size
val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {
case catalogRel: CatalogRelation =>
// This is a Hive serde format table
(catalogRel.catalogTable,
AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable))

case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
// This is a data source format table
(logicalRel.catalogTable.get,
AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))

case otherRelation =>
throw new AnalysisException("ANALYZE TABLE is not supported for " +
s"${otherRelation.nodeName}.")
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
if (tableMeta.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
val sizeInBytes = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)

// Compute stats for each column
val (rowCount, newColStats) =
AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames)
val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames)

// We also update table-level stats in order to keep them consistent with column-level stats.
val statistics = CatalogStatistics(
sizeInBytes = sizeInBytes,
rowCount = Some(rowCount),
// Newly computed column stats should override the existing ones.
colStats = catalogTable.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)

sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics)))
sessionState.catalog.alterTable(tableMeta.copy(stats = Some(statistics)))

// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)

Seq.empty[Row]
}
}

object AnalyzeColumnCommand extends Logging {

/**
* Compute stats for the given columns.
* @return (row count, map from column name to ColumnStats)
*
* This is visible for testing.
*/
def computeColumnStats(
private def computeColumnStats(
sparkSession: SparkSession,
tableName: String,
relation: LogicalPlan,
tableIdent: TableIdentifier,
columnNames: Seq[String]): (Long, Map[String, ColumnStat]) = {

val relation = sparkSession.table(tableIdent).logicalPlan
// Resolve the column names and dedup using AttributeSet
val resolver = sparkSession.sessionState.conf.resolver
val attributesToAnalyze = AttributeSet(columnNames.map { col =>
Expand All @@ -105,7 +82,7 @@ object AnalyzeColumnCommand extends Logging {
attributesToAnalyze.foreach { attr =>
if (!ColumnStat.supportsType(attr.dataType)) {
throw new AnalysisException(
s"Column ${attr.name} in table $tableName is of type ${attr.dataType}, " +
s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " +
"and Spark does not support statistics collection on this column type.")
}
}
Expand All @@ -116,7 +93,7 @@ object AnalyzeColumnCommand extends Logging {
// The layout of each struct follows the layout of the ColumnStats.
val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError
val expressions = Count(Literal(1)).toAggregateExpression() +:
attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))
attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr))

val namedExpressions = expressions.map(e => Alias(e, e.toString)())
val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
import org.apache.spark.sql.internal.SessionState


Expand All @@ -41,53 +39,39 @@ case class AnalyzeTableCommand(
val sessionState = sparkSession.sessionState
val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val relation =
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)

relation match {
case relation: CatalogRelation =>
updateTableStats(relation.catalogTable,
AnalyzeTableCommand.calculateTotalSize(sessionState, relation.catalogTable))

// data source tables have been converted into LogicalRelations
case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined =>
updateTableStats(logicalRel.catalogTable.get,
AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get))

case otherRelation =>
throw new AnalysisException("ANALYZE TABLE is not supported for " +
s"${otherRelation.nodeName}.")
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
if (tableMeta.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
val newTotalSize = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta)

def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
var newStats: Option[CatalogStatistics] = None
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
}
// We only set rowCount when noscan is false, because otherwise:
// 1. when total size is not changed, we don't need to alter the table;
// 2. when total size is changed, `oldRowCount` becomes invalid.
// This is to make sure that we only record the right statistics.
if (!noscan) {
val newRowCount = Dataset.ofRows(sparkSession, relation).count()
if (newRowCount >= 0 && newRowCount != oldRowCount) {
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
} else {
Some(CatalogStatistics(
sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
}
val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
var newStats: Option[CatalogStatistics] = None
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
}
// We only set rowCount when noscan is false, because otherwise:
// 1. when total size is not changed, we don't need to alter the table;
// 2. when total size is changed, `oldRowCount` becomes invalid.
// This is to make sure that we only record the right statistics.
if (!noscan) {
val newRowCount = sparkSession.table(tableIdentWithDB).count()
if (newRowCount >= 0 && newRowCount != oldRowCount) {
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
} else {
Some(CatalogStatistics(
sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
}
}
// Update the metastore if the above statistics of the table are different from those
// recorded in the metastore.
if (newStats.isDefined) {
sessionState.catalog.alterTable(catalogTable.copy(stats = newStats))
// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
}
}
// Update the metastore if the above statistics of the table are different from those
// recorded in the metastore.
if (newStats.isDefined) {
sessionState.catalog.alterTable(tableMeta.copy(stats = newStats))
// Refresh the cached data source table in the catalog.
sessionState.catalog.refreshTable(tableIdentWithDB)
}

Seq.empty[Row]
Expand Down
Loading