From 6f576699c5440d03e56916b5845ecff4797bbfed Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 28 Jul 2015 21:57:48 -0700 Subject: [PATCH 1/5] write schema info to hivemetastore for data source --- .../org/apache/spark/sql/DataFrame.scala | 12 +- .../scala/org/apache/spark/sql/SQLConf.scala | 7 + .../spark/sql/hive/HiveMetastoreCatalog.scala | 129 +++++++++++++++-- .../org/apache/spark/sql/hive/HiveQl.scala | 38 +---- .../spark/sql/hive/orc/OrcRelation.scala | 6 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 135 +++++++++++++++++- .../sql/hive/execution/SQLQuerySuite.scala | 23 +++ 7 files changed, 300 insertions(+), 50 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index e57acec59d327..0a0e95682bd3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1651,7 +1651,7 @@ class DataFrame private[sql]( * be the target of an `insertInto`. * * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * if spark.sql.hive.writeDataSourceSchema=true in configuration. * @group output * @deprecated As of 1.4.0, replaced by `write().saveAsTable(tableName)`. */ @@ -1670,7 +1670,7 @@ class DataFrame private[sql]( * be the target of an `insertInto`. * * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * if spark.sql.hive.writeDataSourceSchema=true in configuration. * @group output * @deprecated As of 1.4.0, replaced by `write().mode(mode).saveAsTable(tableName)`. */ @@ -1690,7 +1690,7 @@ class DataFrame private[sql]( * be the target of an `insertInto`. * * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * if spark.sql.hive.writeDataSourceSchema=true in configuration. * @group output * @deprecated As of 1.4.0, replaced by `write().format(source).saveAsTable(tableName)`. */ @@ -1710,7 +1710,7 @@ class DataFrame private[sql]( * be the target of an `insertInto`. * * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * if spark.sql.hive.writeDataSourceSchema=true in configuration. * @group output * @deprecated As of 1.4.0, replaced by `write().mode(mode).saveAsTable(tableName)`. */ @@ -1729,7 +1729,7 @@ class DataFrame private[sql]( * be the target of an `insertInto`. * * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * if spark.sql.hive.writeDataSourceSchema=true in configuration. * @group output * @deprecated As of 1.4.0, replaced by * `write().format(source).mode(mode).options(options).saveAsTable(tableName)`. @@ -1755,7 +1755,7 @@ class DataFrame private[sql]( * be the target of an `insertInto`. * * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * if spark.sql.hive.writeDataSourceSchema=true in configuration. * @group output * @deprecated As of 1.4.0, replaced by * `write().format(source).mode(mode).options(options).saveAsTable(tableName)`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index f836122b3e0e4..3f3e7eb855ae1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -321,6 +321,11 @@ private[spark] object SQLConf { defaultValue = Some(5 * 60), doc = "Timeout in seconds for the broadcast wait time in broadcast joins.") + val HIVE_WRITE_DATASOURCE_SCHEMA = booleanConf("spark.sql.hive.writeDataSourceSchema", + defaultValue = Some(true), + doc = "When true, will write the metastore information to Hive Metastore for " + + "Spark SQL DataSource Updating/Insertion, so Hive can access the Spark SQL data seamlessly.") + // Options that control which operators can be chosen by the query planner. These should be // considered hints and may be ignored by future versions of Spark SQL. val EXTERNAL_SORT = booleanConf("spark.sql.planner.externalSort", @@ -470,6 +475,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) + private[spark] def writeSchemaToHiveMetastore = getConf(HIVE_WRITE_DATASOURCE_SCHEMA) + private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT) private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6b37af99f4677..0e0442066a314 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -23,6 +23,7 @@ import com.google.common.base.Objects import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.Warehouse import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata._ @@ -40,9 +41,70 @@ import org.apache.spark.sql.execution.datasources import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.parquet.ParquetRelation +import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} +private[hive] case class HiveSerDe( + inputFormat: Option[String] = None, + outputFormat: Option[String] = None, + serde: Option[String] = None) + +private[hive] object HiveSerDe { + /** + * Get the Hive SerDe information from the data source abbreviation string or classname. + * + * @param source Currently the source abbreviation can be one of the following: + * SequenceFile, RCFile, ORC, PARQUET, and case insensitive. + * @param hiveConf Hive Conf + * @param returnDefaultFormat Will the default format returns if no assocated data source found. + * The default input/output format is: + * InputFormat: org.apache.hadoop.mapred.TextInputFormat + * OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat + * @return HiveSerDe associated with the specified source + */ + def sourceToSerDe(source: String, hiveConf: HiveConf, returnDefaultFormat: Boolean) + : Option[HiveSerDe] = { + val serde = if ("SequenceFile".equalsIgnoreCase(source)) { + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), + outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")) + } else if ("RCFile".equalsIgnoreCase(source)) { + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), + serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE))) + } else if ("ORC".equalsIgnoreCase(source) || + "org.apache.spark.sql.hive.orc.DefaultSource" == source) { + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + } else if ("PARQUET".equalsIgnoreCase(source) || + "org.apache.spark.sql.parquet.DefaultSource" == source) { + HiveSerDe( + inputFormat = + Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), + outputFormat = + Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), + serde = + Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) + } else if (returnDefaultFormat) { + // return default file format + HiveSerDe( + inputFormat = + Option("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = + Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) + } else { + // TODO we probably need to provide SerDe for the built-in format, like json. + null + } + + Option(serde) + } +} + private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext) extends Catalog with Logging { @@ -210,15 +272,64 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive ManagedTable } - client.createTable( - HiveTable( - specifiedDatabase = Option(dbName), - name = tblName, - schema = Seq.empty, - partitionColumns = metastorePartitionColumns, - tableType = tableType, - properties = tableProperties.toMap, - serdeProperties = options)) + val hiveTable = HiveSerDe.sourceToSerDe(provider, hive.hiveconf, false) match { + case Some(hiveSerDe) if conf.writeSchemaToHiveMetastore => + // get the schema from the data source + val ds = ResolvedDataSource(hive, userSpecifiedSchema, partitionColumns, provider, options) + ds.relation match { + case fs: HadoopFsRelation if fs.paths.length == 1 => + def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = { + schema.map( field => HiveColumn( + name = field.name, + hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType), + comment = "") + ) + } + // get the partition columns + val parts = schemaToHiveColumn(fs.partitionColumns) + val partSet = parts.toSet + // remove the partition columns from the relation schema + val columns = schemaToHiveColumn(ds.relation.schema).filterNot(partSet.contains) + + HiveTable( + specifiedDatabase = Option(dbName), + name = tblName, + schema = columns, + partitionColumns = parts, + tableType = tableType, + properties = tableProperties.toMap, + serdeProperties = options, + location = Some(fs.paths.head), + viewText = None, // TODO We need to place the SQL string here. + inputFormat = hiveSerDe.inputFormat, + outputFormat = hiveSerDe.outputFormat, + serde = hiveSerDe.serde) + case fs: HadoopFsRelation => + throw new AnalysisException( + """Only a single location support for HadoopFSRelation when write the metadata + |into the Hive Metastore, set spark.sql.hive.writeDataSourceSchema=false + |for not conform to Hive""".stripMargin) + case _ => + throw new AnalysisException( + """Unable to write the Non HadoopFSRelation DataSource meta data + |into the Hive Metastore, set spark.sql.hive.writeDataSourceSchema=false + |for not conform to Hive""".stripMargin) + } + case other => + if (other.isEmpty) { + logWarning(s"Unable to find the SerDe info for $provider") + } + HiveTable( + specifiedDatabase = Option(dbName), + name = tblName, + schema = Seq.empty, + partitionColumns = metastorePartitionColumns, + tableType = tableType, + properties = tableProperties.toMap, + serdeProperties = options) + } + + client.createTable(hiveTable) } def hiveDefaultTableFilePath(tableName: String): String = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index f43e403ce9a9d..ec946819da1ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -604,38 +604,14 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C serde = None, viewText = None) - // default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.) + // default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.) val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT) - // handle the default format for the storage type abbriviation - tableDesc = if ("SequenceFile".equalsIgnoreCase(defaultStorageType)) { - tableDesc.copy( - inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), - outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")) - } else if ("RCFile".equalsIgnoreCase(defaultStorageType)) { - tableDesc.copy( - inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), - serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE))) - } else if ("ORC".equalsIgnoreCase(defaultStorageType)) { - tableDesc.copy( - inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), - serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) - } else if ("PARQUET".equalsIgnoreCase(defaultStorageType)) { - tableDesc.copy( - inputFormat = - Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), - outputFormat = - Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), - serde = - Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) - } else { - tableDesc.copy( - inputFormat = - Option("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = - Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) - } + // handle the default format for the storage type abbreviation + val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf, true).get + + hiveSerDe.inputFormat.foreach(f => tableDesc = tableDesc.copy(inputFormat = Some(f))) + hiveSerDe.outputFormat.foreach(f => tableDesc = tableDesc.copy(outputFormat = Some(f))) + hiveSerDe.serde.foreach(f => tableDesc = tableDesc.copy(serde = Some(f))) children.collect { case list @ Token("TOK_TABCOLLIST", _) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 6fa599734892b..4a310ff4e9016 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -291,9 +291,11 @@ private[orc] case class OrcTableScan( // Sets requested columns addColumnIds(attributes, relation, conf) - if (inputPaths.nonEmpty) { - FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*) + if (inputPaths.isEmpty) { + // the input path probably be pruned, return an empty RDD. + return sqlContext.sparkContext.emptyRDD[InternalRow] } + FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*) val inputFormatClass = classOf[OrcInputFormat] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 983c013bcf86a..4a25969dc0d72 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -17,11 +17,22 @@ package org.apache.spark.sql.hive -import org.apache.spark.{Logging, SparkFunSuite} -import org.apache.spark.sql.hive.test.TestHive +import java.io.File + +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.Logging +import org.apache.spark.util.Utils +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.hive.client.{ExternalTable, HiveColumn, ManagedTable} +import org.apache.spark.sql.{AnalysisException, SQLConf} +import org.apache.spark.sql.sources.DataSourceTest import org.apache.spark.sql.test.ExamplePointUDT import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.test.TestHive.implicits._ + class HiveMetastoreCatalogSuite extends SparkFunSuite with Logging { @@ -45,3 +56,123 @@ class HiveMetastoreCatalogSuite extends SparkFunSuite with Logging { df.as('a).join(df.as('b), $"a.key" === $"b.key") } } + +class DataSourceWithHiveMetastoreCatalogSuite extends DataSourceTest with BeforeAndAfterAll { + var path: File = null + + override def beforeAll(): Unit = { + path = Utils.createTempDir() + Utils.deleteRecursively(path) + } + + override def afterAll(): Unit = {} + + after { + Utils.deleteRecursively(path) + } + + test("SPARK-7550 / SPARK-6923 Support setting the right schema & serde to Hive metastore #1") { + import org.apache.spark.sql.SaveMode + val df = Seq((1, "1", 2), (2, "2", 2)).toDF("d1", "d2", "p") + df.write + .mode(SaveMode.Overwrite) + .partitionBy("p") + .format("parquet") + .saveAsTable("datasource_7550_1") + + val hiveTable = catalog.client.getTable("default", "datasource_7550_1") + val columns = hiveTable.schema + assert(columns.length === 2) + assert(columns(0).name === "d1") + assert(columns(0).hiveType === "int") + + assert(columns(1).name === "d2") + assert(columns(1).hiveType === "string") + + assert(hiveTable.inputFormat === + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(hiveTable.outputFormat === + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + assert(hiveTable.serde === + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) + + assert(hiveTable.isPartitioned === true) + assert(hiveTable.tableType === ManagedTable) + assert(hiveTable.partitionColumns.length === 1) + assert(hiveTable.partitionColumns.head === HiveColumn("p", "int", "")) + } + + test("SPARK-7550 / SPARK-6923 Support setting the right schema & serde to Hive metastore #2") { + import org.apache.spark.sql.SaveMode + val df = Seq((1, "1", 2), (2, "2", 2)).toDF("d1", "d2", "p") + + df.write + .mode(SaveMode.ErrorIfExists) + .partitionBy("p") + .format("parquet") + .option("path", path.toString) + .saveAsTable("datasource_7550_2") + + val hiveTable = catalog.client.getTable("default", "datasource_7550_2") + + assert(hiveTable.isPartitioned === true) + assert(hiveTable.tableType === ExternalTable) + // hiveTable.location is "file:///tmp/..." while path.toString just be "/tmp/.." + assert(hiveTable.location.get.contains(path.toString)) + } + + test("SPARK-7550 / SPARK-6923 Support setting the right schema & serde to Hive metastore #3") { + // Ignore to write the meta into Hive Metastore, as we don't support the json for Hive + import org.apache.spark.sql.SaveMode + val df = Seq((1, "1", 2), (2, "2", 2)).toDF("d1", "d2", "p") + df.write + .mode(SaveMode.ErrorIfExists) + .format("json") + .saveAsTable("datasource_7550_3") + assert(2 === sql("SELECT * from datasource_7550_3").count()) + } + + test("SPARK-7550 / SPARK-6923 Support setting the right schema & serde to Hive metastore #4") { + // Ignore to write the meta into Hive Metastore, as we don't support the DataSource for Hive + sql( + """CREATE TABLE datasource_7550_4 + | USING org.apache.spark.sql.sources.DDLScanSource + | OPTIONS ( + | From '1', + | To '10', + | Table 'test1') + """.stripMargin) + assert(10 === sql("SELECT * from datasource_7550_4").count()) + } + + test("SPARK-7550 / SPARK-6923 Support setting the right schema & serde to Hive metastore #5") { + sql(s""" + |CREATE TABLE datasource_7550_5 + |USING orc + |OPTIONS ( + | path '${path.getCanonicalPath}' + |) AS + |SELECT 1 as d1, "aa" as d2 + """.stripMargin) + + val hiveTable = catalog.client.getTable("default", "datasource_7550_5") + val columns = hiveTable.schema + assert(columns.length === 2) + assert(columns(0).name === "d1") + assert(columns(0).hiveType === "int") + + assert(columns(1).name === "d2") + assert(columns(1).hiveType === "string") + + assert(hiveTable.inputFormat === + Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) + assert(hiveTable.outputFormat === + Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) + assert(hiveTable.serde === + Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + + assert(hiveTable.isPartitioned === false) + assert(hiveTable.tableType === ExternalTable) + assert(hiveTable.partitionColumns.length === 0) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1dff07a6de8ad..1a556cc803646 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -93,6 +93,29 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { checkAnswer(query, Row(1, 1) :: Row(1, 2) :: Row(1, 3) :: Nil) } + test("SPARK-7550: Support setting the right schema & serde when writing to Hive Metastore") { + val df = Seq((1, 1)).toDF("c1", "c2") + + // test writing orc table via data source api, and read it in Hive + df.write.format("orc").saveAsTable("spark_7550_orc") + checkAnswer(sql("select * from spark_7550_orc"), Row(1, 1) :: Nil) // run with spark + val hiveResult1 = runSqlHive("select * from spark_7550_orc limit 2") // run with hive + assert(hiveResult1.length === 1) + assert(hiveResult1(0) === "1\t1") + + // test writing parquet table via data source api, and read it in Hive + df.write.format("parquet").saveAsTable("spark_7550_parquet") + checkAnswer(sql("select * from spark_7550_parquet"), Row(1, 1) :: Nil) // run with spark + val hiveResult2 = runSqlHive("select * from spark_7550_parquet limit 2") // run with hive + assert(hiveResult2.length === 1) + assert(hiveResult2(0) === "1\t1") + + // test writing json table via data source api + // will not throw exception, but will log warning, + // as Hive is not able to load data from this table + df.write.format("json").saveAsTable("spark_7550_json") + } + test("SPARK-6851: Self-joined converted parquet tables") { val orders = Seq( Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151), From 3490cdc77a33a0eac7be29a53e26887a81b001c0 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 28 Jul 2015 22:23:28 -0700 Subject: [PATCH 2/5] update the scaladoc --- .../org/apache/spark/sql/DataFrame.scala | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 0a0e95682bd3f..24c0f9cdfec65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1650,8 +1650,10 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * Also note that while this function can persist the table metadata into Hive's metastore, - * if spark.sql.hive.writeDataSourceSchema=true in configuration. + * This function always persists table metadata into Hive's metastore. But the table is + * not accessible from Hive unless the underlying data source is either Parquet or ORC. + * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * * @group output * @deprecated As of 1.4.0, replaced by `write().saveAsTable(tableName)`. */ @@ -1669,8 +1671,10 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * Also note that while this function can persist the table metadata into Hive's metastore, - * if spark.sql.hive.writeDataSourceSchema=true in configuration. + * This function always persists table metadata into Hive's metastore. But the table is + * not accessible from Hive unless the underlying data source is either Parquet or ORC. + * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * * @group output * @deprecated As of 1.4.0, replaced by `write().mode(mode).saveAsTable(tableName)`. */ @@ -1689,8 +1693,10 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * Also note that while this function can persist the table metadata into Hive's metastore, - * if spark.sql.hive.writeDataSourceSchema=true in configuration. + * This function always persists table metadata into Hive's metastore. But the table is + * not accessible from Hive unless the underlying data source is either Parquet or ORC. + * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * * @group output * @deprecated As of 1.4.0, replaced by `write().format(source).saveAsTable(tableName)`. */ @@ -1709,8 +1715,10 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * Also note that while this function can persist the table metadata into Hive's metastore, - * if spark.sql.hive.writeDataSourceSchema=true in configuration. + * This function always persists table metadata into Hive's metastore. But the table is + * not accessible from Hive unless the underlying data source is either Parquet or ORC. + * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * * @group output * @deprecated As of 1.4.0, replaced by `write().mode(mode).saveAsTable(tableName)`. */ @@ -1728,8 +1736,10 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * Also note that while this function can persist the table metadata into Hive's metastore, - * if spark.sql.hive.writeDataSourceSchema=true in configuration. + * This function always persists table metadata into Hive's metastore. But the table is + * not accessible from Hive unless the underlying data source is either Parquet or ORC. + * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * * @group output * @deprecated As of 1.4.0, replaced by * `write().format(source).mode(mode).options(options).saveAsTable(tableName)`. @@ -1754,8 +1764,10 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * Also note that while this function can persist the table metadata into Hive's metastore, - * if spark.sql.hive.writeDataSourceSchema=true in configuration. + * This function always persists table metadata into Hive's metastore. But the table is + * not accessible from Hive unless the underlying data source is either Parquet or ORC. + * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * * @group output * @deprecated As of 1.4.0, replaced by * `write().format(source).mode(mode).options(options).saveAsTable(tableName)`. From 864aceec0479f1b8faa4eb707c7d1dc57a354c15 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 5 Aug 2015 22:00:45 +0800 Subject: [PATCH 3/5] Refactors PR #5733 --- .../scala/org/apache/spark/sql/SQLConf.scala | 7 - .../spark/sql/hive/HiveMetastoreCatalog.scala | 219 +++++++++------- .../org/apache/spark/sql/hive/HiveQl.scala | 11 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 248 ++++++++---------- .../sql/hive/execution/SQLQuerySuite.scala | 23 -- 5 files changed, 242 insertions(+), 266 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 3f3e7eb855ae1..f836122b3e0e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -321,11 +321,6 @@ private[spark] object SQLConf { defaultValue = Some(5 * 60), doc = "Timeout in seconds for the broadcast wait time in broadcast joins.") - val HIVE_WRITE_DATASOURCE_SCHEMA = booleanConf("spark.sql.hive.writeDataSourceSchema", - defaultValue = Some(true), - doc = "When true, will write the metastore information to Hive Metastore for " + - "Spark SQL DataSource Updating/Insertion, so Hive can access the Spark SQL data seamlessly.") - // Options that control which operators can be chosen by the query planner. These should be // considered hints and may be ignored by future versions of Spark SQL. val EXTERNAL_SORT = booleanConf("spark.sql.planner.externalSort", @@ -475,8 +470,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) - private[spark] def writeSchemaToHiveMetastore = getConf(HIVE_WRITE_DATASOURCE_SCHEMA) - private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT) private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0e0442066a314..1523ebe9d5493 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import scala.collection.JavaConversions._ +import scala.collection.mutable import com.google.common.base.Objects import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} @@ -57,51 +58,40 @@ private[hive] object HiveSerDe { * @param source Currently the source abbreviation can be one of the following: * SequenceFile, RCFile, ORC, PARQUET, and case insensitive. * @param hiveConf Hive Conf - * @param returnDefaultFormat Will the default format returns if no assocated data source found. - * The default input/output format is: - * InputFormat: org.apache.hadoop.mapred.TextInputFormat - * OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat * @return HiveSerDe associated with the specified source */ - def sourceToSerDe(source: String, hiveConf: HiveConf, returnDefaultFormat: Boolean) - : Option[HiveSerDe] = { - val serde = if ("SequenceFile".equalsIgnoreCase(source)) { - HiveSerDe( - inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), - outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")) - } else if ("RCFile".equalsIgnoreCase(source)) { - HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), - serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE))) - } else if ("ORC".equalsIgnoreCase(source) || - "org.apache.spark.sql.hive.orc.DefaultSource" == source) { - HiveSerDe( - inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), - outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), - serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) - } else if ("PARQUET".equalsIgnoreCase(source) || - "org.apache.spark.sql.parquet.DefaultSource" == source) { - HiveSerDe( - inputFormat = - Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), - outputFormat = - Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), - serde = - Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) - } else if (returnDefaultFormat) { - // return default file format - HiveSerDe( - inputFormat = - Option("org.apache.hadoop.mapred.TextInputFormat"), - outputFormat = - Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) - } else { - // TODO we probably need to provide SerDe for the built-in format, like json. - null + def sourceToSerDe(source: String, hiveConf: HiveConf): Option[HiveSerDe] = { + val serdeMap = Map( + "sequencefile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), + outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")), + + "rcfile" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), + serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE))), + + "orc" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")), + + "parquet" -> + HiveSerDe( + inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))) + + val key = source.toLowerCase match { + case _ if source.startsWith("org.apache.spark.sql.parquet") => "parquet" + case _ if source.startsWith("org.apache.spark.sql.orc") => "orc" + case _ => source.toLowerCase } - Option(serde) + serdeMap.get(key) } } @@ -226,15 +216,15 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive processDatabaseAndTableName(database, tableIdent.table) } - val tableProperties = new scala.collection.mutable.HashMap[String, String] + val tableProperties = new mutable.HashMap[String, String] tableProperties.put("spark.sql.sources.provider", provider) // Saves optional user specified schema. Serialized JSON schema string may be too long to be // stored into a single metastore SerDe property. In this case, we split the JSON string and // store each part as a separate SerDe property. - if (userSpecifiedSchema.isDefined) { + userSpecifiedSchema.foreach { schema => val threshold = conf.schemaStringLengthThreshold - val schemaJsonString = userSpecifiedSchema.get.json + val schemaJsonString = schema.json // Split the JSON string. val parts = schemaJsonString.grouped(threshold).toSeq tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString) @@ -256,7 +246,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // The table does not have a specified schema, which means that the schema will be inferred // when we load the table. So, we are not expecting partition columns and we will discover // partitions when we load the table. However, if there are specified partition columns, - // we simplily ignore them and provide a warning message.. + // we simply ignore them and provide a warning message. logWarning( s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " + s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.") @@ -272,61 +262,92 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive ManagedTable } - val hiveTable = HiveSerDe.sourceToSerDe(provider, hive.hiveconf, false) match { - case Some(hiveSerDe) if conf.writeSchemaToHiveMetastore => - // get the schema from the data source - val ds = ResolvedDataSource(hive, userSpecifiedSchema, partitionColumns, provider, options) - ds.relation match { - case fs: HadoopFsRelation if fs.paths.length == 1 => - def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = { - schema.map( field => HiveColumn( - name = field.name, - hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType), - comment = "") - ) - } - // get the partition columns - val parts = schemaToHiveColumn(fs.partitionColumns) - val partSet = parts.toSet - // remove the partition columns from the relation schema - val columns = schemaToHiveColumn(ds.relation.schema).filterNot(partSet.contains) - - HiveTable( - specifiedDatabase = Option(dbName), - name = tblName, - schema = columns, - partitionColumns = parts, - tableType = tableType, - properties = tableProperties.toMap, - serdeProperties = options, - location = Some(fs.paths.head), - viewText = None, // TODO We need to place the SQL string here. - inputFormat = hiveSerDe.inputFormat, - outputFormat = hiveSerDe.outputFormat, - serde = hiveSerDe.serde) - case fs: HadoopFsRelation => - throw new AnalysisException( - """Only a single location support for HadoopFSRelation when write the metadata - |into the Hive Metastore, set spark.sql.hive.writeDataSourceSchema=false - |for not conform to Hive""".stripMargin) - case _ => - throw new AnalysisException( - """Unable to write the Non HadoopFSRelation DataSource meta data - |into the Hive Metastore, set spark.sql.hive.writeDataSourceSchema=false - |for not conform to Hive""".stripMargin) + val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf) + val dataSource = ResolvedDataSource( + hive, userSpecifiedSchema, partitionColumns, provider, options) + + def newSparkSQLSpecificMetastoreTable(): HiveTable = { + HiveTable( + specifiedDatabase = Option(dbName), + name = tblName, + schema = Seq.empty, + partitionColumns = metastorePartitionColumns, + tableType = tableType, + properties = tableProperties.toMap, + serdeProperties = options) + } + + def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = { + def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = { + schema.map { field => + HiveColumn( + name = field.name, + hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType), + comment = "") + } + } + + val partitionColumns = schemaToHiveColumn(relation.partitionColumns) + val dataColumns = schemaToHiveColumn(relation.schema).filterNot(partitionColumns.contains) + + HiveTable( + specifiedDatabase = Option(dbName), + name = tblName, + schema = dataColumns, + partitionColumns = partitionColumns, + tableType = tableType, + properties = tableProperties.toMap, + serdeProperties = options, + location = Some(relation.paths.head), + viewText = None, // TODO We need to place the SQL string here. + inputFormat = serde.inputFormat, + outputFormat = serde.outputFormat, + serde = serde.serde) + } + + // TODO: Support persisting partitioned data source relations in Hive compatible format + val hiveTable = (maybeSerDe, dataSource.relation) match { + case (Some(serde), relation: HadoopFsRelation) + if relation.paths.length == 1 && relation.partitionColumns.isEmpty => + logInfo { + "Persisting data source relation with a single input path into Hive metastore in Hive " + + s"compatible format. Input path: ${relation.paths.head}" + } + newHiveCompatibleMetastoreTable(relation, serde) + + case (Some(serde), relation: HadoopFsRelation) if relation.partitionColumns.nonEmpty => + logWarning { + val paths = relation.paths.mkString(", ") + "Persisting partitioned data source relation into Hive metastore in " + + s"Spark SQL specific format, which is NOT compatible with Hive. Input path(s): " + + paths.mkString("\n", "\n", "") } - case other => - if (other.isEmpty) { - logWarning(s"Unable to find the SerDe info for $provider") + newSparkSQLSpecificMetastoreTable() + + case (Some(serde), relation: HadoopFsRelation) => + logWarning { + val paths = relation.paths.mkString(", ") + "Persisting data source relation with multiple input paths into Hive metastore in " + + s"Spark SQL specific format, which is NOT compatible with Hive. Input paths: " + + paths.mkString("\n", "\n", "") + } + newSparkSQLSpecificMetastoreTable() + + case (Some(serde), _) => + logWarning { + s"Data source relation is not a ${classOf[HadoopFsRelation].getSimpleName}. " + + "Persisting it into Hive metastore in Spark SQL specific format, " + + "which is NOT compatible with Hive." + } + newSparkSQLSpecificMetastoreTable() + + case _ => + logWarning { + s"Couldn't find corresponding Hive SerDe for data source provider $provider. " + + "Persisting data source relation into Hive metastore in Spark SQL specific format, " + + "which is NOT compatible with Hive." } - HiveTable( - specifiedDatabase = Option(dbName), - name = tblName, - schema = Seq.empty, - partitionColumns = metastorePartitionColumns, - tableType = tableType, - properties = tableProperties.toMap, - serdeProperties = options) + newSparkSQLSpecificMetastoreTable() } client.createTable(hiveTable) @@ -574,7 +595,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive case p: LogicalPlan if !p.childrenResolved => p case p: LogicalPlan if p.resolved => p case p @ CreateTableAsSelect(table, child, allowExisting) => - val schema = if (table.schema.size > 0) { + val schema = if (table.schema.nonEmpty) { table.schema } else { child.output.map { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index ec946819da1ac..5f400c26ce04c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.spark.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ @@ -261,8 +262,8 @@ private[hive] object HiveQl extends Logging { /** * Returns the HiveConf */ - private[this] def hiveConf(): HiveConf = { - val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here + private[this] def hiveConf: HiveConf = { + val ss = SessionState.get() // SessionState is lazy initialization, it can be null here if (ss == null) { new HiveConf() } else { @@ -607,7 +608,11 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C // default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.) val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT) // handle the default format for the storage type abbreviation - val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf, true).get + val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse { + HiveSerDe( + inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) + } hiveSerDe.inputFormat.foreach(f => tableDesc = tableDesc.copy(inputFormat = Some(f))) hiveSerDe.outputFormat.foreach(f => tableDesc = tableDesc.copy(outputFormat = Some(f))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 4a25969dc0d72..332c3ec0c28b8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -19,160 +19,140 @@ package org.apache.spark.sql.hive import java.io.File -import org.scalatest.BeforeAndAfterAll - -import org.apache.spark.Logging -import org.apache.spark.util.Utils -import org.apache.spark.SparkFunSuite import org.apache.spark.sql.hive.client.{ExternalTable, HiveColumn, ManagedTable} -import org.apache.spark.sql.{AnalysisException, SQLConf} -import org.apache.spark.sql.sources.DataSourceTest -import org.apache.spark.sql.test.ExamplePointUDT -import org.apache.spark.sql.types.StructType import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.sql.sources.DataSourceTest +import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.{Logging, SparkFunSuite} class HiveMetastoreCatalogSuite extends SparkFunSuite with Logging { test("struct field should accept underscore in sub-column name") { - val metastr = "struct" - - val datatype = HiveMetastoreTypes.toDataType(metastr) - assert(datatype.isInstanceOf[StructType]) + val hiveTypeStr = "struct" + val dateType = HiveMetastoreTypes.toDataType(hiveTypeStr) + assert(dateType.isInstanceOf[StructType]) } test("udt to metastore type conversion") { val udt = new ExamplePointUDT - assert(HiveMetastoreTypes.toMetastoreType(udt) === - HiveMetastoreTypes.toMetastoreType(udt.sqlType)) + assertResult(HiveMetastoreTypes.toMetastoreType(udt.sqlType)) { + HiveMetastoreTypes.toMetastoreType(udt) + } } test("duplicated metastore relations") { - import TestHive.implicits._ - val df = TestHive.sql("SELECT * FROM src") + val df = sql("SELECT * FROM src") logInfo(df.queryExecution.toString) df.as('a).join(df.as('b), $"a.key" === $"b.key") } } -class DataSourceWithHiveMetastoreCatalogSuite extends DataSourceTest with BeforeAndAfterAll { - var path: File = null - - override def beforeAll(): Unit = { - path = Utils.createTempDir() - Utils.deleteRecursively(path) - } - - override def afterAll(): Unit = {} - - after { - Utils.deleteRecursively(path) - } - - test("SPARK-7550 / SPARK-6923 Support setting the right schema & serde to Hive metastore #1") { - import org.apache.spark.sql.SaveMode - val df = Seq((1, "1", 2), (2, "2", 2)).toDF("d1", "d2", "p") - df.write - .mode(SaveMode.Overwrite) - .partitionBy("p") - .format("parquet") - .saveAsTable("datasource_7550_1") - - val hiveTable = catalog.client.getTable("default", "datasource_7550_1") - val columns = hiveTable.schema - assert(columns.length === 2) - assert(columns(0).name === "d1") - assert(columns(0).hiveType === "int") - - assert(columns(1).name === "d2") - assert(columns(1).hiveType === "string") - - assert(hiveTable.inputFormat === - Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) - assert(hiveTable.outputFormat === - Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) - assert(hiveTable.serde === - Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) - - assert(hiveTable.isPartitioned === true) - assert(hiveTable.tableType === ManagedTable) - assert(hiveTable.partitionColumns.length === 1) - assert(hiveTable.partitionColumns.head === HiveColumn("p", "int", "")) - } - - test("SPARK-7550 / SPARK-6923 Support setting the right schema & serde to Hive metastore #2") { - import org.apache.spark.sql.SaveMode - val df = Seq((1, "1", 2), (2, "2", 2)).toDF("d1", "d2", "p") - - df.write - .mode(SaveMode.ErrorIfExists) - .partitionBy("p") - .format("parquet") - .option("path", path.toString) - .saveAsTable("datasource_7550_2") - - val hiveTable = catalog.client.getTable("default", "datasource_7550_2") - - assert(hiveTable.isPartitioned === true) - assert(hiveTable.tableType === ExternalTable) - // hiveTable.location is "file:///tmp/..." while path.toString just be "/tmp/.." - assert(hiveTable.location.get.contains(path.toString)) - } - - test("SPARK-7550 / SPARK-6923 Support setting the right schema & serde to Hive metastore #3") { - // Ignore to write the meta into Hive Metastore, as we don't support the json for Hive - import org.apache.spark.sql.SaveMode - val df = Seq((1, "1", 2), (2, "2", 2)).toDF("d1", "d2", "p") - df.write - .mode(SaveMode.ErrorIfExists) - .format("json") - .saveAsTable("datasource_7550_3") - assert(2 === sql("SELECT * from datasource_7550_3").count()) - } - - test("SPARK-7550 / SPARK-6923 Support setting the right schema & serde to Hive metastore #4") { - // Ignore to write the meta into Hive Metastore, as we don't support the DataSource for Hive - sql( - """CREATE TABLE datasource_7550_4 - | USING org.apache.spark.sql.sources.DDLScanSource - | OPTIONS ( - | From '1', - | To '10', - | Table 'test1') - """.stripMargin) - assert(10 === sql("SELECT * from datasource_7550_4").count()) - } - - test("SPARK-7550 / SPARK-6923 Support setting the right schema & serde to Hive metastore #5") { - sql(s""" - |CREATE TABLE datasource_7550_5 - |USING orc - |OPTIONS ( - | path '${path.getCanonicalPath}' - |) AS - |SELECT 1 as d1, "aa" as d2 - """.stripMargin) - - val hiveTable = catalog.client.getTable("default", "datasource_7550_5") - val columns = hiveTable.schema - assert(columns.length === 2) - assert(columns(0).name === "d1") - assert(columns(0).hiveType === "int") - - assert(columns(1).name === "d2") - assert(columns(1).hiveType === "string") - - assert(hiveTable.inputFormat === - Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) - assert(hiveTable.outputFormat === - Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) - assert(hiveTable.serde === - Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) - - assert(hiveTable.isPartitioned === false) - assert(hiveTable.tableType === ExternalTable) - assert(hiveTable.partitionColumns.length === 0) +class DataSourceWithHiveMetastoreCatalogSuite extends DataSourceTest with SQLTestUtils { + override val sqlContext = TestHive + + private val testDF = (1 to 2).map(i => (i, s"val_$i")).toDF("d1", "d2").coalesce(1) + + Seq( + "parquet" -> ( + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" + ), + + "orc" -> ( + "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat", + "org.apache.hadoop.hive.ql.io.orc.OrcSerde" + ) + ).foreach { case (provider, (inputFormat, outputFormat, serde)) => + test(s"Persist non-partitioned $provider relation into metastore as managed table") { + withTable("t") { + testDF + .write + .mode(SaveMode.Overwrite) + .format(provider) + .saveAsTable("t") + + val hiveTable = catalog.client.getTable("default", "t") + assert(hiveTable.inputFormat === Some(inputFormat)) + assert(hiveTable.outputFormat === Some(outputFormat)) + assert(hiveTable.serde === Some(serde)) + + assert(!hiveTable.isPartitioned) + assert(hiveTable.tableType === ManagedTable) + + val columns = hiveTable.schema + assert(columns.map(_.name) === Seq("d1", "d2")) + assert(columns.map(_.hiveType) === Seq("int", "string")) + + checkAnswer(table("t"), testDF) + assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1", "2\tval_2")) + } + } + + test(s"Persist non-partitioned $provider relation into metastore as external table") { + withTempPath { dir => + withTable("t") { + val path = dir.getCanonicalFile + + testDF + .write + .mode(SaveMode.Overwrite) + .format(provider) + .option("path", path.toString) + .saveAsTable("t") + + val hiveTable = catalog.client.getTable("default", "t") + assert(hiveTable.inputFormat === Some(inputFormat)) + assert(hiveTable.outputFormat === Some(outputFormat)) + assert(hiveTable.serde === Some(serde)) + + assert(hiveTable.tableType === ExternalTable) + assert(hiveTable.location.get === path.toURI.toString.stripSuffix(File.separator)) + + val columns = hiveTable.schema + assert(columns.map(_.name) === Seq("d1", "d2")) + assert(columns.map(_.hiveType) === Seq("int", "string")) + + checkAnswer(table("t"), testDF) + assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1", "2\tval_2")) + } + } + } + + test(s"Persist non-partitioned $provider relation into metastore as managed table using CTAS") { + withTempPath { dir => + withTable("t") { + val path = dir.getCanonicalPath + + sql( + s"""CREATE TABLE t USING $provider + |OPTIONS (path '$path') + |AS SELECT 1 AS d1, "val_1" AS d2 + """.stripMargin) + + val hiveTable = catalog.client.getTable("default", "t") + assert(hiveTable.inputFormat === Some(inputFormat)) + assert(hiveTable.outputFormat === Some(outputFormat)) + assert(hiveTable.serde === Some(serde)) + + assert(hiveTable.isPartitioned === false) + assert(hiveTable.tableType === ExternalTable) + assert(hiveTable.partitionColumns.length === 0) + + val columns = hiveTable.schema + assert(columns.map(_.name) === Seq("d1", "d2")) + assert(columns.map(_.hiveType) === Seq("int", "string")) + + checkAnswer(table("t"), Row(1, "val_1")) + assert(runSqlHive("SELECT * FROM t") === Seq("1\tval_1")) + } + } + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1a556cc803646..1dff07a6de8ad 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -93,29 +93,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { checkAnswer(query, Row(1, 1) :: Row(1, 2) :: Row(1, 3) :: Nil) } - test("SPARK-7550: Support setting the right schema & serde when writing to Hive Metastore") { - val df = Seq((1, 1)).toDF("c1", "c2") - - // test writing orc table via data source api, and read it in Hive - df.write.format("orc").saveAsTable("spark_7550_orc") - checkAnswer(sql("select * from spark_7550_orc"), Row(1, 1) :: Nil) // run with spark - val hiveResult1 = runSqlHive("select * from spark_7550_orc limit 2") // run with hive - assert(hiveResult1.length === 1) - assert(hiveResult1(0) === "1\t1") - - // test writing parquet table via data source api, and read it in Hive - df.write.format("parquet").saveAsTable("spark_7550_parquet") - checkAnswer(sql("select * from spark_7550_parquet"), Row(1, 1) :: Nil) // run with spark - val hiveResult2 = runSqlHive("select * from spark_7550_parquet limit 2") // run with hive - assert(hiveResult2.length === 1) - assert(hiveResult2(0) === "1\t1") - - // test writing json table via data source api - // will not throw exception, but will log warning, - // as Hive is not able to load data from this table - df.write.format("json").saveAsTable("spark_7550_json") - } - test("SPARK-6851: Self-joined converted parquet tables") { val orders = Seq( Order(1, "Atlas", "MTB", 234, "2015-01-07", "John D", "Pacifica", "CA", 20151), From 38701669121ce9e6a32d1878ece5efe41cd7e612 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 6 Aug 2015 02:02:33 +0800 Subject: [PATCH 4/5] Fixes build error and comments --- .../org/apache/spark/sql/DataFrame.scala | 45 +++++++++++-------- .../apache/spark/sql/DataFrameWriter.scala | 7 +++ .../org/apache/spark/sql/hive/HiveQl.scala | 2 +- 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 24c0f9cdfec65..dd6be8c24fc10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -20,9 +20,6 @@ package org.apache.spark.sql import java.io.CharArrayWriter import java.util.Properties -import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.unsafe.types.UTF8String - import scala.language.implicitConversions import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -42,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD, SQLExecution} import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} -import org.apache.spark.sql.json.{JacksonGenerator, JSONRelation} +import org.apache.spark.sql.json.JacksonGenerator import org.apache.spark.sql.sources.HadoopFsRelation import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel @@ -1671,9 +1668,11 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * This function always persists table metadata into Hive's metastore. But the table is - * not accessible from Hive unless the underlying data source is either Parquet or ORC. - * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input + * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC + * and Parquet), the table is persisted in a Hive compatible format, which means other systems + * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL + * specific format. * * @group output * @deprecated As of 1.4.0, replaced by `write().mode(mode).saveAsTable(tableName)`. @@ -1693,9 +1692,11 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * This function always persists table metadata into Hive's metastore. But the table is - * not accessible from Hive unless the underlying data source is either Parquet or ORC. - * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input + * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC + * and Parquet), the table is persisted in a Hive compatible format, which means other systems + * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL + * specific format. * * @group output * @deprecated As of 1.4.0, replaced by `write().format(source).saveAsTable(tableName)`. @@ -1715,9 +1716,11 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * This function always persists table metadata into Hive's metastore. But the table is - * not accessible from Hive unless the underlying data source is either Parquet or ORC. - * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input + * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC + * and Parquet), the table is persisted in a Hive compatible format, which means other systems + * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL + * specific format. * * @group output * @deprecated As of 1.4.0, replaced by `write().mode(mode).saveAsTable(tableName)`. @@ -1736,9 +1739,11 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * This function always persists table metadata into Hive's metastore. But the table is - * not accessible from Hive unless the underlying data source is either Parquet or ORC. - * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input + * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC + * and Parquet), the table is persisted in a Hive compatible format, which means other systems + * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL + * specific format. * * @group output * @deprecated As of 1.4.0, replaced by @@ -1764,9 +1769,11 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * This function always persists table metadata into Hive's metastore. But the table is - * not accessible from Hive unless the underlying data source is either Parquet or ORC. - * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input + * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC + * and Parquet), the table is persisted in a Hive compatible format, which means other systems + * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL + * specific format. * * @group output * @deprecated As of 1.4.0, replaced by diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 7e3318cefe62c..2a4992db09bc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource} import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils} +import org.apache.spark.sql.sources.HadoopFsRelation /** @@ -185,6 +186,12 @@ final class DataFrameWriter private[sql](df: DataFrame) { * When `mode` is `Append`, the schema of the [[DataFrame]] need to be * the same as that of the existing table, and format or options will be ignored. * + * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input + * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC + * and Parquet), the table is persisted in a Hive compatible format, which means other systems + * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL + * specific format. + * * @since 1.4.0 */ def saveAsTable(tableName: String): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 5f400c26ce04c..7d7b4b9167306 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -889,7 +889,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } (Nil, Some(BaseSemanticAnalyzer.unescapeSQLString(serdeClass)), serdeProps) - case Nil => (Nil, Option(hiveConf().getVar(ConfVars.HIVESCRIPTSERDE)), Nil) + case Nil => (Nil, Option(hiveConf.getVar(ConfVars.HIVESCRIPTSERDE)), Nil) } val (inRowFormat, inSerdeClass, inSerdeProps) = matchSerDe(inputSerdeClause) From 5175ee660705f6594d55a896a9e4b5474d49e121 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 6 Aug 2015 08:46:46 +0800 Subject: [PATCH 5/5] Fixes an oudated comment --- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index dd6be8c24fc10..405b5a4a9a7f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1647,9 +1647,11 @@ class DataFrame private[sql]( * an RDD out to a parquet file, and then register that file as a table. This "table" can then * be the target of an `insertInto`. * - * This function always persists table metadata into Hive's metastore. But the table is - * not accessible from Hive unless the underlying data source is either Parquet or ORC. - * And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false. + * When the DataFrame is created from a non-partitioned [[HadoopFsRelation]] with a single input + * path, and the data source provider can be mapped to an existing Hive builtin SerDe (i.e. ORC + * and Parquet), the table is persisted in a Hive compatible format, which means other systems + * like Hive will be able to read this table. Otherwise, the table is persisted in a Spark SQL + * specific format. * * @group output * @deprecated As of 1.4.0, replaced by `write().saveAsTable(tableName)`.