diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 1284681fe80b..94bc4ab82825 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -242,7 +242,7 @@ partitionSpec ; partitionVal - : identifier (EQ constant)? + : identifier (comparisonOperator constant)? ; describeFuncName diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 51326ca25e9c..91c4372fb177 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -234,6 +234,7 @@ object CatalogTypes { * Specifications of a table partition. Mapping column name to column value. */ type TablePartitionSpec = Map[String, String] + type TablePartitionRangeSpec = Map[String, (String, String)] } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 12a70b7769ef..706eebbd77df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -187,12 +187,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { } /** - * Create a partition specification map. + * Create a partition specification map: name -> value. */ override def visitPartitionSpec( ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) { val parts = ctx.partitionVal.asScala.map { pVal => val name = pVal.identifier.getText.toLowerCase + val operator = Option(pVal.comparisonOperator).map(_.getText) + if (operator.isDefined && !operator.get.equals("=")) { + throw new ParseException("Only '=' partition specification is allowed", ctx) + } val value = Option(pVal.constant).map(visitStringConstant) name -> value } @@ -201,6 +205,25 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { parts.toMap } + /** + * Create a partition range specification map: name -> (operator, value). + */ + def visitPartitionRangeSpec( + ctx: PartitionSpecContext): Map[String, (String, String)] = withOrigin(ctx) { + val parts = ctx.partitionVal.asScala.map { pVal => + val name = pVal.identifier.getText.toLowerCase + val operator = Option(pVal.comparisonOperator).map(_.getText) + if (operator.isDefined) { + name -> (operator.get, visitStringConstant(pVal.constant)) + } else { + throw new ParseException("Invalid partition specification", ctx) + } + } + // Check for duplicate partition columns in one spec. + checkDuplicateKeys(parts, ctx) + parts.toMap + } + /** * Create a partition specification map without optional values. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 7f1e23e665eb..001fa9dc2780 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -821,7 +821,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), + ctx.partitionSpec.asScala.map(visitPartitionRangeSpec), ctx.EXISTS != null, ctx.PURGE != null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 01ac89868d10..e0e95d48bd25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -28,8 +28,9 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTablePartition, CatalogTableType, SessionCatalog} -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.{TablePartitionRangeSpec, TablePartitionSpec} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ @@ -402,11 +403,33 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - specs: Seq[TablePartitionSpec], + specs: Seq[TablePartitionRangeSpec], ifExists: Boolean, purge: Boolean) extends RunnableCommand { + private def isInRange( + table: CatalogTable, + rangeSpec: TablePartitionRangeSpec, + partition: CatalogTablePartition): Boolean = rangeSpec.forall { + case (key, (operator, value)) => + if (!partition.spec.contains(key)) { + throw new AnalysisException( + s"Partition spec is invalid. The spec (${rangeSpec.keys.mkString(", ")}) must be " + + s"contained within the partition spec (${table.partitionColumnNames.mkString(", ")}) " + + s"defined in table '${table.identifier}'") + } + val result = partition.spec(key).compareTo(value) + operator match { + case "=" => result == 0 + case "<" => result < 0 + case "<=" => result <= 0 + case ">" => result > 0 + case ">=" => result >= 0 + case _ => throw new UnsupportedOperationException("Unsupported operator: $operator") + } + } + override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) @@ -415,7 +438,13 @@ case class AlterTableDropPartitionCommand( throw new AnalysisException( "ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API") } - catalog.dropPartitions(table.identifier, specs, ignoreIfNotExists = ifExists, purge = purge) + val partitions = catalog.listPartitions(table.identifier) + val targets = partitions.filter(p => specs.exists(isInRange(table, _, p))).map(p => p.spec) + if (targets.nonEmpty) { + catalog.dropPartitions(table.identifier, targets, ignoreIfNotExists = ifExists, purge = purge) + } else if (!ifExists) { + throw new AnalysisException(specs.toString) + } Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 547fb6381375..d246507f428f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -610,8 +610,8 @@ class DDLCommandSuite extends PlanTest { val expected1_table = AlterTableDropPartitionCommand( tableIdent, Seq( - Map("dt" -> "2008-08-08", "country" -> "us"), - Map("dt" -> "2009-09-09", "country" -> "uk")), + Map("dt" -> ("=", "2008-08-08"), "country" -> ("=", "us")), + Map("dt" -> ("=", "2009-09-09"), "country" -> ("=", "uk"))), ifExists = true, purge = false) val expected2_table = expected1_table.copy(ifExists = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 6c77a0deb52a..304477a2a47b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1862,6 +1862,54 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("PARTITION range in DROP PARTITION") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + + for (country <- Seq("US", "CA", "KR")) { + for (quarter <- 1 to 4) { + sql(s"ALTER TABLE sales ADD PARTITION (country='$country', quarter='$quarter')") + } + } + + sql("ALTER TABLE sales DROP PARTITION (country < 'KR')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + + sql("ALTER TABLE sales DROP PARTITION (quarter <= 2)") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + + sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')") + sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=KR/quarter=3") :: + Row("country=US/quarter=4") :: Nil) + + } + } + + test("PARTITION range is not allowed in ADD PARTITION") { + withTable("sales") { + sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") + + val m = intercept[ParseException] { + sql("ALTER TABLE sales ADD PARTITION (country='US', quarter<'1')") + }.getMessage() + assert(m.contains("Only '=' partition specification is allowed")) + } + } + test("SPARK-17354: Partitioning by dates/timestamps works with Parquet vectorized reader") { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { sql(