From 8941e105596219cfbbcb46edb8f852f6f23425ff Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 5 Nov 2016 14:21:05 +0800 Subject: [PATCH 1/2] data source tables should support truncating partition --- .../catalyst/catalog/InMemoryCatalog.scala | 23 ++++++-- .../catalog/ExternalCatalogSuite.scala | 11 ++++ .../spark/sql/execution/command/tables.scala | 14 +++-- .../sql/execution/command/DDLSuite.scala | 43 +++++++++++--- .../sql/hive/execution/HiveDDLSuite.scala | 58 +++++++++++++++++++ 5 files changed, 132 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index ea675b76607d..19bb7b5d3e10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -493,11 +493,26 @@ class InMemoryCatalog( table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = synchronized { requireTableExists(db, table) - if (partialSpec.nonEmpty) { - throw new UnsupportedOperationException( - "listPartition with partial partition spec is not implemented") + + partialSpec match { + case None => catalog(db).tables(table).partitions.values.toSeq + case Some(partial) => + catalog(db).tables(table).partitions.toSeq.collect { + case (spec, partition) if isPartialPartitionSpec(partial, spec) => partition + } + } + } + + /** + * Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a + * partial partition spec w.r.t. PARTITION (a=1,b=2). + */ + private def isPartialPartitionSpec( + spec1: TablePartitionSpec, + spec2: TablePartitionSpec): Boolean = { + spec1.forall { + case (partitionColumn, value) => spec2(partitionColumn) == value } - catalog(db).tables(table).partitions.values.toSeq } override def listPartitionsByFilter( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index f283f4287c5b..d13875af7fe3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -300,6 +300,17 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true) } + test("list partitions with partial partition spec") { + val catalog = newBasicCatalog() + val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1"))) + assert(parts.length == 1) + assert(parts.head.spec == part1.spec) + + // if no partition is matched for the given partition spec, an empty list should be returned. + assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown", "b" -> "1"))).isEmpty) + assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown"))).isEmpty) + } + test("drop partitions") { val catalog = newBasicCatalog() assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 00c646b9185b..77ba58494183 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -343,13 +343,17 @@ case class TruncateTableCommand( DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION") } val locations = - // TODO: The `InMemoryCatalog` doesn't support listPartition with partial partition spec. - if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") { - Seq(table.storage.locationUri) - } else if (table.partitionColumnNames.isEmpty) { + if (table.partitionColumnNames.isEmpty) { Seq(table.storage.locationUri) } else { - catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri) + val normalizedSpec = partitionSpec.map { spec => + PartitioningUtils.normalizePartitionSpec( + spec, + table.partitionColumnNames, + table.identifier.quotedString, + spark.sessionState.conf.resolver) + } + catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri) } val hadoopConf = spark.sessionState.newHadoopConf() locations.foreach { location => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 52b09c54464e..369f2fde0dd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1628,29 +1628,56 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("truncate table - datasource table") { import testImplicits._ - val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") + val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") // Test both a Hive compatible and incompatible code path. Seq("json", "parquet").foreach { format => withTable("rectangles") { data.write.format(format).saveAsTable("rectangles") assume(spark.table("rectangles").collect().nonEmpty, "bad test; table was empty to begin with") + sql("TRUNCATE TABLE rectangles") assert(spark.table("rectangles").collect().isEmpty) + + // not supported since the table is not partitioned + assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)") } } + } - withTable("rectangles", "rectangles2") { - data.write.saveAsTable("rectangles") - data.write.partitionBy("length").saveAsTable("rectangles2") + test("truncate partitioned table - datasource table") { + import testImplicits._ - // not supported since the table is not partitioned - assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)") + val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height") + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") // supported since partitions are stored in the metastore - sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)") - assert(spark.table("rectangles2").collect().isEmpty) + sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)") + assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // support partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=1)") + assert(spark.table("partTable").collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // do nothing if no partition is matched for the given partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=100)") + assert(spark.table("partTable").count() == data.count()) + + // do nothing if no partition is matched for the given non-partial partition spec + // TODO: This behaviour is different from Hive, we should decide whether we need to follow + // Hive's behaviour or stick with our existing behaviour later. + sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)") + assert(spark.table("partTable").count() == data.count()) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 682d7d4b163d..0b5fbf43895e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1098,4 +1098,62 @@ class HiveDDLSuite } } } + + test("truncate table - datasource table") { + import testImplicits._ + + val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") + // Test both a Hive compatible and incompatible code path. + Seq("json", "parquet").foreach { format => + withTable("rectangles") { + data.write.format(format).saveAsTable("rectangles") + assume(spark.table("rectangles").collect().nonEmpty, + "bad test; table was empty to begin with") + + sql("TRUNCATE TABLE rectangles") + assert(spark.table("rectangles").collect().isEmpty) + + // not supported since the table is not partitioned + val e = intercept[AnalysisException] { + sql("TRUNCATE TABLE rectangles PARTITION (width=1)") + } + assert(e.message.contains("Operation not allowed")) + } + } + } + + test("truncate partitioned table - datasource table") { + import testImplicits._ + + val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height") + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // supported since partitions are stored in the metastore + sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)") + assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // support partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=1)") + assert(spark.table("partTable").collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // do nothing if no partition is matched for the given partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=100)") + assert(spark.table("partTable").count() == data.count()) + + // do nothing if no partition is matched for the given non-partial partition spec + // TODO: This behaviour is different from Hive, we should decide whether we need to follow + // Hive's behaviour or stick with our existing behaviour later. + sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)") + assert(spark.table("partTable").count() == data.count()) + } + } } From 47262284ac174aa42d3456cfa493146b01d36013 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 6 Nov 2016 22:07:00 +0800 Subject: [PATCH 2/2] minor improve --- .../org/apache/spark/sql/execution/command/tables.scala | 2 ++ .../org/apache/spark/sql/execution/command/DDLSuite.scala | 6 ++++++ .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 6 ++++++ 3 files changed, 14 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 77ba58494183..3cfa639a2fc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -346,6 +346,8 @@ case class TruncateTableCommand( if (table.partitionColumnNames.isEmpty) { Seq(table.storage.locationUri) } else { + // Here we diverge from Hive when the given partition spec contains all partition columns + // but no partition is matched: Hive will throw an exception and we just do nothing. val normalizedSpec = partitionSpec.map { spec => PartitioningUtils.normalizePartitionSpec( spec, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 369f2fde0dd6..864af8d578b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1678,6 +1678,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // Hive's behaviour or stick with our existing behaviour later. sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)") assert(spark.table("partTable").count() == data.count()) + + // throw exception if the column in partition spec is not a partition column. + val e = intercept[AnalysisException] { + sql("TRUNCATE TABLE partTable PARTITION (unknown=1)") + } + assert(e.message.contains("unknown is not a valid partition column")) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0b5fbf43895e..4150e649bef8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1154,6 +1154,12 @@ class HiveDDLSuite // Hive's behaviour or stick with our existing behaviour later. sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)") assert(spark.table("partTable").count() == data.count()) + + // throw exception if the column in partition spec is not a partition column. + val e = intercept[AnalysisException] { + sql("TRUNCATE TABLE partTable PARTITION (unknown=1)") + } + assert(e.message.contains("unknown is not a valid partition column")) } } }