Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ statement
partitionSpecLocation+ #addTablePartition
| ALTER TABLE multipartIdentifier
from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition
| ALTER (TABLE | VIEW) multipartIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions
| ALTER (TABLE | VIEW) multipartIdentifier DROP (IF EXISTS)?
dropPartitionSpec (',' dropPartitionSpec)* PURGE? #dropTablePartitions
| ALTER TABLE multipartIdentifier
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions
Expand Down Expand Up @@ -328,6 +328,14 @@ database
| SCHEMA
;

dropPartitionSpec
: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')'
;

dropPartitionVal
: identifier (comparisonOperator constant)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One suggestion, can we replace constant with expression? In our PROD environment, there are some requirements like 'ALTER TABLE xxx DROP PARTITION (col < CURRENT_DATE)'.

;

describeFuncName
: qualifiedName
| STRING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ trait ExternalCatalog {
parts: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean): Unit
retainData: Boolean,
supportBatch: Boolean): Unit

/**
* Override the specs of one or many existing table partitions, assuming they exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,10 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
partSpecs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean): Unit = {
delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge, retainData)
retainData: Boolean,
supportBatch: Boolean): Unit = {
delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge, retainData,
supportBatch)
}

override def renamePartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ class InMemoryCatalog(
partSpecs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean): Unit = synchronized {
retainData: Boolean,
supportBatch: Boolean): Unit = synchronized {
requireTableExists(db, table)
val existingParts = catalog(db).tables(table).partitions
if (!ignoreIfNotExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,14 +918,16 @@ class SessionCatalog(
specs: Seq[TablePartitionSpec],
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean): Unit = {
retainData: Boolean,
supportBatch: Boolean): Unit = {
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
requireNonEmptyValueInPartitionSpec(specs)
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData,
supportBatch)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,35 @@ case class OuterReference(e: NamedExpression)
override def newInstance(): NamedExpression = OuterReference(e.newInstance())
}

/**
* A place holder used to hold the name of the partition attributes specified when running commands
* involving partitions, eg. ALTER TABLE ... DROP PARTITIONS.
*/
case class PartitioningAttribute(
name: String,
override val exprId: ExprId = NamedExpression.newExprId)
extends Attribute with Unevaluable {
// We need a dataType to be used during analysis for resolving the expressions (see
// checkInputDataTypes). The String type is used because all the literals in PARTITION operations
// are parsed as strings and eventually casted later.
override def dataType: DataType = StringType
override def nullable: Boolean = false

override def qualifier: Seq[String] = throw new UnsupportedOperationException
override def withNullability(newNullability: Boolean): Attribute =
throw new UnsupportedOperationException
override def newInstance(): Attribute = throw new UnsupportedOperationException
override def withQualifier(newQualifier: Seq[String]): Attribute =
throw new UnsupportedOperationException
override def withName(newName: String): Attribute = throw new UnsupportedOperationException
override def withMetadata(newMetadata: Metadata): Attribute =
throw new UnsupportedOperationException

override lazy val canonicalized: Expression = this.copy(exprId = ExprId(0))

override def withExprId(newExprId: ExprId): Attribute = throw new UnsupportedOperationException
}

object VirtualColumn {
// The attribute name used by Hive, which has different result than Spark, deprecated.
val hiveGroupingIdName: String = "grouping__id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,29 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}

/**
* Create a partition specification map with filters.
*/
override def visitDropPartitionSpec(
ctx: DropPartitionSpecContext): Seq[Expression] = withOrigin(ctx) {
ctx.dropPartitionVal().asScala.map { pFilter =>
if (pFilter.constant() == null || pFilter.comparisonOperator() == null) {
throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx)
}
// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when
// running the command.
val partition = PartitioningAttribute(pFilter.identifier().getText)
val value = Literal(visitStringConstant(pFilter.constant()))
val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
val comparison = buildComparison(partition, value, operator)
if (comparison.isInstanceOf[EqualNullSafe]) {
throw new ParseException(
"'<=>' operator is not supported in ALTER TABLE ... DROP PARTITION.", ctx)
}
comparison
}
}

/**
* Convert a constant of any type into a string. This is typically used in DDL commands, and its
* main purpose is to prevent slight differences due to back to back conversions i.e.:
Expand Down Expand Up @@ -1321,6 +1344,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
val left = expression(ctx.left)
val right = expression(ctx.right)
val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
buildComparison(left, right, operator)
}

/**
* Creates a comparison expression. The following comparison operators are supported:
* - Equal: '=' or '=='
* - Null-safe Equal: '<=>'
* - Not Equal: '<>' or '!='
* - Less than: '<'
* - Less then or Equal: '<='
* - Greater than: '>'
* - Greater then or Equal: '>='
*/
private def buildComparison(
left: Expression,
right: Expression,
operator: TerminalNode): Expression = {
operator.getSymbol.getType match {
case SqlBaseParser.EQ =>
EqualTo(left, right)
Expand Down Expand Up @@ -3153,7 +3193,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
AlterTableDropPartitionStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.dropPartitionSpec.asScala.map(visitDropPartitionSpec),
ifExists = ctx.EXISTS != null,
purge = ctx.PURGE != null,
retainData = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ case class AlterTableRenamePartitionStatement(
*/
case class AlterTableDropPartitionStatement(
tableName: Seq[String],
specs: Seq[TablePartitionSpec],
specs: Seq[Seq[Expression]],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean) extends ParsedStatement
Expand Down
Loading