From 06e115ca886809a7b1fcd16e96bd1e9f493add79 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 25 Jun 2016 16:05:19 -0700 Subject: [PATCH 1/3] fix --- .../spark/sql/execution/SparkSqlParser.scala | 22 ++++++++-- .../apache/spark/sql/internal/HiveSerDe.scala | 40 +++++++++++++++--- .../sql/execution/command/DDLSuite.scala | 41 ++++++++++++++++++- .../sql/hive/execution/HiveDDLSuite.scala | 35 ++++++++++++++++ 4 files changed, 128 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 066ff57721a3..932f21bf40af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -999,8 +999,24 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { operationNotAllowed(errorMessage, ctx) } - val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null) - if (conf.convertCTAS && !hasStorageProperties) { + val convertableFormat: Option[String] = + if (ctx.createFileFormat == null && ctx.rowFormat == null) { + // When no storage properties, use the default data source format + Option(conf.defaultDataSourceName) + } else { + val inputFormat = tableDesc.storage.inputFormat + val outputFormat = tableDesc.storage.outputFormat + val serde = tableDesc.storage.serde + + if (HiveSerDe.isParquet(inputFormat, outputFormat, serde)) { + Option("parquet") + } else if (HiveSerDe.isOrc(inputFormat, outputFormat, serde)) { + Option("orc") + } else { + None + } + } + if (conf.convertCTAS && convertableFormat.nonEmpty) { val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties // are empty Maps. @@ -1011,7 +1027,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } CreateTableUsingAsSelect( tableIdent = tableDesc.identifier, - provider = conf.defaultDataSourceName, + provider = convertableFormat.get, partitionColumns = tableDesc.partitionColumnNames.toArray, bucketSpec = None, mode = mode, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index ad69137f7401..d2876611982b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -17,12 +17,22 @@ package org.apache.spark.sql.internal +import org.apache.hadoop.mapred.InputFormat + case class HiveSerDe( inputFormat: Option[String] = None, outputFormat: Option[String] = None, serde: Option[String] = None) object HiveSerDe { + val parquetInputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" + val parquetOutputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" + val parquetSerde = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" + + val orcInputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat" + val orcOutputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat" + val orcSerde = "org.apache.hadoop.hive.ql.io.orc.OrcSerde" + /** * Get the Hive SerDe information from the data source abbreviation string or classname. * @@ -47,15 +57,15 @@ object HiveSerDe { "orc" -> HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), - serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")), + inputFormat = Option(orcInputFormat), + outputFormat = Option(orcOutputFormat), + serde = Option(orcSerde)), "parquet" -> HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), - serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")), + inputFormat = Option(parquetInputFormat), + outputFormat = Option(parquetOutputFormat), + serde = Option(parquetSerde)), "textfile" -> HiveSerDe( @@ -79,4 +89,22 @@ object HiveSerDe { serdeMap.get(key) } + + def isParquet( + inputFormat: Option[String], + outputFormat: Option[String], + serde: Option[String]): Boolean = { + inputFormat == Option(parquetInputFormat) && + outputFormat == Option(parquetOutputFormat) && + serde == Option(parquetSerde) + } + + def isOrc( + inputFormat: Option[String], + outputFormat: Option[String], + serde: Option[String]): Boolean = { + inputFormat == Option(orcInputFormat) && + outputFormat == Option(orcOutputFormat) && + serde == Option(orcSerde) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 47d8a28f4992..789cea3128d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, Catal import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.datasources.BucketSpec +import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructType} @@ -1264,6 +1264,45 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create Cataloged Table As Select - Convert to Data Source Table") { + import testImplicits._ + withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") { + withTable("t", "t1", "t2", "t3", "t4") { + val df1 = sql("CREATE TABLE t STORED AS parquet SELECT 1 as a, 1 as b") + assert(df1.queryExecution.analyzed.isInstanceOf[CreateTableUsingAsSelect]) + val analyzedDf1 = df1.queryExecution.analyzed.asInstanceOf[CreateTableUsingAsSelect] + assert(analyzedDf1.provider == "parquet") + checkAnswer(spark.table("t"), Row(1, 1) :: Nil) + + spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1") + val df2 = sql("CREATE TABLE t2 STORED AS parquet SELECT a, b from t1") + assert(df2.queryExecution.analyzed.isInstanceOf[CreateTableUsingAsSelect]) + val analyzedDf2 = df2.queryExecution.analyzed.asInstanceOf[CreateTableUsingAsSelect] + assert(analyzedDf2.provider == "parquet") + checkAnswer(spark.table("t2"), spark.table("t1")) + + val df3 = sql( + """ + |CREATE TABLE t3 + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + |SELECT 1 as a, 1 as b + """.stripMargin) + assert(df3.queryExecution.analyzed.isInstanceOf[CreateTableUsingAsSelect]) + val analyzedDf3 = df3.queryExecution.analyzed.asInstanceOf[CreateTableUsingAsSelect] + assert(analyzedDf3.provider == "parquet") + checkAnswer(spark.table("t3"), Row(1, 1) :: Nil) + + val e = intercept[AnalysisException] { + sql("CREATE TABLE t4 STORED AS orc SELECT 1 as a, 1 as b") + }.getMessage + assert(e.contains("The ORC data source must be used with Hive support enabled")) + } + } + } + test("create table with datasource properties (not allowed)") { assertUnsupported("CREATE TABLE my_tab TBLPROPERTIES ('spark.sql.sources.me'='anything')") assertUnsupported("CREATE TABLE my_tab ROW FORMAT SERDE 'serde' " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 89f69c8e4d7f..fb165066882c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.datasources.CreateTableUsingAsSelect import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -141,6 +142,40 @@ class HiveDDLSuite } } + test("Create Cataloged Table As Select - Convert to Data Source Table") { + withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") { + withTable("t", "t1") { + val df1 = sql("CREATE TABLE t STORED AS orc SELECT 1 as a, 1 as b") + assert(df1.queryExecution.analyzed.isInstanceOf[CreateTableUsingAsSelect]) + val analyzedDf1 = df1.queryExecution.analyzed.asInstanceOf[CreateTableUsingAsSelect] + assert(analyzedDf1.provider == "orc") + checkAnswer(spark.table("t"), Row(1, 1) :: Nil) + + val df2 = sql( + """ + |CREATE TABLE t1 + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' + |OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' + |SELECT 1 as a, 1 as b + """.stripMargin) + assert(df2.queryExecution.analyzed.isInstanceOf[CreateTableUsingAsSelect]) + val analyzedDf2 = df2.queryExecution.analyzed.asInstanceOf[CreateTableUsingAsSelect] + assert(analyzedDf2.provider == "orc") + checkAnswer(spark.table("t1"), Row(1, 1) :: Nil) + } + } + + withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") { + withTable("t", "t1") { + val df1 = sql("CREATE TABLE t STORED AS orc SELECT 1 as a, 1 as b") + assert(df1.queryExecution.analyzed.isInstanceOf[CreateHiveTableAsSelectCommand]) + checkAnswer(spark.table("t"), Row(1, 1) :: Nil) + } + } + } + test("add/drop partitions - external table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir => From c4bde0217a5e6a31da15cc29dc552a198ed6ef21 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 25 Jun 2016 16:08:00 -0700 Subject: [PATCH 2/3] clean --- .../main/scala/org/apache/spark/sql/internal/HiveSerDe.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index d2876611982b..7599a272ba8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.internal -import org.apache.hadoop.mapred.InputFormat - case class HiveSerDe( inputFormat: Option[String] = None, outputFormat: Option[String] = None, From a9ce0d8342a2c3768823b4dd120fda0997b1c313 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 25 Jun 2016 21:32:17 -0700 Subject: [PATCH 3/3] test case fix --- .../spark/sql/hive/execution/SQLQuerySuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 9c1f21825315..f6acb087a43f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -395,7 +395,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { def checkRelation( tableName: String, - isDataSourceParquet: Boolean, + isDataSourceTable: Boolean, format: String, userSpecifiedLocation: Option[String] = None): Unit = { val relation = EliminateSubqueryAliases( @@ -404,7 +404,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) relation match { case LogicalRelation(r: HadoopFsRelation, _, _) => - if (!isDataSourceParquet) { + if (!isDataSourceTable) { fail( s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + s"${HadoopFsRelation.getClass.getCanonicalName}.") @@ -418,7 +418,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { catalogTable.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER) === format) case r: MetastoreRelation => - if (isDataSourceParquet) { + if (isDataSourceTable) { fail( s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " + s"${classOf[MetastoreRelation].getCanonicalName}.") @@ -479,11 +479,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { sql("DROP TABLE ctas1") sql("CREATE TABLE ctas1 stored as orc AS SELECT key k, value FROM src ORDER BY k, value") - checkRelation("ctas1", false, "orc") + checkRelation("ctas1", true, "orc") sql("DROP TABLE ctas1") sql("CREATE TABLE ctas1 stored as parquet AS SELECT key k, value FROM src ORDER BY k, value") - checkRelation("ctas1", false, "parquet") + checkRelation("ctas1", true, "parquet") sql("DROP TABLE ctas1") } finally { setConf(SQLConf.CONVERT_CTAS, originalConf)