Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,23 @@ case class FileSourceScanExec(
"InputPaths" -> relation.location.paths.mkString(", "))

private lazy val inputRDD: RDD[InternalRow] = {
val selectedPartitions = relation.location.listFiles(partitionFilters)
val originalPartitions = relation.location.listFiles(partitionFilters)
val filteredPartitions = if (relation.location.paths.isEmpty) {
originalPartitions
} else {
relation.fileFormat.filterPartitions(
dataFilters,
outputSchema,
relation.sparkSession.sparkContext.hadoopConfiguration,
relation.location.allFiles(),
relation.location.paths.head,
originalPartitions)
}
val totalFilesRaw = originalPartitions.map(_.files.size).sum
val totalFilesFiltered = filteredPartitions.map(_.files.size).sum
logInfo(s"Filtered down total number of partitions to ${filteredPartitions.size}"
+ s" from ${originalPartitions.size}, "
+ s"total number of files to ${totalFilesFiltered} from ${totalFilesRaw}")

val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
Expand All @@ -201,9 +217,9 @@ case class FileSourceScanExec(

relation.bucketSpec match {
case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
createBucketedReadRDD(bucketing, readFile, filteredPartitions, relation)
case _ =>
createNonBucketedReadRDD(readFile, selectedPartitions, relation)
createNonBucketedReadRDD(readFile, filteredPartitions, relation)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,21 @@ trait FileFormat {
false
}

/**
* Allow FileFormats to have a pluggable way to utilize pushed filters to eliminate partitions
* before execution. By default no pruning is performed and the original partitioning is
* preserved.
*/
def filterPartitions(
filters: Seq[Filter],
schema: StructType,
conf: Configuration,
allFiles: Seq[FileStatus],
root: Path,
partitions: Seq[Partition]): Seq[Partition] = {
partitions
}

/**
* Returns whether a file with `path` could be splitted or not.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import scala.util.{Failure, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.{Log => ApacheParquetLog}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.compat.{FilterCompat, RowGroupFilter}
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.metadata.ParquetMetadata
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType
import org.slf4j.bridge.SLF4JBridgeHandler
Expand All @@ -58,6 +60,9 @@ class ParquetFileFormat
with Logging
with Serializable {

// Attempt to cache parquet metadata
@transient @volatile private var cachedMetadata: ParquetMetadata = _

override def shortName(): String = "parquet"

override def toString: String = "ParquetFormat"
Expand Down Expand Up @@ -423,6 +428,64 @@ class ParquetFileFormat
sqlContext.sessionState.newHadoopConf(),
options)
}

override def filterPartitions(
filters: Seq[Filter],
schema: StructType,
conf: Configuration,
allFiles: Seq[FileStatus],
root: Path,
partitions: Seq[Partition]): Seq[Partition] = {
// Read the "_metadata" file if available, contains all block headers. On S3 better to grab
// all of the footers in a batch rather than having to read every single file just to get its
// footer.
allFiles.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE).map { stat =>
val metadata = getOrReadMetadata(conf, stat)
partitions.map { part =>
filterByMetadata(
filters,
schema,
conf,
root,
metadata,
part)
}.filterNot(_.files.isEmpty)
}.getOrElse(partitions)
}

private def filterByMetadata(
filters: Seq[Filter],
schema: StructType,
conf: Configuration,
root: Path,
metadata: ParquetMetadata,
partition: Partition): Partition = {
val blockMetadatas = metadata.getBlocks.asScala
val parquetSchema = metadata.getFileMetaData.getSchema
val conjunctiveFilter = filters
.flatMap(ParquetFilters.createFilter(schema, _))
.reduceOption(FilterApi.and)
conjunctiveFilter.map { conjunction =>
val filteredBlocks = RowGroupFilter.filterRowGroups(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind if I ask a question please?

So, if my understanding is correct, Parquet filters rowgroups in both normal reader and vectorized reader already (#13701). Is this doing the same thing in Spark-side?

Copy link
Member

@HyukjinKwon HyukjinKwon Aug 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, doesn't this try to touch many files in driver-side?

FilterCompat.get(conjunction), blockMetadatas.asJava, parquetSchema).asScala.map { bmd =>
new Path(root, bmd.getPath).toString
}
Partition(partition.values, partition.files.filter { f =>
filteredBlocks.contains(f.getPath.toString)
})
}.getOrElse(partition)
}

private def getOrReadMetadata(conf: Configuration, stat: FileStatus): ParquetMetadata = {
if (cachedMetadata == null) {
logInfo("Reading summary metadata into cache in ParquetFileFormat")
cachedMetadata = ParquetFileReader.readFooter(conf, stat, ParquetMetadataConverter.NO_FILTER)
} else {
logInfo("Using cached summary metadata")
}
cachedMetadata
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,16 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
}

test("SPARK-17059: Allow FileFormat to specify partition pruning strategy") {
withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
withTempPath { path =>
Seq(1, 2, 3).toDF("x").write.parquet(path.getCanonicalPath)
val df = spark.read.parquet(path.getCanonicalPath).where("x = 0")
assert(df.rdd.partitions.length == 0)
}
}
}
}

object TestingUDT {
Expand Down