Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ class SessionCatalog(
CatalogColumn(
name = c.name,
dataType = c.dataType.catalogString,
nullable = c.nullable,
comment = Option(c.name)
nullable = c.nullable
)
Copy link
Member Author

@gatorsmile gatorsmile Sep 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the existing master build, we removed this useless comment attribute. The major reason is the schema comparison also checks the comment. This is introduced in the PR: #14114

},
properties = Map(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, Catal
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand All @@ -56,7 +58,12 @@ case class CreateHiveTableAsSelectLogicalPlan(
}

/**
* A command to create a table with the same definition of the given existing table.
* A command to create a MANAGED table with the same definition of the given existing table.
* In the target table definition, the table comment is always empty but the column comments
* are identical to the ones defined in the source table.
*
* The CatalogTable attributes copied from the source table are storage(inputFormat, outputFormat,
* serde, compressed, properties), schema, provider, partitionColumnNames, bucketSpec.
*
* The syntax of using this command in SQL is:
* {{{
Expand All @@ -75,18 +82,54 @@ case class CreateTableLikeCommand(
throw new AnalysisException(
s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
}
if (catalog.isTemporaryTable(sourceTable)) {
throw new AnalysisException(
s"Source table in CREATE TABLE LIKE cannot be temporary: '$sourceTable'")
val sourceTableDesc = catalog.getTableMetadata(sourceTable)

if (DDLUtils.isDatasourceTable(sourceTableDesc) ||
sourceTableDesc.tableType == CatalogTableType.VIEW) {
val outputSchema =
StructType(sourceTableDesc.schema.map { c =>
val builder = new MetadataBuilder
c.comment.map(comment => builder.putString("comment", comment))
StructField(
c.name,
CatalystSqlParser.parseDataType(c.dataType),
c.nullable,
metadata = builder.build())
})
val (schema, provider) = if (DDLUtils.isDatasourceTable(sourceTableDesc)) {
(DDLUtils.getSchemaFromTableProperties(sourceTableDesc).getOrElse(outputSchema),
sourceTableDesc.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER))
} else { // VIEW
(outputSchema, sparkSession.sessionState.conf.defaultDataSourceName)
}
createDataSourceTable(
sparkSession = sparkSession,
tableIdent = targetTable,
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
bucketSpec = None,
provider = provider,
options = Map("path" -> catalog.defaultTablePath(targetTable)),
isExternal = false)
} else {
val newStorage =
sourceTableDesc.storage.copy(
locationUri = None,
serdeProperties = sourceTableDesc.storage.serdeProperties)
val newTableDesc =
CatalogTable(
identifier = targetTable,
tableType = CatalogTableType.MANAGED,
storage = newStorage,
schema = sourceTableDesc.schema,
partitionColumnNames = sourceTableDesc.partitionColumnNames,
sortColumnNames = sourceTableDesc.sortColumnNames,
bucketColumnNames = sourceTableDesc.bucketColumnNames,
numBuckets = sourceTableDesc.numBuckets)

catalog.createTable(newTableDesc, ifNotExists)
}

val tableToCreate = catalog.getTableMetadata(sourceTable).copy(
identifier = targetTable,
tableType = CatalogTableType.MANAGED,
createTime = System.currentTimeMillis,
lastAccessTime = -1).withNewStorage(locationUri = None)

catalog.createTable(tableToCreate, ifNotExists)
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ private[hive] class HiveClientImpl(
serdeProperties = Option(h.getTTable.getSd.getSerdeInfo.getParameters)
.map(_.asScala.toMap).orNull
),
properties = properties.filter(kv => kv._1 != "comment"),
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
// in the function toHiveTable.
properties = properties.filter(kv => kv._1 != "comment" && kv._1 != "EXTERNAL"),
comment = properties.get("comment"),
viewOriginalText = Option(h.getViewOriginalText),
viewText = Option(h.getViewExpandedText),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import org.scalatest.BeforeAndAfterEach

import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils}
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
Expand Down Expand Up @@ -651,6 +654,233 @@ class HiveDDLSuite
}
}


test("CREATE TABLE LIKE a temporary view") {
val sourceViewName = "tab1"
val targetTabName = "tab2"
withTempView(sourceViewName) {
withTable(targetTabName) {
spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
.createTempView(sourceViewName)
sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")

val sourceTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(sourceViewName, None))
val targetTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(targetTabName, Some("default")))

checkCreateTableLike(sourceTable, targetTable)
}
}
}

test("CREATE TABLE LIKE a data source table") {
val sourceTabName = "tab1"
val targetTabName = "tab2"
withTable(sourceTabName, targetTabName) {
spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
.write.format("json").saveAsTable(sourceTabName)
sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")

val sourceTable =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
val targetTable =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))
// The table type of the source table should be a Hive-managed data source table
assert(DDLUtils.isDatasourceTable(sourceTable))
assert(sourceTable.tableType == CatalogTableType.MANAGED)

checkCreateTableLike(sourceTable, targetTable)
}
}

test("CREATE TABLE LIKE an external data source table") {
val sourceTabName = "tab1"
val targetTabName = "tab2"
withTable(sourceTabName, targetTabName) {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
.write.format("parquet").save(path)
sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '$path')")
sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")

// The source table should be an external data source table
val sourceTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(sourceTabName, Some("default")))
val targetTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(targetTabName, Some("default")))
// The table type of the source table should be an external data source table
assert(DDLUtils.isDatasourceTable(sourceTable))
assert(sourceTable.tableType == CatalogTableType.EXTERNAL)

checkCreateTableLike(sourceTable, targetTable)
}
}
}

test("CREATE TABLE LIKE a managed Hive serde table") {
val catalog = spark.sessionState.catalog
val sourceTabName = "tab1"
val targetTabName = "tab2"
withTable(sourceTabName, targetTabName) {
sql(s"CREATE TABLE $sourceTabName TBLPROPERTIES('prop1'='value1') AS SELECT 1 key, 'a'")
sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")

val sourceTable = catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
assert(sourceTable.tableType == CatalogTableType.MANAGED)
assert(sourceTable.properties.get("prop1").nonEmpty)
val targetTable = catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))

checkCreateTableLike(sourceTable, targetTable)
}
}

test("CREATE TABLE LIKE an external Hive serde table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>
val basePath = tmpDir.getCanonicalPath
val sourceTabName = "tab1"
val targetTabName = "tab2"
withTable(sourceTabName, targetTabName) {
assert(tmpDir.listFiles.isEmpty)
sql(
s"""
|CREATE EXTERNAL TABLE $sourceTabName (key INT comment 'test', value STRING)
|COMMENT 'Apache Spark'
|PARTITIONED BY (ds STRING, hr STRING)
|LOCATION '$basePath'
""".stripMargin)
for (ds <- Seq("2008-04-08", "2008-04-09"); hr <- Seq("11", "12")) {
sql(
s"""
|INSERT OVERWRITE TABLE $sourceTabName
|partition (ds='$ds',hr='$hr')
|SELECT 1, 'a'
""".stripMargin)
}
sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")

val sourceTable = catalog.getTableMetadata(TableIdentifier(sourceTabName, Some("default")))
assert(sourceTable.tableType == CatalogTableType.EXTERNAL)
assert(sourceTable.comment == Option("Apache Spark"))
val targetTable = catalog.getTableMetadata(TableIdentifier(targetTabName, Some("default")))

checkCreateTableLike(sourceTable, targetTable)
}
}
}

test("CREATE TABLE LIKE a view") {
val sourceTabName = "tab1"
val sourceViewName = "view"
val targetTabName = "tab2"
withTable(sourceTabName, targetTabName) {
withView(sourceViewName) {
spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
.write.format("json").saveAsTable(sourceTabName)
sql(s"CREATE VIEW $sourceViewName AS SELECT * FROM $sourceTabName")
sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")

val sourceView = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(sourceViewName, Some("default")))
// The original source should be a VIEW with an empty path
assert(sourceView.tableType == CatalogTableType.VIEW)
assert(sourceView.viewText.nonEmpty && sourceView.viewOriginalText.nonEmpty)
val targetTable = spark.sessionState.catalog.getTableMetadata(
TableIdentifier(targetTabName, Some("default")))

checkCreateTableLike(sourceView, targetTable)
}
}
}

private def getTablePath(table: CatalogTable): Option[String] = {
if (DDLUtils.isDatasourceTable(table)) {
new CaseInsensitiveMap(table.storage.serdeProperties).get("path")
} else {
table.storage.locationUri
}
}

private def checkCreateTableLike(sourceTable: CatalogTable, targetTable: CatalogTable): Unit = {
// The created table should be a MANAGED table with empty view text and original text.
assert(targetTable.tableType == CatalogTableType.MANAGED,
"the created table must be a Hive managed table")
assert(targetTable.viewText.isEmpty && targetTable.viewOriginalText.isEmpty,
"the view text and original text in the created table must be empty")
assert(targetTable.comment.isEmpty,
"the comment in the created table must be empty")
assert(targetTable.unsupportedFeatures.isEmpty,
"the unsupportedFeatures in the create table must be empty")

val metastoreGeneratedProperties = Seq(
"CreateTime",
"transient_lastDdlTime",
"grantTime",
"lastUpdateTime",
"last_modified_by",
"last_modified_time",
"Owner:",
"COLUMN_STATS_ACCURATE",
"numFiles",
"numRows",
"rawDataSize",
"totalSize",
"totalNumberFiles",
"maxFileSize",
"minFileSize"
)
assert(targetTable.properties.filterKeys { key =>
!metastoreGeneratedProperties.contains(key) && !key.startsWith(DATASOURCE_PREFIX)
}.isEmpty,
"the table properties of source tables should not be copied in the created table")

if (DDLUtils.isDatasourceTable(sourceTable) ||
sourceTable.tableType == CatalogTableType.VIEW) {
assert(DDLUtils.isDatasourceTable(targetTable),
"the target table should be a data source table")
} else {
assert(!DDLUtils.isDatasourceTable(targetTable),
"the target table should be a Hive serde table")
}

if (sourceTable.tableType == CatalogTableType.VIEW) {
// Source table is a temporary/permanent view, which does not have a provider. The created
// target table uses the default data source format
assert(targetTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER) ==
spark.sessionState.conf.defaultDataSourceName)
} else if (DDLUtils.isDatasourceTable(sourceTable)) {
assert(targetTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER) ==
sourceTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER))
}

val sourceTablePath = getTablePath(sourceTable)
val targetTablePath = getTablePath(targetTable)
assert(targetTablePath.nonEmpty, "target table path should not be empty")
assert(sourceTablePath != targetTablePath,
"source table/view path should be different from target table path")

// The source table contents should not been seen in the target table.
assert(spark.table(sourceTable.identifier).count() != 0, "the source table should be nonempty")
assert(spark.table(targetTable.identifier).count() == 0, "the target table should be empty")

// Their schema should be identical
checkAnswer(
sql(s"DESC ${sourceTable.identifier}").select("col_name", "data_type"),
sql(s"DESC ${targetTable.identifier}").select("col_name", "data_type"))

withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
// Check whether the new table can be inserted using the data from the original table
sql(s"INSERT INTO TABLE ${targetTable.identifier} SELECT * FROM ${sourceTable.identifier}")
}

// After insertion, the data should be identical
checkAnswer(
sql(s"SELECT * FROM ${sourceTable.identifier}"),
sql(s"SELECT * FROM ${targetTable.identifier}"))
}

test("Analyze data source tables(LogicalRelation)") {
withTable("t1") {
withTempPath { dir =>
Expand Down