Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ license: |
- Valid Base64 string should include symbols from in base64 alphabet (A-Za-z0-9+/), optional padding (`=`), and optional whitespaces. Whitespaces are skipped in conversion except when they are preceded by padding symbol(s). If padding is present it should conclude the string and follow rules described in RFC 4648 § 4.
- Valid hexadecimal strings should include only allowed symbols (0-9A-Fa-f).
- Valid values for `fmt` are case-insensitive `hex`, `base64`, `utf-8`, `utf8`.
- Since Spark 3.4, Spark throws only `PartitionsAlreadyExistException` when it creates partitions but some of them exist already. In Spark 3.3 or earlier, Spark can throw either `PartitionsAlreadyExistException` or `PartitionAlreadyExistsException`.

## Upgrading from Spark SQL 3.2 to 3.3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;

/**
Expand Down Expand Up @@ -50,11 +49,11 @@ public interface SupportsAtomicPartitionManagement extends SupportsPartitionMana
default void createPartition(
InternalRow ident,
Map<String, String> properties)
throws PartitionAlreadyExistsException, UnsupportedOperationException {
throws PartitionsAlreadyExistException, UnsupportedOperationException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark try-catch the errors thrown by these 2 APIs? If yes then we may have bugs as the existing data source implementations may still throw PartitionAlreadyExistsException

Copy link
Member Author

@MaxGekk MaxGekk Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation, Spark invokes createPartition()/createPartitions() from AddPartitionExec (ALTER TABLE .. ADD PARTITION) where all exceptions from the methods are propagated to users directly. But it doesn't matter in our case because AddPartitionExec.run() checks partitions exist BEFORE the invokes, see:

val (existsParts, notExistsParts) =
partSpecs.partition(p => table.partitionExists(p.ident))
if (existsParts.nonEmpty && !ignoreIfExists) {
throw new PartitionsAlreadyExistException(
table.name(), existsParts.map(_.ident), table.partitionSchema())
}

So, this PR doesn't affect ALTER TABLE .. ADD PARTITION but it affects v2 ALTER TABLE .. RENAME PARTITION which propagates the exception directly to users.

Copy link
Contributor

@cloud-fan cloud-fan Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case it's OK to change the type of the exception that is propagated to users.

try {
createPartitions(new InternalRow[]{ident}, new Map[]{properties});
} catch (PartitionsAlreadyExistException e) {
throw new PartitionAlreadyExistsException(e.getMessage());
throw new PartitionsAlreadyExistException(e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException;
import org.apache.spark.sql.catalyst.analysis.PartitionAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException;
import org.apache.spark.sql.types.StructType;

/**
Expand Down Expand Up @@ -59,13 +59,13 @@ public interface SupportsPartitionManagement extends Table {
*
* @param ident a new partition identifier
* @param properties the metadata of a partition
* @throws PartitionAlreadyExistsException If a partition already exists for the identifier
* @throws PartitionsAlreadyExistException If a partition already exists for the identifier
* @throws UnsupportedOperationException If partition property is not supported
*/
void createPartition(
InternalRow ident,
Map<String, String> properties)
throws PartitionAlreadyExistsException, UnsupportedOperationException;
throws PartitionsAlreadyExistException, UnsupportedOperationException;

