Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,7 @@ partitionSpecLocation
;

partitionSpec
: PARTITION '(' partitionVal (',' partitionVal)* ')'
;

partitionVal
: identifier (EQ constant)?
: PARTITION '(' expression (',' expression)* ')'
;

describeFuncName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,15 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
*/
override def visitPartitionSpec(
ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
val parts = ctx.partitionVal.asScala.map { pVal =>
val name = pVal.identifier.getText
val value = Option(pVal.constant).map(visitStringConstant)
name -> value
val parts = ctx.expression.asScala.map { pVal =>
expression(pVal) match {
case UnresolvedAttribute(name :: Nil) =>
name -> None
case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: Literal) =>
name -> Option(constant.toString)
case _ =>
throw new ParseException("Invalid partition filter specification", ctx)
}
}
// Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values
// in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for
Expand All @@ -206,6 +211,23 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
parts.toMap
}

/**
* Create a partition filter specification.
*/
def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) {
val parts = ctx.expression.asScala.map { pVal =>
expression(pVal) match {
case EqualNullSafe(_, _) =>
throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx)
case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) =>
cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant))
case _ =>
throw new ParseException("Invalid partition filter specification", ctx)
}
}
parts.reduceLeft(And)
}

/**
* Create a partition specification map without optional values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.partitionSpec.asScala.map(visitPartitionFilterSpec),
ctx.EXISTS != null,
ctx.PURGE != null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -418,27 +419,55 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
specs: Seq[Expression],
ifExists: Boolean,
purge: Boolean)
extends RunnableCommand {
extends RunnableCommand with PredicateHelper {

private def isRangeComparison(expr: Expression): Boolean = {
expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")

val normalizedSpecs = specs.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
specs.foreach { expr =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rant:

this code here to me is evidence that we should actually analyze these commands. The analyzer should throw out invalid commands, instead of doing it all during run().

expr.references.foreach { attr =>
if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
throw new AnalysisException(s"${attr.name} is not a valid partition column " +
s"in table ${table.identifier.quotedString}.")
}
}
}

catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
if (specs.exists(isRangeComparison)) {
val partitionSet = specs.flatMap { spec =>
val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec)
if (partitions.isEmpty && !ifExists) {
throw new AnalysisException(s"There is no partition for ${spec.sql}")
}
partitions
}.distinct
catalog.dropPartitions(
table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge)
} else {
val normalizedSpecs = specs.map { expr =>
val spec = splitConjunctivePredicates(expr).map {
case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString
}.toMap
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
resolver)
}
catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
}
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if (overwrite.enabled) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
import org.apache.spark.sql.catalyst.expressions._
val expressions = deletedPartitions.map { specs =>
specs.map { case (key, value) =>
EqualTo(AttributeReference(key, StringType)(), Literal.create(value, StringType))
}.reduceLeft(And)
}.toSeq
AlterTableDropPartitionCommand(
l.catalogTable.get.identifier, deletedPartitions.toSeq,
l.catalogTable.get.identifier, expressions,
ifExists = true, purge = true).run(t.sparkSession)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.reflect.{classTag, ClassTag}

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
Expand Down Expand Up @@ -612,8 +613,12 @@ class DDLCommandSuite extends PlanTest {
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
And(
EqualTo(AttributeReference("dt", StringType)(), Literal.create("2008-08-08", StringType)),
EqualTo(AttributeReference("country", StringType)(), Literal.create("us", StringType))),
And(
EqualTo(AttributeReference("dt", StringType)(), Literal.create("2009-09-09", StringType)),
EqualTo(AttributeReference("country", StringType)(), Literal.create("uk", StringType)))),
ifExists = true,
purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
Expand Down Expand Up @@ -225,6 +226,108 @@ class HiveDDLSuite
}
}

test("SPARK-17732: Drop partitions by filter") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

for (country <- Seq("US", "CA", "KR")) {
for (quarter <- 1 to 4) {
sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')")
}
}

sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=CA/quarter=1") ::
Row("country=CA/quarter=2") ::
Row("country=KR/quarter=1") ::
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=US/quarter=1") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')")
sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a test for dropping multiple partition specs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 251 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean something like PARTITON (quarter <= '2'), PARTITION (quarter >= '4').

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I missed that case. Sure! Thank you again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test case like PARTITON (quarter <= '4'), PARTITION (quarter <= '2') to see what will happen since after the first partition spec is removed the second one may be failed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add that, we should make another testcases because the remaining partitions are not enough to test that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the case by updating the existing testcases.

checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") :: Nil)

// According to the declarative partition spec definitions, this drops the union of target
// partitions without exceptions. Hive raises exceptions because it handles them sequentially.
sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')")
checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
}
}

test("SPARK-17732: Error handling for drop partitions by filter") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

val m = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')")
}.getMessage
assert(m.contains("unknown is not a valid partition column in table"))

val m2 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')")
}.getMessage
assert(m2.contains("unknown is not a valid partition column in table"))

val m3 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')")
}.getMessage
assert(m3.contains("'<=>' operator is not allowed in partition specification"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more negative case? How about unknown <=> upper('KR')?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current master behavior looks the following. I added new case with the same behavior.

scala> sql("ALTER TABLE sales DROP PARTITION (unknown = upper('KR'))").show
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '(' expecting STRING(line 1, pos 49)


val m4 = intercept[ParseException] {
sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))")
}.getMessage
assert(m4.contains("'<=>' operator is not allowed in partition specification"))

val m5 = intercept[ParseException] {
sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)")
}.getMessage
assert(m5.contains("Invalid partition filter specification"))

sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')")
val m6 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')")
}.getMessage
// The query is not executed because `PARTITION (quarter <= '2')` is invalid.
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") :: Nil)
assert(m6.contains("There is no partition for (`quarter` <= '2')"))
}
}

test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

val m = intercept[ParseException] {
sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')")
}.getMessage()
assert(m.contains("Invalid partition filter specification"))
}
}

test("drop views") {
withTable("tab1") {
val tabName = "tab1"
Expand Down