Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,69 +309,68 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
}

// TODO: Support persisting partitioned data source relations in Hive compatible format
val hiveTable = (maybeSerDe, dataSource.relation) match {
val qualifiedTableName = tableIdent.quotedString
val (hiveCompitiableTable, logMessage) = (maybeSerDe, dataSource.relation) match {
case (Some(serde), relation: HadoopFsRelation)
if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
// Hive ParquetSerDe doesn't support decimal type until 1.2.0.
val isParquetSerDe = serde.inputFormat.exists(_.toLowerCase.contains("parquet"))
val hasDecimalFields = relation.schema.existsRecursively(_.isInstanceOf[DecimalType])

val hiveParquetSupportsDecimal = client.version match {
case org.apache.spark.sql.hive.client.hive.v1_2 => true
case _ => false
}

if (isParquetSerDe && !hiveParquetSupportsDecimal && hasDecimalFields) {
// If Hive version is below 1.2.0, we cannot save Hive compatible schema to
// metastore when the file format is Parquet and the schema has DecimalType.
logWarning {
"Persisting Parquet relation with decimal field(s) into Hive metastore in Spark SQL " +
"specific format, which is NOT compatible with Hive. Because ParquetHiveSerDe in " +
s"Hive ${client.version.fullVersion} doesn't support decimal type. See HIVE-6384."
}
newSparkSQLSpecificMetastoreTable()
} else {
logInfo {
"Persisting data source relation with a single input path into Hive metastore in " +
s"Hive compatible format. Input path: ${relation.paths.head}"
}
newHiveCompatibleMetastoreTable(relation, serde)
}
if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
val message =
s"Persisting data source relation $qualifiedTableName with a single input path " +
s"into Hive metastore in Hive compatible format. Input path: ${relation.paths.head}."
(Some(hiveTable), message)

case (Some(serde), relation: HadoopFsRelation) if relation.partitionColumns.nonEmpty =>
logWarning {
"Persisting partitioned data source relation into Hive metastore in " +
s"Spark SQL specific format, which is NOT compatible with Hive. Input path(s): " +
relation.paths.mkString("\n", "\n", "")
}
newSparkSQLSpecificMetastoreTable()
val message =
s"Persisting partitioned data source relation $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
"Input path(s): " + relation.paths.mkString("\n", "\n", "")
(None, message)

case (Some(serde), relation: HadoopFsRelation) =>
logWarning {
"Persisting data source relation with multiple input paths into Hive metastore in " +
s"Spark SQL specific format, which is NOT compatible with Hive. Input paths: " +
relation.paths.mkString("\n", "\n", "")
}
newSparkSQLSpecificMetastoreTable()
val message =
s"Persisting data source relation $qualifiedTableName with multiple input paths into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " +
s"Input paths: " + relation.paths.mkString("\n", "\n", "")
(None, message)

case (Some(serde), _) =>
logWarning {
s"Data source relation is not a ${classOf[HadoopFsRelation].getSimpleName}. " +
"Persisting it into Hive metastore in Spark SQL specific format, " +
"which is NOT compatible with Hive."
}
newSparkSQLSpecificMetastoreTable()
val message =
s"Data source relation $qualifiedTableName is not a " +
s"${classOf[HadoopFsRelation].getSimpleName}. Persisting it into Hive metastore " +
"in Spark SQL specific format, which is NOT compatible with Hive."
(None, message)

case _ =>
logWarning {
val message =
s"Couldn't find corresponding Hive SerDe for data source provider $provider. " +
"Persisting data source relation into Hive metastore in Spark SQL specific format, " +
"which is NOT compatible with Hive."
}
newSparkSQLSpecificMetastoreTable()
s"Persisting data source relation $qualifiedTableName into Hive metastore in " +
s"Spark SQL specific format, which is NOT compatible with Hive."
(None, message)
}

client.createTable(hiveTable)
(hiveCompitiableTable, logMessage) match {
case (Some(table), message) =>
// We first try to save the metadata of the table in a Hive compatiable way.
// If Hive throws an error, we fall back to save its metadata in the Spark SQL
// specific way.
try {
logInfo(message)
client.createTable(table)
} catch {
case throwable: Throwable =>
val warningMessage =
s"Could not persist $qualifiedTableName in a Hive compatible way. Persisting " +
s"it into Hive metastore in Spark SQL specific format."
logWarning(warningMessage, throwable)
val sparkSqlSpecificTable = newSparkSQLSpecificMetastoreTable()
client.createTable(sparkSqlSpecificTable)
}

case (None, message) =>
logWarning(message)
val hiveTable = newSparkSQLSpecificMetastoreTable()
client.createTable(hiveTable)
}
}

def hiveDefaultTableFilePath(tableName: String): String = {
Expand Down