Skip to content

Commit 91c8e58

Browse files
committed
ALTER TABLE CHANGE COLUMN should do multi-catalog resolution.
1 parent bb47870 commit 91c8e58

File tree

8 files changed

+103
-64
lines changed

8 files changed

+103
-64
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ statement
151151
| ALTER TABLE multipartIdentifier
152152
(ALTER | CHANGE) COLUMN? qualifiedName
153153
(TYPE dataType)? (COMMENT comment=STRING)? colPosition? #alterTableColumn
154-
| ALTER TABLE tableIdentifier partitionSpec?
154+
| ALTER TABLE multipartIdentifier partitionSpec?
155155
CHANGE COLUMN?
156156
colName=errorCapturingIdentifier colType colPosition? #changeColumn
157157
| ALTER TABLE tableIdentifier (partitionSpec)?

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2951,4 +2951,31 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
29512951
ctx: RecoverPartitionsContext): LogicalPlan = withOrigin(ctx) {
29522952
AlterTableRecoverPartitionsStatement(visitMultipartIdentifier(ctx.multipartIdentifier))
29532953
}
2954+
2955+
/**
2956+
* Create a [[AlterTableChangeColumnStatement]] command.
2957+
*
2958+
* For example:
2959+
* {{{
2960+
* ALTER TABLE multi_part_name [PARTITION partition_spec]
2961+
* CHANGE [COLUMN] column_old_name column_new_name column_dataType [COMMENT column_comment]
2962+
* [FIRST | AFTER column_name];
2963+
* }}}
2964+
*/
2965+
override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = withOrigin(ctx) {
2966+
if (ctx.partitionSpec != null) {
2967+
operationNotAllowed("ALTER TABLE table PARTITION partition_spec CHANGE COLUMN", ctx)
2968+
}
2969+
2970+
if (ctx.colPosition != null) {
2971+
operationNotAllowed(
2972+
"ALTER TABLE table [PARTITION partition_spec] CHANGE COLUMN ... FIRST | AFTER otherCol",
2973+
ctx)
2974+
}
2975+
2976+
AlterTableChangeColumnStatement(
2977+
visitMultipartIdentifier(ctx.multipartIdentifier),
2978+
ctx.colName.getText,
2979+
visitColType(ctx.colType))
2980+
}
29542981
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
2121
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2222
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
2323
import org.apache.spark.sql.connector.expressions.Transform
24-
import org.apache.spark.sql.types.{DataType, StructType}
24+
import org.apache.spark.sql.types.{DataType, StructField, StructType}
2525

2626
/**
2727
* A logical plan node that contains exactly what was parsed from SQL.
@@ -136,7 +136,7 @@ case class AlterTableAddColumnsStatement(
136136
columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement
137137

138138
/**
139-
* ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL.
139+
* ALTER TABLE ... ALTER COLUMN command, as parsed from SQL.
140140
*/
141141
case class AlterTableAlterColumnStatement(
142142
tableName: Seq[String],
@@ -188,6 +188,14 @@ case class AlterTableSetLocationStatement(
188188
case class AlterTableRecoverPartitionsStatement(
189189
tableName: Seq[String]) extends ParsedStatement
190190

191+
/**
192+
* ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL.
193+
*/
194+
case class AlterTableChangeColumnStatement(
195+
tableName: Seq[String],
196+
columnName: String,
197+
newColumn: StructField) extends ParsedStatement
198+
191199
/**
192200
* ALTER VIEW ... SET TBLPROPERTIES command, as parsed from SQL.
193201
*/

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
2525
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
28-
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
28+
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType, TimestampType}
2929
import org.apache.spark.unsafe.types.UTF8String
3030

3131
class DDLParserSuite extends AnalysisTest {
@@ -601,11 +601,47 @@ class DDLParserSuite extends AnalysisTest {
601601
Some("new comment")))
602602
}
603603

