Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1643,8 +1643,10 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* This function always persists table metadata into Hive's metastore. But the table is
* not accessible from Hive unless the underlying data source is either Parquet or ORC.
* And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false.
*
* @group output
* @deprecated As of 1.4.0, replaced by `write().saveAsTable(tableName)`.
*/
Expand All @@ -1662,8 +1664,10 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* This function always persists table metadata into Hive's metastore. But the table is
* not accessible from Hive unless the underlying data source is either Parquet or ORC.
* And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false.
*
* @group output
* @deprecated As of 1.4.0, replaced by `write().mode(mode).saveAsTable(tableName)`.
*/
Expand All @@ -1682,8 +1686,10 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* This function always persists table metadata into Hive's metastore. But the table is
* not accessible from Hive unless the underlying data source is either Parquet or ORC.
* And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false.
*
* @group output
* @deprecated As of 1.4.0, replaced by `write().format(source).saveAsTable(tableName)`.
*/
Expand All @@ -1702,8 +1708,10 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* This function always persists table metadata into Hive's metastore. But the table is
* not accessible from Hive unless the underlying data source is either Parquet or ORC.
* And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false.
*
* @group output
* @deprecated As of 1.4.0, replaced by `write().mode(mode).saveAsTable(tableName)`.
*/
Expand All @@ -1721,8 +1729,10 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* This function always persists table metadata into Hive's metastore. But the table is
* not accessible from Hive unless the underlying data source is either Parquet or ORC.
* And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false.
*
* @group output
* @deprecated As of 1.4.0, replaced by
* `write().format(source).mode(mode).options(options).saveAsTable(tableName)`.
Expand All @@ -1747,8 +1757,10 @@ class DataFrame private[sql](
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* This function always persists table metadata into Hive's metastore. But the table is
* not accessible from Hive unless the underlying data source is either Parquet or ORC.
* And we can disable that by setting spark.sql.hive.writeDataSourceSchema to false.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's not anymore, I forgot to update the comment in my PR...

*
* @group output
* @deprecated As of 1.4.0, replaced by
* `write().format(source).mode(mode).options(options).saveAsTable(tableName)`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.spark.sql.hive

import scala.collection.JavaConversions._
import scala.collection.mutable

import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.Warehouse
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata._
Expand All @@ -40,9 +42,59 @@ import org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}

private[hive] case class HiveSerDe(
inputFormat: Option[String] = None,
outputFormat: Option[String] = None,
serde: Option[String] = None)

private[hive] object HiveSerDe {
/**
* Get the Hive SerDe information from the data source abbreviation string or classname.
*
* @param source Currently the source abbreviation can be one of the following:
* SequenceFile, RCFile, ORC, PARQUET, and case insensitive.
* @param hiveConf Hive Conf
* @return HiveSerDe associated with the specified source
*/
def sourceToSerDe(source: String, hiveConf: HiveConf): Option[HiveSerDe] = {
val serdeMap = Map(
"sequencefile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")),

"rcfile" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE))),

"orc" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")),

"parquet" ->
HiveSerDe(
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")))

val key = source.toLowerCase match {
case _ if source.startsWith("org.apache.spark.sql.parquet") => "parquet"
case _ if source.startsWith("org.apache.spark.sql.orc") => "orc"
case _ => source.toLowerCase
}

serdeMap.get(key)
}
}

private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
extends Catalog with Logging {

Expand Down Expand Up @@ -164,15 +216,15 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
processDatabaseAndTableName(database, tableIdent.table)
}

val tableProperties = new scala.collection.mutable.HashMap[String, String]
val tableProperties = new mutable.HashMap[String, String]
tableProperties.put("spark.sql.sources.provider", provider)

