diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 701f108181f0..c983fb3d5a03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} import java.lang.{Iterable => JIterable} +import java.lang.reflect.InvocationTargetException import java.util.{Locale, Map => JMap} import scala.collection.JavaConverters._ @@ -30,10 +31,10 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Order} +import org.apache.hadoop.hive.metastore.api.{AlreadyExistsException, Database => HiveDatabase, FieldSchema, Order} import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} import org.apache.hadoop.hive.ql.Driver -import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} +import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState @@ -43,7 +44,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression @@ -541,7 +542,17 @@ private[hive] class HiveClientImpl( table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = withHiveState { - shim.createPartitions(client, db, table, parts, ignoreIfExists) + def replaceExistException(e: Throwable): Unit = e match { + case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] => + throw new PartitionsAlreadyExistException(db, table, parts.map(_.spec)) + case _ => throw e + } + try { + shim.createPartitions(client, db, table, parts, ignoreIfExists) + } catch { + case e: InvocationTargetException => replaceExistException(e.getCause) + case e: Throwable => replaceExistException(e) + } } override def dropPartitions( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 7c66ff699f70..2a4ef11f1082 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.util.quietly @@ -458,6 +458,27 @@ class VersionsSuite extends SparkFunSuite with Logging { assert(client.getPartitionOption("default", "src_part", spec).isEmpty) } + test(s"$version: createPartitions if already exists") { + val partitions = Seq(CatalogTablePartition( + Map("key1" -> "101", "key2" -> "102"), + storageFormat)) + try { + client.createPartitions("default", "src_part", partitions, ignoreIfExists = false) + val errMsg = intercept[PartitionsAlreadyExistException] { + client.createPartitions("default", "src_part", partitions, ignoreIfExists = false) + }.getMessage + assert(errMsg.contains("partitions already exists")) + } finally { + client.dropPartitions( + "default", + "src_part", + partitions.map(_.spec), + ignoreIfNotExists = true, + purge = false, + retainData = false) + } + } + /////////////////////////////////////////////////////////////////////////// // Function related API /////////////////////////////////////////////////////////////////////////// diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 90915e0b4f21..35e83ae27667 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -31,7 +31,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.functions._ @@ -2351,4 +2351,21 @@ class HiveDDLSuite } } } + + test("SPARK-33742: partition already exists") { + withTable("t") { + sql(s"CREATE TABLE t (data string) PARTITIONED BY (id bigint)") + sql(s"ALTER TABLE t ADD PARTITION (id=2) LOCATION 'loc1'") + + val errMsg = intercept[PartitionsAlreadyExistException] { + sql(s"ALTER TABLE t ADD PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("The following partitions already exists")) + + sql(s"ALTER TABLE t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkAnswer(sql("SHOW PARTITIONS t"), Seq(Row("id=1"), Row("id=2"))) + } + } }