diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index c7f47d2eaaaad..2e295200e9ce9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.Locale +import scala.util.control.NonFatal + import org.apache.spark.internal.LogKeys.OPTIONS import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} @@ -28,8 +30,10 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.classic.Dataset import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper -import org.apache.spark.sql.execution.command.CreateViewCommand +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.execution.command.{CreateViewCommand, DropTempViewCommand} import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils trait BaseCacheTableExec extends LeafV2CommandExec { def relationName: String @@ -53,7 +57,16 @@ trait BaseCacheTableExec extends LeafV2CommandExec { if (!isLazy) { // Performs eager caching. - df.count() + try { + df.count() + } catch { + case NonFatal(e) => + // If the query fails, we should remove the cached table. + Utils.tryLogNonFatalError { + session.sharedState.cacheManager.uncacheQuery(session, planToCache, cascade = false) + } + throw e + } } Seq.empty @@ -99,7 +112,15 @@ case class CacheTableAsSelectExec( isAnalyzed = true, referredTempFunctions = referredTempFunctions ).run(session) - super.run() + try { + super.run() + } catch { + case NonFatal(e) => + Utils.tryLogNonFatalError { + DropTempViewCommand(Identifier.of(Array.empty, tempViewName)).run(session) + } + throw e + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 68299804ea877..53844be265c52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -27,7 +27,7 @@ import scala.concurrent.duration._ import org.apache.commons.io.FileUtils -import org.apache.spark.CleanerListener +import org.apache.spark.{CleanerListener, SparkRuntimeException} import org.apache.spark.executor.DataReadMethod._ import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} @@ -1833,4 +1833,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils } } } + + test("SPARK-52684: Atomicity of cache table on error") { + withTempView("SPARK_52684") { + intercept[SparkRuntimeException] { + spark.sql("CACHE TABLE SPARK_52684 AS SELECT raise_error('SPARK-52684') AS c1") + } + assert(!spark.catalog.tableExists("SPARK_52684")) + } + } }