Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,27 @@ class ParquetFileFormat
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)

// Pass case insensitivity config param to ParquetReadSupport.
val caseInsensitive = sparkSession.sessionState.conf.parquetCaseInsensitiveResolution
hadoopConf.setBoolean(
SQLConf.PARQUET_CASE_INSENSITIVE_RESOLUTION.key,
caseInsensitive)

// Try to push down filters when filter push-down is enabled.
val pushed =
if (sparkSession.sessionState.conf.parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
if (caseInsensitive) {
logWarning("Parquet filter push-down may not be used when using case-insensitive field " +
"resolution; disabling.")
None
} else {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can
// be converted (`ParquetFilters.createFilter` returns an `Option`). That's why a
// `flatMap` is used here.
.flatMap(ParquetFilters.createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
}
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.parquet.schema.Type.Repetition

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand All @@ -56,15 +57,19 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo
* readers. Responsible for figuring out Parquet requested schema used for column pruning.
*/
override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration
catalystRequestedSchema = {
val conf = context.getConfiguration
val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
assert(schemaString != null, "Parquet requested schema not set.")
StructType.fromString(schemaString)
}

val parquetRequestedSchema =
ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
val caseInsensitive = conf.get(SQLConf.PARQUET_CASE_INSENSITIVE_RESOLUTION.key)
assert(caseInsensitive != null, "Parquet case insensitivity param not set.")
val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
context.getFileSchema,
catalystRequestedSchema,
caseInsensitive.toBoolean)

new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
}
Expand Down Expand Up @@ -108,8 +113,14 @@ private[parquet] object ParquetReadSupport {
* Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist
* in `catalystSchema`, and adding those only exist in `catalystSchema`.
*/
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
def clipParquetSchema(
parquetSchema: MessageType,
catalystSchema: StructType,
caseInsensitive: Boolean = false): MessageType = {
val clippedParquetFields = clipParquetGroupFields(
parquetSchema.asGroupType(),
catalystSchema,
caseInsensitive)
if (clippedParquetFields.isEmpty) {
ParquetSchemaConverter.EMPTY_MESSAGE
} else {
Expand All @@ -120,20 +131,23 @@ private[parquet] object ParquetReadSupport {
}
}

private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
private def clipParquetType(
parquetType: Type,
catalystType: DataType,
caseInsensitive: Boolean): Type = {
catalystType match {
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
// Only clips array types with nested type as element type.
clipParquetListType(parquetType.asGroupType(), t.elementType)
clipParquetListType(parquetType.asGroupType(), t.elementType, caseInsensitive)

case t: MapType
if !isPrimitiveCatalystType(t.keyType) ||
!isPrimitiveCatalystType(t.valueType) =>
// Only clips map types with nested key type or value type
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, caseInsensitive)

case t: StructType =>
clipParquetGroup(parquetType.asGroupType(), t)
clipParquetGroup(parquetType.asGroupType(), t, caseInsensitive)

case _ =>
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
Expand All @@ -159,14 +173,17 @@ private[parquet] object ParquetReadSupport {
* of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
* [[StructType]].
*/
private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = {
private def clipParquetListType(
parquetList: GroupType,
elementType: DataType,
caseInsensitive: Boolean): Type = {
// Precondition of this method, should only be called for lists with nested element types.
assert(!isPrimitiveCatalystType(elementType))

// Unannotated repeated group should be interpreted as required list of required element, so
// list element type is just the group itself. Clip it.
if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
clipParquetType(parquetList, elementType)
clipParquetType(parquetList, elementType, caseInsensitive)
} else {
assert(
parquetList.getOriginalType == OriginalType.LIST,
Expand Down Expand Up @@ -198,7 +215,7 @@ private[parquet] object ParquetReadSupport {
Types
.buildGroup(parquetList.getRepetition)
.as(OriginalType.LIST)
.addField(clipParquetType(repeatedGroup, elementType))
.addField(clipParquetType(repeatedGroup, elementType, caseInsensitive))
.named(parquetList.getName)
} else {
// Otherwise, the repeated field's type is the element type with the repeated field's
Expand All @@ -209,7 +226,7 @@ private[parquet] object ParquetReadSupport {
.addField(
Types
.repeatedGroup()
.addField(clipParquetType(repeatedGroup.getType(0), elementType))
.addField(clipParquetType(repeatedGroup.getType(0), elementType, caseInsensitive))
.named(repeatedGroup.getName))
.named(parquetList.getName)
}
Expand All @@ -222,7 +239,10 @@ private[parquet] object ParquetReadSupport {
* a [[StructType]].
*/
private def clipParquetMapType(
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
parquetMap: GroupType,
keyType: DataType,
valueType: DataType,
caseInsensitive: Boolean): GroupType = {
// Precondition of this method, only handles maps with nested key types or value types.
assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))

Expand All @@ -234,8 +254,8 @@ private[parquet] object ParquetReadSupport {
Types
.repeatedGroup()
.as(repeatedGroup.getOriginalType)
.addField(clipParquetType(parquetKeyType, keyType))
.addField(clipParquetType(parquetValueType, valueType))
.addField(clipParquetType(parquetKeyType, keyType, caseInsensitive))
.addField(clipParquetType(parquetValueType, valueType, caseInsensitive))
.named(repeatedGroup.getName)

Types
Expand All @@ -253,8 +273,11 @@ private[parquet] object ParquetReadSupport {
* [[MessageType]]. Because it's legal to construct an empty requested schema for column
* pruning.
*/
private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = {
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType)
private def clipParquetGroup(
parquetRecord: GroupType,
structType: StructType,
caseInsensitive: Boolean): GroupType = {
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseInsensitive)
Types
.buildGroup(parquetRecord.getRepetition)
.as(parquetRecord.getOriginalType)
Expand All @@ -268,13 +291,19 @@ private[parquet] object ParquetReadSupport {
* @return A list of clipped [[GroupType]] fields, which can be empty.
*/
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
parquetRecord: GroupType,
structType: StructType,
caseInsensitive: Boolean): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map { f =>
val name = if (caseInsensitive) f.getName.toLowerCase else f.getName
name -> f
}.toMap
val toParquet = new ParquetSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
val name = if (caseInsensitive) f.name.toLowerCase else f.name
parquetFieldMap
.get(f.name)
.map(clipParquetType(_, f.dataType))
.get(name)
.map(clipParquetType(_, f.dataType, caseInsensitive))
.getOrElse(toParquet.convertField(f))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val PARQUET_CASE_INSENSITIVE_RESOLUTION =
SQLConfigBuilder("spark.sql.parquet.caseInsensitiveResolution")
.doc("Whether to use case insensitive matching when resolving Parquet columns by "
+ "their field names. Disables Parquet filter push-down if enabled. Defaults to false.")
.booleanConf
.createWithDefault(false)

val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
Expand Down Expand Up @@ -742,6 +749,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)

def parquetCaseInsensitiveResolution: Boolean = getConf(PARQUET_CASE_INSENSITIVE_RESOLUTION)

def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)

def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,41 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
assert(option.compressionCodecClassName == "UNCOMPRESSED")
}
}

// Case class must be defined outside of test scope so TypeTag can be resolved.
case class ExampleType(columnA: String, columnB: Integer)

test("SPARK-19455: Support case insensitive Parquet field resolution") {
val lowercaseMetastoreSchema = StructType(
StructField("columna", StringType, true) ::
StructField("columnb", IntegerType, true) :: Nil)

val tableName = "temp_external_table"
spark.catalog.createExternalTable(
tableName,
"org.apache.spark.sql.parquet",
lowercaseMetastoreSchema,
Map.empty[String, String])

val testData = Seq(
ExampleType(columnA = "foo", columnB = 0),
ExampleType(columnA = "bar", columnB = 2),
ExampleType(columnA = "baz", columnB = 4),
ExampleType(columnA = "bat", columnB = 6))

withSQLConf(
SQLConf.PARQUET_CASE_INSENSITIVE_RESOLUTION.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "false") {
withParquetFile[ExampleType](testData) { path =>
spark.sql(s"""ALTER TABLE $tableName SET LOCATION "$path"""")
spark.catalog.refreshTable(tableName)
val totalCount = spark.sql(s"SELECT COUNT(columna) FROM $tableName")
assert(totalCount.first.getLong(0) == 4L)
val filteredCount = spark.sql(s"SELECT COUNT(*) FROM $tableName WHERE columnb > 2")
assert(filteredCount.first.getLong(0) == 2L)
}
}
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1051,19 +1051,21 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: String): Unit = {
expectedSchema: String,
caseInsensitive: Boolean = true): Unit = {
testSchemaClipping(testName, parquetSchema, catalystSchema,
MessageTypeParser.parseMessageType(expectedSchema))
MessageTypeParser.parseMessageType(expectedSchema), caseInsensitive)
}

private def testSchemaClipping(
testName: String,
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: MessageType): Unit = {
expectedSchema: MessageType,
caseInsensitive: Boolean): Unit = {
test(s"Clipping - $testName") {
val actual = ParquetReadSupport.clipParquetSchema(
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseInsensitive)

try {
expectedSchema.checkContains(actual)
Expand All @@ -1080,6 +1082,36 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}

testSchemaClipping(
"falls back to case insensitive resolution",

parquetSchema =
"""message root {
| required group A {
| optional int32 B;
| }
| optional int32 c;
|}
""".stripMargin,

catalystSchema = {
val nestedType = new StructType().add("b", IntegerType, nullable = true)
new StructType()
.add("a", nestedType, nullable = true)
.add("c", IntegerType, nullable = true)
},

expectedSchema =
"""message root {
| required group A {
| optional int32 B;
| }
| optional int32 c;
|}
""".stripMargin,

caseInsensitive = true)

testSchemaClipping(
"simple nested struct",

Expand Down Expand Up @@ -1424,7 +1456,9 @@ class ParquetSchemaSuite extends ParquetSchemaTest {

catalystSchema = new StructType(),

expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE)
expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE,

caseInsensitive = false)

testSchemaClipping(
"disjoint field sets",
Expand Down