diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 49fba6b7f35d..3d922be799a0 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -161,8 +161,8 @@ statement partitionSpecLocation+ #addTablePartition | ALTER TABLE multipartIdentifier from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition - | ALTER (TABLE | VIEW) multipartIdentifier - DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions + | ALTER (TABLE | VIEW) multipartIdentifier DROP (IF EXISTS)? + dropPartitionSpec (',' dropPartitionSpec)* PURGE? #dropTablePartitions | ALTER TABLE multipartIdentifier (partitionSpec)? SET locationSpec #setTableLocation | ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions @@ -328,6 +328,14 @@ database | SCHEMA ; +dropPartitionSpec + : PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')' + ; + +dropPartitionVal + : identifier (comparisonOperator constant)? + ; + describeFuncName : qualifiedName | STRING diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index dcc143982a4a..a23b2ab96511 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -188,7 +188,8 @@ trait ExternalCatalog { parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, - retainData: Boolean): Unit + retainData: Boolean, + supportBatch: Boolean): Unit /** * Override the specs of one or many existing table partitions, assuming they exist. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala index 86113d3ec3ea..c5008e1bff33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala @@ -203,8 +203,10 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog) partSpecs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, - retainData: Boolean): Unit = { - delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge, retainData) + retainData: Boolean, + supportBatch: Boolean): Unit = { + delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge, retainData, + supportBatch) } override def renamePartitions( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index abf69939dea1..c0ae71cd55fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -425,7 +425,8 @@ class InMemoryCatalog( partSpecs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, - retainData: Boolean): Unit = synchronized { + retainData: Boolean, + supportBatch: Boolean): Unit = synchronized { requireTableExists(db, table) val existingParts = catalog(db).tables(table).partitions if (!ignoreIfNotExists) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index be8526454f9f..a515f95da92f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -918,14 +918,16 @@ class SessionCatalog( specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, - retainData: Boolean): Unit = { + retainData: Boolean, + supportBatch: Boolean): Unit = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) requireDbExists(db) requireTableExists(TableIdentifier(table, Option(db))) requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName)) requireNonEmptyValueInPartitionSpec(specs) - externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData) + externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData, + supportBatch) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 3362353e2662..42fc71e0e6b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -391,6 +391,35 @@ case class OuterReference(e: NamedExpression) override def newInstance(): NamedExpression = OuterReference(e.newInstance()) } +/** + * A place holder used to hold the name of the partition attributes specified when running commands + * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS. + */ +case class PartitioningAttribute( + name: String, + override val exprId: ExprId = NamedExpression.newExprId) + extends Attribute with Unevaluable { + // We need a dataType to be used during analysis for resolving the expressions (see + // checkInputDataTypes). The String type is used because all the literals in PARTITION operations + // are parsed as strings and eventually casted later. + override def dataType: DataType = StringType + override def nullable: Boolean = false + + override def qualifier: Seq[String] = throw new UnsupportedOperationException + override def withNullability(newNullability: Boolean): Attribute = + throw new UnsupportedOperationException + override def newInstance(): Attribute = throw new UnsupportedOperationException + override def withQualifier(newQualifier: Seq[String]): Attribute = + throw new UnsupportedOperationException + override def withName(newName: String): Attribute = throw new UnsupportedOperationException + override def withMetadata(newMetadata: Metadata): Attribute = + throw new UnsupportedOperationException + + override lazy val canonicalized: Expression = this.copy(exprId = ExprId(0)) + + override def withExprId(newExprId: ExprId): Attribute = throw new UnsupportedOperationException +} + object VirtualColumn { // The attribute name used by Hive, which has different result than Spark, deprecated. val hiveGroupingIdName: String = "grouping__id" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c623b5c4d36a..7b6155695b96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -499,6 +499,29 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a partition specification map with filters. + */ + override def visitDropPartitionSpec( + ctx: DropPartitionSpecContext): Seq[Expression] = withOrigin(ctx) { + ctx.dropPartitionVal().asScala.map { pFilter => + if (pFilter.constant() == null || pFilter.comparisonOperator() == null) { + throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) + } + // We cannot use UnresolvedAttribute because resolution is performed after Analysis, when + // running the command. + val partition = PartitioningAttribute(pFilter.identifier().getText) + val value = Literal(visitStringConstant(pFilter.constant())) + val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] + val comparison = buildComparison(partition, value, operator) + if (comparison.isInstanceOf[EqualNullSafe]) { + throw new ParseException( + "'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx) + } + comparison + } + } + /** * Convert a constant of any type into a string. This is typically used in DDL commands, and its * main purpose is to prevent slight differences due to back to back conversions i.e.: @@ -1321,6 +1344,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging val left = expression(ctx.left) val right = expression(ctx.right) val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] + buildComparison(left, right, operator) + } + + /** + * Creates a comparison expression. The following comparison operators are supported: + * - Equal: '=' or '==' + * - Null-safe Equal: '<=>' + * - Not Equal: '<>' or '!=' + * - Less than: '<' + * - Less then or Equal: '<=' + * - Greater than: '>' + * - Greater then or Equal: '>=' + */ + private def buildComparison( + left: Expression, + right: Expression, + operator: TerminalNode): Expression = { operator.getSymbol.getType match { case SqlBaseParser.EQ => EqualTo(left, right) @@ -3153,7 +3193,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } AlterTableDropPartitionStatement( visitMultipartIdentifier(ctx.multipartIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.dropPartitionSpec.asScala.map(visitDropPartitionSpec), ifExists = ctx.EXISTS != null, purge = ctx.PURGE != null, retainData = false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index ec373d95fad8..911e41057d31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -209,7 +209,7 @@ case class AlterTableRenamePartitionStatement( */ case class AlterTableDropPartitionStatement( tableName: Seq[String], - specs: Seq[TablePartitionSpec], + specs: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) extends ParsedStatement diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 6b1c35094e4a..477b8f7f0e16 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -393,43 +393,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } test("create/drop partitions in managed tables with location") { - val catalog = newBasicCatalog() - val table = CatalogTable( - identifier = TableIdentifier("tbl", Some("db1")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType() - .add("col1", "int") - .add("col2", "string") - .add("partCol1", "int") - .add("partCol2", "string"), - provider = Some(defaultProvider), - partitionColumnNames = Seq("partCol1", "partCol2")) - catalog.createTable(table, ignoreIfExists = false) - - val newLocationPart1 = newUriForDatabase() - val newLocationPart2 = newUriForDatabase() - - val partition1 = - CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), - storageFormat.copy(locationUri = Some(newLocationPart1))) - val partition2 = - CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), - storageFormat.copy(locationUri = Some(newLocationPart2))) - catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false) - catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false) - - assert(exists(newLocationPart1)) - assert(exists(newLocationPart2)) - - // the corresponding directory is dropped. - catalog.dropPartitions("db1", "tbl", Seq(partition1.spec), - ignoreIfNotExists = false, purge = false, retainData = false) - assert(!exists(newLocationPart1)) + val catalog = newBasicCatalogForBatchOrNotDropPartitionTest() + Seq(true, false).foreach { batchDrop => + val db1 = s"db1_$batchDrop" + val table = CatalogTable( + identifier = TableIdentifier("tbl", Some(s"$db1")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType() + .add("col1", "int") + .add("col2", "string") + .add("partCol1", "int") + .add("partCol2", "string"), + provider = Some(defaultProvider), + partitionColumnNames = Seq("partCol1", "partCol2")) + catalog.createTable(table, ignoreIfExists = false) - // all the remaining directories are dropped. - catalog.dropTable("db1", "tbl", ignoreIfNotExists = false, purge = false) - assert(!exists(newLocationPart2)) + val newLocationPart1 = newUriForDatabase() + val newLocationPart2 = newUriForDatabase() + + val partition1 = + CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), + storageFormat.copy(locationUri = Some(newLocationPart1))) + val partition2 = + CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), + storageFormat.copy(locationUri = Some(newLocationPart2))) + catalog.createPartitions(s"$db1", "tbl", Seq(partition1), ignoreIfExists = false) + catalog.createPartitions(s"$db1", "tbl", Seq(partition2), ignoreIfExists = false) + + assert(exists(newLocationPart1)) + assert(exists(newLocationPart2)) + + // the corresponding directory is dropped. + catalog.dropPartitions(s"$db1", "tbl", Seq(partition1.spec), + ignoreIfNotExists = false, purge = false, retainData = false, batchDrop) + assert(!exists(newLocationPart1)) + + // all the remaining directories are dropped. + catalog.dropTable(s"$db1", "tbl", ignoreIfNotExists = false, purge = false) + assert(!exists(newLocationPart2)) + } } test("list partition names") { @@ -519,43 +522,54 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } test("drop partitions") { - val catalog = newBasicCatalog() - assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) - catalog.dropPartitions( - "db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false) - assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2))) - resetState() - val catalog2 = newBasicCatalog() - assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2))) - catalog2.dropPartitions( - "db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false, - retainData = false) - assert(catalog2.listPartitions("db2", "tbl2").isEmpty) + val catalog = newBasicCatalogForBatchOrNotDropPartitionTest() + Seq(true, false).foreach { batchDrop => + val db2 = s"db2_$batchDrop" + assert(catalogPartitionsEqual(catalog, s"$db2", "tbl2", Seq(part1, part2))) + catalog.dropPartitions( + s"$db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false, + retainData = false, supportBatch = batchDrop) + assert(catalogPartitionsEqual(catalog, s"$db2", "tbl2", Seq(part2))) + resetState() + val catalog2 = newBasicCatalogForBatchOrNotDropPartitionTest() + assert(catalogPartitionsEqual(catalog2, s"$db2", "tbl2", Seq(part1, part2))) + catalog2.dropPartitions( + s"$db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false, + retainData = false, supportBatch = batchDrop) + assert(catalog2.listPartitions(s"$db2", "tbl2").isEmpty) + } } test("drop partitions when database/table does not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { - catalog.dropPartitions( - "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false, - retainData = false) - } - intercept[AnalysisException] { - catalog.dropPartitions( - "db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false, - retainData = false) + val catalog = newBasicCatalogForBatchOrNotDropPartitionTest() + Seq(true, false).foreach { batchDrop => + val db2 = s"db2_$batchDrop" + intercept[AnalysisException] { + catalog.dropPartitions( + "does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false, + retainData = false, supportBatch = batchDrop) + } + intercept[AnalysisException] { + catalog.dropPartitions( + db2, "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false, + retainData = false, supportBatch = batchDrop) + } } } test("drop partitions that do not exist") { - val catalog = newBasicCatalog() - intercept[AnalysisException] { + val catalog = newBasicCatalogForBatchOrNotDropPartitionTest() + Seq(true, false).foreach { batchDrop => + val db2 = s"db2_$batchDrop" + intercept[AnalysisException] { + catalog.dropPartitions( + s"$db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false, + retainData = false, supportBatch = batchDrop) + } catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false, - retainData = false) + s"$db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false, + retainData = false, supportBatch = batchDrop) } - catalog.dropPartitions( - "db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false, retainData = false) } test("get partition") { @@ -857,70 +871,79 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } test("create/drop/rename partitions should create/delete/rename the directory") { - val catalog = newBasicCatalog() - val table = CatalogTable( - identifier = TableIdentifier("tbl", Some("db1")), - tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat.empty, - schema = new StructType() - .add("col1", "int") - .add("col2", "string") - .add("partCol1", "int") - .add("partCol2", "string"), - provider = Some(defaultProvider), - partitionColumnNames = Seq("partCol1", "partCol2")) - catalog.createTable(table, ignoreIfExists = false) + val catalog = newBasicCatalogForBatchOrNotDropPartitionTest() + Seq(true, false).foreach { batchDrop => + val db1 = s"db1_$batchDrop" + val table = CatalogTable( + identifier = TableIdentifier("tbl", Some(s"$db1")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType() + .add("col1", "int") + .add("col2", "string") + .add("partCol1", "int") + .add("partCol2", "string"), + provider = Some(defaultProvider), + partitionColumnNames = Seq("partCol1", "partCol2")) + catalog.createTable(table, ignoreIfExists = false) - val tableLocation = catalog.getTable("db1", "tbl").location - - val part1 = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat) - val part2 = CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), storageFormat) - val part3 = CatalogTablePartition(Map("partCol1" -> "5", "partCol2" -> "6"), storageFormat) - - catalog.createPartitions("db1", "tbl", Seq(part1, part2), ignoreIfExists = false) - assert(exists(tableLocation, "partCol1=1", "partCol2=2")) - assert(exists(tableLocation, "partCol1=3", "partCol2=4")) - - catalog.renamePartitions("db1", "tbl", Seq(part1.spec), Seq(part3.spec)) - assert(!exists(tableLocation, "partCol1=1", "partCol2=2")) - assert(exists(tableLocation, "partCol1=5", "partCol2=6")) - - catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false, - purge = false, retainData = false) - assert(!exists(tableLocation, "partCol1=3", "partCol2=4")) - assert(!exists(tableLocation, "partCol1=5", "partCol2=6")) - - val tempPath = Utils.createTempDir() - // create partition with existing directory is OK. - val partWithExistingDir = CatalogTablePartition( - Map("partCol1" -> "7", "partCol2" -> "8"), - CatalogStorageFormat( - Some(tempPath.toURI), - None, None, None, false, Map.empty)) - catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false) - - tempPath.delete() - // create partition with non-existing directory will create that directory. - val partWithNonExistingDir = CatalogTablePartition( - Map("partCol1" -> "9", "partCol2" -> "10"), - CatalogStorageFormat( - Some(tempPath.toURI), - None, None, None, false, Map.empty)) - catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false) - assert(tempPath.exists()) + val tableLocation = catalog.getTable(s"$db1", "tbl").location + + val part1 = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), storageFormat) + val part2 = CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), storageFormat) + val part3 = CatalogTablePartition(Map("partCol1" -> "5", "partCol2" -> "6"), storageFormat) + + catalog.createPartitions(s"$db1", "tbl", Seq(part1, part2), ignoreIfExists = false) + assert(exists(tableLocation, "partCol1=1", "partCol2=2")) + assert(exists(tableLocation, "partCol1=3", "partCol2=4")) + + catalog.renamePartitions(s"$db1", "tbl", Seq(part1.spec), Seq(part3.spec)) + assert(!exists(tableLocation, "partCol1=1", "partCol2=2")) + assert(exists(tableLocation, "partCol1=5", "partCol2=6")) + + catalog.dropPartitions(s"$db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false, + purge = false, retainData = false, supportBatch = batchDrop) + assert(!exists(tableLocation, "partCol1=3", "partCol2=4")) + assert(!exists(tableLocation, "partCol1=5", "partCol2=6")) + + val tempPath = Utils.createTempDir() + // create partition with existing directory is OK. + val partWithExistingDir = CatalogTablePartition( + Map("partCol1" -> "7", "partCol2" -> "8"), + CatalogStorageFormat( + Some(tempPath.toURI), + None, None, None, false, Map.empty)) + catalog.createPartitions(s"$db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false) + + tempPath.delete() + // create partition with non-existing directory will create that directory. + val partWithNonExistingDir = CatalogTablePartition( + Map("partCol1" -> "9", "partCol2" -> "10"), + CatalogStorageFormat( + Some(tempPath.toURI), + None, None, None, false, Map.empty)) + catalog.createPartitions(s"$db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false) + assert(tempPath.exists()) + } } test("drop partition from external table should not delete the directory") { - val catalog = newBasicCatalog() - catalog.createPartitions("db2", "tbl1", Seq(part1), ignoreIfExists = false) + val catalog = newBasicCatalogForBatchOrNotDropPartitionTest() + Seq(true, false).foreach { batchDrop => + val db2 = s"db2_$batchDrop" + catalog.createPartitions(s"$db2", "tbl1", Seq(part1), ignoreIfExists = false) - val partPath = new Path(catalog.getPartition("db2", "tbl1", part1.spec).location) - val fs = partPath.getFileSystem(new Configuration) - assert(fs.exists(partPath)) + val partPath = new Path(catalog.getPartition(s"$db2", "tbl1", part1.spec).location) + val fs = partPath.getFileSystem(new Configuration) + assert(fs.exists(partPath)) - catalog.dropPartitions( - "db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false) - assert(fs.exists(partPath)) + catalog.dropPartitions( + s"$db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, + purge = false, + retainData = false, + supportBatch = batchDrop) + assert(fs.exists(partPath)) + } } } @@ -987,6 +1010,21 @@ abstract class CatalogTestUtils { catalog } + def newBasicCatalogForBatchOrNotDropPartitionTest(): ExternalCatalog = { + val catalog = newEmptyCatalog() + Seq(true, false).foreach { batchDrop => + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + catalog.createDatabase(newDb(s"db1_$batchDrop"), ignoreIfExists = false) + catalog.createDatabase(newDb(s"db2_$batchDrop"), ignoreIfExists = false) + catalog.createDatabase(newDb(s"db3_$batchDrop"), ignoreIfExists = false) + catalog.createTable(newTable("tbl1", s"db2_$batchDrop"), ignoreIfExists = false) + catalog.createTable(newTable("tbl2", s"db2_$batchDrop"), ignoreIfExists = false) + catalog.createTable(newView("view1", Some(s"db3_$batchDrop")), ignoreIfExists = false) + catalog.createPartitions(s"db2_$batchDrop", "tbl2", Seq(part1, part2), ignoreIfExists = false) + catalog.createFunction(s"db2_$batchDrop", newFunc("func1", Some(s"db2_$batchDrop"))) + } + catalog + } def newFunc(): CatalogFunction = newFunc("funcName") def newUriForDatabase(): URI = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index f334ba5690af..9a0320639c72 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -59,6 +59,15 @@ abstract class SessionCatalogSuite extends AnalysisTest { } } + private def withBasicCatalogForBatchOrNotDropPartitionTest(f: SessionCatalog => Unit): Unit = { + val catalog = new SessionCatalog(newBasicCatalogForBatchOrNotDropPartitionTest()) + try { + f(catalog) + } finally { + catalog.reset() + } + } + private def withEmptyCatalog(f: SessionCatalog => Unit): Unit = { val catalog = new SessionCatalog(newEmptyCatalog()) catalog.createDatabase(newDb("default"), ignoreIfExists = true) @@ -861,115 +870,134 @@ abstract class SessionCatalogSuite extends AnalysisTest { } test("drop partitions") { - withBasicCatalog { catalog => - assert(catalogPartitionsEqual( - catalog.externalCatalog.listPartitions("db2", "tbl2"), part1, part2)) - catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(part1.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) - assert(catalogPartitionsEqual( - catalog.externalCatalog.listPartitions("db2", "tbl2"), part2)) - // Drop partitions without explicitly specifying database - catalog.setCurrentDatabase("db2") - catalog.dropPartitions( - TableIdentifier("tbl2"), - Seq(part2.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) - assert(catalog.externalCatalog.listPartitions("db2", "tbl2").isEmpty) - // Drop multiple partitions at once - catalog.createPartitions( - TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false) - assert(catalogPartitionsEqual( - catalog.externalCatalog.listPartitions("db2", "tbl2"), part1, part2)) - catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(part1.spec, part2.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) - assert(catalog.externalCatalog.listPartitions("db2", "tbl2").isEmpty) - } - } - - test("drop partitions when database/table does not exist") { - withBasicCatalog { catalog => - intercept[NoSuchDatabaseException] { + Seq(true, false).foreach { batchDrop => + withBasicCatalogForBatchOrNotDropPartitionTest { catalog => + assert(catalogPartitionsEqual( + catalog.externalCatalog.listPartitions(s"db2_$batchDrop", "tbl2"), part1, part2)) catalog.dropPartitions( - TableIdentifier("tbl1", Some("unknown_db")), - Seq(), + TableIdentifier("tbl2", Some(s"db2_$batchDrop")), + Seq(part1.spec), ignoreIfNotExists = false, purge = false, - retainData = false) - } - intercept[NoSuchTableException] { + retainData = false, + supportBatch = batchDrop) + assert(catalogPartitionsEqual( + catalog.externalCatalog.listPartitions(s"db2_$batchDrop", "tbl2"), part2)) + // Drop partitions without explicitly specifying database + catalog.setCurrentDatabase(s"db2_$batchDrop") catalog.dropPartitions( - TableIdentifier("does_not_exist", Some("db2")), - Seq(), + TableIdentifier("tbl2"), + Seq(part2.spec), ignoreIfNotExists = false, purge = false, - retainData = false) + retainData = false, + supportBatch = batchDrop) + assert(catalog.externalCatalog.listPartitions(s"db2_$batchDrop", "tbl2").isEmpty) + // Drop multiple partitions at once + catalog.createPartitions( + TableIdentifier("tbl2", Some(s"db2_$batchDrop")), + Seq(part1, part2), ignoreIfExists = false) + assert(catalogPartitionsEqual( + catalog.externalCatalog.listPartitions(s"db2_$batchDrop", "tbl2"), part1, part2)) + catalog.dropPartitions( + TableIdentifier("tbl2", Some(s"db2_$batchDrop")), + Seq(part1.spec, part2.spec), + ignoreIfNotExists = false, + purge = false, + retainData = false, + supportBatch = batchDrop) + assert(catalog.externalCatalog.listPartitions(s"db2_$batchDrop", "tbl2").isEmpty) + } + } + } + + test("drop partitions when database/table does not exist") { + Seq(true, false).foreach { batchDrop => + withBasicCatalog { catalog => + intercept[NoSuchDatabaseException] { + catalog.dropPartitions( + TableIdentifier("tbl1", Some("unknown_db")), + Seq(), + ignoreIfNotExists = false, + purge = false, + retainData = false, + supportBatch = batchDrop) + } + intercept[NoSuchTableException] { + catalog.dropPartitions( + TableIdentifier("does_not_exist", Some("db2")), + Seq(), + ignoreIfNotExists = false, + purge = false, + retainData = false, + supportBatch = batchDrop) + } } } } test("drop partitions that do not exist") { - withBasicCatalog { catalog => - intercept[AnalysisException] { + Seq(true, false).foreach { batchDrop => + withBasicCatalog { catalog => + intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(part3.spec), + ignoreIfNotExists = false, + purge = false, + retainData = false, + supportBatch = batchDrop) + } catalog.dropPartitions( TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), - ignoreIfNotExists = false, + ignoreIfNotExists = true, purge = false, - retainData = false) + retainData = false, + supportBatch = batchDrop) } - catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(part3.spec), - ignoreIfNotExists = true, - purge = false, - retainData = false) } } test("drop partitions with invalid partition spec") { - withBasicCatalog { catalog => - var e = intercept[AnalysisException] { - catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(partWithMoreColumns.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) - } - assert(e.getMessage.contains( - "Partition spec is invalid. The spec (a, b, c) must be contained within " + - "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { - catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(partWithUnknownColumns.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) - } - assert(e.getMessage.contains( - "Partition spec is invalid. The spec (a, unknown) must be contained within " + - "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) - e = intercept[AnalysisException] { - catalog.dropPartitions( - TableIdentifier("tbl2", Some("db2")), - Seq(partWithEmptyValue.spec, part1.spec), - ignoreIfNotExists = false, - purge = false, - retainData = false) + Seq(true, false).foreach { batchDrop => + withBasicCatalog { catalog => + var e = intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithMoreColumns.spec), + ignoreIfNotExists = false, + purge = false, + retainData = false, + supportBatch = batchDrop) + } + assert(e.getMessage.contains( + "Partition spec is invalid. The spec (a, b, c) must be contained within " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithUnknownColumns.spec), + ignoreIfNotExists = false, + purge = false, + retainData = false, + supportBatch = batchDrop) + } + assert(e.getMessage.contains( + "Partition spec is invalid. The spec (a, unknown) must be contained within " + + "the partition spec (a, b) defined in table '`db2`.`tbl2`'")) + e = intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), + Seq(partWithEmptyValue.spec, part1.spec), + ignoreIfNotExists = false, + purge = false, + retainData = false, + supportBatch = batchDrop) + } + assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains " + + "an empty partition column value")) } - assert(e.getMessage.contains("Partition spec is invalid. The spec ([a=3, b=]) contains an " + - "empty partition column value")) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 94171feba2ac..00cba72405b8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -22,7 +22,7 @@ import java.util.Locale import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} +import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal, PartitioningAttribute} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType} @@ -1458,31 +1458,33 @@ class DDLParserSuite extends AnalysisTest { assertUnsupported(sql1_view) assertUnsupported(sql2_view) + val dtAttr = PartitioningAttribute("dt") + val countryAttr = PartitioningAttribute("country") val expected1_table = AlterTableDropPartitionStatement( Seq("table_name"), - Seq( - Map("dt" -> "2008-08-08", "country" -> "us"), - Map("dt" -> "2009-09-09", "country" -> "uk")), + Seq(Seq(EqualTo(dtAttr, Literal("2008-08-08")), EqualTo(countryAttr, Literal("us"))), + Seq(EqualTo(dtAttr, Literal("2009-09-09")), EqualTo(countryAttr, Literal("uk")))), ifExists = true, purge = false, retainData = false) val expected2_table = expected1_table.copy(ifExists = false) val expected1_purge = expected1_table.copy(purge = true) - comparePlans(parsed1_table, expected1_table) - comparePlans(parsed2_table, expected2_table) - comparePlans(parsed1_purge, expected1_purge) + comparePlans(parsed1_table.canonicalized, expected1_table.canonicalized) + comparePlans(parsed2_table.canonicalized, expected2_table.canonicalized) + comparePlans(parsed1_purge.canonicalized, expected1_purge.canonicalized) + val dsAttr = PartitioningAttribute("ds") val sql3_table = "ALTER TABLE a.b.c DROP IF EXISTS PARTITION (ds='2017-06-10')" val expected3_table = AlterTableDropPartitionStatement( Seq("a", "b", "c"), - Seq(Map("ds" -> "2017-06-10")), + Seq(Seq(EqualTo(dsAttr, Literal("2017-06-10")))), ifExists = true, purge = false, retainData = false) val parsed3_table = parsePlan(sql3_table) - comparePlans(parsed3_table, expected3_table) + comparePlans(parsed3_table.canonicalized, expected3_table.canonicalized) } test("show current namespace") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 3645d38b3b55..df776ff884a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -546,7 +546,7 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - specs: Seq[TablePartitionSpec], + partitionsFilters: Seq[Seq[Expression]], ifExists: Boolean, purge: Boolean, retainData: Boolean) @@ -555,26 +555,114 @@ case class AlterTableDropPartitionCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) + val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") - - val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( - spec, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) + val partitionColumns = table.partitionColumnNames + val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap + val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => + if (hasComplexFilters(filtersSpec)) { + generatePartitionSpec(filtersSpec, + partitionColumns, + partitionAttributes, + table.identifier, + catalog, + sparkSession.sessionState.conf.resolver, + timeZone, + ifExists) + } else { + val partitionSpec = filtersSpec.map { + case EqualTo(key: Attribute, v: Literal) => + key.name -> v.value.toString + }.toMap + PartitioningUtils.normalizePartitionSpec( + partitionSpec, + partitionColumns, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) :: Nil + } } - + // now drop partitions with batch only support 'DROP [IF EXISTS] PARTITION spec1' and + // do NOT support 'DROP [IF EXISTS] PARTITION spec1, PARTITION spec2, ...'. catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, - retainData = retainData) + table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge, + retainData = retainData, supportBatch = partitionsFilters.size <= 1) CommandUtils.updateTableStats(sparkSession, table) Seq.empty[Row] } + def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = { + partitionFilterSpec.exists(!_.isInstanceOf[EqualTo]) + } + + def generatePartitionSpec( + partitionFilterSpec: Seq[Expression], + partitionColumns: Seq[String], + partitionAttributes: Map[String, Attribute], + tableIdentifier: TableIdentifier, + catalog: SessionCatalog, + resolver: Resolver, + timeZone: Option[String], + ifExists: Boolean): Seq[TablePartitionSpec] = { + val filters = partitionFilterSpec.map { pFilter => + pFilter.transform { + // Resolve the partition attributes + case partitionCol: PartitioningAttribute => + val normalizedPartition = PartitioningUtils.normalizePartitionColumn( + partitionCol.name, + partitionColumns, + tableIdentifier.quotedString, + resolver) + partitionAttributes(normalizedPartition) + }.transform { + // Cast the partition value to the data type of the corresponding partition attribute + case cmp @ BinaryComparison(left, right) => + if (!cmp.isInstanceOf[EqualTo] && left.dataType.isInstanceOf[BooleanType]) { + throw new AnalysisException(s"partition formatted " + + s"with booleanType can only be dropped with equal comparison.") + } + if (!left.dataType.sameType(right.dataType)) { + val dt = left.dataType + val lit = Literal(Cast(right, dt, timeZone).eval(), dt) + cmp.makeCopy(Array(left, lit)) + } else { + cmp + } + } + } + val partitions = catalog.listPartitionsByFilter(tableIdentifier, filters) + if (partitions.isEmpty && !ifExists) { + // we should do nothing when dropping a partition which contains empty values + // to be compatible with Hive. + logWarning(s"There is no partition for ${filters.reduceLeft(And).sql}") + } + partitions.map(_.spec) + } +} + + +object AlterTableDropPartitionCommand { + + def fromSpecs( + tableName: TableIdentifier, + specs: Seq[TablePartitionSpec], + ifExists: Boolean, + purge: Boolean, + retainData: Boolean): AlterTableDropPartitionCommand = { + AlterTableDropPartitionCommand(tableName, + specs.map(tablePartitionToPartitionFilters), + ifExists, + purge, + retainData) + } + + def tablePartitionToPartitionFilters(spec: TablePartitionSpec): Seq[Expression] = { + spec.map { + case (key, value) => EqualTo(PartitioningAttribute(key), Literal(value)) + }.toSeq + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index fbe874b3e8bc..269e994544b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -151,7 +151,7 @@ case class InsertIntoHadoopFsRelationCommand( if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) { val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { - AlterTableDropPartitionCommand( + AlterTableDropPartitionCommand.fromSpecs( catalogTable.get.identifier, deletedPartitions.toSeq, ifExists = true, purge = false, retainData = true /* already deleted */).run(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index fdad43b23c5a..fcc6e7eaa2a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -378,6 +378,16 @@ object PartitioningUtils { normalizedPartSpec.toMap } + def normalizePartitionColumn( + partition: String, + partColNames: Seq[String], + tblName: String, + resolver: Resolver): String = { + partColNames.find(resolver(_, partition)).getOrElse { + throw new AnalysisException(s"$partition is not a valid partition column in table $tblName.") + } + } + /** * Resolves possible type conflicts between partitions by up-casting "lower" types using * [[findWiderTypeForPartitionColumn]]. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index d98f2ca62972..196fb6c92acd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan -import org.apache.spark.sql.catalyst.expressions.JsonTuple +import org.apache.spark.sql.catalyst.expressions.{Expression, JsonTuple, PartitioningAttribute} import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan, Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTableDropPartitionStatement, Generate, InsertIntoDir, LogicalPlan, Project, ScriptTransformation} import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -473,6 +473,39 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { """.stripMargin) } + test("SPARK-23866: Support any comparison operator in ALTER TABLE ... DROP PARTITION") { + val sql1_table = + """ + |ALTER TABLE table_name DROP IF EXISTS PARTITION + |(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk') + """.stripMargin + Seq((">", (a: Expression, b: Expression) => a > b), + (">=", (a: Expression, b: Expression) => a >= b), + ("<", (a: Expression, b: Expression) => a < b), + ("<=", (a: Expression, b: Expression) => a <= b), + ("<>", (a: Expression, b: Expression) => a =!= b), + ("!=", (a: Expression, b: Expression) => a =!= b)).foreach { case (op, predicateGen) => + val genPlan = parser.parsePlan(sql1_table.replace("=", op)) + val dtAttr = PartitioningAttribute("dt") + val countryAttr = PartitioningAttribute("country") + val expectedPlan = AlterTableDropPartitionStatement( + Seq("table_name"), + Seq( + Seq(predicateGen(dtAttr, "2008-08-08"), predicateGen(countryAttr, "us")), + Seq(predicateGen(dtAttr, "2009-09-09"), predicateGen(countryAttr, "uk"))), + ifExists = true, + purge = false, + retainData = false) + comparePlans(genPlan.canonicalized, expectedPlan.canonicalized) + } + + // SPARK-23866: <=> is not supported + intercept("ALTER TABLE table_name DROP PARTITION (dt <=> 'a')", "operator is not supported in") + + // SPARK-23866: Invalid partition specification + intercept("ALTER TABLE table_name DROP PARTITION (dt)", "Invalid partition spec:") + } + test("alter table: archive partition (not supported)") { assertUnsupported("ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 03874d005a6e..85ad9b56bd98 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -991,10 +991,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, - retainData: Boolean): Unit = withClient { + retainData: Boolean, + supportBatch: Boolean): Unit = withClient { requireTableExists(db, table) client.dropPartitions( - db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData) + db, table, parts.map(lowerCasePartitionSpec), ignoreIfNotExists, purge, retainData, + supportBatch) } override def renamePartitions( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index e31dffa4795c..5c211a28cd34 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -149,7 +149,8 @@ private[hive] trait HiveClient { specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, - retainData: Boolean): Unit + retainData: Boolean, + supportBatch: Boolean): Unit /** * Rename one or many existing table partitions, assuming they exist. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 12c9a972c1af..1e01c86c8a9e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -28,21 +28,26 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.hive.common.{ObjectPair, StatsSetupConst} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable} -import org.apache.hadoop.hive.metastore.api.{FieldSchema, Order, SerDeInfo, StorageDescriptor} +import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, MetaException, NoSuchObjectException, Order, SerDeInfo, StorageDescriptor, Table => MetaStoreApiTable} import org.apache.hadoop.hive.ql.Driver +import org.apache.hadoop.hive.ql.exec.FunctionRegistry import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC +import org.apache.hadoop.hive.ql.parse.SemanticException +import org.apache.hadoop.hive.ql.plan.{ExprNodeColumnDesc, ExprNodeConstantDesc, ExprNodeDesc, ExprNodeGenericFuncDesc} import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters +import org.apache.hadoop.hive.serde2.typeinfo.{PrimitiveTypeInfo, TypeInfoFactory, TypeInfoUtils} import org.apache.hadoop.security.UserGroupInformation +import org.apache.thrift.TException import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging @@ -617,13 +622,42 @@ private[hive] class HiveClientImpl( } override def dropPartitions( + db: String, + table: String, + specs: Seq[TablePartitionSpec], + ignoreIfNotExists: Boolean, + purge: Boolean, + retainData: Boolean, + supportBatch: Boolean): Unit = withHiveState { + + // TODO(weixiuli): Spark can NOT support dropPartitions call with batch + // when HiveShim version is < v1_2 and HiveShim version is >= v3_0. + + // The dropPartitionsWithBatch method uses ObjectPair class is same with + // HiveMetaStoreClient's dropPartitions, while the ObjectPair class has been changed from + // 'org.apache.hadoop.hive.common.ObjectPair' to + // 'org.apache.hadoop.hive.metastore.utils.ObjectPair' since HiveShim version is v3_0. + // link: https://issues.apache.org/jira/browse/HIVE-17980 + // So if we want to support dropPartitions call with batch when HiveShim version is >= v3_0, + // we should use 'org.apache.hadoop.hive.metastore.utils.ObjectPair' instead of + // 'org.apache.hadoop.hive.common.ObjectPair'. + + val supportVersion = Set(hive.v1_2, hive.v2_0, hive.v2_1, hive.v2_2, hive.v2_3) + .map(_.fullVersion) + if (supportVersion.contains(version.fullVersion) && supportBatch) { + dropPartitionsWithBatch(db, table, specs, ignoreIfNotExists, purge, retainData) + } else { + dropPartitionsOneByOne(db, table, specs, ignoreIfNotExists, purge, retainData) + } + } + + def dropPartitionsOneByOne( db: String, table: String, specs: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit = withHiveState { - // TODO: figure out how to drop multiple partitions in one call val hiveTable = client.getTable(db, table, true /* throw exception */) // do the check at first and collect all the matching partitions val matchingParts = @@ -664,6 +698,47 @@ private[hive] class HiveClientImpl( } } + def dropPartitionsWithBatch( + db: String, + table: String, + specs: Seq[TablePartitionSpec], + ignoreIfNotExists: Boolean, + purge: Boolean, + retainData: Boolean): Unit = withHiveState { + val hiveTable = client.getTable(db, table, true /* throw exception */) + val partExprs = new java.util.ArrayList[ObjectPair[Integer, Array[Byte]]] + val maxBatches = sparkConf.getInt(s"spark.sql.dropPartition.maxBatches", 200) + val serializeObjectToKryoMethod = shim.getSerializeObjectToKryoMethod + serializeObjectToKryoMethod.setAccessible(true) + try { + specs.map { s => + assert(s.values.forall(_.nonEmpty), s"partition spec '$s' is invalid") + s + }.distinct.map { spec => + partExprs.add(new ObjectPair[Integer, Array[Byte]](spec.size, + serializeObjectToKryoMethod.invoke(null, + (new ExpressionBuilder(hiveTable.getTTable, + spec.asJava).build)).asInstanceOf[Array[Byte]])) + if (partExprs.size == maxBatches) { + shim.dropPartitions(client, db, table, partExprs, !retainData, purge, ignoreIfNotExists) + partExprs.clear() + } + } + if (partExprs.size > 0) { + shim.dropPartitions(client, db, table, partExprs, !retainData, purge, ignoreIfNotExists) + } + } catch { + case e: NoSuchObjectException => + throw new AnalysisException( + "NoSuchObjectException while dropping partition." + e.getMessage) + case e: MetaException => + throw new MetaException("MetaException while dropping partition." + e.getMessage) + case e: TException => + throw new TException( + "TException while dropping partition.", e) + } + } + override def renamePartitions( db: String, table: String, @@ -1205,4 +1280,62 @@ private[hive] object HiveClientImpl { StatsSetupConst.RAW_DATA_SIZE, StatsSetupConst.TOTAL_SIZE ) + + /** + * Helper class to help build ExprDesc tree to represent the partitions to be dropped. + * Note: At present, the ExpressionBuilder only constructs partition predicates where + * partition-keys equal specific values, and logical-AND expressions. E.g. + * ( dt = '20150310' AND region = 'US' ) + * This only supports the partition-specs specified by the Map argument of: + * org.apache.hive.hcatalog.api.HCatClient#dropPartitions(String, String, Map , boolean) + */ + case class ExpressionBuilder(val table: MetaStoreApiTable, + val partSpecs: java.util.Map[String, String]) { + + private val partColumnTypesMap = + com.google.common.collect.Maps.newHashMap[String, PrimitiveTypeInfo] + // scalastyle:off caselocale + for (partField <- table.getPartitionKeys.asScala) { + partColumnTypesMap.put(partField.getName.toLowerCase, + TypeInfoFactory.getPrimitiveTypeInfo(partField.getType)) + } + + private def getTypeFor(partColumn: String): PrimitiveTypeInfo = + partColumnTypesMap.get(partColumn.toLowerCase) + // scalastyle:on caselocale + private def getTypeAppropriateValueFor(typeInfo: PrimitiveTypeInfo, value: String) = { + val converter = ObjectInspectorConverters.getConverter( + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(TypeInfoFactory.stringTypeInfo), + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfo)) + converter.convert(value) + } + + @throws[SemanticException] + def equalityPredicate(partColumn: String, value: String): ExprNodeGenericFuncDesc = { + val partColumnType = getTypeFor(partColumn) + val partColumnExpr = new ExprNodeColumnDesc(partColumnType, partColumn, null, true) + val valueExpr = new ExprNodeConstantDesc(partColumnType, + getTypeAppropriateValueFor(partColumnType, value)) + binaryPredicate("=", partColumnExpr, valueExpr) + } + + @throws[SemanticException] + def binaryPredicate(function: String, lhs: ExprNodeDesc, rhs: ExprNodeDesc): + ExprNodeGenericFuncDesc = { + new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, + FunctionRegistry.getFunctionInfo(function).getGenericUDF, + com.google.common.collect.Lists.newArrayList(lhs, rhs)) + } + + @throws[SemanticException] + def build: ExprNodeGenericFuncDesc = { + var resultExpr: ExprNodeGenericFuncDesc = null + for (partSpec <- partSpecs.entrySet.asScala) { + val partExpr = equalityPredicate(partSpec.getKey.toString, partSpec.getValue.toString) + resultExpr = if (resultExpr == null) partExpr + else binaryPredicate("and", resultExpr, partExpr) + } + resultExpr + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 586fbbefade4..ba4ae78ff384 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -27,8 +27,9 @@ import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.common.ObjectPair import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.IMetaStoreClient +import org.apache.hadoop.hive.metastore.{IMetaStoreClient, PartitionDropOptions} import org.apache.hadoop.hive.metastore.api.{EnvironmentContext, Function => HiveFunction, FunctionType} import org.apache.hadoop.hive.metastore.api.{MetaException, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver @@ -154,6 +155,23 @@ private[client] sealed abstract class Shim { deleteData: Boolean, purge: Boolean): Unit + def getSerializeObjectToKryoMethod(): Method = + throw new UnsupportedOperationException("Spark can NOT support " + + "getSerializeObjectToKryoMethod call when HiveShim version is < v1_2.") + + def dropPartitions( + hive: Hive, + dbName: String, + tableName: String, + partExprs: JArrayList[ObjectPair[Integer, Array[Byte]]], + deleteData: Boolean, + purge: Boolean, + ifExists: Boolean, + needResults: Boolean = false): Unit = { + throw new UnsupportedOperationException("Spark can NOT support dropPartitions " + + "call when HiveShim version is < v1_2.") + } + protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = { val method = findMethod(klass, name, args: _*) require(Modifier.isStatic(method.getModifiers()), @@ -973,6 +991,8 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { Utils.classForName("org.apache.hadoop.hive.metastore.PartitionDropOptions") private lazy val dropOptionsDeleteData = dropOptionsClass.getField("deleteData") private lazy val dropOptionsPurge = dropOptionsClass.getField("purgeData") + private lazy val dropOptionsReturnResults = dropOptionsClass.getField("returnResults") + private lazy val dropOptionsIfExists = dropOptionsClass.getField("ifExists") private lazy val dropPartitionMethod = findMethod( classOf[Hive], @@ -995,6 +1015,11 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { txnIdInLoadDynamicPartitions) } + override def getSerializeObjectToKryoMethod(): Method = { + Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities") + .getDeclaredMethod("serializeObjectToKryo", classOf[java.io.Serializable]) + } + override def dropPartition( hive: Hive, dbName: String, @@ -1008,6 +1033,23 @@ private[client] class Shim_v1_2 extends Shim_v1_1 { dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions) } + // TODO(weixiuli): Spark can NOT support dropPartitions call when HiveShim version is < v1_2. + override def dropPartitions( + hive: Hive, + dbName: String, + tableName: String, + partExprs: JArrayList[ObjectPair[Integer, Array[Byte]]], + deleteData: Boolean, + purge: Boolean, + ifExists: Boolean, + needResults: Boolean = false): Unit = { + val dropOptions = dropOptionsClass.newInstance().asInstanceOf[PartitionDropOptions] + dropOptionsDeleteData.setBoolean(dropOptions, deleteData) + dropOptionsIfExists.setBoolean(dropOptions, ifExists) + dropOptionsReturnResults.setBoolean(dropOptions, needResults) + dropOptionsPurge.setBoolean(dropOptions, purge) + hive.getMSC.dropPartitions(dbName, tableName, partExprs, dropOptions) + } } private[client] class Shim_v2_0 extends Shim_v1_2 { @@ -1081,7 +1123,12 @@ private[client] class Shim_v2_0 extends Shim_v1_2 { loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean, numDP: JInteger, listBucketingEnabled: JBoolean, isAcid, txnIdInLoadDynamicPartitions) } - + // Use KryoPool instead of thread-local caching + // since HiveShim version >= v2_0(link: https://issues.apache.org/jira/browse/HIVE-12302) + override def getSerializeObjectToKryoMethod(): Method = { + Utils.classForName("org.apache.hadoop.hive.ql.exec.SerializationUtilities") + .getDeclaredMethod("serializeObjectToKryo", classOf[java.io.Serializable]) + } } private[client] class Shim_v2_1 extends Shim_v2_0 { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 46623000405c..09ea239332db 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -90,6 +90,55 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { } } + test("create and drop partitions") { + Seq(true, false).foreach { batchDrop => + val catalog = newEmptyCatalog() + val database = s"db_$batchDrop" + val table1 = "tbl1" + val table2 = "tbl2" + catalog.createDatabase(newDb(s"$database"), ignoreIfExists = false) + catalog.createTable(newTable(s"$table1", s"$database"), + ignoreIfExists = false) + catalog.createPartitions(s"$database", s"$table1", + Seq(part1, part2, part3), ignoreIfExists = false) + assert(catalogPartitionsEqual(catalog, s"$database", s"$table1", + Seq(part1, part2, part3))) + + // drop partitions with only one partition + catalog.dropPartitions( + s"$database", s"$table1", Seq(part1.spec), + ignoreIfNotExists = false, purge = false, retainData = false, + supportBatch = batchDrop) + assert(catalogPartitionsEqual(catalog, s"$database", s"$table1", Seq(part2, part3))) + + catalog.dropPartitions( + s"$database", s"$table1", Seq(part2.spec), + ignoreIfNotExists = false, purge = false, retainData = false, supportBatch = batchDrop) + assert(catalogPartitionsEqual(catalog, s"$database", s"$table1", Seq(part3))) + + catalog.dropPartitions( + s"$database", s"$table1", Seq(part3.spec), + ignoreIfNotExists = false, purge = false, retainData = false, supportBatch = batchDrop) + assert(catalog.listPartitions(s"$database", s"$table1").isEmpty) + + catalog.createTable(newTable(s"$table2", s"$database"), ignoreIfExists = false) + catalog.createPartitions(s"$database", s"$table2", + Seq(part1, part2, part3), ignoreIfExists = false) + assert(catalogPartitionsEqual(catalog, s"$database", s"$table2", Seq(part1, part2, part3))) + + // drop partitions with some partitions + catalog.dropPartitions( + s"$database", s"$table2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, + purge = false, retainData = false, supportBatch = batchDrop) + assert(catalogPartitionsEqual(catalog, s"$database", s"$table2", Seq(part3))) + + catalog.dropPartitions( + s"$database", s"$table2", Seq(part3.spec), + ignoreIfNotExists = false, purge = false, retainData = false, supportBatch = batchDrop) + assert(catalog.listPartitions(s"$database", s"$table2").isEmpty) + } + } + test("SPARK-22306: alter table schema should not erase the bucketing metadata at hive side") { val catalog = newBasicCatalog() externalCatalog.client.runSqlHive( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index e3797041883a..391787eaa7b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -519,13 +519,13 @@ class VersionsSuite extends SparkFunSuite with Logging { // with a version that is older than the minimum (1.2 in this case). try { client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, - purge = true, retainData = false) + purge = true, retainData = false, supportBatch = true) assert(!versionsWithoutPurge.contains(version)) } catch { case _: UnsupportedOperationException => assert(versionsWithoutPurge.contains(version)) client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true, - purge = false, retainData = false) + purge = false, retainData = false, supportBatch = true) } assert(client.getPartitionOption("default", "src_part", spec).isEmpty) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 21d995b3e427..8d5e4bb504c7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.net.URI +import scala.language.existentials + import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER import org.apache.parquet.hadoop.ParquetFileReader @@ -30,8 +32,9 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} +import org.apache.spark.sql.execution.command.{AlterTableDropPartitionCommand, DDLSuite, DDLUtils} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} @@ -701,6 +704,330 @@ class HiveDDLSuite } } + def testDropPartition(dataType: DataType, value1: Any, value2: Any): Unit = { + withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value1)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value2)") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value2)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), + Row(s"p=$value1") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = $value1)") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) + } + } + + test("SPARK-14922: Drop partitions by filter") { + withTable("sales") { + sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)") + for (country <- Seq("AU", "US", "CA", "KR")) { + for (quarter <- 1 to 5) { + sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')") + } + } + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=AU/quarter=1") :: + Row("country=AU/quarter=2") :: + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=KR/quarter=5") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: + Row("country=US/quarter=5") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION (quarter = '5')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (country = 'KR', quarter = '4')") + sql("ALTER TABLE sales DROP PARTITION (country = 'US', quarter = '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=4") :: Nil) + sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION (quarter >= '4')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: Nil) + // According to the declarative partition spec definitions, this drops the union of target + // partitions without exceptions. Hive raises exceptions because it handles them sequentially. + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')") + checkAnswer(sql("SHOW PARTITIONS sales"), Nil) + } + withTable("tbl_x") { + sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p STRING)") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'false')") + sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'true')") + sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= 'true')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), + Row(s"p=false") :: Nil) + sql(s"ALTER TABLE tbl_x DROP PARTITION (p = 'false')") + checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil) + } + testDropPartition(IntegerType, 1, 2) + testDropPartition(LongType, 1L, 2L) + testDropPartition(ShortType, 1.toShort, 2.toShort) + testDropPartition(ByteType, 1.toByte, 2.toByte) + testDropPartition(FloatType, 1.0F, 2.0F) + testDropPartition(DoubleType, 1.0, 2.0) + testDropPartition(DecimalType(2, 1), Decimal(1.5), Decimal(2.5)) + } + + test("SPARK-14922: Partition formatted with booleanType " + + "can only be dropped with equal comparison") { + val message = intercept[AnalysisException] { + testDropPartition(BooleanType, false, true) + }.getMessage + assert(message.contains("partition formatted with booleanType can only be" + + " dropped with equal comparison")) + } + + test("SPARK-14922: Drop partitions by only one partition filter") { + val table = "hive_table" + withTable(s"$table") { + sql(s"CREATE TABLE $table (id INT) PARTITIONED BY (name STRING ,quarter INT)") + val num = 100 + for (name <- Seq("AU", "US", "CA", "KR")) { + for (quarter <- 1 to num) { + sql(s"ALTER TABLE $table ADD PARTITION (name = '$name', quarter = $quarter)") + } + } + sql(s"ALTER TABLE $table DROP PARTITION (quarter < $num)") + checkAnswer(sql(s"SHOW PARTITIONS $table"), + Row(s"name=AU/quarter=$num") :: + Row(s"name=CA/quarter=$num") :: + Row(s"name=KR/quarter=$num") :: + Row(s"name=US/quarter=$num") :: Nil) + + sql(s"ALTER TABLE $table DROP PARTITION (name <= 'AU')") + checkAnswer(sql(s"SHOW PARTITIONS $table"), + Row(s"name=CA/quarter=$num") :: + Row(s"name=KR/quarter=$num") :: + Row(s"name=US/quarter=$num") :: Nil) + } + } + + test("Test generatePartitionSpec with a simple Filter") { + val catalog = spark.sessionState.catalog + val timeZone = Option(spark.sessionState.conf.sessionLocalTimeZone) + val table_name = "table_name" + withTable(s"$table_name") { + val num = 10 + val countrySeq = Seq("AU", "US", "CA", "KR") + // note: to avoid some error when we import in the head of the file, so we import here. + import org.apache.spark.sql.catalyst.expressions.{Expression, PartitioningAttribute} + import org.apache.spark.sql.catalyst.dsl.expressions._ + val numAttr = PartitioningAttribute("num") + + Seq((">", (a: Expression, b: Expression) => a > b), + (">=", (a: Expression, b: Expression) => a >= b), + ("<", (a: Expression, b: Expression) => a < b), + ("<=", (a: Expression, b: Expression) => a <= b), + ("<>", (a: Expression, b: Expression) => a =!= b), + ("!=", (a: Expression, b: Expression) => a =!= b)).foreach { case (op, predicateGen) => + sql(s"DROP TABLE IF EXISTS $table_name") + sql(s"CREATE TABLE $table_name (id INT) PARTITIONED BY (num INT ,country STRING)") + val tableIdent = TableIdentifier(s"$table_name", Some("default")) + val alterTableDropPartitionCommand = new AlterTableDropPartitionCommand(tableIdent, + Seq(), + ifExists = true, + purge = false, + retainData = false) + val table = catalog.getTableMetadata(tableIdent) + val partitionColumns = table.partitionColumnNames + val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap + for (country <- countrySeq) { + for (quarter <- 1 to num) { + sql(s"ALTER TABLE $table_name ADD PARTITION (num = $quarter, country = '$country')") + } + } + val tablePartitionSpecSeq = alterTableDropPartitionCommand.generatePartitionSpec( + Seq(predicateGen(numAttr, "2")), + partitionColumns, + partitionAttributes, + tableIdent, + catalog, + spark.sessionState.conf.resolver, + timeZone, + false) + op match { + case ">" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(3, num, countrySeq)).isEmpty) + case ">=" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(2, num, countrySeq)).isEmpty) + case "<" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(1, 1, countrySeq)).isEmpty) + case "<=" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(1, 2, countrySeq)).isEmpty) + case "!=" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(1, 1, countrySeq) + ++ getTablePartitionSpecs(3, num, countrySeq)).isEmpty) + case "<>" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(1, 1, countrySeq) + ++ getTablePartitionSpecs(3, num, countrySeq)).isEmpty) + } + } + } + } + + test("Test generatePartitionSpec with complex Filters ") { + val catalog = spark.sessionState.catalog + val timeZone = Option(spark.sessionState.conf.sessionLocalTimeZone) + val table_name = "table_name" + withTable(s"$table_name") { + val num = 10 + val countrySeq = Seq("AU", "US", "CA", "KR") + val newcountrySeq = Seq("AU", "CA") + // note: to avoid some error when we import in the head of the file, so we import here. + import org.apache.spark.sql.catalyst.expressions.{Expression, PartitioningAttribute} + import org.apache.spark.sql.catalyst.dsl.expressions._ + val numAttr = PartitioningAttribute("num") + val countryAttr = PartitioningAttribute("country") + + val predicateCountry = (a: Expression, b: Expression) => a <= b + + Seq((">", (a: Expression, b: Expression) => a > b), + (">=", (a: Expression, b: Expression) => a >= b), + ("<", (a: Expression, b: Expression) => a < b), + ("<=", (a: Expression, b: Expression) => a <= b), + ("<>", (a: Expression, b: Expression) => a =!= b), + ("!=", (a: Expression, b: Expression) => a =!= b)).foreach { case (op, predicateGen) => + sql(s"DROP TABLE IF EXISTS $table_name") + sql(s"CREATE TABLE $table_name (id INT) PARTITIONED BY (num INT ,country STRING)") + val tableIdent = TableIdentifier(s"$table_name", Some("default")) + val alterTableDropPartitionCommand = new AlterTableDropPartitionCommand(tableIdent, + Seq(), + ifExists = true, + purge = false, + retainData = false) + val table = catalog.getTableMetadata(tableIdent) + val partitionColumns = table.partitionColumnNames + val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap + // + for (country <- countrySeq) { + for (quarter <- 1 to num) { + sql(s"ALTER TABLE $table_name ADD PARTITION (num = $quarter, country = '$country')") + } + } + val tablePartitionSpecSeq = alterTableDropPartitionCommand.generatePartitionSpec( + Seq(predicateGen(numAttr, "2"), predicateCountry(countryAttr, "CA")), + partitionColumns, + partitionAttributes, + tableIdent, + catalog, + spark.sessionState.conf.resolver, + timeZone, + false) + + op match { + case ">" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(3, num, + newcountrySeq)).isEmpty) + case ">=" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(2, num, + newcountrySeq)).isEmpty) + case "<" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(1, 1, + newcountrySeq)).isEmpty) + case "<=" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(1, 2, + newcountrySeq)).isEmpty) + case "!=" => + assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(1, 1, + newcountrySeq) + ++ getTablePartitionSpecs(3, num, newcountrySeq)).isEmpty) + case "<>" => assert(tablePartitionSpecSeq.diff(getTablePartitionSpecs(1, 1, + newcountrySeq) ++ getTablePartitionSpecs(3, num, newcountrySeq)).isEmpty) + } + } + } + } + + def getTablePartitionSpecs(start: Int, end: Int, countrySeq: Seq[String]): + Seq[TablePartitionSpec] = { + val tablePartitionSpecs = Seq.newBuilder[TablePartitionSpec] + countrySeq.map { country => + (start to end).map { quarter => + tablePartitionSpecs += Map("num" -> quarter.toString, "country" -> country) + } + } + tablePartitionSpecs.result + } + + test("SPARK-14922: Error handling for drop partitions by filter") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + val m = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')") + }.getMessage + assert(m.contains("unknown is not a valid partition column in table")) + val m2 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')") + }.getMessage + assert(m2.contains("unknown is not a valid partition column in table")) + val m3 = intercept[AnalysisException] { + sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')") + }.getMessage + assert(m3.contains("'<=>' operator is not supported")) + val m4 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))") + }.getMessage + assert(m4.contains("extraneous input")) + val m5 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)") + }.getMessage + assert(m5.contains("Invalid partition spec: quarter")) + + sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')") + sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '5')") + + // support executing `PARTITION (quarter <= '2')` which contains empty + // values to be compatible with Hive. + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')") + + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=5") :: Nil) + val m7 = intercept[ParseException] { + sql("ALTER TABLE sales DROP PARTITION ( '4' > quarter)") + }.getMessage + assert(m7.contains("mismatched input ''4''")) + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=5") :: Nil) + } + } + + test("SPARK-14922: Partition filter is not allowed in ADD PARTITION") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + val e = intercept[AnalysisException] { + sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')") + } + assert(e.getMessage.contains("extraneous input '<'")) + } + } + test("drop views") { withTable("tab1") { val tabName = "tab1"