604-
test("alter table: change column position (not supported)") {
604+
test("alter table: alter column position (not supported)") {
605605
assertUnsupported("ALTER TABLE table_name CHANGE COLUMN name COMMENT 'doc' FIRST")
606606
assertUnsupported("ALTER TABLE table_name CHANGE COLUMN name TYPE INT AFTER other_col")
607607
}
608608

609+
test("alter table: change column name/type/comment") {
610+
val sql1 = "ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT"
611+
val sql2 = "ALTER TABLE table_name CHANGE COLUMN col_name col_name INT COMMENT 'new_comment'"
612+
val sql3 = "ALTER TABLE a.b.c CHANGE COLUMN col_name col_name INT COMMENT 'new_comment'"
613+
614+
val parsed1 = parsePlan(sql1)
615+
val parsed2 = parsePlan(sql2)
616+
val parsed3 = parsePlan(sql3)
617+
618+
val expected1 = AlterTableChangeColumnStatement(
619+
Seq("table_name"),
620+
"col_old_name",
621+
StructField("col_new_name", IntegerType))
622+
val expected2 = AlterTableChangeColumnStatement(
623+
Seq("table_name"),
624+
"col_name",
625+
StructField("col_name", IntegerType).withComment("new_comment"))
626+
val expected3 = AlterTableChangeColumnStatement(
627+
Seq("a", "b", "c"),
628+
"col_name",
629+
StructField("col_name", IntegerType).withComment("new_comment"))
630+
comparePlans(parsed1, expected1)
631+
comparePlans(parsed2, expected2)
632+
comparePlans(parsed3, expected3)
633+
}
634+
635+
test("alter table: change column position (not supported)") {
636+
assertUnsupported("ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT FIRST")
637+
assertUnsupported(
638+
"ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT AFTER other_col")
639+
}
640+
641+
test("alter table: change column in partition spec") {
642+
assertUnsupported("ALTER TABLE table_name PARTITION (a='1', a='2') CHANGE COLUMN a new_a INT")
643+
}
644+
609645
test("alter table: drop column") {
610646
comparePlans(
611647
parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"),

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
2424
import org.apache.spark.sql.catalyst.rules.Rule
2525
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
2626
import org.apache.spark.sql.connector.expressions.Transform
27-
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropDatabaseCommand, DropTableCommand, LoadDataCommand, ShowColumnsCommand, ShowCreateTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand, UncacheTableCommand}
27+
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CacheTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropDatabaseCommand, DropTableCommand, LoadDataCommand, ShowColumnsCommand, ShowCreateTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand, UncacheTableCommand}
2828
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
2929
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
3030
import org.apache.spark.sql.internal.SQLConf
@@ -369,6 +369,13 @@ class ResolveSessionCatalog(
369369
AlterTableRecoverPartitionsCommand(
370370
v1TableName.asTableIdentifier,
371371
"ALTER TABLE RECOVER PARTITIONS")
372+
373+
case AlterTableChangeColumnStatement(tableName, columnName, newColumn) =>
374+
val v1TableName = parseV1Table(tableName, "ALTER TABLE CHANGE COLUMN")
375+
AlterTableChangeColumnCommand(
376+
v1TableName.asTableIdentifier,
377+
columnName,
378+
newColumn)
372379
}
373380

374381
private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -502,33 +502,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
502502
retainData = false)
503503
}
504504