/**
* Drop a partition from table.
Expand Down Expand Up @@ -147,14 +147,14 @@ Map<String, String> loadPartitionMetadata(InternalRow ident)
* @param to new partition identifier
* @return true if renaming completes successfully otherwise false
* @throws UnsupportedOperationException If partition renaming is not supported
* @throws PartitionAlreadyExistsException If the `to` partition exists already
* @throws PartitionsAlreadyExistException If the `to` partition exists already
* @throws NoSuchPartitionException If the `from` partition does not exist
*
* @since 3.2.0
*/
default boolean renamePartition(InternalRow from, InternalRow to)
throws UnsupportedOperationException,
PartitionAlreadyExistsException,
PartitionsAlreadyExistException,
NoSuchPartitionException {
throw new UnsupportedOperationException("Partition renaming is not supported");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,29 +59,23 @@ class TableAlreadyExistsException(message: String, cause: Option[Throwable] = No
class TempTableAlreadyExistsException(table: String)
extends TableAlreadyExistsException(s"Temporary view '$table' already exists")

class PartitionAlreadyExistsException(message: String) extends AnalysisException(message) {
def this(db: String, table: String, spec: TablePartitionSpec) = {
this(s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n"))
}

def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) = {
this(s"Partition already exists in table $tableName:" +
partitionIdent.toSeq(partitionSchema).zip(partitionSchema.map(_.name))
.map( kv => s"${kv._1} -> ${kv._2}").mkString(","))
}
}

class PartitionsAlreadyExistException(message: String) extends AnalysisException(message) {
def this(db: String, table: String, specs: Seq[TablePartitionSpec]) = {
this(s"The following partitions already exists in table '$table' database '$db':\n"
this(s"The following partitions already exist in table '$table' database '$db':\n"
+ specs.mkString("\n===\n"))
}

def this(db: String, table: String, spec: TablePartitionSpec) =
this(db, table, Seq(spec))

def this(tableName: String, partitionIdents: Seq[InternalRow], partitionSchema: StructType) = {
this(s"The following partitions already exists in table $tableName:" +
this(s"The following partitions already exist in table $tableName:" +
partitionIdents.map(id => partitionSchema.map(_.name).zip(id.toSeq(partitionSchema))
.map( kv => s"${kv._1} -> ${kv._2}").mkString(",")).mkString("\n===\n"))
}

def this(tableName: String, partitionIdent: InternalRow, partitionSchema: StructType) =
this(tableName, Seq(partitionIdent), partitionSchema)
}

class FunctionAlreadyExistsException(db: String, func: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class InMemoryCatalog(
specs: Seq[TablePartitionSpec]): Unit = {
specs.foreach { s =>
if (partitionExists(db, table, s)) {
throw new PartitionAlreadyExistsException(db = db, table = table, spec = s)
throw new PartitionsAlreadyExistException(db = db, table = table, spec = s)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector.catalog
import java.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType

Expand All @@ -39,7 +39,7 @@ class InMemoryAtomicPartitionTable (
ident: InternalRow,
properties: util.Map[String, String]): Unit = {
if (memoryTablePartitions.containsKey(ident)) {
throw new PartitionAlreadyExistsException(name, ident, partitionSchema)
throw new PartitionsAlreadyExistException(name, ident, partitionSchema)
} else {
createPartitionKey(ident.toSeq(schema))
memoryTablePartitions.put(ident, properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -51,7 +51,7 @@ class InMemoryPartitionTable(
ident: InternalRow,
properties: util.Map[String, String]): Unit = {
if (memoryTablePartitions.containsKey(ident)) {
throw new PartitionAlreadyExistsException(name, ident, partitionSchema)
throw new PartitionsAlreadyExistException(name, ident, partitionSchema)
} else {
createPartitionKey(ident.toSeq(schema))
memoryTablePartitions.put(ident, properties)
Expand Down Expand Up @@ -111,7 +111,7 @@ class InMemoryPartitionTable(

override def renamePartition(from: InternalRow, to: InternalRow): Boolean = {
if (memoryTablePartitions.containsKey(to)) {
throw new PartitionAlreadyExistsException(name, to, partitionSchema)
throw new PartitionsAlreadyExistException(name, to, partitionSchema)
} else {
val partValue = memoryTablePartitions.remove(from)
if (partValue == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -218,10 +218,10 @@ class SupportsPartitionManagementSuite extends SparkFunSuite {
test("renamePartition") {
val partTable = createMultiPartTable()

val errMsg1 = intercept[PartitionAlreadyExistsException] {
val errMsg1 = intercept[PartitionsAlreadyExistException] {
partTable.renamePartition(InternalRow(0, "abc"), InternalRow(1, "abc"))
}.getMessage
assert(errMsg1.contains("Partition already exists"))
assert(errMsg1.contains("partitions already exist"))

val newPart = InternalRow(2, "xyz")
val errMsg2 = intercept[NoSuchPartitionException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, PartitionsAlreadyExistException}
import org.apache.spark.sql.internal.SQLConf

/**
Expand Down Expand Up @@ -75,15 +75,15 @@ trait AlterTableRenamePartitionSuiteBase extends QueryTest with DDLCommandTestUt
}
}

test("target partition exists") {
test("target partitions exist") {
withNamespaceAndTable("ns", "tbl") { t =>
createSinglePartTable(t)
sql(s"INSERT INTO $t PARTITION (id = 2) SELECT 'def'")
checkPartitions(t, Map("id" -> "1"), Map("id" -> "2"))
val errMsg = intercept[PartitionAlreadyExistsException] {
val errMsg = intercept[PartitionsAlreadyExistException] {
sql(s"ALTER TABLE $t PARTITION (id = 1) RENAME TO PARTITION (id = 2)")
}.getMessage
assert(errMsg.contains("Partition already exists"))
assert(errMsg.contains("partitions already exist"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit
" PARTITION (id=2) LOCATION 'loc1'")
}.getMessage
assert(errMsg ===
"""The following partitions already exists in table 'tbl' database 'ns':
"""The following partitions already exist in table 'tbl' database 'ns':
|Map(id -> 2)""".stripMargin)

sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class AlterTableAddPartitionSuite
sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
}.getMessage
assert(errMsg === s"The following partitions already exists in table $t:id -> 2")
assert(errMsg === s"The following partitions already exist in table $t:id -> 2")

sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.CatalogUtils.stringToURI
Expand Down Expand Up @@ -707,7 +707,7 @@ private[hive] class HiveClientImpl(
hiveTable.setOwner(userName)
specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
if (shim.getPartition(client, hiveTable, newSpec.asJava, false) != null) {
throw new PartitionAlreadyExistsException(db, table, newSpec)
throw new PartitionsAlreadyExistException(db, table, newSpec)
}
val hivePart = getPartitionOption(rawHiveTable, oldSpec)
.map { p => toHivePartition(p.copy(spec = newSpec), hiveTable) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String])
val errMsg = intercept[PartitionsAlreadyExistException] {
client.createPartitions("default", "src_part", partitions, ignoreIfExists = false)
}.getMessage
assert(errMsg.contains("partitions already exists"))
assert(errMsg.contains("partitions already exist"))
} finally {
client.dropPartitions(
"default",
Expand Down