diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index ea675b76607d..19bb7b5d3e10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -493,11 +493,26 @@ class InMemoryCatalog( table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = synchronized { requireTableExists(db, table) - if (partialSpec.nonEmpty) { - throw new UnsupportedOperationException( - "listPartition with partial partition spec is not implemented") + + partialSpec match { + case None => catalog(db).tables(table).partitions.values.toSeq + case Some(partial) => + catalog(db).tables(table).partitions.toSeq.collect { + case (spec, partition) if isPartialPartitionSpec(partial, spec) => partition + } + } + } + + /** + * Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a + * partial partition spec w.r.t. PARTITION (a=1,b=2). + */ + private def isPartialPartitionSpec( + spec1: TablePartitionSpec, + spec2: TablePartitionSpec): Boolean = { + spec1.forall { + case (partitionColumn, value) => spec2(partitionColumn) == value } - catalog(db).tables(table).partitions.values.toSeq } override def listPartitionsByFilter( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index f283f4287c5b..d13875af7fe3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -300,6 +300,17 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true) } + test("list partitions with partial partition spec") { + val catalog = newBasicCatalog() + val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1"))) + assert(parts.length == 1) + assert(parts.head.spec == part1.spec) + + // if no partition is matched for the given partition spec, an empty list should be returned. + assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown", "b" -> "1"))).isEmpty) + assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown"))).isEmpty) + } + test("drop partitions") { val catalog = newBasicCatalog() assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 00c646b9185b..3cfa639a2fc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -343,13 +343,19 @@ case class TruncateTableCommand( DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION") } val locations = - // TODO: The `InMemoryCatalog` doesn't support listPartition with partial partition spec. - if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") { - Seq(table.storage.locationUri) - } else if (table.partitionColumnNames.isEmpty) { + if (table.partitionColumnNames.isEmpty) { Seq(table.storage.locationUri) } else { - catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri) + // Here we diverge from Hive when the given partition spec contains all partition columns + // but no partition is matched: Hive will throw an exception and we just do nothing. + val normalizedSpec = partitionSpec.map { spec => + PartitioningUtils.normalizePartitionSpec( + spec, + table.partitionColumnNames, + table.identifier.quotedString, + spark.sessionState.conf.resolver) + } + catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri) } val hadoopConf = spark.sessionState.newHadoopConf() locations.foreach { location => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 52b09c54464e..864af8d578b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1628,29 +1628,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("truncate table - datasource table") { import testImplicits._ - val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") + val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") // Test both a Hive compatible and incompatible code path. Seq("json", "parquet").foreach { format => withTable("rectangles") { data.write.format(format).saveAsTable("rectangles") assume(spark.table("rectangles").collect().nonEmpty, "bad test; table was empty to begin with") + sql("TRUNCATE TABLE rectangles") assert(spark.table("rectangles").collect().isEmpty) + + // not supported since the table is not partitioned + assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)") } } + } - withTable("rectangles", "rectangles2") { - data.write.saveAsTable("rectangles") - data.write.partitionBy("length").saveAsTable("rectangles2") + test("truncate partitioned table - datasource table") { + import testImplicits._ - // not supported since the table is not partitioned - assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)") + val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height") + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") // supported since partitions are stored in the metastore - sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)") - assert(spark.table("rectangles2").collect().isEmpty) + sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)") + assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // support partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=1)") + assert(spark.table("partTable").collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // do nothing if no partition is matched for the given partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=100)") + assert(spark.table("partTable").count() == data.count()) + + // do nothing if no partition is matched for the given non-partial partition spec + // TODO: This behaviour is different from Hive, we should decide whether we need to follow + // Hive's behaviour or stick with our existing behaviour later. + sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)") + assert(spark.table("partTable").count() == data.count()) + + // throw exception if the column in partition spec is not a partition column. + val e = intercept[AnalysisException] { + sql("TRUNCATE TABLE partTable PARTITION (unknown=1)") + } + assert(e.message.contains("unknown is not a valid partition column")) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 682d7d4b163d..4150e649bef8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1098,4 +1098,68 @@ class HiveDDLSuite } } } + + test("truncate table - datasource table") { + import testImplicits._ + + val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") + // Test both a Hive compatible and incompatible code path. + Seq("json", "parquet").foreach { format => + withTable("rectangles") { + data.write.format(format).saveAsTable("rectangles") + assume(spark.table("rectangles").collect().nonEmpty, + "bad test; table was empty to begin with") + + sql("TRUNCATE TABLE rectangles") + assert(spark.table("rectangles").collect().isEmpty) + + // not supported since the table is not partitioned + val e = intercept[AnalysisException] { + sql("TRUNCATE TABLE rectangles PARTITION (width=1)") + } + assert(e.message.contains("Operation not allowed")) + } + } + } + + test("truncate partitioned table - datasource table") { + import testImplicits._ + + val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height") + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // supported since partitions are stored in the metastore + sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)") + assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // support partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=1)") + assert(spark.table("partTable").collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // do nothing if no partition is matched for the given partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=100)") + assert(spark.table("partTable").count() == data.count()) + + // do nothing if no partition is matched for the given non-partial partition spec + // TODO: This behaviour is different from Hive, we should decide whether we need to follow + // Hive's behaviour or stick with our existing behaviour later. + sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)") + assert(spark.table("partTable").count() == data.count()) + + // throw exception if the column in partition spec is not a partition column. + val e = intercept[AnalysisException] { + sql("TRUNCATE TABLE partTable PARTITION (unknown=1)") + } + assert(e.message.contains("unknown is not a valid partition column")) + } + } }