From 1c0ba50edc2ea6c09af634ede4068bd9879abff0 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Thu, 24 Sep 2015 22:51:32 -0700 Subject: [PATCH 1/2] Scan DataSource with predicate expression combine partition key and attributes doesn't work --- .../datasources/DataSourceStrategy.scala | 42 +++++++++++++------ .../parquet/ParquetFilterSuite.scala | 17 ++++++++ 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c58213155daa..f01a761ecddf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -62,7 +62,30 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Scanning partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) if t.partitionSpec.partitionColumns.nonEmpty => - val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray + // We divide the filter expressions into 3 parts + val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet + val filterMap = filters.groupBy { f => + // TODO this is case-senstive + val referencedColumnNames = f.references.map(_.name).toSet + if (referencedColumnNames.subsetOf(partitionColumnNames)) { + // Only reference the partition key + 0 + } else if (referencedColumnNames.intersect(partitionColumnNames).isEmpty) { + // Not reference any partition key at all. can be push down + 1 + } else { + // Reference both partition key and attributes + 2 + } + } + // Only prunning the partition keys + val partitionFilters = filterMap.getOrElse(0, Nil) + // Only pushes down predicates that do not reference partition keys. + val pushedFilters = filterMap.getOrElse(1, Nil) + // Predicates with both partition keys and attributes + val combineFilters = filterMap.getOrElse(2, Nil) + + val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray logInfo { val total = t.partitionSpec.partitions.length @@ -71,21 +94,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." } - // Only pushes down predicates that do not reference partition columns. - val pushedFilters = { - val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet - filters.filter { f => - val referencedColumnNames = f.references.map(_.name).toSet - referencedColumnNames.intersect(partitionColumnNames).isEmpty - } - } - - buildPartitionedTableScan( + val scan = buildPartitionedTableScan( l, projects, pushedFilters, t.partitionSpec.partitionColumns, - selectedPartitions) :: Nil + selectedPartitions) + + combineFilters + .reduceLeftOption(expressions.And) + .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index f067112cfca9..93cd2a1c0cd5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -297,4 +297,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-10829: Filter combine partition key and attribute doesn't work in DataSource scan") { + import testImplicits._ + + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/part=1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) + + // If the "part = 1" filter gets pushed down, this query will throw an exception since + // "part" is not a valid column in the actual Parquet file + checkAnswer( + sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"), + (2 to 3).map(i => Row(i, i.toString, 1))) + } + } + } } From f5705a57fafd174655634e21807867f2b7c46927 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 12 Oct 2015 19:53:06 -0700 Subject: [PATCH 2/2] update the code as suggested --- .../datasources/DataSourceStrategy.scala | 26 +++++++------------ 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index f01a761ecddf..5d27e274457c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -64,26 +64,18 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { if t.partitionSpec.partitionColumns.nonEmpty => // We divide the filter expressions into 3 parts val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet - val filterMap = filters.groupBy { f => - // TODO this is case-senstive - val referencedColumnNames = f.references.map(_.name).toSet - if (referencedColumnNames.subsetOf(partitionColumnNames)) { - // Only reference the partition key - 0 - } else if (referencedColumnNames.intersect(partitionColumnNames).isEmpty) { - // Not reference any partition key at all. can be push down - 1 - } else { - // Reference both partition key and attributes - 2 - } - } + + // TODO this is case-sensitive // Only prunning the partition keys - val partitionFilters = filterMap.getOrElse(0, Nil) + val partitionFilters = + filters.filter(_.references.map(_.name).toSet.subsetOf(partitionColumnNames)) + // Only pushes down predicates that do not reference partition keys. - val pushedFilters = filterMap.getOrElse(1, Nil) + val pushedFilters = + filters.filter(_.references.map(_.name).toSet.intersect(partitionColumnNames).isEmpty) + // Predicates with both partition keys and attributes - val combineFilters = filterMap.getOrElse(2, Nil) + val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray