From e2fa2ca5a35ccfb26f146bcfa28e609dacdfa6ec Mon Sep 17 00:00:00 2001 From: mcdull-zhang Date: Wed, 16 Mar 2022 21:36:54 +0800 Subject: [PATCH 1/7] literal_dynamic_partition --- .../dynamicpruning/PartitionPruning.scala | 2 +- .../sql/DynamicPartitionPruningSuite.scala | 49 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index 3b5fc4aea5d8..7f4cd1233dbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -65,7 +65,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper { case fs: HadoopFsRelation => val partitionColumns = AttributeSet( l.resolve(fs.partitionSchema, fs.sparkSession.sessionState.analyzer.resolver)) - if (resExp.references.subsetOf(partitionColumns)) { + if (!resExp.references.isEmpty && resExp.references.subsetOf(partitionColumns)) { return Some(l) } else { None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 61885169ece4..8739396b4cfc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1528,6 +1528,55 @@ abstract class DynamicPartitionPruningSuiteBase } } } + + test("SPARK-38570: Fix incorrect DynamicPartitionPruning caused by Literal") { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { + withTable("fact1", "fact2", "dim") { + val fact1 = Seq[(Int, String, String)]( + (1, "a1", "part1"), + (3, "a3", "part1"), + (5, "a5", "part1") + ) + fact1.toDF("joinCol", "otherCol", "partCol") + .write + .partitionBy("partCol") + .format(tableFormat) + .saveAsTable("fact1") + + val fact2 = Seq[(Int, String, String)]( + (2, "b2", "part1"), + (4, "b4", "part1"), + (6, "b6", "part1") + ) + fact2.toDF("joinCol", "otherCol", "partCol") + .write + .partitionBy("partCol") + .format(tableFormat) + .saveAsTable("fact2") + + val dim = Seq[(String, Int, Int)]( + ("type1", 1, 100), + ("type2", 2, 200) + ) + dim.toDF("type", "joinCol", "score") + .write + .format(tableFormat) + .saveAsTable("dim") + + val df = sql( + """ + |SELECT a.type,a.joinCol,a.otherCol,b.score FROM + |(SELECT 'type1' as type,joinCol,otherCol FROM fact1 WHERE partCol='part1' + |UNION ALL + |SELECT 'type2' as type,joinCol,otherCol FROM fact2 WHERE partCol='part1') a + |Join dim b ON a.type=b.type AND a.joinCol=b.joinCol; + |""".stripMargin) + + checkPartitionPruningPredicate(df, false, withBroadcast = false) + checkAnswer(df, Row("type1", 1, "a1", 100) :: Row("type2", 2, "b2", 200) :: Nil) + } + } + } } abstract class DynamicPartitionPruningDataSourceSuiteBase From 29766bd7ec56386f8e97b6a69034c05b0bdaedb4 Mon Sep 17 00:00:00 2001 From: mcdull-zhang Date: Thu, 17 Mar 2022 11:38:24 +0800 Subject: [PATCH 2/7] fix communication --- .../spark/sql/execution/dynamicpruning/PartitionPruning.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index 7f4cd1233dbd..bef06cae2b71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -59,13 +59,15 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper { */ def getFilterableTableScan(a: Expression, plan: LogicalPlan): Option[LogicalPlan] = { val srcInfo: Option[(Expression, LogicalPlan)] = findExpressionAndTrackLineageDown(a, plan) + // filter out literal + .filter(_._1.references.nonEmpty) srcInfo.flatMap { case (resExp, l: LogicalRelation) => l.relation match { case fs: HadoopFsRelation => val partitionColumns = AttributeSet( l.resolve(fs.partitionSchema, fs.sparkSession.sessionState.analyzer.resolver)) - if (!resExp.references.isEmpty && resExp.references.subsetOf(partitionColumns)) { + if (resExp.references.subsetOf(partitionColumns)) { return Some(l) } else { None From 74053cfed4f21c73a353368338f640445f4d73af Mon Sep 17 00:00:00 2001 From: mcdull-zhang Date: Thu, 24 Mar 2022 11:03:03 +0800 Subject: [PATCH 3/7] fix --- .../sql/catalyst/expressions/predicates.scala | 3 + .../dynamicpruning/PartitionPruning.scala | 2 - .../sql/DynamicPartitionPruningSuite.scala | 65 +++++++------------ 3 files changed, 25 insertions(+), 45 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index a2fd668f495e..8af4ad3d80ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -128,6 +128,9 @@ trait PredicateHelper extends AliasHelper with Logging { def findExpressionAndTrackLineageDown( exp: Expression, plan: LogicalPlan): Option[(Expression, LogicalPlan)] = { + if (exp.references.isEmpty) { + return None + } plan match { case p: Project => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala index bef06cae2b71..3b5fc4aea5d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala @@ -59,8 +59,6 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper { */ def getFilterableTableScan(a: Expression, plan: LogicalPlan): Option[LogicalPlan] = { val srcInfo: Option[(Expression, LogicalPlan)] = findExpressionAndTrackLineageDown(a, plan) - // filter out literal - .filter(_._1.references.nonEmpty) srcInfo.flatMap { case (resExp, l: LogicalRelation) => l.relation match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 8739396b4cfc..d9b8c3ff8089 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1531,50 +1531,29 @@ abstract class DynamicPartitionPruningSuiteBase test("SPARK-38570: Fix incorrect DynamicPartitionPruning caused by Literal") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") { - withTable("fact1", "fact2", "dim") { - val fact1 = Seq[(Int, String, String)]( - (1, "a1", "part1"), - (3, "a3", "part1"), - (5, "a5", "part1") - ) - fact1.toDF("joinCol", "otherCol", "partCol") - .write - .partitionBy("partCol") - .format(tableFormat) - .saveAsTable("fact1") - - val fact2 = Seq[(Int, String, String)]( - (2, "b2", "part1"), - (4, "b4", "part1"), - (6, "b6", "part1") - ) - fact2.toDF("joinCol", "otherCol", "partCol") - .write - .partitionBy("partCol") - .format(tableFormat) - .saveAsTable("fact2") - - val dim = Seq[(String, Int, Int)]( - ("type1", 1, 100), - ("type2", 2, 200) - ) - dim.toDF("type", "joinCol", "score") - .write - .format(tableFormat) - .saveAsTable("dim") - - val df = sql( - """ - |SELECT a.type,a.joinCol,a.otherCol,b.score FROM - |(SELECT 'type1' as type,joinCol,otherCol FROM fact1 WHERE partCol='part1' - |UNION ALL - |SELECT 'type2' as type,joinCol,otherCol FROM fact2 WHERE partCol='part1') a - |Join dim b ON a.type=b.type AND a.joinCol=b.joinCol; - |""".stripMargin) + val df = sql( + """ + |SELECT f.store_id, + | f.date_id, + | s.state_province + |FROM (SELECT 4 AS store_id, + | date_id, + | product_id + | FROM fact_sk + | WHERE date_id >= 1300 + | UNION ALL + | SELECT 5 AS store_id, + | date_id, + | product_id + | FROM fact_stats + | WHERE date_id <= 1000) f + |JOIN dim_store s + |ON f.store_id = s.store_id + |WHERE s.country = 'US' + |""".stripMargin) - checkPartitionPruningPredicate(df, false, withBroadcast = false) - checkAnswer(df, Row("type1", 1, "a1", 100) :: Row("type2", 2, "b2", 200) :: Nil) - } + checkPartitionPruningPredicate(df, false, withBroadcast = false) + checkAnswer(df, Row(4, 1300, "California") :: Row(5, 1000, "Texas") :: Nil) } } } From ea6046fb381ce1cdf98cab7d162d958ad8822a09 Mon Sep 17 00:00:00 2001 From: mcdull_zhang Date: Thu, 24 Mar 2022 11:21:06 +0800 Subject: [PATCH 4/7] Update sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala code format Co-authored-by: Hyukjin Kwon --- .../apache/spark/sql/catalyst/expressions/predicates.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index b5bb6d14320e..949ce9741165 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -128,9 +128,7 @@ trait PredicateHelper extends AliasHelper with Logging { def findExpressionAndTrackLineageDown( exp: Expression, plan: LogicalPlan): Option[(Expression, LogicalPlan)] = { - if (exp.references.isEmpty) { - return None - } + if (exp.references.isEmpty) return None plan match { case p: Project => From 4a863d94cb062f10f06ed53c53d0087f41140ac6 Mon Sep 17 00:00:00 2001 From: mcdull_zhang Date: Thu, 24 Mar 2022 13:51:51 +0800 Subject: [PATCH 5/7] Update sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala code style Co-authored-by: Wenchen Fan --- .../org/apache/spark/sql/DynamicPartitionPruningSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index e09c3fbe87bb..cfdd2e08a79e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1552,7 +1552,7 @@ abstract class DynamicPartitionPruningSuiteBase |WHERE s.country = 'US' |""".stripMargin) - checkPartitionPruningPredicate(df, false, withBroadcast = false) + checkPartitionPruningPredicate(df, withSubquery = false, withBroadcast = false) checkAnswer(df, Row(4, 1300, "California") :: Row(5, 1000, "Texas") :: Nil) } } From 704af7dcfc0f7d0d8db4684915362dccfbae6f18 Mon Sep 17 00:00:00 2001 From: mcdull-zhang Date: Thu, 24 Mar 2022 22:07:04 +0800 Subject: [PATCH 6/7] triger flow From 408934f0ae68ba6f7143634dc5ff3626aaedf526 Mon Sep 17 00:00:00 2001 From: mcdull-zhang Date: Thu, 24 Mar 2022 22:42:06 +0800 Subject: [PATCH 7/7] triger flow