From d16dbab1a7a4b010fcad63317d9a51a74fe5e92b Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 4 Apr 2019 10:33:45 +0800 Subject: [PATCH 1/4] prune partition values --- .../v2/FilePartitionReaderFactory.scala | 13 ------ .../execution/datasources/v2/FileScan.scala | 40 +++++++++++++++++-- .../datasources/v2/FileScanBuilder.scala | 40 ++++++++++++++++--- .../datasources/v2/TextBasedFileScan.scala | 5 ++- .../v2/csv/CSVPartitionReaderFactory.scala | 5 +-- .../datasources/v2/csv/CSVScan.scala | 13 +++--- .../datasources/v2/csv/CSVScanBuilder.scala | 5 ++- .../v2/orc/OrcPartitionReaderFactory.scala | 35 ++++++++-------- .../datasources/v2/orc/OrcScan.scala | 11 +++-- .../datasources/v2/orc/OrcScanBuilder.scala | 5 ++- 10 files changed, 112 insertions(+), 60 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala index 1daf8ae72b63..d053ea98f8b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReaderFactory.scala @@ -46,19 +46,6 @@ abstract class FilePartitionReaderFactory extends PartitionReaderFactory { def buildColumnarReader(partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = { throw new UnsupportedOperationException("Cannot create columnar reader.") } - - protected def getReadDataSchema( - readSchema: StructType, - partitionSchema: StructType, - isCaseSensitive: Boolean): StructType = { - val partitionNameSet = - partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet - val fields = readSchema.fields.filterNot { field => - partitionNameSet.contains(PartitioningUtils.getColName(field, isCaseSensitive)) - } - - StructType(fields) - } } // A compound class for combining file and its corresponding reader. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index e971fd762efe..c4c27ba9f2af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -16,9 +16,13 @@ */ package org.apache.spark.sql.execution.datasources.v2 +import java.util.Locale + import org.apache.hadoop.fs.Path -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan} @@ -28,7 +32,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap abstract class FileScan( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, - readSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, options: CaseInsensitiveStringMap) extends Scan with Batch { /** * Returns whether a file with `path` could be split or not. @@ -40,7 +45,23 @@ abstract class FileScan( protected def partitions: Seq[FilePartition] = { val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) + val partitionAttributes = fileIndex.partitionSchema.toAttributes + val attributeMap = partitionAttributes.map(a => getAttributeName(a) -> a).toMap + val readPartitionAttributes = readPartitionSchema.toAttributes.map { readAttr => + attributeMap.get(getAttributeName(readAttr)).getOrElse { + throw new AnalysisException(s"Can't find required partition column ${readAttr.name} " + + s"in partition schema ${fileIndex.partitionSchema}") + } + } + lazy val partitionValueProject = + GenerateUnsafeProjection.generate(readPartitionAttributes, partitionAttributes) val splitFiles = selectedPartitions.flatMap { partition => + // Prune partition values if part of the partition columns are not required. + val partitionValues = if (readPartitionAttributes != partitionAttributes) { + partitionValueProject(partition.values).copy() + } else { + partition.values + } partition.files.flatMap { file => val filePath = file.getPath PartitionedFileUtil.splitFiles( @@ -49,7 +70,7 @@ abstract class FileScan( filePath = filePath, isSplitable = isSplitable(filePath), maxSplitBytes = maxSplitBytes, - partitionValues = partition.values + partitionValues = partitionValues ) }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) } @@ -61,4 +82,17 @@ abstract class FileScan( } override def toBatch: Batch = this + + override def readSchema(): StructType = + StructType(readDataSchema.fields ++ readPartitionSchema.fields) + + private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + + private def getAttributeName(a: AttributeReference): String = { + if (isCaseSensitive) { + a.name + } else { + a.name.toLowerCase(Locale.ROOT) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala index d4e55a50307d..ca6c767b80d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala @@ -16,15 +16,43 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, PartitioningUtils} +import org.apache.spark.sql.sources.v2.reader.{ScanBuilder, SupportsPushDownRequiredColumns} import org.apache.spark.sql.types.StructType -abstract class FileScanBuilder(schema: StructType) - extends ScanBuilder - with SupportsPushDownRequiredColumns { - protected var readSchema = schema +abstract class FileScanBuilder( + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { + private val partitionSchema = fileIndex.partitionSchema + private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { - this.readSchema = requiredSchema + this.requiredSchema = requiredSchema } + + protected def readDataSchema: StructType = { + val fields = dataSchema.fields.filter { field => + val colName = PartitioningUtils.getColName(field, isCaseSensitive) + requiredNameSet.contains(colName) && !partitionNameSet.contains(colName) + } + StructType(fields) + } + + protected def readPartitionSchema: StructType = { + val fields = partitionSchema.fields.filter { field => + val colName = PartitioningUtils.getColName(field, isCaseSensitive) + requiredNameSet.contains(colName) + } + StructType(fields) + } + + // Define as method instead of value, since `requiredSchema` is mutable. + private def requiredNameSet: Set[String] = + requiredSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet + + private val partitionNameSet: Set[String] = + partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala index 8d9cc68417ef..8d5ebeeda6e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala @@ -29,9 +29,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap abstract class TextBasedFileScan( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, - readSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, options: CaseInsensitiveStringMap) - extends FileScan(sparkSession, fileIndex, readSchema, options) { + extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) { private var codecFactory: CompressionCodecFactory = _ override def isSplitable(path: Path): Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala index e2d50282e9cb..bea4a6ff15db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala @@ -41,16 +41,13 @@ case class CSVPartitionReaderFactory( sqlConf: SQLConf, broadcastedConf: Broadcast[SerializableConfiguration], dataSchema: StructType, + readDataSchema: StructType, partitionSchema: StructType, - readSchema: StructType, parsedOptions: CSVOptions) extends FilePartitionReaderFactory { private val columnPruning = sqlConf.csvColumnPruning - private val readDataSchema = - getReadDataSchema(readSchema, partitionSchema, sqlConf.caseSensitiveAnalysis) override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value - val parser = new UnivocityParser( StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), StructType(readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index 8f2f8f256731..5bc8029b4068 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -35,9 +35,10 @@ case class CSVScan( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, dataSchema: StructType, - readSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, options: CaseInsensitiveStringMap) - extends TextBasedFileScan(sparkSession, fileIndex, readSchema, options) { + extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) { private lazy val parsedOptions: CSVOptions = new CSVOptions( options.asScala.toMap, @@ -53,8 +54,8 @@ case class CSVScan( // Check a field requirement for corrupt records here to throw an exception in a driver side ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) - if (readSchema.length == 1 && - readSchema.head.name == parsedOptions.columnNameOfCorruptRecord) { + if (readDataSchema.length == 1 && + readDataSchema.head.name == parsedOptions.columnNameOfCorruptRecord) { throw new AnalysisException( "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" + "referenced columns only include the internal corrupt record column\n" + @@ -72,7 +73,9 @@ case class CSVScan( val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) + // The partition values are already truncated in `FileScan.partitions`. + // We should use `readPartitionSchema` as the partition schema here. CSVPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, fileIndex.partitionSchema, readSchema, parsedOptions) + dataSchema, readDataSchema, readPartitionSchema, parsedOptions) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala index dbb3c03ca981..7e856e63822b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala @@ -29,9 +29,10 @@ case class CSVScanBuilder( fileIndex: PartitioningAwareFileIndex, schema: StructType, dataSchema: StructType, - options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) { + options: CaseInsensitiveStringMap) + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { override def build(): Scan = { - CSVScan(sparkSession, fileIndex, dataSchema, readSchema, options) + CSVScan(sparkSession, fileIndex, dataSchema, readDataSchema, readPartitionSchema, options) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 1da9469909f1..ef19e16cca99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -52,24 +52,24 @@ import org.apache.spark.util.SerializableConfiguration case class OrcPartitionReaderFactory( sqlConf: SQLConf, broadcastedConf: Broadcast[SerializableConfiguration], + resultSchema: StructType, dataSchema: StructType, - partitionSchema: StructType, - readSchema: StructType) extends FilePartitionReaderFactory { + readDataSchema: StructType, + partitionSchema: StructType) extends FilePartitionReaderFactory { private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val capacity = sqlConf.orcVectorizedReaderBatchSize override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled && - readSchema.length <= sqlConf.wholeStageMaxNumFields && - readSchema.forall(_.dataType.isInstanceOf[AtomicType]) + resultSchema.length <= sqlConf.wholeStageMaxNumFields && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) } override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { val conf = broadcastedConf.value.value - val readDataSchema = getReadDataSchema(readSchema, partitionSchema, isCaseSensitive) - val readDataSchemaString = OrcUtils.orcTypeDescriptionString(readDataSchema) - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, readDataSchemaString) + val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) + OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive) val filePath = new Path(new URI(file.filePath)) @@ -113,8 +113,8 @@ case class OrcPartitionReaderFactory( override def buildColumnarReader(file: PartitionedFile): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value - val readSchemaString = OrcUtils.orcTypeDescriptionString(readSchema) - OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, readSchemaString) + val resultSchemaString = OrcUtils.orcTypeDescriptionString(resultSchema) + OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive) val filePath = new Path(new URI(file.filePath)) @@ -124,13 +124,13 @@ case class OrcPartitionReaderFactory( val reader = OrcFile.createReader(filePath, readerOptions) val requestedColIdsOrEmptyFile = OrcUtils.requestedColumnIds( - isCaseSensitive, dataSchema, readSchema, reader, conf) + isCaseSensitive, dataSchema, readDataSchema, reader, conf) if (requestedColIdsOrEmptyFile.isEmpty) { new EmptyPartitionReader } else { - val requestedColIds = requestedColIdsOrEmptyFile.get - assert(requestedColIds.length == readSchema.length, + val requestedColIds = requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) + assert(requestedColIds.length == resultSchema.length, "[BUG] requested column IDs do not match required schema") val taskConf = new Configuration(conf) @@ -140,15 +140,12 @@ case class OrcPartitionReaderFactory( val batchReader = new OrcColumnarBatchReader(capacity) batchReader.initialize(fileSplit, taskAttemptContext) - val columnNameMap = partitionSchema.fields.map( - PartitioningUtils.getColName(_, isCaseSensitive)).zipWithIndex.toMap - val requestedPartitionColIds = readSchema.fields.map { field => - columnNameMap.getOrElse(PartitioningUtils.getColName(field, isCaseSensitive), -1) - } + val requestedPartitionColIds = + Array.fill(readDataSchema.length)(-1) ++ Range(0, partitionSchema.length) batchReader.initBatch( - TypeDescription.fromString(readSchemaString), - readSchema.fields, + TypeDescription.fromString(resultSchemaString), + resultSchema.fields, requestedColIds, requestedPartitionColIds, file.partitionValues) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index fc8a682b226c..2c4661618315 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -32,15 +32,18 @@ case class OrcScan( hadoopConf: Configuration, fileIndex: PartitioningAwareFileIndex, dataSchema: StructType, - readSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, options: CaseInsensitiveStringMap) - extends FileScan(sparkSession, fileIndex, readSchema, options) { + extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) { override def isSplitable(path: Path): Boolean = true override def createReaderFactory(): PartitionReaderFactory = { val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) - OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, fileIndex.partitionSchema, readSchema) + // The partition values are already truncated in `FileScan.partitions`. + // We should use `readPartitionSchema` as the partition schema here. + OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, readSchema(), + dataSchema, readDataSchema, readPartitionSchema) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index 8ac56aa5f64b..144bfcfff0c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -36,7 +36,7 @@ case class OrcScanBuilder( schema: StructType, dataSchema: StructType, options: CaseInsensitiveStringMap) - extends FileScanBuilder(schema) with SupportsPushDownFilters { + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) with SupportsPushDownFilters { lazy val hadoopConf = { val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap // Hadoop Configurations are case sensitive. @@ -44,7 +44,8 @@ case class OrcScanBuilder( } override def build(): Scan = { - OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema, options) + OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, + readDataSchema, readPartitionSchema, options) } private var _pushedFilters: Array[Filter] = Array.empty From f9c9986eb84d4eff20f9d0c63fe1fe673d642651 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 5 Apr 2019 14:59:02 +0800 Subject: [PATCH 2/4] address comments --- .../sql/execution/datasources/v2/FileScan.scala | 17 ++++++++--------- .../datasources/v2/TextBasedFileScan.scala | 2 +- .../v2/csv/CSVPartitionReaderFactory.scala | 2 +- .../v2/orc/OrcPartitionReaderFactory.scala | 4 ++-- .../execution/datasources/v2/orc/OrcScan.scala | 4 ++-- 5 files changed, 14 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index c4c27ba9f2af..337aac9ea651 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -33,8 +33,7 @@ abstract class FileScan( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, readDataSchema: StructType, - readPartitionSchema: StructType, - options: CaseInsensitiveStringMap) extends Scan with Batch { + readPartitionSchema: StructType) extends Scan with Batch { /** * Returns whether a file with `path` could be split or not. */ @@ -46,10 +45,10 @@ abstract class FileScan( val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) val partitionAttributes = fileIndex.partitionSchema.toAttributes - val attributeMap = partitionAttributes.map(a => getAttributeName(a) -> a).toMap - val readPartitionAttributes = readPartitionSchema.toAttributes.map { readAttr => - attributeMap.get(getAttributeName(readAttr)).getOrElse { - throw new AnalysisException(s"Can't find required partition column ${readAttr.name} " + + val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap + val readPartitionAttributes = readPartitionSchema.map { readField => + attributeMap.get(normalizeName(readField.name)).getOrElse { + throw new AnalysisException(s"Can't find required partition column ${readField.name} " + s"in partition schema ${fileIndex.partitionSchema}") } } @@ -88,11 +87,11 @@ abstract class FileScan( private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis - private def getAttributeName(a: AttributeReference): String = { + private def normalizeName(name: String): String = { if (isCaseSensitive) { - a.name + name } else { - a.name.toLowerCase(Locale.ROOT) + name.toLowerCase(Locale.ROOT) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala index 8d5ebeeda6e1..d6b84dcdfd15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala @@ -32,7 +32,7 @@ abstract class TextBasedFileScan( readDataSchema: StructType, readPartitionSchema: StructType, options: CaseInsensitiveStringMap) - extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) { + extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) { private var codecFactory: CompressionCodecFactory = _ override def isSplitable(path: Path): Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala index bea4a6ff15db..28e310489cd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala @@ -33,8 +33,8 @@ import org.apache.spark.util.SerializableConfiguration * @param sqlConf SQL configuration. * @param broadcastedConf Broadcasted serializable Hadoop Configuration. * @param dataSchema Schema of CSV files. + * @param readDataSchema Required data schema in the batch scan. * @param partitionSchema Schema of partitions. - * @param readSchema Required schema in the batch scan. * @param parsedOptions Options for parsing CSV files. */ case class CSVPartitionReaderFactory( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index ef19e16cca99..ec923797e269 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -46,16 +46,16 @@ import org.apache.spark.util.SerializableConfiguration * @param sqlConf SQL configuration. * @param broadcastedConf Broadcast serializable Hadoop Configuration. * @param dataSchema Schema of orc files. + * @param readDataSchema Required data schema in the batch scan. * @param partitionSchema Schema of partitions. - * @param readSchema Required schema in the batch scan. */ case class OrcPartitionReaderFactory( sqlConf: SQLConf, broadcastedConf: Broadcast[SerializableConfiguration], - resultSchema: StructType, dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType) extends FilePartitionReaderFactory { + private val resultSchema = StructType(readDataSchema.fields ++ partitionSchema.fields) private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val capacity = sqlConf.orcVectorizedReaderBatchSize diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index 2c4661618315..dc6b67ceb7e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -35,7 +35,7 @@ case class OrcScan( readDataSchema: StructType, readPartitionSchema: StructType, options: CaseInsensitiveStringMap) - extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) { + extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) { override def isSplitable(path: Path): Boolean = true override def createReaderFactory(): PartitionReaderFactory = { @@ -43,7 +43,7 @@ case class OrcScan( new SerializableConfiguration(hadoopConf)) // The partition values are already truncated in `FileScan.partitions`. // We should use `readPartitionSchema` as the partition schema here. - OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, readSchema(), + OrcPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema) } } From a6db4d8a99cd471650d53c161f1bd54b32ed51ae Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 5 Apr 2019 16:23:45 +0800 Subject: [PATCH 3/4] address comment --- .../spark/sql/execution/datasources/v2/FileScanBuilder.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala index ca6c767b80d6..a33c7ab58f3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala @@ -34,6 +34,7 @@ abstract class FileScanBuilder( } protected def readDataSchema: StructType = { + val requiredNameSet = createRequiredNameSet val fields = dataSchema.fields.filter { field => val colName = PartitioningUtils.getColName(field, isCaseSensitive) requiredNameSet.contains(colName) && !partitionNameSet.contains(colName) @@ -42,6 +43,7 @@ abstract class FileScanBuilder( } protected def readPartitionSchema: StructType = { + val requiredNameSet = createRequiredNameSet val fields = partitionSchema.fields.filter { field => val colName = PartitioningUtils.getColName(field, isCaseSensitive) requiredNameSet.contains(colName) @@ -50,7 +52,7 @@ abstract class FileScanBuilder( } // Define as method instead of value, since `requiredSchema` is mutable. - private def requiredNameSet: Set[String] = + private def createRequiredNameSet: Set[String] = requiredSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet private val partitionNameSet: Set[String] = From b6dab15db00ecfb11f23e6c138ee1d23ec942c49 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 5 Apr 2019 16:29:46 +0800 Subject: [PATCH 4/4] revise --- .../execution/datasources/v2/FileScanBuilder.scala | 11 +++++------ .../execution/datasources/v2/csv/CSVScanBuilder.scala | 2 +- .../execution/datasources/v2/orc/OrcScanBuilder.scala | 2 +- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala index a33c7ab58f3d..3b236be90e6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala @@ -33,8 +33,8 @@ abstract class FileScanBuilder( this.requiredSchema = requiredSchema } - protected def readDataSchema: StructType = { - val requiredNameSet = createRequiredNameSet + protected def readDataSchema(): StructType = { + val requiredNameSet = createRequiredNameSet() val fields = dataSchema.fields.filter { field => val colName = PartitioningUtils.getColName(field, isCaseSensitive) requiredNameSet.contains(colName) && !partitionNameSet.contains(colName) @@ -42,8 +42,8 @@ abstract class FileScanBuilder( StructType(fields) } - protected def readPartitionSchema: StructType = { - val requiredNameSet = createRequiredNameSet + protected def readPartitionSchema(): StructType = { + val requiredNameSet = createRequiredNameSet() val fields = partitionSchema.fields.filter { field => val colName = PartitioningUtils.getColName(field, isCaseSensitive) requiredNameSet.contains(colName) @@ -51,8 +51,7 @@ abstract class FileScanBuilder( StructType(fields) } - // Define as method instead of value, since `requiredSchema` is mutable. - private def createRequiredNameSet: Set[String] = + private def createRequiredNameSet(): Set[String] = requiredSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet private val partitionNameSet: Set[String] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala index 7e856e63822b..28c5b3d81a3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala @@ -33,6 +33,6 @@ case class CSVScanBuilder( extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { override def build(): Scan = { - CSVScan(sparkSession, fileIndex, dataSchema, readDataSchema, readPartitionSchema, options) + CSVScan(sparkSession, fileIndex, dataSchema, readDataSchema(), readPartitionSchema(), options) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index 144bfcfff0c4..4c1ec520c6ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -45,7 +45,7 @@ case class OrcScanBuilder( override def build(): Scan = { OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, - readDataSchema, readPartitionSchema, options) + readDataSchema(), readPartitionSchema(), options) } private var _pushedFilters: Array[Filter] = Array.empty