Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -999,8 +999,24 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
operationNotAllowed(errorMessage, ctx)
}

val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
if (conf.convertCTAS && !hasStorageProperties) {
val convertableFormat: Option[String] =
if (ctx.createFileFormat == null && ctx.rowFormat == null) {
// When no storage properties, use the default data source format
Option(conf.defaultDataSourceName)
} else {
val inputFormat = tableDesc.storage.inputFormat
val outputFormat = tableDesc.storage.outputFormat
val serde = tableDesc.storage.serde

if (HiveSerDe.isParquet(inputFormat, outputFormat, serde)) {
Option("parquet")
} else if (HiveSerDe.isOrc(inputFormat, outputFormat, serde)) {
Option("orc")
} else {
None
}
}
if (conf.convertCTAS && convertableFormat.nonEmpty) {
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
Expand All @@ -1011,7 +1027,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
CreateTableUsingAsSelect(
tableIdent = tableDesc.identifier,
provider = conf.defaultDataSourceName,
provider = convertableFormat.get,
partitionColumns = tableDesc.partitionColumnNames.toArray,
bucketSpec = None,
mode = mode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ case class HiveSerDe(
serde: Option[String] = None)

object HiveSerDe {
val parquetInputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
val parquetOutputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
val parquetSerde = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"

val orcInputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"
val orcOutputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"
val orcSerde = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"

/**
* Get the Hive SerDe information from the data source abbreviation string or classname.
*
Expand All @@ -47,15 +55,15 @@ object HiveSerDe {

"orc" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),
inputFormat = Option(orcInputFormat),
outputFormat = Option(orcOutputFormat),
serde = Option(orcSerde)),

"parquet" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")),
inputFormat = Option(parquetInputFormat),
outputFormat = Option(parquetOutputFormat),
serde = Option(parquetSerde)),

"textfile" ->
HiveSerDe(
Expand All @@ -79,4 +87,22 @@ object HiveSerDe {

serdeMap.get(key)
}

def isParquet(
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String]): Boolean = {
inputFormat == Option(parquetInputFormat) &&
outputFormat == Option(parquetOutputFormat) &&
serde == Option(parquetSerde)
}

def isOrc(
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String]): Boolean = {
inputFormat == Option(orcInputFormat) &&
outputFormat == Option(orcOutputFormat) &&
serde == Option(orcSerde)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, Catal
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
import org.apache.spark.sql.execution.datasources.BucketSpec
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
Expand Down Expand Up @@ -1264,6 +1264,45 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}

test("Create Cataloged Table As Select - Convert to Data Source Table") {
import testImplicits._
withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
withTable("t", "t1", "t2", "t3", "t4") {
val df1 = sql("CREATE TABLE t STORED AS parquet SELECT 1 as a, 1 as b")
assert(df1.queryExecution.analyzed.isInstanceOf[CreateTableUsingAsSelect])
val analyzedDf1 = df1.queryExecution.analyzed.asInstanceOf[CreateTableUsingAsSelect]
assert(analyzedDf1.provider == "parquet")
checkAnswer(spark.table("t"), Row(1, 1) :: Nil)

spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
val df2 = sql("CREATE TABLE t2 STORED AS parquet SELECT a, b from t1")
assert(df2.queryExecution.analyzed.isInstanceOf[CreateTableUsingAsSelect])
val analyzedDf2 = df2.queryExecution.analyzed.asInstanceOf[CreateTableUsingAsSelect]
assert(analyzedDf2.provider == "parquet")
checkAnswer(spark.table("t2"), spark.table("t1"))

val df3 = sql(
"""
|CREATE TABLE t3
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|STORED AS
|INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|SELECT 1 as a, 1 as b
""".stripMargin)
assert(df3.queryExecution.analyzed.isInstanceOf[CreateTableUsingAsSelect])
val analyzedDf3 = df3.queryExecution.analyzed.asInstanceOf[CreateTableUsingAsSelect]
assert(analyzedDf3.provider == "parquet")
checkAnswer(spark.table("t3"), Row(1, 1) :: Nil)

val e = intercept[AnalysisException] {
sql("CREATE TABLE t4 STORED AS orc SELECT 1 as a, 1 as b")
}.getMessage
assert(e.contains("The ORC data source must be used with Hive support enabled"))
}
}
}

test("create table with datasource properties (not allowed)") {
assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')")
assertUnsupported("CREATE TABLE my_tab ROW FORMAT SERDE 'serde' " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.datasources.CreateTableUsingAsSelect
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
Expand Down Expand Up @@ -141,6 +142,40 @@ class HiveDDLSuite
}
}

test("Create Cataloged Table As Select - Convert to Data Source Table") {
withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
withTable("t", "t1") {
val df1 = sql("CREATE TABLE t STORED AS orc SELECT 1 as a, 1 as b")
assert(df1.queryExecution.analyzed.isInstanceOf[CreateTableUsingAsSelect])
val analyzedDf1 = df1.queryExecution.analyzed.asInstanceOf[CreateTableUsingAsSelect]
assert(analyzedDf1.provider == "orc")
checkAnswer(spark.table("t"), Row(1, 1) :: Nil)

val df2 = sql(
"""
|CREATE TABLE t1
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
|STORED AS
|INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
|SELECT 1 as a, 1 as b
""".stripMargin)
assert(df2.queryExecution.analyzed.isInstanceOf[CreateTableUsingAsSelect])
val analyzedDf2 = df2.queryExecution.analyzed.asInstanceOf[CreateTableUsingAsSelect]
assert(analyzedDf2.provider == "orc")
checkAnswer(spark.table("t1"), Row(1, 1) :: Nil)
}
}

withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
withTable("t", "t1") {
val df1 = sql("CREATE TABLE t STORED AS orc SELECT 1 as a, 1 as b")
assert(df1.queryExecution.analyzed.isInstanceOf[CreateHiveTableAsSelectCommand])
checkAnswer(spark.table("t"), Row(1, 1) :: Nil)
}
}
}

test("add/drop partitions - external table") {
val catalog = spark.sessionState.catalog
withTempDir { tmpDir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

def checkRelation(
tableName: String,
isDataSourceParquet: Boolean,
isDataSourceTable: Boolean,
format: String,
userSpecifiedLocation: Option[String] = None): Unit = {
val relation = EliminateSubqueryAliases(
Expand All @@ -404,7 +404,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
relation match {
case LogicalRelation(r: HadoopFsRelation, _, _) =>
if (!isDataSourceParquet) {
if (!isDataSourceTable) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
s"${HadoopFsRelation.getClass.getCanonicalName}.")
Expand All @@ -418,7 +418,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
catalogTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER) === format)

case r: MetastoreRelation =>
if (isDataSourceParquet) {
if (isDataSourceTable) {
fail(
s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
Expand Down Expand Up @@ -479,11 +479,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
sql("DROP TABLE ctas1")

sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", false, "orc")
checkRelation("ctas1", true, "orc")
sql("DROP TABLE ctas1")

sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value")
checkRelation("ctas1", false, "parquet")
checkRelation("ctas1", true, "parquet")
sql("DROP TABLE ctas1")
} finally {
setConf(SQLConf.CONVERT_CTAS, originalConf)
Expand Down