Skip to content

Commit e27f13c

Browse files
committed
[SPARK-14922][SPARK-17732][SPARK-23866][SQL] Support partition filters in ALTER TABLE DROP PARTITION and batch dropping PARTITIONS
- Support partition filters in ALTER TABLE DROP PARTITION. - Support batch dropping PARTITIONS by 'ExpressionBuilder'.
1 parent 39596b9 commit e27f13c

File tree

22 files changed

+1064
-256
lines changed

22 files changed

+1064
-256
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ statement
161161
partitionSpecLocation+ #addTablePartition
162162
| ALTER TABLE multipartIdentifier
163163
from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition
164-
| ALTER (TABLE | VIEW) multipartIdentifier
165-
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions
164+
| ALTER (TABLE | VIEW) multipartIdentifier DROP (IF EXISTS)?
165+
dropPartitionSpec (',' dropPartitionSpec)* PURGE? #dropTablePartitions
166166
| ALTER TABLE multipartIdentifier
167167
(partitionSpec)? SET locationSpec #setTableLocation
168168
| ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions
@@ -328,6 +328,14 @@ database
328328
| SCHEMA
329329
;
330330

331+
dropPartitionSpec
332+
: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')'
333+
;
334+
335+
dropPartitionVal
336+
: identifier (comparisonOperator constant)?
337+
;
338+
331339
describeFuncName
332340
: qualifiedName
333341
| STRING

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ trait ExternalCatalog {
188188
parts: Seq[TablePartitionSpec],
189189
ignoreIfNotExists: Boolean,
190190
purge: Boolean,
191-
retainData: Boolean): Unit
191+
retainData: Boolean,
192+
supportBatch: Boolean): Unit
192193

193194
/**
194195
* Override the specs of one or many existing table partitions, assuming they exist.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,10 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
203203
partSpecs: Seq[TablePartitionSpec],
204204
ignoreIfNotExists: Boolean,
205205
purge: Boolean,
206-
retainData: Boolean): Unit = {
207-
delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge, retainData)
206+
retainData: Boolean,
207+
supportBatch: Boolean): Unit = {
208+
delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge, retainData,
209+
supportBatch)
208210
}
209211

210212
override def renamePartitions(

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ class InMemoryCatalog(
425425
partSpecs: Seq[TablePartitionSpec],
426426
ignoreIfNotExists: Boolean,
427427
purge: Boolean,
428-
retainData: Boolean): Unit = synchronized {
428+
retainData: Boolean,
429+
supportBatch: Boolean): Unit = synchronized {
429430
requireTableExists(db, table)
430431
val existingParts = catalog(db).tables(table).partitions
431432
if (!ignoreIfNotExists) {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -918,14 +918,16 @@ class SessionCatalog(
918918
specs: Seq[TablePartitionSpec],
919919
ignoreIfNotExists: Boolean,
920920
purge: Boolean,
921-
retainData: Boolean): Unit = {
921+
retainData: Boolean,
922+
supportBatch: Boolean): Unit = {
922923
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
923924
val table = formatTableName(tableName.table)
924925
requireDbExists(db)
925926
requireTableExists(TableIdentifier(table, Option(db)))
926927
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
927928
requireNonEmptyValueInPartitionSpec(specs)
928-
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
929+
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData,
930+
supportBatch)
929931
}
930932

931933
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,35 @@ case class OuterReference(e: NamedExpression)
391391
override def newInstance(): NamedExpression = OuterReference(e.newInstance())
392392
}
393393

394+
/**
395+
* A place holder used to hold the name of the partition attributes specified when running commands
396+
* involving partitions, eg. ALTER TABLE ... DROP PARTITIONS.
397+
*/
398+
case class PartitioningAttribute(
399+
name: String,
400+
override val exprId: ExprId = NamedExpression.newExprId)
401+
extends Attribute with Unevaluable {
402+
// We need a dataType to be used during analysis for resolving the expressions (see
403+
// checkInputDataTypes). The String type is used because all the literals in PARTITION operations
404+
// are parsed as strings and eventually casted later.
405+
override def dataType: DataType = StringType
406+
override def nullable: Boolean = false
407+
408+
override def qualifier: Seq[String] = throw new UnsupportedOperationException
409+
override def withNullability(newNullability: Boolean): Attribute =
410+
throw new UnsupportedOperationException
411+
override def newInstance(): Attribute = throw new UnsupportedOperationException
412+
override def withQualifier(newQualifier: Seq[String]): Attribute =
413+
throw new UnsupportedOperationException
414+
override def withName(newName: String): Attribute = throw new UnsupportedOperationException
415+
override def withMetadata(newMetadata: Metadata): Attribute =
416+
throw new UnsupportedOperationException
417+
418+
override lazy val canonicalized: Expression = this.copy(exprId = ExprId(0))
419+
420+
override def withExprId(newExprId: ExprId): Attribute = throw new UnsupportedOperationException
421+
}
422+
394423
object VirtualColumn {
395424
// The attribute name used by Hive, which has different result than Spark, deprecated.
396425
val hiveGroupingIdName: String = "grouping__id"

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,29 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
499499
}
500500
}
501501