// Saves optional user specified schema. Serialized JSON schema string may be too long to be
// stored into a single metastore SerDe property. In this case, we split the JSON string and
// store each part as a separate SerDe property.
if (userSpecifiedSchema.isDefined) {
userSpecifiedSchema.foreach { schema =>
val threshold = conf.schemaStringLengthThreshold
val schemaJsonString = userSpecifiedSchema.get.json
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
tableProperties.put("spark.sql.sources.schema.numParts", parts.size.toString)
Expand All @@ -194,7 +246,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// The table does not have a specified schema, which means that the schema will be inferred
// when we load the table. So, we are not expecting partition columns and we will discover
// partitions when we load the table. However, if there are specified partition columns,
// we simplily ignore them and provide a warning message..
// we simply ignore them and provide a warning message.
logWarning(
s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
Expand All @@ -210,15 +262,95 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
ManagedTable
}

client.createTable(
val maybeSerDe = HiveSerDe.sourceToSerDe(provider, hive.hiveconf)
val dataSource = ResolvedDataSource(
hive, userSpecifiedSchema, partitionColumns, provider, options)

def newSparkSQLSpecificMetastoreTable(): HiveTable = {
HiveTable(
specifiedDatabase = Option(dbName),
name = tblName,
schema = Seq.empty,
partitionColumns = metastorePartitionColumns,
tableType = tableType,
properties = tableProperties.toMap,
serdeProperties = options))
serdeProperties = options)
}

def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = {
def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = {
schema.map { field =>
HiveColumn(
name = field.name,
hiveType = HiveMetastoreTypes.toMetastoreType(field.dataType),
comment = "")
}
}

val partitionColumns = schemaToHiveColumn(relation.partitionColumns)
val dataColumns = schemaToHiveColumn(relation.schema).filterNot(partitionColumns.contains)

HiveTable(
specifiedDatabase = Option(dbName),
name = tblName,
schema = dataColumns,
partitionColumns = partitionColumns,
tableType = tableType,
properties = tableProperties.toMap,
serdeProperties = options,
location = Some(relation.paths.head),
viewText = None, // TODO We need to place the SQL string here.
inputFormat = serde.inputFormat,
outputFormat = serde.outputFormat,
serde = serde.serde)
}

// TODO: Support persisting partitioned data source relations in Hive compatible format
val hiveTable = (maybeSerDe, dataSource.relation) match {
case (Some(serde), relation: HadoopFsRelation)
if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
logInfo {
"Persisting data source relation with a single input path into Hive metastore in Hive " +
s"compatible format. Input path: ${relation.paths.head}"
}
newHiveCompatibleMetastoreTable(relation, serde)

case (Some(serde), relation: HadoopFsRelation) if relation.partitionColumns.nonEmpty =>
logWarning {
val paths = relation.paths.mkString(", ")
"Persisting partitioned data source relation into Hive metastore in " +
s"Spark SQL specific format, which is NOT compatible with Hive. Input path(s): " +
paths.mkString("\n", "\n", "")
}
newSparkSQLSpecificMetastoreTable()

case (Some(serde), relation: HadoopFsRelation) =>
logWarning {
val paths = relation.paths.mkString(", ")
"Persisting data source relation with multiple input paths into Hive metastore in " +
s"Spark SQL specific format, which is NOT compatible with Hive. Input paths: " +
paths.mkString("\n", "\n", "")
}
newSparkSQLSpecificMetastoreTable()

case (Some(serde), _) =>
logWarning {
s"Data source relation is not a ${classOf[HadoopFsRelation].getSimpleName}. " +
"Persisting it into Hive metastore in Spark SQL specific format, " +
"which is NOT compatible with Hive."
}
newSparkSQLSpecificMetastoreTable()

case _ =>
logWarning {
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
"Persisting data source relation into Hive metastore in Spark SQL specific format, " +
"which is NOT compatible with Hive."
}
newSparkSQLSpecificMetastoreTable()
}

client.createTable(hiveTable)
}

def hiveDefaultTableFilePath(tableName: String): String = {
Expand Down Expand Up @@ -464,7 +596,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
case p @ CreateTableAsSelect(table, child, allowExisting) =>
val schema = if (table.schema.size > 0) {
val schema = if (table.schema.nonEmpty) {
table.schema
} else {
child.output.map {
Expand Down
47 changes: 14 additions & 33 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -257,8 +258,8 @@ private[hive] object HiveQl extends Logging {
/**
* Returns the HiveConf
*/
private[this] def hiveConf(): HiveConf = {
val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here
private[this] def hiveConf: HiveConf = {
val ss = SessionState.get() // SessionState is lazy initialization, it can be null here
if (ss == null) {
new HiveConf()
} else {
Expand Down Expand Up @@ -605,38 +606,18 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
serde = None,
viewText = None)

// default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.)
// default storage type abbreviation (e.g. RCFile, ORC, PARQUET etc.)
val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT)
// handle the default format for the storage type abbriviation
tableDesc = if ("SequenceFile".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"))
} else if ("RCFile".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
} else if ("ORC".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
} else if ("PARQUET".equalsIgnoreCase(defaultStorageType)) {
tableDesc.copy(
inputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat =
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde =
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
} else {
tableDesc.copy(
inputFormat =
Option("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat =
Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
}
// handle the default format for the storage type abbreviation
val hiveSerDe = HiveSerDe.sourceToSerDe(defaultStorageType, hiveConf).getOrElse {
HiveSerDe(
inputFormat = Option("org.apache.hadoop.mapred.TextInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
}

hiveSerDe.inputFormat.foreach(f => tableDesc = tableDesc.copy(inputFormat = Some(f)))
hiveSerDe.outputFormat.foreach(f => tableDesc = tableDesc.copy(outputFormat = Some(f)))
hiveSerDe.serde.foreach(f => tableDesc = tableDesc.copy(serde = Some(f)))

children.collect {
case list @ Token("TOK_TABCOLLIST", _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,11 @@ private[orc] case class OrcTableScan(
// Sets requested columns
addColumnIds(attributes, relation, conf)

if (inputPaths.nonEmpty) {
FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*)
if (inputPaths.isEmpty) {
// the input path probably be pruned, return an empty RDD.
return sqlContext.sparkContext.emptyRDD[InternalRow]
}
FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*)

val inputFormatClass =
classOf[OrcInputFormat]
Expand Down
Loading