diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index bc7f17fd5cb13..18cc579e4f9ea 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -33,6 +33,7 @@ license: | - Valid Base64 string should include symbols from in base64 alphabet (A-Za-z0-9+/), optional padding (`=`), and optional whitespaces. Whitespaces are skipped in conversion except when they are preceded by padding symbol(s). If padding is present it should conclude the string and follow rules described in RFC 4648 ยง 4. - Valid hexadecimal strings should include only allowed symbols (0-9A-Fa-f). - Valid values for `fmt` are case-insensitive `hex`, `base64`, `utf-8`, `utf8`. + - Since Spark 3.4, Spark throws only `PartitionsAlreadyExistException` when it creates partitions but some of them exist already. In Spark 3.3 or earlier, Spark can throw either `PartitionsAlreadyExistException` or `PartitionAlreadyExistsException`. ## Upgrading from Spark SQL 3.2 to 3.3 diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java index e2c693f2d0a92..09b26d8f793f7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsAtomicPartitionManagement.java @@ -22,7 +22,6 @@ import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException; -import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; /** @@ -50,11 +49,11 @@ public interface SupportsAtomicPartitionManagement extends SupportsPartitionMana default void createPartition( InternalRow ident, Map properties) - throws PartitionAlreadyExistsException, UnsupportedOperationException { + throws PartitionsAlreadyExistException, UnsupportedOperationException { try { createPartitions(new InternalRow[]{ident}, new Map[]{properties}); } catch (PartitionsAlreadyExistException e) { - throw new PartitionAlreadyExistsException(e.getMessage()); + throw new PartitionsAlreadyExistException(e.getMessage()); } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java index ec2b61a766499..4830e193222fc 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java @@ -22,7 +22,7 @@ import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException; -import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException; import org.apache.spark.sql.types.StructType; /** @@ -59,13 +59,13 @@ public interface SupportsPartitionManagement extends Table { * * @param ident a new partition identifier * @param properties the metadata of a partition - * @throws PartitionAlreadyExistsException If a partition already exists for the identifier + * @throws PartitionsAlreadyExistException If a partition already exists for the identifier * @throws UnsupportedOperationException If partition property is not supported */ void createPartition( InternalRow ident, Map properties) - throws PartitionAlreadyExistsException, UnsupportedOperationException; + throws PartitionsAlreadyExistException, UnsupportedOperationException; /** * Drop a partition from table. @@ -147,14 +147,14 @@ Map loadPartitionMetadata(InternalRow ident) * @param to new partition identifier * @return true if renaming completes successfully otherwise false * @throws UnsupportedOperationException If partition renaming is not supported - * @throws PartitionAlreadyExistsException If the `to` partition exists already + * @throws PartitionsAlreadyExistException If the `to` partition exists already * @throws NoSuchPartitionException If the `from` partition does not exist * * @since 3.2.0 */ default boolean renamePartition(InternalRow from, InternalRow to) throws UnsupportedOperationException, - PartitionAlreadyExistsException, + PartitionsAlreadyExistException, NoSuchPartitionException { throw new UnsupportedOperationException("Partition renaming is not supported"); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index f65c29a06cc65..c1dd80e3f77e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -59,29 +59,23 @@ class TableAlreadyExistsException(message: String, cause: Option[Throwable] = No class TempTableAlreadyExistsException(table: String) extends TableAlreadyExistsException(s"Temporary view '$table' already exists") -class PartitionAlreadyExistsException(message: String) extends AnalysisException(message) { - def this(db: String, table: String, spec: TablePartitionSpec) = { - this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n")) - } - - def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = { - this(s"Partition already exists in table $tableName:" + - partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name)) - .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")) - } -} - class PartitionsAlreadyExistException(message: String) extends AnalysisException(message) { def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = { - this(s"The following partitions already exists in table '$table' database '$db':\n" + this(s"The following partitions already exist in table '$table' database '$db':\n" + specs.mkString("\n===\n")) } + def this(db: String, table: String, spec: TablePartitionSpec) = + this(db, table, Seq(spec)) + def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = { - this(s"The following partitions already exists in table $tableName:" + + this(s"The following partitions already exist in table $tableName:" + partitionIdents.map(id => partitionSchema.map(_.name).zip(id.toSeq(partitionSchema)) .map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n")) } + + def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = + this(tableName, Seq(partitionIdent), partitionSchema) } class FunctionAlreadyExistsException(db: String, func: String) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 218a342e669bd..90e824284bdbb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -90,7 +90,7 @@ class InMemoryCatalog( specs: Seq[TablePartitionSpec]): Unit = { specs.foreach { s => if (partitionExists(db, table, s)) { - throw new PartitionAlreadyExistsException(db = db, table = table, spec = s) + throw new PartitionsAlreadyExistException(db = db, table = table, spec = s) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala index a48eb04a98806..dd3d77f26cdd3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryAtomicPartitionTable.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector.catalog import java.util import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -39,7 +39,7 @@ class InMemoryAtomicPartitionTable ( ident: InternalRow, properties: util.Map[String, String]): Unit = { if (memoryTablePartitions.containsKey(ident)) { - throw new PartitionAlreadyExistsException(name, ident, partitionSchema) + throw new PartitionsAlreadyExistException(name, ident, partitionSchema) } else { createPartitionKey(ident.toSeq(schema)) memoryTablePartitions.put(ident, properties) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala index 660140e282ecb..7280d6a5b0776 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryPartitionTable.scala @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -51,7 +51,7 @@ class InMemoryPartitionTable( ident: InternalRow, properties: util.Map[String, String]): Unit = { if (memoryTablePartitions.containsKey(ident)) { - throw new PartitionAlreadyExistsException(name, ident, partitionSchema) + throw new PartitionsAlreadyExistException(name, ident, partitionSchema) } else { createPartitionKey(ident.toSeq(schema)) memoryTablePartitions.put(ident, properties) @@ -111,7 +111,7 @@ class InMemoryPartitionTable( override def renamePartition(from: InternalRow, to: InternalRow): Boolean = { if (memoryTablePartitions.containsKey(to)) { - throw new PartitionAlreadyExistsException(name, to, partitionSchema) + throw new PartitionsAlreadyExistException(name, to, partitionSchema) } else { val partValue = memoryTablePartitions.remove(from) if (partValue == null) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala index e5aeb90b841a6..7f7c529944501 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SupportsPartitionManagementSuite.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -218,10 +218,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite { test("renamePartition") { val partTable = createMultiPartTable() - val errMsg1 = intercept[PartitionAlreadyExistsException] { + val errMsg1 = intercept[PartitionsAlreadyExistException] { partTable.renamePartition(InternalRow(0, "abc"), InternalRow(1, "abc")) }.getMessage - assert(errMsg1.contains("Partition already exists")) + assert(errMsg1.contains("partitions already exist")) val newPart = InternalRow(2, "xyz") val errMsg2 = intercept[NoSuchPartitionException] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala index 080cd89c4a209..6e67946a557ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableRenamePartitionSuiteBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException} import org.apache.spark.sql.internal.SQLConf /** @@ -75,15 +75,15 @@ trait AlterTableRenamePartitionSuiteBase extends QueryTest with DDLCommandTestUt } } - test("target partition exists") { + test("target partitions exist") { withNamespaceAndTable("ns", "tbl") { t => createSinglePartTable(t) sql(s"INSERT INTO $t PARTITION (id = 2) SELECT 'def'") checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) - val errMsg = intercept[PartitionAlreadyExistsException] { + val errMsg = intercept[PartitionsAlreadyExistException] { sql(s"ALTER TABLE $t PARTITION (id = 1) RENAME TO PARTITION (id = 2)") }.getMessage - assert(errMsg.contains("Partition already exists")) + assert(errMsg.contains("partitions already exist")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala index 6b2308766f6c8..54287cc6a47bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala @@ -148,7 +148,7 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit " PARTITION (id=2) LOCATION 'loc1'") }.getMessage assert(errMsg === - """The following partitions already exists in table 'tbl' database 'ns': + """The following partitions already exist in table 'tbl' database 'ns': |Map(id -> 2)""".stripMargin) sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala index a238dfcf2dd9c..dc6e5a2909da0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -111,7 +111,7 @@ class AlterTableAddPartitionSuite sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + " PARTITION (id=2) LOCATION 'loc1'") }.getMessage - assert(errMsg === s"The following partitions already exists in table $t:id -> 2") + assert(errMsg === s"The following partitions already exist in table $t:id -> 2") sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + " PARTITION (id=2) LOCATION 'loc1'") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bef320174ecd7..db600bcd3d459 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -50,7 +50,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI @@ -707,7 +707,7 @@ private[hive] class HiveClientImpl( hiveTable.setOwner(userName) specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => if (shim.getPartition(client, hiveTable, newSpec.asJava, false) != null) { - throw new PartitionAlreadyExistsException(db, table, newSpec) + throw new PartitionsAlreadyExistException(db, table, newSpec) } val hivePart = getPartitionOption(rawHiveTable, oldSpec) .map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 184e03d088cc5..e6abc7b96c17d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -528,7 +528,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) val errMsg = intercept[PartitionsAlreadyExistException] { client.createPartitions("default", "src_part", partitions, ignoreIfExists = false) }.getMessage - assert(errMsg.contains("partitions already exists")) + assert(errMsg.contains("partitions already exist")) } finally { client.dropPartitions( "default",