502+
/**
503+
* Create a partition specification map with filters.
504+
*/
505+
override def visitDropPartitionSpec(
506+
ctx: DropPartitionSpecContext): Seq[Expression] = withOrigin(ctx) {
507+
ctx.dropPartitionVal().asScala.map { pFilter =>
508+
if (pFilter.constant() == null || pFilter.comparisonOperator() == null) {
509+
throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx)
510+
}
511+
// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when
512+
// running the command.
513+
val partition = PartitioningAttribute(pFilter.identifier().getText)
514+
val value = Literal(visitStringConstant(pFilter.constant()))
515+
val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
516+
val comparison = buildComparison(partition, value, operator)
517+
if (comparison.isInstanceOf[EqualNullSafe]) {
518+
throw new ParseException(
519+
"'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx)
520+
}
521+
comparison
522+
}
523+
}
524+
502525
/**
503526
* Convert a constant of any type into a string. This is typically used in DDL commands, and its
504527
* main purpose is to prevent slight differences due to back to back conversions i.e.:
@@ -1321,6 +1344,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
13211344
val left = expression(ctx.left)
13221345
val right = expression(ctx.right)
13231346
val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
1347+
buildComparison(left, right, operator)
1348+
}
1349+
1350+
/**
1351+
* Creates a comparison expression. The following comparison operators are supported:
1352+
* - Equal: '=' or '=='
1353+
* - Null-safe Equal: '<=>'
1354+
* - Not Equal: '<>' or '!='
1355+
* - Less than: '<'
1356+
* - Less then or Equal: '<='
1357+
* - Greater than: '>'
1358+
* - Greater then or Equal: '>='
1359+
*/
1360+
private def buildComparison(
1361+
left: Expression,
1362+
right: Expression,
1363+
operator: TerminalNode): Expression = {
13241364
operator.getSymbol.getType match {
13251365
case SqlBaseParser.EQ =>
13261366
EqualTo(left, right)
@@ -3153,7 +3193,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
31533193
}
31543194
AlterTableDropPartitionStatement(
31553195
visitMultipartIdentifier(ctx.multipartIdentifier),
3156-
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
3196+
ctx.dropPartitionSpec.asScala.map(visitDropPartitionSpec),
31573197
ifExists = ctx.EXISTS != null,
31583198
purge = ctx.PURGE != null,
31593199
retainData = false)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ case class AlterTableRenamePartitionStatement(
209209
*/
210210
case class AlterTableDropPartitionStatement(
211211
tableName: Seq[String],
212-
specs: Seq[TablePartitionSpec],
212+
specs: Seq[Seq[Expression]],
213213
ifExists: Boolean,
214214
purge: Boolean,
215215
retainData: Boolean) extends ParsedStatement

0 commit comments

Comments
 (0)