Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.execution.datasources

import scala.collection.mutable

import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}


/**
Expand Down Expand Up @@ -49,10 +51,16 @@ case class HadoopFsRelation(
override def sqlContext: SQLContext = sparkSession.sqlContext

val schema: StructType = {
val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
StructType(dataSchema ++ partitionSchema.filterNot { column =>
dataSchemaColumnNames.contains(column.name.toLowerCase)
})
val getColName: (StructField => String) =
if (sparkSession.sessionState.conf.caseSensitiveAnalysis) _.name else _.name.toLowerCase
val overlappedPartCols = mutable.Map.empty[String, StructField]
partitionSchema.foreach { partitionField =>
if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
overlappedPartCols += getColName(partitionField) -> partitionField
}
}
StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))
}

def partitionSchemaOption: Option[StructType] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -969,4 +969,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
))
}
}

test("SPARK-18108 Parquet reader fails when data column types conflict with partition ones") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = Seq((1L, 2.0)).toDF("a", "b")
df.write.parquet(s"$path/a=1")
checkAnswer(spark.read.parquet(s"$path"), Seq(Row(1, 2.0)))
}
}
}
}