From 77c011a162565549e7c0952d977e654395ae2e0a Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Wed, 9 Jul 2025 15:52:14 -0700 Subject: [PATCH 1/4] Cleanup shuffle for non-adaptive --- .../spark/sql/execution/SQLExecution.scala | 10 +++++---- .../sql/execution/QueryExecutionSuite.scala | 21 ++++++++++++------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 9dcb38f8ff10e..dc59235ee4c0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -19,10 +19,8 @@ package org.apache.spark.sql.execution import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, ExecutorService} import java.util.concurrent.atomic.AtomicLong - import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal - import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkContext, SparkEnv, SparkException, SparkThrowable, SparkThrowableHelper} import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, SPARK_JOB_INTERRUPT_ON_CANCEL} import org.apache.spark.internal.Logging @@ -30,6 +28,7 @@ import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PRE import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.SQL_EVENT_TRUNCATE_LENGTH @@ -178,8 +177,11 @@ object SQLExecution extends Logging { val shuffleIds = queryExecution.executedPlan match { case ae: AdaptiveSparkPlanExec => ae.context.shuffleIds.asScala.keys - case _ => - Iterable.empty + case nonAdaptivePlan => + nonAdaptivePlan.collect { + case exec: ShuffleExchangeLike => + exec.shuffleId + } } shuffleIds.foreach { shuffleId => queryExecution.shuffleCleanupMode match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 1aab2a855bb4a..3e27beaf46f9a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -350,14 +350,19 @@ class QueryExecutionSuite extends SharedSparkSession { } test("SPARK-47764: Cleanup shuffle dependencies - RemoveShuffleFiles mode") { - val plan = spark.range(100).repartition(10).logicalPlan - val df = Dataset.ofRows(spark, plan, RemoveShuffleFiles) - df.collect() - - val blockManager = spark.sparkContext.env.blockManager - assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) - assert(blockManager.diskBlockManager.getAllBlocks().isEmpty) - cleanupShuffles() + Seq(true, false).foreach { adaptiveEnabled => { + withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) { + val plan = spark.range(100).repartition(10).logicalPlan + val df = Dataset.ofRows(spark, plan, RemoveShuffleFiles) + df.collect() + + val blockManager = spark.sparkContext.env.blockManager + assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) + assert(blockManager.diskBlockManager.getAllBlocks().isEmpty) + cleanupShuffles() + } + } + } } test("SPARK-35378: Return UnsafeRow in CommandResultExecCheck execute methods") { From 4ae3f85ccca1eca887e0534c197ad17850e982da Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Wed, 9 Jul 2025 17:37:16 -0700 Subject: [PATCH 2/4] Fix scalastyle violation --- .../scala/org/apache/spark/sql/execution/SQLExecution.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index dc59235ee4c0c..c5c2f9bb6a6f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -19,8 +19,10 @@ package org.apache.spark.sql.execution import java.util.concurrent.{CompletableFuture, ConcurrentHashMap, ExecutorService} import java.util.concurrent.atomic.AtomicLong + import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal + import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkContext, SparkEnv, SparkException, SparkThrowable, SparkThrowableHelper} import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, SPARK_JOB_INTERRUPT_ON_CANCEL} import org.apache.spark.internal.Logging From bcd1ee314aa2e2249d6818b82c3f4b4aad32b379 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Wed, 9 Jul 2025 21:04:55 -0700 Subject: [PATCH 3/4] Fix indent --- .../org/apache/spark/sql/execution/QueryExecutionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 3e27beaf46f9a..cf59c1a867a0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -360,9 +360,9 @@ class QueryExecutionSuite extends SharedSparkSession { assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) assert(blockManager.diskBlockManager.getAllBlocks().isEmpty) cleanupShuffles() + } } } - } } test("SPARK-35378: Return UnsafeRow in CommandResultExecCheck execute methods") { From cdb7dc74f3f1497a63fd45df0b12803aab3204dc Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Wed, 16 Jul 2025 17:42:17 -0700 Subject: [PATCH 4/4] Address review comments: run relevenat tests in both adaptive and non adaptive modes --- .../sql/execution/QueryExecutionSuite.scala | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index cf59c1a867a0f..0d14faaf8144a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -328,25 +328,35 @@ class QueryExecutionSuite extends SharedSparkSession { } test("SPARK-47764: Cleanup shuffle dependencies - DoNotCleanup mode") { - val plan = spark.range(100).repartition(10).logicalPlan - val df = Dataset.ofRows(spark, plan, DoNotCleanup) - df.collect() + Seq(true, false).foreach { adaptiveEnabled => { + withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) { + val plan = spark.range(100).repartition(10).logicalPlan + val df = Dataset.ofRows(spark, plan, DoNotCleanup) + df.collect() - val blockManager = spark.sparkContext.env.blockManager - assert(blockManager.migratableResolver.getStoredShuffles().nonEmpty) - assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty) - cleanupShuffles() + val blockManager = spark.sparkContext.env.blockManager + assert(blockManager.migratableResolver.getStoredShuffles().nonEmpty) + assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty) + cleanupShuffles() + } + } + } } test("SPARK-47764: Cleanup shuffle dependencies - SkipMigration mode") { - val plan = spark.range(100).repartition(10).logicalPlan - val df = Dataset.ofRows(spark, plan, SkipMigration) - df.collect() + Seq(true, false).foreach { adaptiveEnabled => { + withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) { + val plan = spark.range(100).repartition(10).logicalPlan + val df = Dataset.ofRows(spark, plan, SkipMigration) + df.collect() - val blockManager = spark.sparkContext.env.blockManager - assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) - assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty) - cleanupShuffles() + val blockManager = spark.sparkContext.env.blockManager + assert(blockManager.migratableResolver.getStoredShuffles().isEmpty) + assert(blockManager.diskBlockManager.getAllBlocks().nonEmpty) + cleanupShuffles() + } + } + } } test("SPARK-47764: Cleanup shuffle dependencies - RemoveShuffleFiles mode") {