Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.client

import java.io.{File, PrintStream}
import java.lang.{Iterable => JIterable}
import java.lang.reflect.InvocationTargetException
import java.nio.charset.StandardCharsets.UTF_8
import java.util.{Locale, Map => JMap}
import java.util.concurrent.TimeUnit._
Expand Down Expand Up @@ -48,7 +49,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -605,7 +606,17 @@ private[hive] class HiveClientImpl(
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = withHiveState {
shim.createPartitions(client, db, table, parts, ignoreIfExists)
def replaceExistException(e: Throwable): Unit = e match {
case _: HiveException if e.getCause.isInstanceOf[AlreadyExistsException] =>
throw new PartitionsAlreadyExistException(db, table, parts.map(_.spec))
case _ => throw e
}
try {
shim.createPartitions(client, db, table, parts, ignoreIfExists)
} catch {
case e: InvocationTargetException => replaceExistException(e.getCause)
case e: Throwable => replaceExistException(e)
}
}

override def dropPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
Expand Down Expand Up @@ -593,6 +593,27 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
}

test(s"$version: createPartitions if already exists") {
val partitions = Seq(CatalogTablePartition(
Map("key1" -> "101", "key2" -> "102"),
storageFormat))
try {
client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
val errMsg = intercept[PartitionsAlreadyExistException] {
client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
}.getMessage
assert(errMsg.contains("partitions already exists"))
} finally {
client.dropPartitions(
"default",
"src_part",
partitions.map(_.spec),
ignoreIfNotExists = true,
purge = false,
retainData = false)
}
}

///////////////////////////////////////////////////////////////////////////
// Function related API
///////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog.CatalogManager
Expand Down Expand Up @@ -2733,4 +2733,21 @@ class HiveDDLSuite
assert(sql("SELECT * FROM t2 WHERE c = 'A'").collect().isEmpty)
}
}

test("SPARK-33742: partition already exists") {
withTable("t") {
sql(s"CREATE TABLE t (data string) PARTITIONED BY (id bigint)")
sql(s"ALTER TABLE t ADD PARTITION (id=2) LOCATION 'loc1'")

val errMsg = intercept[PartitionsAlreadyExistException] {
sql(s"ALTER TABLE t ADD PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
}.getMessage
assert(errMsg.contains("The following partitions already exists"))

sql(s"ALTER TABLE t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
checkAnswer(sql("SHOW PARTITIONS t"), Seq(Row("id=1"), Row("id=2")))
}
}
}