Skip to content

Commit 87a8be5

Browse files
committed
[SPARK-19987][SQL] Pass all filters into FileIndex
1 parent 2ea214d commit 87a8be5

File tree

7 files changed

+33
-26
lines changed

7 files changed

+33
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@ import org.apache.commons.lang3.StringUtils
2323
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
2424

2525
import org.apache.spark.rdd.RDD
26-
import org.apache.spark.sql.{AnalysisException, SparkSession}
26+
import org.apache.spark.sql.SparkSession
2727
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2828
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2929
import org.apache.spark.sql.catalyst.expressions._
30-
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
30+
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
3131
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
3232
import org.apache.spark.sql.execution.datasources._
3333
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
3434
import org.apache.spark.sql.execution.metric.SQLMetrics
3535
import org.apache.spark.sql.internal.SQLConf
36-
import org.apache.spark.sql.sources.{BaseRelation, Filter}
37-
import org.apache.spark.sql.types.{DataType, StructType}
36+
import org.apache.spark.sql.sources.BaseRelation
37+
import org.apache.spark.sql.types.StructType
3838
import org.apache.spark.util.Utils
3939

4040
trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
@@ -135,15 +135,15 @@ case class RowDataSourceScanExec(
135135
* @param output Output attributes of the scan.
136136
* @param outputSchema Output schema of the scan.
137137
* @param partitionFilters Predicates to use for partition pruning.
138-
* @param dataFilters Data source filters to use for filtering data within partitions.
138+
* @param dataFilters Filters on non-partition columns.
139139
* @param metastoreTableIdentifier identifier for the table in the metastore.
140140
*/
141141
case class FileSourceScanExec(
142142
@transient relation: HadoopFsRelation,
143143
output: Seq[Attribute],
144144
outputSchema: StructType,
145145
partitionFilters: Seq[Expression],
146-
dataFilters: Seq[Filter],
146+
dataFilters: Seq[Expression],
147147
override val metastoreTableIdentifier: Option[TableIdentifier])
148148
extends DataSourceScanExec with ColumnarBatchScan {
149149

@@ -156,7 +156,8 @@ case class FileSourceScanExec(
156156
false
157157
}
158158

159-
@transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)
159+
@transient private lazy val selectedPartitions =
160+
relation.location.listFiles(partitionFilters, dataFilters)
160161

161162
override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
162163
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
@@ -249,13 +250,16 @@ case class FileSourceScanExec(
249250
}
250251

251252
private lazy val inputRDD: RDD[InternalRow] = {
253+
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
254+
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
255+
252256
val readFile: (PartitionedFile) => Iterator[InternalRow] =
253257
relation.fileFormat.buildReaderWithPartitionValues(
254258
sparkSession = relation.sparkSession,
255259
dataSchema = relation.dataSchema,
256260
partitionSchema = relation.partitionSchema,
257261
requiredSchema = outputSchema,
258-
filters = dataFilters,
262+
filters = pushedDownFilters,
259263
options = relation.options,
260264
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
261265

sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ case class OptimizeMetadataOnlyQuery(
9898
relation match {
9999
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
100100
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
101-
val partitionData = fsRelation.location.listFiles(filters = Nil)
101+
val partitionData = fsRelation.location.listFiles(Nil, Nil)
102102
LocalRelation(partAttrs, partitionData.map(_.values))
103103

104104
case relation: CatalogRelation =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ class CatalogFileIndex(
5454

5555
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
5656

57-
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
58-
filterPartitions(filters).listFiles(Nil)
57+
override def listFiles(
58+
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
59+
filterPartitions(partitionFilters).listFiles(Nil, dataFilters)
5960
}
6061

6162
override def refresh(): Unit = fileStatusCache.invalidateAll()

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,17 @@ trait FileIndex {
4646
* Returns all valid files grouped into partitions when the data is partitioned. If the data is
4747
* unpartitioned, this will return a single partition with no partition values.
4848
*
49-
* @param filters The filters used to prune which partitions are returned. These filters must
50-
* only refer to partition columns and this method will only return files
51-
* where these predicates are guaranteed to evaluate to `true`. Thus, these
52-
* filters will not need to be evaluated again on the returned data.
49+
* @param partitionFilters The filters used to prune which partitions are returned. These filters
50+
* must only refer to partition columns and this method will only return
51+
* files where these predicates are guaranteed to evaluate to `true`.
52+
* Thus, these filters will not need to be evaluated again on the
53+
* returned data.
54+
* @param dataFilters Filters that can be applied on non-partitioned columns. The implementation
55+
* does not need to guarantee these filters are applied, i.e. the execution
56+
* engine will ensure these filters are still applied on the returned files.
5357
*/
54-
def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
58+
def listFiles(
59+
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
5560

5661
/**
5762
* Returns the list of files that will be read when scanning this relation. This call may be

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,6 @@ object FileSourceStrategy extends Strategy with Logging {
100100
val outputSchema = readDataColumns.toStructType
101101
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
102102

103-
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
104-
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
105-
106103
val outputAttributes = readDataColumns ++ partitionColumns
107104

108105
val scan =
@@ -111,7 +108,7 @@ object FileSourceStrategy extends Strategy with Logging {
111108
outputAttributes,
112109
outputSchema,
113110
partitionKeyFilters.toSeq,
114-
pushedDownFilters,
111+
dataFilters,
115112
table.map(_.identifier))
116113

117114
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,19 @@ abstract class PartitioningAwareFileIndex(
5454

5555
override def partitionSchema: StructType = partitionSpec().partitionColumns
5656

57-
protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
57+
protected val hadoopConf: Configuration =
58+
sparkSession.sessionState.newHadoopConfWithOptions(parameters)
5859

5960
protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
6061

6162
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
6263

63-
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
64+
override def listFiles(
65+
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
6466
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
6567
PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
6668
} else {
67-
prunePartitions(filters, partitionSpec()).map {
69+
prunePartitions(partitionFilters, partitionSpec()).map {
6870
case PartitionPath(values, path) =>
6971
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
7072
case Some(existingDir) =>

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.hive
1919

20-
import java.net.URI
21-
2220
import scala.util.control.NonFatal
2321

2422
import com.google.common.util.concurrent.Striped
@@ -248,7 +246,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
248246
.inferSchema(
249247
sparkSession,
250248
options,
251-
fileIndex.listFiles(Nil).flatMap(_.files))
249+
fileIndex.listFiles(Nil, Nil).flatMap(_.files))
252250
.map(mergeWithMetastoreSchema(relation.tableMeta.schema, _))
253251

254252
inferredSchema match {

0 commit comments

Comments
 (0)