diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 1d3c4663c339..2faf5d0e98a5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -100,6 +100,33 @@ private[sql] class OrcFileFormat true } + def mapRequiredColumns( + conf: Configuration, + dataSchema: StructType, + physicalSchema: StructType, + requiredSchema: StructType): StructType = { + + // requiredSchema names might not match with physical schema names. + // This is especially true when data is generated via Hive wherein + // orc files would have column names as _col0, _col1 etc. This is + // fixed in Hive 2.0, where in physical col names would match that + // of metastore. To make it backward compatible, it is required to + // map physical names to that of requiredSchema. + + // for requiredSchema, get the ordinal from dataSchema + val ids = requiredSchema.map(a => dataSchema.fieldIndex(a.name): Integer).sorted + + // for ids, get corresponding name from physicalSchema (e.g _col1 in + // case of hive. otherwise it would match physical name) + val names = ids.map(i => physicalSchema.fieldNames(i)) + HiveShim.appendReadColumns(conf, ids, names) + + val mappedReqPhysicalSchemaStruct = + StructType(physicalSchema.filter(struct => names.contains(struct.name))) + + mappedReqPhysicalSchemaStruct + } + override def buildReader( sparkSession: SparkSession, dataSchema: StructType, @@ -130,7 +157,9 @@ private[sql] class OrcFileFormat Iterator.empty } else { val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) + // Get StructType for newly mapped schema + val mappedReqPhysicalSchema = + mapRequiredColumns(conf, dataSchema, physicalSchema, requiredSchema) val orcRecordReader = { val job = Job.getInstance(conf) @@ -151,7 +180,7 @@ private[sql] class OrcFileFormat // Unwraps `OrcStruct`s to `UnsafeRow`s OrcRelation.unwrapOrcStructs( conf, - requiredSchema, + mappedReqPhysicalSchema, Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), new RecordReaderIterator[OrcStruct](orcRecordReader)) } @@ -306,11 +335,4 @@ private[orc] object OrcRelation extends HiveInspectors { maybeStructOI.map(unwrap).getOrElse(Iterator.empty) } - - def setRequiredColumns( - conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { - val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) - val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip - HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) - } } diff --git a/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc b/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc new file mode 100644 index 000000000000..fb35cb0df83c Binary files /dev/null and b/sql/hive/src/test/resources/data/files/hive_1.x_orc/dummy_orc.orc differ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 49e963ee1294..8642a81a18d9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.orc +import java.io.File import java.nio.charset.StandardCharsets import org.scalatest.BeforeAndAfterAll @@ -401,6 +402,31 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + def getHiveFile(path: String): File = { + new File(Thread.currentThread().getContextClassLoader.getResource(path).getFile) + } + + test("Verify ORC conversion parameter: CONVERT_METASTORE_ORC with Hive-1.x files") { + val singleRowDF = Seq((2415022, "AAAAAAAAOKJNECAA")).toDF("key", "value") + Seq("true", "false").foreach { orcConversion => + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> orcConversion) { + withTable("dummy_orc") { + // Hive 1.x can have virtual columns as follows in ORC files + // Type: struct<_col0:int,_col1:string> in hive_1.x_orc + spark.sql( + s""" + |CREATE EXTERNAL TABLE dummy_orc(key INT, value STRING) + |STORED AS ORC + |LOCATION '${getHiveFile("data/files/hive_1.x_orc/")}' + """.stripMargin) + + val df = spark.sql("SELECT key, value FROM dummy_orc LIMIT 1") + checkAnswer(df, singleRowDF) + } + } + } + } + test("Verify the ORC conversion parameter: CONVERT_METASTORE_ORC") { withTempView("single") { val singleRowDF = Seq((0, "foo")).toDF("key", "value")