diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index a04486d36a640..e49ae04e6af6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.{Logging, MessageWithContext} import org.apache.spark.internal.LogKeys._ +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint @@ -224,7 +225,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { nameInCache.length == name.length && nameInCache.zip(name).forall(conf.resolver.tupled) } - plan match { + EliminateSubqueryAliases(plan) match { case LogicalRelationWithTable(_, Some(catalogTable)) => isSameName(catalogTable.identifier.nameParts) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 3cdf906cfd59e..178e1aead43a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -26,6 +26,7 @@ import scala.collection.mutable.HashSet import scala.concurrent.duration._ import org.apache.spark.{CleanerListener, SparkRuntimeException} +import org.apache.spark.SparkConf import org.apache.spark.executor.DataReadMethod._ import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} @@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH} import org.apache.spark.sql.catalyst.util.DateTimeConstants +import org.apache.spark.sql.connector.catalog.InMemoryCatalog import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan, SparkPlanInfo} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEPropagateEmptyRelation} import org.apache.spark.sql.execution.columnar._ @@ -57,6 +59,9 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper { import testImplicits._ + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName) + setupTestData() override def afterEach(): Unit = { @@ -1853,4 +1858,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils assert(!spark.catalog.tableExists("SPARK_52684")) } } + + test("uncache DSv2 table via cache manager correctly uncaches views with logical plans") { + val t = "testcat.tbl" + withTable(t, "v") { + sql(s"CREATE TABLE $t (id int, data string) USING foo") + sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')") + + // cache table + sql(s"CACHE TABLE $t") + assertCached(sql(s"SELECT * FROM $t")) + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, "a"), Row(2, "b"))) + + // create and cache view + spark.table(t).select("id").createOrReplaceTempView("v") + sql("SELECT * FROM v").cache() + assertCached(sql("SELECT * FROM v")) + checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2))) + + // must invalidate only table, view must remain cached (cascade = false) + spark.sharedState.cacheManager.uncacheTableOrView( + spark, + Seq("testcat", "tbl"), + cascade = false) + assertNotCached(sql(s"SELECT * FROM $t")) + assertCached(sql("SELECT * FROM v")) + + // must invalidate view (cascade = true) + spark.sharedState.cacheManager.uncacheTableOrView( + spark, + Seq("testcat", "tbl"), + cascade = true) + + // verify view is not cached anymore + assertNotCached(sql("SELECT * FROM v")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 37233dbd7b5c9..515a60228405e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -232,6 +232,10 @@ abstract class QueryTest extends PlanTest { s"level $storageLevel, but it doesn't.") } + def assertNotCached(query: Dataset[_]): Unit = { + assertCached(query, numCachedTables = 0) + } + /** * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans. */