505-
/**
506-
* Create a [[AlterTableChangeColumnCommand]] command.
507-
*
508-
* For example:
509-
* {{{
510-
* ALTER TABLE table [PARTITION partition_spec]
511-
* CHANGE [COLUMN] column_old_name column_new_name column_dataType [COMMENT column_comment]
512-
* [FIRST | AFTER column_name];
513-
* }}}
514-
*/
515-
override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = withOrigin(ctx) {
516-
if (ctx.partitionSpec != null) {
517-
operationNotAllowed("ALTER TABLE table PARTITION partition_spec CHANGE COLUMN", ctx)
518-
}
519-
520-
if (ctx.colPosition != null) {
521-
operationNotAllowed(
522-
"ALTER TABLE table [PARTITION partition_spec] CHANGE COLUMN ... FIRST | AFTER otherCol",
523-
ctx)
524-
}
525-
526-
AlterTableChangeColumnCommand(
527-
tableName = visitTableIdentifier(ctx.tableIdentifier),
528-
columnName = ctx.colName.getText,
529-
newColumn = visitColType(ctx.colType))
530-
}
531-
532505
/**
533506
* Convert a nested constants list into a sequence of string sequences.
534507
*/

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,16 +1374,32 @@ class DataSourceV2SQLSuite
13741374
}
13751375
}
13761376

1377-
test("ALTER TABLE RECOVER PARTITIONS") {
1377+
test("ALTER TABLE RECOVER PARTITIONS") {
13781378
val t = "testcat.ns1.ns2.tbl"
13791379
withTable(t) {
13801380
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
13811381
val e = intercept[AnalysisException] {
1382-
val partition = sql(s"ALTER TABLE $t RECOVER PARTITIONS")
1382+
sql(s"ALTER TABLE $t RECOVER PARTITIONS")
13831383
}
13841384
assert(e.message.contains("ALTER TABLE RECOVER PARTITIONS is only supported with v1 tables"))
13851385
}
1386-
}
1386+
}
1387+
1388+
test("ALTER TABLE CHANGE COLUMN") {
1389+
val t = "testcat.ns1.ns2.tbl"
1390+
withTable(t) {
1391+
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
1392+
val e1 = intercept[AnalysisException] {
1393+
sql(s"ALTER TABLE $t CHANGE COLUMN id id_new INT")
1394+
}
1395+
assert(e1.message.contains("ALTER TABLE CHANGE COLUMN is only supported with v1 tables"))
1396+
1397+
val e2 = intercept[AnalysisException] {
1398+
sql(s"ALTER TABLE $t CHANGE COLUMN id id_new INT COMMENT 'new_comment'")
1399+
}
1400+
assert(e2.message.contains("ALTER TABLE CHANGE COLUMN is only supported with v1 tables"))
1401+
}
1402+
}
13871403

13881404
private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
13891405
val e = intercept[AnalysisException] {

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -635,34 +635,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
635635
"SET FILEFORMAT PARQUET")
636636
}
637637

638-
test("alter table: change column name/type/comment") {
639-
val sql1 = "ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT"
640-
val sql2 = "ALTER TABLE table_name CHANGE COLUMN col_name col_name INT COMMENT 'new_comment'"
641-
val parsed1 = parser.parsePlan(sql1)
642-
val parsed2 = parser.parsePlan(sql2)
643-
val tableIdent = TableIdentifier("table_name", None)
644-
val expected1 = AlterTableChangeColumnCommand(
645-
tableIdent,
646-
"col_old_name",
647-
StructField("col_new_name", IntegerType))
648-
val expected2 = AlterTableChangeColumnCommand(
649-
tableIdent,
650-
"col_name",
651-
StructField("col_name", IntegerType).withComment("new_comment"))
652-
comparePlans(parsed1, expected1)
653-
comparePlans(parsed2, expected2)
654-
}
655-
656-
test("alter table: change column position (not supported)") {
657-
assertUnsupported("ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT FIRST")
658-
assertUnsupported(
659-
"ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT AFTER other_col")
660-
}
661-
662-
test("alter table: change column in partition spec") {
663-
assertUnsupported("ALTER TABLE table_name PARTITION (a='1', a='2') CHANGE COLUMN a new_a INT")
664-
}
665-
666638
test("alter table: touch (not supported)") {
667639
assertUnsupported("ALTER TABLE table_name TOUCH")
668640
assertUnsupported("ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')")

0 commit comments

Comments
 (0)