Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,26 @@ class InMemoryCatalog(
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = synchronized {
requireTableExists(db, table)
if (partialSpec.nonEmpty) {
throw new UnsupportedOperationException(
"listPartition with partial partition spec is not implemented")

partialSpec match {
case None => catalog(db).tables(table).partitions.values.toSeq
case Some(partial) =>
catalog(db).tables(table).partitions.toSeq.collect {
case (spec, partition) if isPartialPartitionSpec(partial, spec) => partition
}
}
}

/**
* Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a
* partial partition spec w.r.t. PARTITION (a=1,b=2).
*/
private def isPartialPartitionSpec(
spec1: TablePartitionSpec,
spec2: TablePartitionSpec): Boolean = {
spec1.forall {
case (partitionColumn, value) => spec2(partitionColumn) == value
}
catalog(db).tables(table).partitions.values.toSeq
}

override def listPartitionsByFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,17 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
}

test("list partitions with partial partition spec") {
val catalog = newBasicCatalog()
val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1")))
assert(parts.length == 1)
assert(parts.head.spec == part1.spec)

// if no partition is matched for the given partition spec, an empty list should be returned.
assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown", "b" -> "1"))).isEmpty)
assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown"))).isEmpty)
}

test("drop partitions") {
val catalog = newBasicCatalog()
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,19 @@ case class TruncateTableCommand(
DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
}
val locations =
// TODO: The `InMemoryCatalog` doesn't support listPartition with partial partition spec.
if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") {
Seq(table.storage.locationUri)
} else if (table.partitionColumnNames.isEmpty) {
if (table.partitionColumnNames.isEmpty) {
Seq(table.storage.locationUri)
} else {
catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri)
// Here we diverge from Hive when the given partition spec contains all partition columns
// but no partition is matched: Hive will throw an exception and we just do nothing.
val normalizedSpec = partitionSpec.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
spark.sessionState.conf.resolver)
}
catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri)
}
val hadoopConf = spark.sessionState.newHadoopConf()
locations.foreach { location =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1628,29 +1628,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {

test("truncate table - datasource table") {
import testImplicits._
val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")

val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
// Test both a Hive compatible and incompatible code path.
Seq("json", "parquet").foreach { format =>
withTable("rectangles") {
data.write.format(format).saveAsTable("rectangles")
assume(spark.table("rectangles").collect().nonEmpty,
"bad test; table was empty to begin with")

sql("TRUNCATE TABLE rectangles")
assert(spark.table("rectangles").collect().isEmpty)

// not supported since the table is not partitioned
assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
}
}
}

withTable("rectangles", "rectangles2") {
data.write.saveAsTable("rectangles")
data.write.partitionBy("length").saveAsTable("rectangles2")
test("truncate partitioned table - datasource table") {
import testImplicits._

// not supported since the table is not partitioned
assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height")

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// supported since partitions are stored in the metastore
sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
assert(spark.table("rectangles2").collect().isEmpty)
sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)")
assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty)
assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty)
}

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// support partial partition spec
sql("TRUNCATE TABLE partTable PARTITION (width=1)")
assert(spark.table("partTable").collect().nonEmpty)
assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty)
}

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// do nothing if no partition is matched for the given partial partition spec
sql("TRUNCATE TABLE partTable PARTITION (width=100)")
assert(spark.table("partTable").count() == data.count())

// do nothing if no partition is matched for the given non-partial partition spec
// TODO: This behaviour is different from Hive, we should decide whether we need to follow
// Hive's behaviour or stick with our existing behaviour later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's our and hive's behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our behaviour: do nothing if no partition is matched, no matter whether the given partition spec is partial or not.

Hive's behaviour: when no partition is matched, do nothing if the given partition spec is partial, throw exception if the given partition spec is non-partial.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we document the behavior difference in the code comments, if we decide to stick with our existing behavior?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think Hive's behavior makes sense here. If I'm giving you an exact match, you should warn me if there is an issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative, of course, is to provide "truncate if exists", which doesn't throw exceptions if that's desired.

sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
assert(spark.table("partTable").count() == data.count())

// throw exception if the column in partition spec is not a partition column.
val e = intercept[AnalysisException] {
sql("TRUNCATE TABLE partTable PARTITION (unknown=1)")
}
assert(e.message.contains("unknown is not a valid partition column"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,4 +1098,68 @@ class HiveDDLSuite
}
}
}

test("truncate table - datasource table") {
import testImplicits._

val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
// Test both a Hive compatible and incompatible code path.
Seq("json", "parquet").foreach { format =>
withTable("rectangles") {
data.write.format(format).saveAsTable("rectangles")
assume(spark.table("rectangles").collect().nonEmpty,
"bad test; table was empty to begin with")

sql("TRUNCATE TABLE rectangles")
assert(spark.table("rectangles").collect().isEmpty)

// not supported since the table is not partitioned
val e = intercept[AnalysisException] {
sql("TRUNCATE TABLE rectangles PARTITION (width=1)")
}
assert(e.message.contains("Operation not allowed"))
}
}
}

test("truncate partitioned table - datasource table") {
import testImplicits._

val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height")

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// supported since partitions are stored in the metastore
sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)")
assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty)
assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty)
}

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// support partial partition spec
sql("TRUNCATE TABLE partTable PARTITION (width=1)")
assert(spark.table("partTable").collect().nonEmpty)
assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty)
}

withTable("partTable") {
data.write.partitionBy("width", "length").saveAsTable("partTable")
// do nothing if no partition is matched for the given partial partition spec
sql("TRUNCATE TABLE partTable PARTITION (width=100)")
assert(spark.table("partTable").count() == data.count())

// do nothing if no partition is matched for the given non-partial partition spec
// TODO: This behaviour is different from Hive, we should decide whether we need to follow
// Hive's behaviour or stick with our existing behaviour later.
sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
assert(spark.table("partTable").count() == data.count())

// throw exception if the column in partition spec is not a partition column.
val e = intercept[AnalysisException] {
sql("TRUNCATE TABLE partTable PARTITION (unknown=1)")
}
assert(e.message.contains("unknown is not a valid partition column"))
}
}
}