From d559a6cdcdfda3c30303b839a460d989dd5b8152 Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Wed, 9 Nov 2016 19:14:17 +0000 Subject: [PATCH 1/2] SPARK-17059: Allow FileFormat to specify partition pruning strategy --- .../sql/execution/DataSourceScanExec.scala | 86 +++++++---- .../execution/datasources/FileFormat.scala | 13 ++ .../parquet/ParquetFileFormat.scala | 74 +++++++++ .../parquet/ParquetFileSplitter.scala | 146 ++++++++++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 8 + .../parquet/ParquetQuerySuite.scala | 85 +++++++++- .../spark/sql/sources/BucketedReadSuite.scala | 17 +- 7 files changed, 397 insertions(+), 32 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index e485b52b43f7..f76218ca7c4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ @@ -436,23 +436,38 @@ case class FileSourceScanExec( selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") - val bucketed = - selectedPartitions.flatMap { p => - p.files.map { f => - val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen) - PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts) + val session = fsRelation.sparkSession + val hadoopConf = session.sessionState.newHadoopConf() + val partitionFiles = selectedPartitions.flatMap { partition => + partition.files.map((_, partition.values)) + } + val bucketed = partitionFiles.flatMap { case (file, values) => + val blockLocations = getBlockLocations(file) + val filePath = file.getPath.toUri.toString + val format = fsRelation.fileFormat + + if (format.isSplitable(session, fsRelation.options, file.getPath)) { + val splitter = + format.buildSplitter(session, fsRelation.location, dataFilters, schema, hadoopConf) + val validSplits = splitter(file) + validSplits.map { split => + val hosts = getBlockHosts(blockLocations, split.getStart, split.getLength) + PartitionedFile(values, filePath, split.getStart, split.getLength, hosts) } - }.groupBy { f => - BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + } else { + val hosts = getBlockHosts(blockLocations, 0, file.getLen) + Seq(PartitionedFile(values, filePath, 0, file.getLen, hosts)) } - + }.groupBy { f => + BucketingUtils + .getBucketId(new Path(f.filePath).getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) + } val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil)) } - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + new FileScanRDD(session, readFile, filePartitions) } /** @@ -467,10 +482,11 @@ case class FileSourceScanExec( readFile: (PartitionedFile) => Iterator[InternalRow], selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { - val defaultMaxSplitBytes = - fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes - val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes - val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism + val session = fsRelation.sparkSession + val hadoopConf = session.sessionState.newHadoopConf() + val defaultMaxSplitBytes = session.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = session.sessionState.conf.filesOpenCostInBytes + val defaultParallelism = session.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism @@ -478,23 +494,33 @@ case class FileSourceScanExec( logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") - val splitFiles = selectedPartitions.flatMap { partition => - partition.files.flatMap { file => - val blockLocations = getBlockLocations(file) - if (fsRelation.fileFormat.isSplitable( - fsRelation.sparkSession, fsRelation.options, file.getPath)) { - (0L until file.getLen by maxSplitBytes).map { offset => - val remaining = file.getLen - offset + val partitionFiles = selectedPartitions.flatMap { partition => + partition.files.map((_, partition.values)) + } + val splitFiles = partitionFiles.flatMap { case (file, values) => + val blockLocations = getBlockLocations(file) + val filePath = file.getPath.toUri.toString + val format = fsRelation.fileFormat + + // If the format is splittable, attempt to split and filter the file. + if (format.isSplitable(session, fsRelation.options, file.getPath)) { + val splitter = + format.buildSplitter(session, fsRelation.location, dataFilters, schema, hadoopConf) + val validSplits = splitter(file) + validSplits.flatMap { split => + val splitOffset = split.getStart + val end = splitOffset + split.getLength + (splitOffset until end by maxSplitBytes).map { offset => + val remaining = end - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val hosts = getBlockHosts(blockLocations, offset, size) - PartitionedFile( - partition.values, file.getPath.toUri.toString, offset, size, hosts) + PartitionedFile(values, filePath, offset, size, hosts) } - } else { - val hosts = getBlockHosts(blockLocations, 0, file.getLen) - Seq(PartitionedFile( - partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) } + } else { + // Take the entire file as one partition. + val hosts = getBlockHosts(blockLocations, 0, file.getLen) + Seq(PartitionedFile(values, filePath, 0, file.getLen, hosts)) } }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) @@ -527,7 +553,7 @@ case class FileSourceScanExec( } closePartition() - new FileScanRDD(fsRelation.sparkSession, readFile, partitions) + new FileScanRDD(session, readFile, partitions) } private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 4f4aaaa5026f..4919bcc00574 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow @@ -74,6 +75,18 @@ trait FileFormat { false } + /** + * Allow a splittable FileFormat to produce a function to split individual files. + */ + def buildSplitter( + sparkSession: SparkSession, + fileIndex: FileIndex, + filters: Seq[Filter], + schema: StructType, + hadoopConf: Configuration): (FileStatus => Seq[FileSplit]) = { + stat => Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty)) + } + /** * Returns a function that can be used to read a single file in as an Iterator of InternalRow. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 031a0fe57893..8186f9b4520d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -17,12 +17,15 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.io.FileNotFoundException import java.net.URI +import java.util.concurrent.{Callable, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Try} +import com.google.common.cache.{Cache, CacheBuilder} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce._ @@ -30,8 +33,10 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.codec.CodecConfig +import org.apache.parquet.hadoop.metadata.ParquetMetadata import org.apache.parquet.hadoop.util.ContextUtil import org.apache.parquet.schema.MessageType @@ -278,6 +283,68 @@ class ParquetFileFormat true } + override def buildSplitter( + sparkSession: SparkSession, + fileIndex: FileIndex, + filters: Seq[Filter], + schema: StructType, + hadoopConf: Configuration): (FileStatus => Seq[FileSplit]) = { + val pruningEnabled = sparkSession.sessionState.conf.parquetPartitionPruningEnabled + val defaultSplitter = super.buildSplitter(sparkSession, fileIndex, filters, schema, hadoopConf) + if (!pruningEnabled || filters.isEmpty) { + // Return immediately to save FileSystem overhead + defaultSplitter + } else { + val splitters = fileIndex.rootPaths.map { root => + val splits = ParquetFileFormat.fileSplits.get(root, + new Callable[ParquetFileSplitter] { + override def call(): ParquetFileSplitter = + createParquetFileSplits(root, hadoopConf, schema) + }) + root -> splits.buildSplitter(filters) + }.toMap + val compositeSplitter: (FileStatus => Seq[FileSplit]) = { stat => + val filePath = stat.getPath + val rootOption: Option[Path] = fileIndex.rootPaths + .find(root => filePath.toString.startsWith(root.toString)) + val splitterForPath = rootOption.flatMap { root => + splitters.get(root) + }.getOrElse(defaultSplitter) + splitterForPath(stat) + } + compositeSplitter + } + } + + private def createParquetFileSplits( + root: Path, + hadoopConf: Configuration, + schema: StructType): ParquetFileSplitter = { + getMetadataForPath(root, hadoopConf) + .map(meta => new ParquetMetadataFileSplitter(root, meta.getBlocks.asScala, schema)) + .getOrElse(ParquetDefaultFileSplitter) + } + + private def getMetadataForPath( + rootPath: Path, + conf: Configuration): Option[ParquetMetadata] = { + val fs = rootPath.getFileSystem(conf) + try { + val stat = fs.getFileStatus(rootPath) + // Mimic Parquet behavior. If given a directory, find the underlying _metadata file + // If given a single file, check the parent directory for a _metadata file + val directory = if (stat.isDirectory) stat.getPath else stat.getPath.getParent + val metadataFile = new Path(directory, ParquetFileWriter.PARQUET_METADATA_FILE) + val metadata = + ParquetFileReader.readFooter(conf, metadataFile, ParquetMetadataConverter.NO_FILTER) + Option(metadata) + } catch { + case notFound: FileNotFoundException => + log.debug(s"No _metadata file found in root $rootPath") + None + } + } + override def buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, @@ -424,6 +491,13 @@ class ParquetFileFormat } object ParquetFileFormat extends Logging { + + @transient private val fileSplits: Cache[Path, ParquetFileSplitter] = + CacheBuilder.newBuilder() + .expireAfterAccess(4, TimeUnit.HOURS) + .concurrencyLevel(1) + .build() + private[parquet] def readSchema( footers: Seq[Footer], sparkSession: SparkSession): Option[StructType] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala new file mode 100644 index 000000000000..e0cb458b483e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileSplitter.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + +import com.google.common.cache.{Cache, CacheBuilder} +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} +import org.apache.parquet.filter2.statisticslevel.StatisticsFilter +import org.apache.parquet.hadoop.metadata.BlockMetaData +import org.roaringbitmap.RoaringBitmap + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.ThreadUtils + + +abstract class ParquetFileSplitter { + def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) + + def singleFileSplit(stat: FileStatus): Seq[FileSplit] = { + Seq(new FileSplit(stat.getPath, 0, stat.getLen, Array.empty)) + } +} + +object ParquetDefaultFileSplitter extends ParquetFileSplitter { + override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = { + stat => singleFileSplit(stat) + } +} + +class ParquetMetadataFileSplitter( + val root: Path, + val blocks: Seq[BlockMetaData], + val schema: StructType) + extends ParquetFileSplitter + with Logging { + + private val referencedFiles = blocks.map(bmd => new Path(root, bmd.getPath)).toSet + + private val filterSets: Cache[Filter, RoaringBitmap] = + CacheBuilder.newBuilder() + .expireAfterAccess(4, TimeUnit.HOURS) + .concurrencyLevel(1) + .build() + + override def buildSplitter(filters: Seq[Filter]): (FileStatus => Seq[FileSplit]) = { + val (applied, unapplied, filteredBlocks) = this.synchronized { + val (applied, unapplied) = filters.partition(filterSets.getIfPresent(_) != null) + val filteredBlocks = filterSets.getAllPresent(applied.asJava).values().asScala + .reduceOption(RoaringBitmap.and) + .map { bitmap => + blocks.zipWithIndex.filter { case(block, index) => + bitmap.contains(index) + }.map(_._1) + }.getOrElse(blocks) + (applied, unapplied, filteredBlocks) + } + + val eligible = parquetFilter(unapplied, filteredBlocks).map { bmd => + val blockPath = new Path(root, bmd.getPath) + new FileSplit(blockPath, bmd.getStartingPos, bmd.getTotalByteSize, Array.empty) + } + + val statFilter: (FileStatus => Seq[FileSplit]) = { stat => + if (!referencedFiles.contains(stat.getPath)) { + log.warn(s"Found _metadata file for $root," + + s" but no entries for blocks in ${stat.getPath}. Retaining whole file.") + singleFileSplit(stat) + } else { + eligible.filter(_.getPath == stat.getPath) + } + } + statFilter + } + + private def parquetFilter( + filters: Seq[Filter], + blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = { + if (filters.nonEmpty) { + // Asynchronously build bitmaps + Future { + buildFilterBitMaps(filters) + }(ParquetMetadataFileSplitter.executionContext) + + val predicate = filters.flatMap { + ParquetFilters.createFilter(schema, _) + }.reduce(FilterApi.and) + blocks.filter(bmd => !StatisticsFilter.canDrop(predicate, bmd.getColumns)) + } else { + blocks + } + } + + private def buildFilterBitMaps(filters: Seq[Filter]): Unit = { + this.synchronized { + // Only build bitmaps for filters that don't exist. + val sets = filters.flatMap { filter => + val bitmap = new RoaringBitmap + ParquetFilters.createFilter(schema, filter) + .map((filter, _, bitmap)) + } + var i = 0 + val blockLen = blocks.size + while (i < blockLen) { + val bmd = blocks(i) + sets.foreach { case (filter, parquetFilter, bitmap) => + if (!StatisticsFilter.canDrop(parquetFilter, bmd.getColumns)) { + bitmap.add(i) + } + } + i += 1 + } + val mapping = sets.map { case (filter, _, bitmap) => + bitmap.runOptimize() + filter -> bitmap + }.toMap.asJava + filterSets.putAll(mapping) + } + } +} +object ParquetMetadataFileSplitter { + private val executionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("parquet-metadata-filter", 1)) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 71f3a67d0d5a..c5e0c9107fac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -253,6 +253,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_PARTITION_PRUNING_ENABLED = + SQLConfigBuilder("spark.sql.parquet.enablePartitionPruning") + .doc("Enables driver-side partition pruning for Parquet.") + .booleanConf + .createWithDefault(true) + val ORC_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf @@ -689,6 +695,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED) + def parquetPartitionPruningEnabled: Boolean = getConf(PARQUET_PARTITION_PRUNING_ENABLED) + def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 4c4a7d86f2bd..ea9c825d5c50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import java.net.URI -import org.apache.hadoop.fs.{FileSystem, Path} +import com.google.common.collect.{HashMultiset, Multiset} +import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path, RawLocalFileSystem} import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ @@ -703,6 +705,87 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + // In order to make intent more readable for partition pruning tests, we increase + // openCostInBytes to disable file merging. + test("SPARK-17059: Allow FileFormat to specify partition pruning strategy") { + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (128 * 1024 * 1024).toString) { + withTempPath { path => + spark.sparkContext.parallelize(Seq(1, 2, 3), 3) + .toDF("x").write.parquet(path.getCanonicalPath) + + val zeroPartitions = spark.read.parquet(path.getCanonicalPath).where("x = 0") + assert(zeroPartitions.rdd.partitions.length == 0) + + val onePartition = spark.read.parquet(path.getCanonicalPath).where("x = 1") + assert(onePartition.rdd.partitions.length == 1) + } + } + } + + test("Do not filter out parquet file when missing in _metadata file") { + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> (128 * 1024 * 1024).toString) { + withTempPath { path => + spark.sparkContext.parallelize(Seq(1, 2, 3), 3) + .toDF("x").write.parquet(path.getCanonicalPath) + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "false") { + spark.sparkContext.parallelize(Seq(4), 1) + .toDF("x").write.mode(SaveMode.Append).parquet(path.getCanonicalPath) + } + val twoPartitions = spark.read.parquet(path.getCanonicalPath).where("x = 1") + assert(twoPartitions.rdd.partitions.length == 2) + } + } + } + + test("Only read _metadata file once for a given root path") { + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true", + "fs.count.impl" -> classOf[CountingFileSystem].getName, + "fs.count.impl.disable.cache" -> "true") { + withTempPath { path => + val mockedPath = s"count://some-bucket/${path.getCanonicalPath}" + val metadataPath: Path = new Path(s"$mockedPath/_metadata") + spark.sparkContext.parallelize(Seq(1, 2, 3), 3) + .toDF("x").write.parquet(mockedPath) + val onePartition = spark.read.parquet(mockedPath).where("x = 1") + assert(onePartition.rdd.partitions.length == 1) + assert(Counter.count(metadataPath) == 1) + Counter.reset() + } + } + } +} + +class CountingFileSystem extends RawLocalFileSystem { + override def getScheme: String = "count" + + override def getUri: URI = { + URI.create("count://some-bucket") + } + + override def open(path: Path, bufferSize: Int): FSDataInputStream = { + Counter.increment(path) + super.open(path, bufferSize) + } +} + +object Counter { + var counts: Multiset[Path] = HashMultiset.create() + + def increment(path: Path): Unit = { + counts.add(path) + } + + def count(path: Path): Int = { + counts.count(path) + } + + def reset(): Unit = { + counts = HashMultiset.create() + } + } object TestingUDT { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index d9ddcbd57ca8..6be5cecc2cb0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -20,12 +20,14 @@ package org.apache.spark.sql.sources import java.io.File import java.net.URI +import org.apache.parquet.hadoop.ParquetOutputFormat + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.{DataSourceScanExec, SortExec} -import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FilePartition} import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ @@ -514,4 +516,17 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet df1.groupBy("j").agg(max("k"))) } } + + test("prune files when not passing filters") { + withTable("bucketed_table") { + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + df.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") + // All partitions should have empty files. + hiveContext.table("bucketed_table").filter("j == -1").rdd.partitions.foreach { partition => + val filePartition = partition.asInstanceOf[FilePartition] + assert(filePartition.files.isEmpty) + } + } + } + } } From 77dccf5e71552cf92fe5c093ebcdff2ee0f43eee Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Thu, 17 Nov 2016 15:46:14 -0500 Subject: [PATCH 2/2] Move splitter creation above file loop --- .../spark/sql/execution/DataSourceScanExec.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index f76218ca7c4f..c3201d72dde3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -441,14 +441,13 @@ case class FileSourceScanExec( val partitionFiles = selectedPartitions.flatMap { partition => partition.files.map((_, partition.values)) } + val format = fsRelation.fileFormat + val splitter = + format.buildSplitter(session, fsRelation.location, dataFilters, schema, hadoopConf) val bucketed = partitionFiles.flatMap { case (file, values) => val blockLocations = getBlockLocations(file) val filePath = file.getPath.toUri.toString - val format = fsRelation.fileFormat - if (format.isSplitable(session, fsRelation.options, file.getPath)) { - val splitter = - format.buildSplitter(session, fsRelation.location, dataFilters, schema, hadoopConf) val validSplits = splitter(file) validSplits.map { split => val hosts = getBlockHosts(blockLocations, split.getStart, split.getLength) @@ -497,15 +496,14 @@ case class FileSourceScanExec( val partitionFiles = selectedPartitions.flatMap { partition => partition.files.map((_, partition.values)) } + val format = fsRelation.fileFormat + val splitter = + format.buildSplitter(session, fsRelation.location, dataFilters, schema, hadoopConf) val splitFiles = partitionFiles.flatMap { case (file, values) => val blockLocations = getBlockLocations(file) val filePath = file.getPath.toUri.toString - val format = fsRelation.fileFormat - // If the format is splittable, attempt to split and filter the file. if (format.isSplitable(session, fsRelation.options, file.getPath)) { - val splitter = - format.buildSplitter(session, fsRelation.location, dataFilters, schema, hadoopConf) val validSplits = splitter(file) validSplits.flatMap { split => val splitOffset = split.getStart