From fd16d6b1fe01f3ee9564e6811dd1e889fae6cda4 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 17 Nov 2017 20:11:33 +0800 Subject: [PATCH 1/4] Support change column dataType --- .../spark/sql/execution/command/ddl.scala | 18 +++++++++++++----- .../sql-tests/inputs/change-column.sql | 2 +- .../sql-tests/results/change-column.sql.out | 7 +++---- .../spark/sql/execution/command/DDLSuite.scala | 5 +++++ 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 04bf8c6dd917..6618a42f134c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -309,7 +309,7 @@ case class AlterTableChangeColumnCommand( columnName: String, newColumn: StructField) extends RunnableCommand { - // TODO: support change column name/dataType/metadata/position. + // TODO: support change column name/metadata/position. override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) @@ -326,10 +326,18 @@ case class AlterTableChangeColumnCommand( s"'${newColumn.name}' with type '${newColumn.dataType}'") } - val newDataSchema = table.dataSchema.fields.map { field => + val changeSchema = originColumn.dataType != newColumn.dataType + val newDataSchema = table.schema.fields.map { field => if (field.name == originColumn.name) { - // Create a new column from the origin column with the new comment. - addComment(field, newColumn.getComment) + var newField = field + if (newColumn.getComment.isDefined) { + // Create a new column from the origin column with the new comment. + newField = addComment(field, newColumn.getComment) + } + if (changeSchema) { + newField = newField.copy(dataType = newColumn.dataType) + } + newField } else { field } @@ -359,7 +367,7 @@ case class AlterTableChangeColumnCommand( // name(by resolver) and dataType. private def columnEqual( field: StructField, other: StructField, resolver: Resolver): Boolean = { - resolver(field.name, other.name) && field.dataType == other.dataType + resolver(field.name, other.name) } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/change-column.sql b/sql/core/src/test/resources/sql-tests/inputs/change-column.sql index 2909024e4c9f..dd8d82a1a5f2 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/change-column.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/change-column.sql @@ -6,7 +6,7 @@ DESC test_change; ALTER TABLE test_change CHANGE a a1 INT; DESC test_change; --- Change column dataType (not supported yet) +-- Change column dataType ALTER TABLE test_change CHANGE a a STRING; DESC test_change; diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out index ff1ecbcc44c2..b7f164eb86be 100644 --- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out @@ -44,8 +44,7 @@ ALTER TABLE test_change CHANGE a a STRING -- !query 4 schema struct<> -- !query 4 output -org.apache.spark.sql.AnalysisException -ALTER TABLE CHANGE COLUMN is not supported for changing column 'a' with type 'IntegerType' to 'a' with type 'StringType'; + -- !query 5 @@ -53,7 +52,7 @@ DESC test_change -- !query 5 schema struct -- !query 5 output -a int +a string b string c int @@ -91,7 +90,7 @@ DESC test_change -- !query 8 schema struct -- !query 8 output -a int +a string b string c int diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ca95aad3976e..99e8490010cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1697,6 +1697,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 INT COMMENT 'this is col1'") assert(getMetadata("col1").getString("key") == "value") assert(getMetadata("col1").getString("comment") == "this is col1") + + // Ensure that change column type take effect + sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 STRING") + val column = catalog.getTableMetadata(tableIdent).schema.fields.find(_.name == "col1") + assert(column.get.dataType == StringType) } test("drop build-in function") { From 0cf1a9b943f9ae8de7a7ae226e6eae4345f81364 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 20 Nov 2017 11:39:21 +0800 Subject: [PATCH 2/4] Address comments --- .../spark/sql/execution/command/ddl.scala | 23 +++++++------------ .../sql/execution/command/DDLSuite.scala | 2 +- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6618a42f134c..62b69304b68d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -326,18 +326,16 @@ case class AlterTableChangeColumnCommand( s"'${newColumn.name}' with type '${newColumn.dataType}'") } - val changeSchema = originColumn.dataType != newColumn.dataType - val newDataSchema = table.schema.fields.map { field => + val typeChanged = originColumn.dataType != newColumn.dataType + val newDataSchema = table.dataSchema.fields.map { field => if (field.name == originColumn.name) { - var newField = field - if (newColumn.getComment.isDefined) { - // Create a new column from the origin column with the new comment. - newField = addComment(field, newColumn.getComment) - } - if (changeSchema) { - newField = newField.copy(dataType = newColumn.dataType) + // Add the comment to a column, if comment is empty, return the original column. + val newField = newColumn.getComment.map(field.withComment(_)).getOrElse(field) + if (typeChanged) { + newField.copy(dataType = newColumn.dataType) + } else { + newField } - newField } else { field } @@ -358,11 +356,6 @@ case class AlterTableChangeColumnCommand( s"${schema.fieldNames.mkString("[`", "`, `", "`]")}")) } - // Add the comment to a column, if comment is empty, return the original column. - private def addComment(column: StructField, comment: Option[String]): StructField = { - comment.map(column.withComment(_)).getOrElse(column) - } - // Compare a [[StructField]] to another, return true if they have the same column // name(by resolver) and dataType. private def columnEqual( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 99e8490010cc..589f6379a3c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1698,7 +1698,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { assert(getMetadata("col1").getString("key") == "value") assert(getMetadata("col1").getString("comment") == "this is col1") - // Ensure that change column type take effect + // Ensure that changing column type takes effect sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 STRING") val column = catalog.getTableMetadata(tableIdent).schema.fields.find(_.name == "col1") assert(column.get.dataType == StringType) From d8982b1ce8294c9234f88b9adaf649cb8dd0c6f6 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 24 Nov 2017 20:16:53 +0800 Subject: [PATCH 3/4] Add the checking logic in next commit and fix bug for changing comment of partition column --- .../org/apache/spark/sql/execution/command/ddl.scala | 12 +++++++++++- .../spark/sql/execution/command/DDLSuite.scala | 5 +++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 62b69304b68d..a9de3e31f164 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -318,7 +318,7 @@ case class AlterTableChangeColumnCommand( // Find the origin column from dataSchema by column name. val originColumn = findColumnByName(table.dataSchema, columnName, resolver) - // Throw an AnalysisException if the column name/dataType is changed. + // Throw an AnalysisException if the column name is changed. if (!columnEqual(originColumn, newColumn, resolver)) { throw new AnalysisException( "ALTER TABLE CHANGE COLUMN is not supported for changing column " + @@ -327,6 +327,16 @@ case class AlterTableChangeColumnCommand( } val typeChanged = originColumn.dataType != newColumn.dataType + val partitionColumnChanged = table.partitionColumnNames.contains(originColumn.name) + + // Throw an AnalysisException if the type of partition column is changed. + if (typeChanged && partitionColumnChanged) { + throw new AnalysisException( + "ALTER TABLE CHANGE COLUMN is not supported for changing partition column" + + s"'${originColumn.name}' with type '${originColumn.dataType}' to " + + s"'${newColumn.name}' with type '${newColumn.dataType}'") + } + val newDataSchema = table.dataSchema.fields.map { field => if (field.name == originColumn.name) { // Add the comment to a column, if comment is empty, return the original column. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 589f6379a3c4..e58142a52027 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1702,6 +1702,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 STRING") val column = catalog.getTableMetadata(tableIdent).schema.fields.find(_.name == "col1") assert(column.get.dataType == StringType) + + // Ensure that changing partition column type throw exception + intercept[AnalysisException] { + sql("ALTER TABLE dbx.tab1 CHANGE COLUMN a a STRING") + } } test("drop build-in function") { From ef65c4de516c91fc6de1727cee4df6c106f6ef1f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 7 Sep 2018 14:39:00 +0800 Subject: [PATCH 4/4] Add check for data compatible --- .../spark/sql/execution/command/ddl.scala | 23 ++++++------------- .../sql/execution/command/DDLSuite.scala | 13 ++++++++++- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index a9de3e31f164..bfdf5b92c212 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -318,8 +318,8 @@ case class AlterTableChangeColumnCommand( // Find the origin column from dataSchema by column name. val originColumn = findColumnByName(table.dataSchema, columnName, resolver) - // Throw an AnalysisException if the column name is changed. - if (!columnEqual(originColumn, newColumn, resolver)) { + // Throw an AnalysisException if the column name is changed or type change is incompatible. + if (!columnCheck(originColumn, newColumn, resolver)) { throw new AnalysisException( "ALTER TABLE CHANGE COLUMN is not supported for changing column " + s"'${originColumn.name}' with type '${originColumn.dataType}' to " + @@ -327,20 +327,11 @@ case class AlterTableChangeColumnCommand( } val typeChanged = originColumn.dataType != newColumn.dataType - val partitionColumnChanged = table.partitionColumnNames.contains(originColumn.name) - - // Throw an AnalysisException if the type of partition column is changed. - if (typeChanged && partitionColumnChanged) { - throw new AnalysisException( - "ALTER TABLE CHANGE COLUMN is not supported for changing partition column" + - s"'${originColumn.name}' with type '${originColumn.dataType}' to " + - s"'${newColumn.name}' with type '${newColumn.dataType}'") - } val newDataSchema = table.dataSchema.fields.map { field => if (field.name == originColumn.name) { // Add the comment to a column, if comment is empty, return the original column. - val newField = newColumn.getComment.map(field.withComment(_)).getOrElse(field) + val newField = newColumn.getComment().map(field.withComment).getOrElse(field) if (typeChanged) { newField.copy(dataType = newColumn.dataType) } else { @@ -367,10 +358,10 @@ case class AlterTableChangeColumnCommand( } // Compare a [[StructField]] to another, return true if they have the same column - // name(by resolver) and dataType. - private def columnEqual( + // name(by resolver) and dataType and data type compatible. + private def columnCheck( field: StructField, other: StructField, resolver: Resolver): Boolean = { - resolver(field.name, other.name) + resolver(field.name, other.name) && Cast.canCast(field.dataType, other.dataType) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e58142a52027..245ee4f2d7fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1704,9 +1704,20 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { assert(column.get.dataType == StringType) // Ensure that changing partition column type throw exception - intercept[AnalysisException] { + var msg = intercept[AnalysisException] { sql("ALTER TABLE dbx.tab1 CHANGE COLUMN a a STRING") } + assert(msg.getMessage.startsWith( + "Can't find column `a` given table data columns")) + + withTable("t") { + sql("CREATE TABLE t(s STRUCT) USING PARQUET") + msg = intercept[AnalysisException]{ + sql("ALTER TABLE t CHANGE COLUMN s s INT") + } + assert(msg.getMessage.startsWith( + "ALTER TABLE CHANGE COLUMN is not supported for changing column ")) + } } test("drop build-in function") {