diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 805b3171cdaa..3592b8f4846d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -189,12 +189,12 @@ case class RelationConversions( private def convert(relation: HiveTableRelation): LogicalRelation = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) if (serde.contains("parquet")) { - val options = Map(ParquetOptions.MERGE_SCHEMA -> + val options = relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) sessionCatalog.metastoreCatalog .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") } else { - val options = Map[String, String]() + val options = relation.tableMeta.storage.properties sessionCatalog.metastoreCatalog .convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 668da5fb4732..02e26bbe876a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -23,6 +23,8 @@ import java.net.URI import scala.language.existentials import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.apache.parquet.hadoop.ParquetFileReader import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkException @@ -32,6 +34,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAl import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -1455,12 +1458,8 @@ class HiveDDLSuite sql("INSERT INTO t SELECT 1") checkAnswer(spark.table("t"), Row(1)) // Check if this is compressed as ZLIB. - val maybeOrcFile = path.listFiles().find(!_.getName.endsWith(".crc")) - assert(maybeOrcFile.isDefined) - val orcFilePath = maybeOrcFile.get.toPath.toString - val expectedCompressionKind = - OrcFileOperator.getFileReader(orcFilePath).get.getCompression - assert("ZLIB" === expectedCompressionKind.name()) + val maybeOrcFile = path.listFiles().find(_.getName.startsWith("part")) + assertCompression(maybeOrcFile, "orc", "ZLIB") sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2") val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) @@ -2009,4 +2008,47 @@ class HiveDDLSuite } } } + + private def assertCompression(maybeFile: Option[File], format: String, compression: String) = { + assert(maybeFile.isDefined) + + val actualCompression = format match { + case "orc" => + OrcFileOperator.getFileReader(maybeFile.get.toPath.toString).get.getCompression.name + + case "parquet" => + val footer = ParquetFileReader.readFooter( + sparkContext.hadoopConfiguration, new Path(maybeFile.get.getPath), NO_FILTER) + footer.getBlocks.get(0).getColumns.get(0).getCodec.toString + } + + assert(compression === actualCompression) + } + + Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat, compression) => + test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") { + withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") { + withTable("t") { + withTempPath { path => + sql( + s""" + |CREATE TABLE t(id int) USING hive + |OPTIONS(fileFormat '$fileFormat', compression '$compression') + |LOCATION '${path.toURI}' + """.stripMargin) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(DDLUtils.isHiveTable(table)) + assert(table.storage.serde.get.contains(fileFormat)) + assert(table.storage.properties.get("compression") == Some(compression)) + assert(spark.table("t").collect().isEmpty) + + sql("INSERT INTO t SELECT 1") + checkAnswer(spark.table("t"), Row(1)) + val maybeFile = path.listFiles().find(_.getName.startsWith("part")) + assertCompression(maybeFile, fileFormat, compression) + } + } + } + } + } }