diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index b9afe71d243e..4d80f2366a5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTa import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty +import org.apache.spark.sql.connector.catalog.TableChange.{ColumnChange, DeleteColumn, RenameColumn} import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -149,26 +150,47 @@ class V2SessionCatalog(catalog: SessionCatalog) throw QueryCompilationErrors.noSuchTableError(ident) } - val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes) - val schema = CatalogV2Util.applySchemaChanges( - catalogTable.schema, changes, catalogTable.provider, "ALTER TABLE") - val comment = properties.get(TableCatalog.PROP_COMMENT) - val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner) - val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI) - val storage = if (location.isDefined) { - catalogTable.storage.copy(locationUri = location) - } else { - catalogTable.storage + + val (columnChanges, otherChanges) = changes.toSeq.partition(change => + change.isInstanceOf[ColumnChange] && + // Not supported changes in alterTableDataSchema + !change.isInstanceOf[RenameColumn] && !change.isInstanceOf[DeleteColumn] + ) + + if(columnChanges.size > 0) { + val schema = CatalogV2Util.applySchemaChanges( + catalogTable.schema, columnChanges, catalogTable.provider, "ALTER TABLE") + + try { + catalog.alterTableDataSchema(ident.asTableIdentifier, schema) + } catch { + case _: NoSuchTableException => + throw QueryCompilationErrors.noSuchTableError(ident) + } } - try { - catalog.alterTable( - catalogTable.copy( - properties = properties, schema = schema, owner = owner, comment = comment, - storage = storage)) - } catch { - case _: NoSuchTableException => - throw QueryCompilationErrors.noSuchTableError(ident) + if(otherChanges.size > 0) { + val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, otherChanges) + val schema = CatalogV2Util.applySchemaChanges( + catalogTable.schema, otherChanges, catalogTable.provider, "ALTER TABLE") + val comment = properties.get(TableCatalog.PROP_COMMENT) + val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner) + val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI) + val storage = if (location.isDefined) { + catalogTable.storage.copy(locationUri = location) + } else { + catalogTable.storage + } + + try { + catalog.alterTable( + catalogTable.copy( + properties = properties, schema = schema, owner = owner, comment = comment, + storage = storage)) + } catch { + case _: NoSuchTableException => + throw QueryCompilationErrors.noSuchTableError(ident) + } } loadTable(ident) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 3dfe60638772..01aff011fa93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -24,9 +24,10 @@ import java.util.Collections import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path -import org.scalatest.BeforeAndAfter +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.apache.spark.sql.AnalysisException +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.quoteIdentifier @@ -35,7 +36,9 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap -abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with BeforeAndAfter { +abstract class V2SessionCatalogBaseSuite extends SparkFunSuite{ + + protected def spark: SparkSession val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] val schema: StructType = new StructType() @@ -55,7 +58,8 @@ abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with BeforeA } } -class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { +abstract class V2SessionCatalogTableBaseSuite extends V2SessionCatalogBaseSuite + with BeforeAndAfterAll with BeforeAndAfter { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -64,7 +68,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { val catalog = newCatalog() catalog.createNamespace(Array("db"), emptyProps) catalog.createNamespace(Array("db2"), - Map(SupportsNamespaces.PROP_LOCATION -> "file:///db2.db").asJava) + Map(SupportsNamespaces.PROP_LOCATION -> "file:///tmp/db2.db").asJava) catalog.createNamespace(Array("ns"), emptyProps) catalog.createNamespace(Array("ns2"), emptyProps) } @@ -815,7 +819,11 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { } } -class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { +class V2SessionCatalogTableSuite extends V2SessionCatalogTableBaseSuite with SharedSparkSession { +} + +class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite + with SharedSparkSession { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -895,7 +903,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { test("createNamespace: basic behavior") { val catalog = newCatalog() - val sessionCatalog = sqlContext.sessionState.catalog + val sessionCatalog = spark.sqlContext.sessionState.catalog val expectedPath = new Path(spark.sessionState.conf.warehousePath, sessionCatalog.getDefaultDBPath(testNs(0)).toString).toString diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalV2SessionCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalV2SessionCatalogSuite.scala new file mode 100644 index 000000000000..339106e5c434 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalV2SessionCatalogSuite.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalogTableBaseSuite +import org.apache.spark.sql.hive.test.TestHiveSingleton + + + class HiveExternalV2SessionCatalogTableSuite extends V2SessionCatalogTableBaseSuite + with TestHiveSingleton { + + def excluded: Seq[String] = Seq( + // Not supported in Hive catalog + "alterTable: add nested column", + "createTable: location", + "alterTable: location", + + // Not supported in V2SessionCatalog + "alterTable: rename top-level column", + "alterTable: rename nested column", + "alterTable: rename struct column", + "alterTable: multiple changes", + "alterTable: delete top-level column", + "alterTable: delete nested column" + ) + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any) + (implicit pos: Position): Unit = { + if (excluded.contains(testName)) () + else super.test(testName, testTags: _*)(testFun) + } +}