diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index e067730cfdf5..1088e3f7a720 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -302,10 +302,15 @@ class V2SessionCatalog(catalog: SessionCatalog) val finalProperties = CatalogV2Util.applyClusterByChanges(properties, schema, changes) try { - catalog.alterTable( - catalogTable.copy( - properties = finalProperties, schema = schema, owner = owner, comment = comment, - collation = collation, storage = storage)) + if (changes.exists(!_.isInstanceOf[TableChange.ColumnChange])) { + catalog.alterTable( + catalogTable.copy( + properties = finalProperties, schema = schema, owner = owner, comment = comment, + collation = collation, storage = storage)) + } + if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) { + catalog.alterTableDataSchema(ident.asTableIdentifier, schema) + } } catch { case _: NoSuchTableException => throw QueryCompilationErrors.noSuchTableError(ident) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 19d8cba25308..63e54812922a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -590,22 +590,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { parameters = Map("fieldName" -> "`missing_col`", "fields" -> "`id`, `data`")) } - test("alterTable: rename top-level column") { - val catalog = newCatalog() - - catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - val table = catalog.loadTable(testIdent) - - assert(table.columns === columns) - - catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) - val updated = catalog.loadTable(testIdent) - - val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType) - - assert(updated.schema == expectedSchema) - } - test("alterTable: rename nested column") { val catalog = newCatalog() @@ -627,26 +611,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(updated.columns === expectedColumns) } - test("alterTable: rename struct column") { - val catalog = newCatalog() - - val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableColumns = columns :+ Column.create("point", pointStruct) - - catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) - val table = catalog.loadTable(testIdent) - - assert(table.columns === tableColumns) - - catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p")) - val updated = catalog.loadTable(testIdent) - - val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val expectedColumns = columns :+ Column.create("p", newPointStruct) - - assert(updated.columns === expectedColumns) - } - test("alterTable: rename missing column fails") { val catalog = newCatalog() @@ -686,21 +650,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(updated.columns === expectedColumns) } - test("alterTable: delete top-level column") { - val catalog = newCatalog() - - catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - val table = catalog.loadTable(testIdent) - - assert(table.columns === columns) - - catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false)) - val updated = catalog.loadTable(testIdent) - - val expectedSchema = new StructType().add("data", StringType) - assert(updated.schema == expectedSchema) - } - test("alterTable: delete nested column") { val catalog = newCatalog() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f33bca78eb93..13e8d3721d81 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -22,8 +22,12 @@ import java.net.URI import java.time.LocalDateTime import java.util.Locale +import scala.jdk.CollectionConverters._ + import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{spy, times, verify} import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkException, SparkUnsupportedOperationException} @@ -32,12 +36,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader} +import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} import org.apache.spark.sql.hive.orc.OrcFileOperator @@ -3392,4 +3397,39 @@ class HiveDDLSuite ) } } + + test("SPARK-52272: V2SessionCatalog does not alter schema on Hive Catalog") { + val spyCatalog = spy(spark.sessionState.catalog.externalCatalog) + val v1SessionCatalog = new SessionCatalog(spyCatalog) + val v2SessionCatalog = new V2SessionCatalog(v1SessionCatalog) + withTable("t1") { + val identifier = Identifier.of(Array("default"), "t1") + val outputSchema = new StructType().add("a", IntegerType, true, "comment1") + v2SessionCatalog.createTable( + identifier, + new TableInfo.Builder() + .withProperties(Map.empty.asJava) + .withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema)) + .withPartitions(Array.empty) + .build() + ) + v2SessionCatalog.alterTable(identifier, TableChange.setProperty("foo", "bar")) + val loaded = v2SessionCatalog.loadTable(identifier) + assert(loaded.properties().get("foo") == "bar") + + verify(spyCatalog, times(1)).alterTable(any[CatalogTable]) + verify(spyCatalog, times(0)).alterTableDataSchema( + any[String], any[String], any[StructType]) + + v2SessionCatalog.alterTable(identifier, + TableChange.updateColumnComment(Array("a"), "comment2")) + val loaded2 = v2SessionCatalog.loadTable(identifier) + assert(loaded2.columns().length == 1) + assert(loaded2.columns.head.comment() == "comment2") + + verify(spyCatalog, times(1)).alterTable(any[CatalogTable]) + verify(spyCatalog, times(1)).alterTableDataSchema( + any[String], any[String], any[StructType]) + } + } }