From 54f3bc3ed2ab50b5a0621ca252315017fc5a7c56 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Thu, 4 May 2023 19:04:02 +0800 Subject: [PATCH 1/4] Improve reuse subquery with table cache --- .../adaptive/AdaptiveSparkPlanExec.scala | 6 ++- .../adaptive/InsertAdaptiveSparkPlan.scala | 2 +- .../adaptive/ReuseAdaptiveSubquery.scala | 11 +++-- .../apache/spark/sql/CachedTableSuite.scala | 44 ++++++++++++------- .../adaptive/AdaptiveQueryExecSuite.scala | 15 +++++++ 5 files changed, 55 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index fceb9db41120..1b2e802ae939 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -587,8 +587,10 @@ case class AdaptiveSparkPlanExec( BroadcastQueryStageExec(currentStageId, newPlan, e.canonicalized) } case i: InMemoryTableScanExec => - // No need to optimize `InMemoryTableScanExec` as it's a leaf node. - TableCacheQueryStageExec(currentStageId, i) + // Apply `queryStageOptimizerRules` so that we can reuse subquery. + // No need to apply `postStageCreationRules` for `InMemoryTableScanExec` + // as it's a leaf node. + TableCacheQueryStageExec(currentStageId, optimizeQueryStage(i, isFinalStage = false)) } currentStageId += 1 setLogicalLinkForNewQueryStage(queryStage, plan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 947a7314142f..1f05adc57a4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -125,7 +125,7 @@ case class InsertAdaptiveSparkPlan( /** * Returns an expression-id-to-execution-plan map for all the sub-queries. * For each sub-query, generate the adaptive execution plan for each sub-query by applying this - * rule, or reuse the execution plan from another sub-query of the same semantics if possible. + * rule. */ private def buildSubqueryMap(plan: SparkPlan): Map[Long, BaseSubqueryExec] = { val subqueryMap = mutable.HashMap.empty[Long, BaseSubqueryExec] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala index c1d0e93e3b97..4222e4e3a8ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala @@ -33,10 +33,13 @@ case class ReuseAdaptiveSubquery( plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { case sub: ExecSubqueryExpression => - val newPlan = reuseMap.getOrElseUpdate(sub.plan.canonicalized, sub.plan) - if (newPlan.ne(sub.plan)) { - sub.withNewPlan(ReusedSubqueryExec(newPlan)) - } else { + // `InsertAdaptiveSparkPlan` compiles subquery for each exprId, then the java object + // is always `eq` if two subqueries have same exprId. + // Check if the subquery can be reused manually instead of call `getOrElseUpdate`. + reuseMap.get(sub.plan.canonicalized).map { subquery => + sub.withNewPlan(ReusedSubqueryExec(subquery)) + }.getOrElse { + reuseMap.put(sub.plan.canonicalized, sub.plan) sub } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 5548108b9150..1f2235a10a9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH} import org.apache.spark.sql.catalyst.util.DateTimeConstants import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan} -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEPropagateEmptyRelation} import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.functions._ @@ -823,21 +823,33 @@ class CachedTableSuite extends QueryTest with SQLTestUtils test("SPARK-19993 subquery with cached underlying relation") { withTempView("t1") { - Seq(1).toDF("c1").createOrReplaceTempView("t1") - spark.catalog.cacheTable("t1") - - // underlying table t1 is cached as well as the query that refers to it. - val sqlText = - """ - |SELECT * FROM t1 - |WHERE - |NOT EXISTS (SELECT * FROM t1) - """.stripMargin - val ds = sql(sqlText) - assert(getNumInMemoryRelations(ds) == 2) - - val cachedDs = sql(sqlText).cache() - assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.sparkPlan) == 3) + Seq(false, true).foreach { enabled => + withSQLConf( + SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> enabled.toString, + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> + AQEPropagateEmptyRelation.ruleName) { + + Seq(1).toDF("c1").createOrReplaceTempView("t1") + spark.catalog.cacheTable("t1") + + // underlying table t1 is cached as well as the query that refers to it. + val sqlText = + """ + |SELECT * FROM t1 + |WHERE + |NOT EXISTS (SELECT * FROM t1) + """.stripMargin + val ds = sql(sqlText) + assert(getNumInMemoryRelations(ds) == 2) + + val cachedDs = sql(sqlText).cache() + cachedDs.collect() + assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.executedPlan) == 3) + + cachedDs.unpersist() + spark.catalog.uncacheTable("t1") + } + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 7d0879c21d5f..58936f5d8dc8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2826,6 +2826,21 @@ class AdaptiveQueryExecSuite .executedPlan.isInstanceOf[LocalTableScanExec]) } } + + test("SPARK-43376: Improve reuse subquery with table cache") { + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { + withTable("t1", "t2") { + withCache("t1") { + Seq(1).toDF("c1").cache().createOrReplaceTempView("t1") + Seq(2).toDF("c2").createOrReplaceTempView("t2") + + val (_, adaptive) = runAdaptiveAndVerifyResult( + "SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)") + assert(findReusedSubquery(adaptive).size == 1) + } + } + } + } } /** From 97ddbe744afc0b5967326a725106dc945edefd17 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 5 May 2023 09:54:22 +0800 Subject: [PATCH 2/4] fix --- .../adaptive/ReuseAdaptiveSubquery.scala | 2 +- .../org/apache/spark/sql/SubquerySuite.scala | 24 +++++-------------- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala index 4222e4e3a8ad..39e4c5cd38ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala @@ -39,7 +39,7 @@ case class ReuseAdaptiveSubquery( reuseMap.get(sub.plan.canonicalized).map { subquery => sub.withNewPlan(ReusedSubqueryExec(subquery)) }.getOrElse { - reuseMap.put(sub.plan.canonicalized, sub.plan) + reuseMap.putIfAbsent(sub.plan.canonicalized, sub.plan) sub } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 32d913ca3b42..2425854e3c8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2337,15 +2337,9 @@ class SubquerySuite extends QueryTest case rs: ReusedSubqueryExec => rs.child.id } - if (enableAQE) { - assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in the plan") - assert(reusedSubqueryIds.size == 4, - "Missing or unexpected reused ReusedSubqueryExec in the plan") - } else { - assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan") - assert(reusedSubqueryIds.size == 5, - "Missing or unexpected reused ReusedSubqueryExec in the plan") - } + assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan") + assert(reusedSubqueryIds.size == 5, + "Missing or unexpected reused ReusedSubqueryExec in the plan") } } } @@ -2413,15 +2407,9 @@ class SubquerySuite extends QueryTest case rs: ReusedSubqueryExec => rs.child.id } - if (enableAQE) { - assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in the plan") - assert(reusedSubqueryIds.size == 3, - "Missing or unexpected reused ReusedSubqueryExec in the plan") - } else { - assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan") - assert(reusedSubqueryIds.size == 4, - "Missing or unexpected reused ReusedSubqueryExec in the plan") - } + assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan") + assert(reusedSubqueryIds.size == 4, + "Missing or unexpected reused ReusedSubqueryExec in the plan") } } } From 8a41b7a768c37e1ebb76be4854def1bcdc5d903b Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 5 May 2023 13:34:44 +0800 Subject: [PATCH 3/4] address comment --- .../sql/execution/adaptive/ReuseAdaptiveSubquery.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala index 39e4c5cd38ba..bed22b67de46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala @@ -33,9 +33,9 @@ case class ReuseAdaptiveSubquery( plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { case sub: ExecSubqueryExpression => - // `InsertAdaptiveSparkPlan` compiles subquery for each exprId, then the java object - // is always `eq` if two subqueries have same exprId. - // Check if the subquery can be reused manually instead of call `getOrElseUpdate`. + // The subquery can be already reused (the same Java object) due to filter pushdown + // of table cache. If it happens, we just need to wrap the current subquery with + // `ReusedSubqueryExec` and no need to update the `reuseMap`. reuseMap.get(sub.plan.canonicalized).map { subquery => sub.withNewPlan(ReusedSubqueryExec(subquery)) }.getOrElse { From 99f810c4084576eea19231590f4ddf9a9afaf7b4 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 5 May 2023 16:13:03 +0800 Subject: [PATCH 4/4] address comment --- .../sql/execution/adaptive/ReuseAdaptiveSubquery.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala index bed22b67de46..df6849447215 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala @@ -39,8 +39,10 @@ case class ReuseAdaptiveSubquery( reuseMap.get(sub.plan.canonicalized).map { subquery => sub.withNewPlan(ReusedSubqueryExec(subquery)) }.getOrElse { - reuseMap.putIfAbsent(sub.plan.canonicalized, sub.plan) - sub + reuseMap.putIfAbsent(sub.plan.canonicalized, sub.plan) match { + case Some(subquery) => sub.withNewPlan(ReusedSubqueryExec(subquery)) + case None => sub + } } } }