diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 828949eddc8e..561068f32b9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -315,15 +315,27 @@ class ParquetFileFormat SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + // Pass case insensitivity config param to ParquetReadSupport. + val caseInsensitive = sparkSession.sessionState.conf.parquetCaseInsensitiveResolution + hadoopConf.setBoolean( + SQLConf.PARQUET_CASE_INSENSITIVE_RESOLUTION.key, + caseInsensitive) + // Try to push down filters when filter push-down is enabled. val pushed = if (sparkSession.sessionState.conf.parquetFilterPushDown) { - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(ParquetFilters.createFilter(requiredSchema, _)) - .reduceOption(FilterApi.and) + if (caseInsensitive) { + logWarning("Parquet filter push-down may not be used when using case-insensitive field " + + "resolution; disabling.") + None + } else { + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can + // be converted (`ParquetFilters.createFilter` returns an `Option`). That's why a + // `flatMap` is used here. + .flatMap(ParquetFilters.createFilter(requiredSchema, _)) + .reduceOption(FilterApi.and) + } } else { None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index f1a35dd8a620..a3b3137c054a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -30,6 +30,7 @@ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -56,15 +57,19 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo * readers. Responsible for figuring out Parquet requested schema used for column pruning. */ override def init(context: InitContext): ReadContext = { + val conf = context.getConfiguration catalystRequestedSchema = { - val conf = context.getConfiguration val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA) assert(schemaString != null, "Parquet requested schema not set.") StructType.fromString(schemaString) } - val parquetRequestedSchema = - ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) + val caseInsensitive = conf.get(SQLConf.PARQUET_CASE_INSENSITIVE_RESOLUTION.key) + assert(caseInsensitive != null, "Parquet case insensitivity param not set.") + val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema( + context.getFileSchema, + catalystRequestedSchema, + caseInsensitive.toBoolean) new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava) } @@ -108,8 +113,14 @@ private[parquet] object ParquetReadSupport { * Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist * in `catalystSchema`, and adding those only exist in `catalystSchema`. */ - def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { - val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) + def clipParquetSchema( + parquetSchema: MessageType, + catalystSchema: StructType, + caseInsensitive: Boolean = false): MessageType = { + val clippedParquetFields = clipParquetGroupFields( + parquetSchema.asGroupType(), + catalystSchema, + caseInsensitive) if (clippedParquetFields.isEmpty) { ParquetSchemaConverter.EMPTY_MESSAGE } else { @@ -120,20 +131,23 @@ private[parquet] object ParquetReadSupport { } } - private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { + private def clipParquetType( + parquetType: Type, + catalystType: DataType, + caseInsensitive: Boolean): Type = { catalystType match { case t: ArrayType if !isPrimitiveCatalystType(t.elementType) => // Only clips array types with nested type as element type. - clipParquetListType(parquetType.asGroupType(), t.elementType) + clipParquetListType(parquetType.asGroupType(), t.elementType, caseInsensitive) case t: MapType if !isPrimitiveCatalystType(t.keyType) || !isPrimitiveCatalystType(t.valueType) => // Only clips map types with nested key type or value type - clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType) + clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, caseInsensitive) case t: StructType => - clipParquetGroup(parquetType.asGroupType(), t) + clipParquetGroup(parquetType.asGroupType(), t, caseInsensitive) case _ => // UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able @@ -159,14 +173,17 @@ private[parquet] object ParquetReadSupport { * of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a * [[StructType]]. */ - private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = { + private def clipParquetListType( + parquetList: GroupType, + elementType: DataType, + caseInsensitive: Boolean): Type = { // Precondition of this method, should only be called for lists with nested element types. assert(!isPrimitiveCatalystType(elementType)) // Unannotated repeated group should be interpreted as required list of required element, so // list element type is just the group itself. Clip it. if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { - clipParquetType(parquetList, elementType) + clipParquetType(parquetList, elementType, caseInsensitive) } else { assert( parquetList.getOriginalType == OriginalType.LIST, @@ -198,7 +215,7 @@ private[parquet] object ParquetReadSupport { Types .buildGroup(parquetList.getRepetition) .as(OriginalType.LIST) - .addField(clipParquetType(repeatedGroup, elementType)) + .addField(clipParquetType(repeatedGroup, elementType, caseInsensitive)) .named(parquetList.getName) } else { // Otherwise, the repeated field's type is the element type with the repeated field's @@ -209,7 +226,7 @@ private[parquet] object ParquetReadSupport { .addField( Types .repeatedGroup() - .addField(clipParquetType(repeatedGroup.getType(0), elementType)) + .addField(clipParquetType(repeatedGroup.getType(0), elementType, caseInsensitive)) .named(repeatedGroup.getName)) .named(parquetList.getName) } @@ -222,7 +239,10 @@ private[parquet] object ParquetReadSupport { * a [[StructType]]. */ private def clipParquetMapType( - parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = { + parquetMap: GroupType, + keyType: DataType, + valueType: DataType, + caseInsensitive: Boolean): GroupType = { // Precondition of this method, only handles maps with nested key types or value types. assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType)) @@ -234,8 +254,8 @@ private[parquet] object ParquetReadSupport { Types .repeatedGroup() .as(repeatedGroup.getOriginalType) - .addField(clipParquetType(parquetKeyType, keyType)) - .addField(clipParquetType(parquetValueType, valueType)) + .addField(clipParquetType(parquetKeyType, keyType, caseInsensitive)) + .addField(clipParquetType(parquetValueType, valueType, caseInsensitive)) .named(repeatedGroup.getName) Types @@ -253,8 +273,11 @@ private[parquet] object ParquetReadSupport { * [[MessageType]]. Because it's legal to construct an empty requested schema for column * pruning. */ - private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = { - val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType) + private def clipParquetGroup( + parquetRecord: GroupType, + structType: StructType, + caseInsensitive: Boolean): GroupType = { + val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseInsensitive) Types .buildGroup(parquetRecord.getRepetition) .as(parquetRecord.getOriginalType) @@ -268,13 +291,19 @@ private[parquet] object ParquetReadSupport { * @return A list of clipped [[GroupType]] fields, which can be empty. */ private def clipParquetGroupFields( - parquetRecord: GroupType, structType: StructType): Seq[Type] = { - val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap + parquetRecord: GroupType, + structType: StructType, + caseInsensitive: Boolean): Seq[Type] = { + val parquetFieldMap = parquetRecord.getFields.asScala.map { f => + val name = if (caseInsensitive) f.getName.toLowerCase else f.getName + name -> f + }.toMap val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false) structType.map { f => + val name = if (caseInsensitive) f.name.toLowerCase else f.name parquetFieldMap - .get(f.name) - .map(clipParquetType(_, f.dataType)) + .get(name) + .map(clipParquetType(_, f.dataType, caseInsensitive)) .getOrElse(toParquet.convertField(f)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5ba4192512a5..6074588481df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -253,6 +253,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_CASE_INSENSITIVE_RESOLUTION = + SQLConfigBuilder("spark.sql.parquet.caseInsensitiveResolution") + .doc("Whether to use case insensitive matching when resolving Parquet columns by " + + "their field names. Disables Parquet filter push-down if enabled. Defaults to false.") + .booleanConf + .createWithDefault(false) + val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf @@ -742,6 +749,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED) + def parquetCaseInsensitiveResolution: Boolean = getConf(PARQUET_CASE_INSENSITIVE_RESOLUTION) + def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index dbdcd230a4de..39f580647550 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -753,6 +753,41 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { assert(option.compressionCodecClassName == "UNCOMPRESSED") } } + + // Case class must be defined outside of test scope so TypeTag can be resolved. + case class ExampleType(columnA: String, columnB: Integer) + + test("SPARK-19455: Support case insensitive Parquet field resolution") { + val lowercaseMetastoreSchema = StructType( + StructField("columna", StringType, true) :: + StructField("columnb", IntegerType, true) :: Nil) + + val tableName = "temp_external_table" + spark.catalog.createExternalTable( + tableName, + "org.apache.spark.sql.parquet", + lowercaseMetastoreSchema, + Map.empty[String, String]) + + val testData = Seq( + ExampleType(columnA = "foo", columnB = 0), + ExampleType(columnA = "bar", columnB = 2), + ExampleType(columnA = "baz", columnB = 4), + ExampleType(columnA = "bat", columnB = 6)) + + withSQLConf( + SQLConf.PARQUET_CASE_INSENSITIVE_RESOLUTION.key -> "true", + SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "false") { + withParquetFile[ExampleType](testData) { path => + spark.sql(s"""ALTER TABLE $tableName SET LOCATION "$path"""") + spark.catalog.refreshTable(tableName) + val totalCount = spark.sql(s"SELECT COUNT(columna) FROM $tableName") + assert(totalCount.first.getLong(0) == 4L) + val filteredCount = spark.sql(s"SELECT COUNT(*) FROM $tableName WHERE columnb > 2") + assert(filteredCount.first.getLong(0) == 2L) + } + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 8a980a7eb538..f15815b463da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1051,19 +1051,21 @@ class ParquetSchemaSuite extends ParquetSchemaTest { testName: String, parquetSchema: String, catalystSchema: StructType, - expectedSchema: String): Unit = { + expectedSchema: String, + caseInsensitive: Boolean = true): Unit = { testSchemaClipping(testName, parquetSchema, catalystSchema, - MessageTypeParser.parseMessageType(expectedSchema)) + MessageTypeParser.parseMessageType(expectedSchema), caseInsensitive) } private def testSchemaClipping( testName: String, parquetSchema: String, catalystSchema: StructType, - expectedSchema: MessageType): Unit = { + expectedSchema: MessageType, + caseInsensitive: Boolean): Unit = { test(s"Clipping - $testName") { val actual = ParquetReadSupport.clipParquetSchema( - MessageTypeParser.parseMessageType(parquetSchema), catalystSchema) + MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseInsensitive) try { expectedSchema.checkContains(actual) @@ -1080,6 +1082,36 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } + testSchemaClipping( + "falls back to case insensitive resolution", + + parquetSchema = + """message root { + | required group A { + | optional int32 B; + | } + | optional int32 c; + |} + """.stripMargin, + + catalystSchema = { + val nestedType = new StructType().add("b", IntegerType, nullable = true) + new StructType() + .add("a", nestedType, nullable = true) + .add("c", IntegerType, nullable = true) + }, + + expectedSchema = + """message root { + | required group A { + | optional int32 B; + | } + | optional int32 c; + |} + """.stripMargin, + + caseInsensitive = true) + testSchemaClipping( "simple nested struct", @@ -1424,7 +1456,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest { catalystSchema = new StructType(), - expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE) + expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE, + + caseInsensitive = false) testSchemaClipping( "disjoint field sets",