Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -2114,54 +2114,6 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsed2, expected2)
}

// ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
// ALTER VIEW table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
test("alter table: drop partition") {
val sql1_table =
"""
|ALTER TABLE table_name DROP IF EXISTS PARTITION
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
""".stripMargin
val sql2_table =
"""
|ALTER TABLE table_name DROP PARTITION
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
""".stripMargin
val sql1_view = sql1_table.replace("TABLE", "VIEW")
val sql2_view = sql2_table.replace("TABLE", "VIEW")

val parsed1_table = parsePlan(sql1_table)
val parsed2_table = parsePlan(sql2_table)
val parsed1_purge = parsePlan(sql1_table + " PURGE")

assertUnsupported(sql1_view)
assertUnsupported(sql2_view)

val expected1_table = AlterTableDropPartition(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP PARTITION ..."),
Seq(
UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us")),
UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"))),
ifExists = true,
purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
val expected1_purge = expected1_table.copy(purge = true)

comparePlans(parsed1_table, expected1_table)
comparePlans(parsed2_table, expected2_table)
comparePlans(parsed1_purge, expected1_purge)

val sql3_table = "ALTER TABLE a.b.c DROP IF EXISTS PARTITION (ds='2017-06-10')"
val expected3_table = AlterTableDropPartition(
UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... DROP PARTITION ..."),
Seq(UnresolvedPartitionSpec(Map("ds" -> "2017-06-10"))),
ifExists = true,
purge = false)

val parsed3_table = parsePlan(sql3_table)
comparePlans(parsed3_table, expected3_table)
}

test("show current namespace") {
comparePlans(
parsePlan("SHOW CURRENT NAMESPACE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,8 @@
package org.apache.spark.sql.connector

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits
import org.apache.spark.sql.internal.SQLConf

class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {

import CatalogV2Implicits._
import DataSourceV2Implicits._


test("ALTER TABLE RECOVER PARTITIONS") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
Expand All @@ -51,106 +41,4 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
assert(e.message.contains("ALTER TABLE RENAME PARTITION is only supported with v1 tables"))
}
}

test("ALTER TABLE DROP PARTITION") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'")
spark.sql(s"ALTER TABLE $t DROP PARTITION (id=1)")

val partTable =
catalog("testpart").asTableCatalog.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(1))))
}
}

test("ALTER TABLE DROP PARTITIONS") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
spark.sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
spark.sql(s"ALTER TABLE $t DROP PARTITION (id=1), PARTITION (id=2)")

val partTable =
catalog("testpart").asTableCatalog.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(1))))
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(2))))
assert(
partTable.asPartitionable.listPartitionIdentifiers(Array.empty, InternalRow.empty).isEmpty)
}
}

test("ALTER TABLE DROP PARTITIONS: partition not exists") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'")

assertThrows[NoSuchPartitionsException](
spark.sql(s"ALTER TABLE $t DROP PARTITION (id=1), PARTITION (id=2)"))

val partTable =
catalog("testpart").asTableCatalog.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
assert(partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(1))))

spark.sql(s"ALTER TABLE $t DROP IF EXISTS PARTITION (id=1), PARTITION (id=2)")
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(1))))
assert(!partTable.asPartitionable.partitionExists(InternalRow.fromSeq(Seq(2))))
assert(
partTable.asPartitionable.listPartitionIdentifiers(Array.empty, InternalRow.empty).isEmpty)
}
}

test("case sensitivity in resolving partition specs") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val errMsg = intercept[AnalysisException] {
spark.sql(s"ALTER TABLE $t DROP PARTITION (ID=1)")
}.getMessage
assert(errMsg.contains(s"ID is not a valid partition column in table $t"))
}

val partTable = catalog("testpart").asTableCatalog
.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
.asPartitionable
assert(!partTable.partitionExists(InternalRow.fromSeq(Seq(1))))

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'")
assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
spark.sql(s"ALTER TABLE $t DROP PARTITION (Id=1)")
assert(!partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
}
}
}

test("SPARK-33650: drop partition into a table which doesn't support partition management") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING _")
val errMsg = intercept[AnalysisException] {
spark.sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
}.getMessage
assert(errMsg.contains(s"Table $t can not alter partitions"))
}
}

test("SPARK-33676: not fully specified partition spec") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
sql(s"""
|CREATE TABLE $t (id bigint, part0 int, part1 string)
|USING foo
|PARTITIONED BY (part0, part1)""".stripMargin)
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t DROP PARTITION (part0 = 1)")
}.getMessage
assert(errMsg.contains("Partition spec is invalid. " +
"The spec (part0) must match the partition spec (part0, part1)"))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedPartitionSpec, UnresolvedTable}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.AlterTableDropPartition
import org.apache.spark.sql.test.SharedSparkSession

class AlterTableDropPartitionParserSuite extends AnalysisTest with SharedSparkSession {
test("drop partition") {
val sql = """
|ALTER TABLE table_name DROP PARTITION
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
""".stripMargin
val expected = AlterTableDropPartition(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP PARTITION ..."),
Seq(
UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us")),
UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"))),
ifExists = false,
purge = false)

comparePlans(parsePlan(sql), expected)
}

test("drop partition if exists") {
val sql = """
|ALTER TABLE table_name DROP IF EXISTS
|PARTITION (dt='2008-08-08', country='us'),
|PARTITION (dt='2009-09-09', country='uk')
""".stripMargin
val expected = AlterTableDropPartition(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP PARTITION ..."),
Seq(
UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us")),
UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"))),
ifExists = true,
purge = false)
comparePlans(parsePlan(sql), expected)
}

test("drop partition in a table with multi-part identifier") {
val sql = "ALTER TABLE a.b.c DROP IF EXISTS PARTITION (ds='2017-06-10')"
val expected = AlterTableDropPartition(
UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... DROP PARTITION ..."),
Seq(UnresolvedPartitionSpec(Map("ds" -> "2017-06-10"))),
ifExists = true,
purge = false)

comparePlans(parsePlan(sql), expected)
}

test("drop partition with PURGE") {
val sql = "ALTER TABLE table_name DROP PARTITION (p=1) PURGE"
val expected = AlterTableDropPartition(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP PARTITION ..."),
Seq(UnresolvedPartitionSpec(Map("p" -> "1"))),
ifExists = false,
purge = true)

comparePlans(parsePlan(sql), expected)
}

test("drop partition from view") {
val sql = "ALTER VIEW table_name DROP PARTITION (p=1)"
val errMsg = intercept[ParseException] {
parsePlan(sql)
}.getMessage
assert(errMsg.contains("Operation not allowed"))
}
}
Loading