diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 4d92a6704437..2ec48656d322 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -109,6 +109,33 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable true } + def mapRequiredColumns( + conf: Configuration, + dataSchema: StructType, + physicalSchema: StructType, + requiredSchema: StructType): StructType = { + + // requiredSchema names might not match with physical schema names. + // This is especially true when data is generated via Hive wherein + // orc files would have column names as _col0, _col1 etc. This is + // fixed in Hive 2.0, where in physical col names would match that + // of metastore. To make it backward compatible, it is required to + // map physical names to that of requiredSchema. + + // for requiredSchema, get the ordinal from dataSchema + val ids = requiredSchema.map(a => dataSchema.fieldIndex(a.name): Integer).sorted + + // for ids, get corresponding name from physicalSchema (e.g _col1 in + // case of hive. otherwise it would match physical name) + val names = ids.map(i => physicalSchema.fieldNames(i)) + HiveShim.appendReadColumns(conf, ids, names) + + val mappedReqPhysicalSchemaStruct = + StructType(physicalSchema.filter(struct => names.contains(struct.name))) + + mappedReqPhysicalSchemaStruct + } + override def buildReader( sparkSession: SparkSession, dataSchema: StructType, @@ -139,7 +166,9 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable Iterator.empty } else { val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) + // Get StructType for newly mapped schema + val mappedReqPhysicalSchema = + mapRequiredColumns(conf, dataSchema, physicalSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -163,7 +192,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable // Unwraps `OrcStruct`s to `UnsafeRow`s OrcRelation.unwrapOrcStructs( conf, - requiredSchema, + mappedReqPhysicalSchema, Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), recordsIterator) } diff --git a/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc b/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc new file mode 100644 index 000000000000..fb35cb0df83c Binary files /dev/null and b/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc differ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 60ccd996d6d5..8c02c9524f61 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.orc +import java.io.File import java.nio.charset.StandardCharsets import java.sql.Timestamp @@ -439,6 +440,31 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + def getHiveFile(path: String): File = { + new File(Thread.currentThread().getContextClassLoader.getResource(path).getFile) + } + + test("Verify ORC conversion parameter: CONVERT_METASTORE_ORC with Hive-1.x files") { + val singleRowDF = Seq((2415022, "AAAAAAAAOKJNECAA")).toDF("key", "value") + Seq("true", "false").foreach { orcConversion => + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> orcConversion) { + withTable("dummy_orc") { + // Hive 1.x can have virtual columns as follows in ORC files + // Type: struct<_col0:int,_col1:string> in hive_1.x_orc + spark.sql( + s""" + |CREATE EXTERNAL TABLE dummy_orc(key INT, value STRING) + |STORED AS ORC + |LOCATION '${getHiveFile("data/files/hive_1.x_orc/")}' + """.stripMargin) + + val df = spark.sql("SELECT key, value FROM dummy_orc LIMIT 1") + checkAnswer(df, singleRowDF) + } + } + } + } + test("Verify the ORC conversion parameter: CONVERT_METASTORE_ORC") { withTempView("single") { val singleRowDF = Seq((0, "foo")).toDF("key", "value") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 781de6631f32..ef7e140255a2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -187,8 +188,12 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA |STORED AS orc |LOCATION '$uri'""".stripMargin) val result = Row("a", "b ", "c", Seq("d ")) - checkAnswer(spark.table("hive_orc"), result) - checkAnswer(spark.table("spark_orc"), result) + Seq("false", "true").foreach { value => + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> value) { + checkAnswer(spark.table("hive_orc"), result) + checkAnswer(spark.table("spark_orc"), result) + } + } } finally { hiveClient.runSqlHive("DROP TABLE IF EXISTS hive_orc") hiveClient.runSqlHive("DROP TABLE IF EXISTS spark_orc")