Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ partitionSpec
;

partitionVal
: identifier (EQ constant)?
: identifier (comparisonOperator constant)?
;

describeFuncName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ object CatalogTypes {
* Specifications of a table partition. Mapping column name to column value.
*/
type TablePartitionSpec = Map[String, String]
type TablePartitionRangeSpec = Map[String, (String, String)]
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}

/**
* Create a partition specification map.
* Create a partition specification map: name -> value.
*/
override def visitPartitionSpec(
ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
val parts = ctx.partitionVal.asScala.map { pVal =>
val name = pVal.identifier.getText.toLowerCase
val operator = Option(pVal.comparisonOperator).map(_.getText)
if (operator.isDefined && !operator.get.equals("=")) {
throw new ParseException("Only '=' partition specification is allowed", ctx)
}
val value = Option(pVal.constant).map(visitStringConstant)
name -> value
}
Expand All @@ -201,6 +205,25 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
parts.toMap
}

/**
* Create a partition range specification map: name -> (operator, value).
*/
def visitPartitionRangeSpec(
ctx: PartitionSpecContext): Map[String, (String, String)] = withOrigin(ctx) {
val parts = ctx.partitionVal.asScala.map { pVal =>
val name = pVal.identifier.getText.toLowerCase
val operator = Option(pVal.comparisonOperator).map(_.getText)
if (operator.isDefined) {
name -> (operator.get, visitStringConstant(pVal.constant))
} else {
throw new ParseException("Invalid partition specification", ctx)
}
}
// Check for duplicate partition columns in one spec.
checkDuplicateKeys(parts, ctx)
parts.toMap
}

/**
* Create a partition specification map without optional values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.partitionSpec.asScala.map(visitPartitionRangeSpec),
ctx.EXISTS != null,
ctx.PURGE != null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.{TablePartitionRangeSpec, TablePartitionSpec}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -402,11 +403,33 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
specs: Seq[TablePartitionRangeSpec],
ifExists: Boolean,
purge: Boolean)
extends RunnableCommand {

private def isInRange(
table: CatalogTable,
rangeSpec: TablePartitionRangeSpec,
partition: CatalogTablePartition): Boolean = rangeSpec.forall {
case (key, (operator, value)) =>
if (!partition.spec.contains(key)) {
throw new AnalysisException(
s"Partition spec is invalid. The spec (${rangeSpec.keys.mkString(", ")}) must be " +
s"contained within the partition spec (${table.partitionColumnNames.mkString(", ")}) " +
s"defined in table '${table.identifier}'")
}
val result = partition.spec(key).compareTo(value)
operator match {
case "=" => result == 0
case "<" => result < 0
case "<=" => result <= 0
case ">" => result > 0
case ">=" => result >= 0
case _ => throw new UnsupportedOperationException("Unsupported operator: $operator")
}
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
Expand All @@ -415,7 +438,13 @@ case class AlterTableDropPartitionCommand(
throw new AnalysisException(
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
}
catalog.dropPartitions(table.identifier, specs, ignoreIfNotExists = ifExists, purge = purge)
val partitions = catalog.listPartitions(table.identifier)
val targets = partitions.filter(p => specs.exists(isInRange(table, _, p))).map(p => p.spec)
if (targets.nonEmpty) {
catalog.dropPartitions(table.identifier, targets, ignoreIfNotExists = ifExists, purge = purge)
} else if (!ifExists) {
throw new AnalysisException(specs.toString)
}
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,8 @@ class DDLCommandSuite extends PlanTest {
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
Map("dt" -> ("=", "2008-08-08"), "country" -> ("=", "us")),
Map("dt" -> ("=", "2009-09-09"), "country" -> ("=", "uk"))),
ifExists = true,
purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1862,6 +1862,54 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("PARTITION range in DROP PARTITION") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

for (country <- Seq("US", "CA", "KR")) {
for (quarter <- 1 to 4) {
sql(s"ALTER TABLE sales ADD PARTITION (country='$country', quarter='$quarter')")
}
}

sql("ALTER TABLE sales DROP PARTITION (country < 'KR')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=1") ::
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=US/quarter=1") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (quarter <= 2)")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')")
sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

}
}

test("PARTITION range is not allowed in ADD PARTITION") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

val m = intercept[ParseException] {
sql("ALTER TABLE sales ADD PARTITION (country='US', quarter<'1')")
}.getMessage()
assert(m.contains("Only '=' partition specification is allowed"))
}
}

test("SPARK-17354: Partitioning by dates/timestamps works with Parquet vectorized reader") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
sql(
Expand Down