diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index d131a6bef06e..93ef5d447b07 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} import java.lang.{Iterable => JIterable} +import java.lang.reflect.InvocationTargetException import java.nio.charset.StandardCharsets.UTF_8 import java.util.{Locale, Map => JMap} import java.util.concurrent.TimeUnit._ @@ -48,7 +49,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression @@ -605,7 +606,17 @@ private[hive] class HiveClientImpl( table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = withHiveState { - shim.createPartitions(client, db, table, parts, ignoreIfExists) + def replaceExistException(e: Throwable): Unit = e match { + case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] => + throw new PartitionsAlreadyExistException(db, table, parts.map(_.spec)) + case _ => throw e + } + try { + shim.createPartitions(client, db, table, parts, ignoreIfExists) + } catch { + case e: InvocationTargetException => replaceExistException(e.getCause) + case e: Throwable => replaceExistException(e) + } } override def dropPartitions( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 5ddaaf3c9a77..41b90db3b2e7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.util.quietly @@ -593,6 +593,27 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(client.getPartitionOption("default", "src_part", spec).isEmpty) } + test(s"$version: createPartitions if already exists") { + val partitions = Seq(CatalogTablePartition( + Map("key1" -> "101", "key2" -> "102"), + storageFormat)) + try { + client.createPartitions("default", "src_part", partitions, ignoreIfExists = false) + val errMsg = intercept[PartitionsAlreadyExistException] { + client.createPartitions("default", "src_part", partitions, ignoreIfExists = false) + }.getMessage + assert(errMsg.contains("partitions already exists")) + } finally { + client.dropPartitions( + "default", + "src_part", + partitions.map(_.spec), + ignoreIfNotExists = true, + purge = false, + retainData = false) + } + } + /////////////////////////////////////////////////////////////////////////// // Function related API /////////////////////////////////////////////////////////////////////////// diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 86385f0f56b5..718b6f276c62 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connector.catalog.CatalogManager @@ -2733,4 +2733,21 @@ class HiveDDLSuite assert(sql("SELECT * FROM t2 WHERE c = 'A'").collect().isEmpty) } } + + test("SPARK-33742: partition already exists") { + withTable("t") { + sql(s"CREATE TABLE t (data string) PARTITIONED BY (id bigint)") + sql(s"ALTER TABLE t ADD PARTITION (id=2) LOCATION 'loc1'") + + val errMsg = intercept[PartitionsAlreadyExistException] { + sql(s"ALTER TABLE t ADD PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("The following partitions already exists")) + + sql(s"ALTER TABLE t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkAnswer(sql("SHOW PARTITIONS t"), Seq(Row("id=1"), Row("id=2"))) + } + } }