diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala index 0cf0b395f139b..9d2c58b7e4351 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -21,6 +21,7 @@ import org.scalactic.source.Position import org.scalatest.Tag import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.internal.SQLConf @@ -184,4 +185,21 @@ trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { "The spec (part0) must match the partition spec (part0, part1)")) } } + + test("partition already exists") { + withNsTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") + + val errMsg = intercept[PartitionsAlreadyExistException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("The following partitions already exists")) + + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala index 295ce1d3da13f..b29564e1d81b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.command.v1 -import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.execution.command @@ -44,21 +43,4 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit } } -class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession { - test("partition already exists") { - withNsTable("ns", "tbl") { t => - sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") - - val errMsg = intercept[PartitionsAlreadyExistException] { - sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - }.getMessage - assert(errMsg.contains("The following partitions already exists")) - - sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) - } - } -} +class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala index b15235d17671a..09921c8d8a5eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.v2 import org.apache.spark.SparkConf import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{PartitionsAlreadyExistException, ResolvePartitionSpec} +import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog} import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier} @@ -60,23 +60,6 @@ class AlterTableAddPartitionSuite assert(partMetadata.get("location") === expected) } - test("partition already exists") { - withNsTable("ns", "tbl") { t => - sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") - - val errMsg = intercept[PartitionsAlreadyExistException] { - sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - }.getMessage - assert(errMsg.contains("The following partitions already exists")) - - sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) - } - } - test("SPARK-33650: add partition into a table which doesn't support partition management") { withNsTable("ns", "tbl", s"non_part_$catalog") { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index b4ebf153fc178..0b19e5e6e8c84 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} import java.lang.{Iterable => JIterable} +import java.lang.reflect.InvocationTargetException import java.nio.charset.StandardCharsets.UTF_8 import java.util.{Locale, Map => JMap} import java.util.concurrent.TimeUnit._ @@ -48,7 +49,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression @@ -598,7 +599,17 @@ private[hive] class HiveClientImpl( table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = withHiveState { - shim.createPartitions(client, db, table, parts, ignoreIfExists) + def replaceExistException(e: Throwable): Unit = e match { + case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] => + throw new PartitionsAlreadyExistException(db, table, parts.map(_.spec)) + case _ => throw e + } + try { + shim.createPartitions(client, db, table, parts, ignoreIfExists) + } catch { + case e: InvocationTargetException => replaceExistException(e.getCause) + case e: Throwable => replaceExistException(e) + } } override def dropPartitions( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 684529aa330a7..b5500eaf47158 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.util.quietly @@ -594,6 +594,27 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(client.getPartitionOption("default", "src_part", spec).isEmpty) } + test(s"$version: createPartitions if already exists") { + val partitions = Seq(CatalogTablePartition( + Map("key1" -> "101", "key2" -> "102"), + storageFormat)) + try { + client.createPartitions("default", "src_part", partitions, ignoreIfExists = false) + val errMsg = intercept[PartitionsAlreadyExistException] { + client.createPartitions("default", "src_part", partitions, ignoreIfExists = false) + }.getMessage + assert(errMsg.contains("partitions already exists")) + } finally { + client.dropPartitions( + "default", + "src_part", + partitions.map(_.spec), + ignoreIfNotExists = true, + purge = false, + retainData = false) + } + } + /////////////////////////////////////////////////////////////////////////// // Function related API /////////////////////////////////////////////////////////////////////////// diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala index ef0ec8d9bd69f..73776c3ef79fa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive.execution.command -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.command.v1 import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -26,21 +25,4 @@ class AlterTableAddPartitionSuite with TestHiveSingleton { override def version: String = "Hive V1" override def defaultUsing: String = "USING HIVE" - - test("partition already exists") { - withNsTable("ns", "tbl") { t => - sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") - sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") - - val errMsg = intercept[AnalysisException] { - sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - }.getMessage - assert(errMsg.contains("already exists")) - - sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) - } - } }