diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 6dcebe29537d..6f8cf4721521 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -17,18 +17,20 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.net.URI import java.util import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange} -import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform} +import org.apache.spark.sql.catalog.v2.{Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange} +import org.apache.spark.sql.catalog.v2.NamespaceChange.{RemoveProperty, SetProperty} +import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} +import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.sources.v2.Table @@ -39,11 +41,16 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A [[TableCatalog]] that translates calls to the v1 SessionCatalog. */ -class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { +class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with SupportsNamespaces { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + import V2SessionCatalog._ + def this() = { this(SparkSession.active.sessionState) } + override val defaultNamespace: Array[String] = Array("default") + private lazy val catalog: SessionCatalog = sessionState.catalog private var _name: String = _ @@ -87,7 +94,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions) val provider = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName) val tableProperties = properties.asScala - val location = Option(properties.get("location")) + val location = Option(properties.get(LOCATION_TABLE_PROP)) val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap) .copy(locationUri = location.map(CatalogUtils.stringToURI)) val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED @@ -102,7 +109,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { bucketSpec = maybeBucketSpec, properties = tableProperties.toMap, tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions, - comment = Option(properties.get("comment"))) + comment = Option(properties.get(COMMENT_TABLE_PROP))) try { catalog.createTable(tableDesc, ignoreIfExists = false) @@ -177,10 +184,97 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { } } + override def namespaceExists(namespace: Array[String]): Boolean = namespace match { + case Array(db) => + catalog.databaseExists(db) + case _ => + false + } + + override def listNamespaces(): Array[Array[String]] = { + catalog.listDatabases().map(Array(_)).toArray + } + + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + namespace match { + case Array() => + listNamespaces() + case Array(db) if catalog.databaseExists(db) => + Array() + case _ => + throw new NoSuchNamespaceException(namespace) + } + } + + override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = { + namespace match { + case Array(db) => + catalog.getDatabaseMetadata(db).toMetadata + + case _ => + throw new NoSuchNamespaceException(namespace) + } + } + + override def createNamespace( + namespace: Array[String], + metadata: util.Map[String, String]): Unit = namespace match { + case Array(db) if !catalog.databaseExists(db) => + catalog.createDatabase( + toCatalogDatabase(db, metadata, defaultLocation = Some(catalog.getDefaultDBPath(db))), + ignoreIfExists = false) + + case Array(_) => + throw new NamespaceAlreadyExistsException(namespace) + + case _ => + throw new IllegalArgumentException(s"Invalid namespace name: ${namespace.quoted}") + } + + override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = { + namespace match { + case Array(db) => + // validate that this catalog's reserved properties are not removed + changes.foreach { + case remove: RemoveProperty if RESERVED_PROPERTIES.contains(remove.property) => + throw new UnsupportedOperationException( + s"Cannot remove reserved property: ${remove.property}") + case _ => + } + + val metadata = catalog.getDatabaseMetadata(db).toMetadata + catalog.alterDatabase( + toCatalogDatabase(db, CatalogV2Util.applyNamespaceChanges(metadata, changes))) + + case _ => + throw new NoSuchNamespaceException(namespace) + } + } + + override def dropNamespace(namespace: Array[String]): Boolean = namespace match { + case Array(db) if catalog.databaseExists(db) => + if (catalog.listTables(db).nonEmpty) { + throw new IllegalStateException(s"Namespace ${namespace.quoted} is not empty") + } + catalog.dropDatabase(db, ignoreIfNotExists = false, cascade = false) + true + + case Array(_) => + // exists returned false + false + + case _ => + throw new NoSuchNamespaceException(namespace) + } + override def toString: String = s"V2SessionCatalog($name)" } private[sql] object V2SessionCatalog { + val COMMENT_TABLE_PROP: String = "comment" + val LOCATION_TABLE_PROP: String = "location" + val RESERVED_PROPERTIES: Set[String] = Set(COMMENT_TABLE_PROP, LOCATION_TABLE_PROP) + /** * Convert v2 Transforms to v1 partition columns and an optional bucket spec. */ @@ -202,4 +296,32 @@ private[sql] object V2SessionCatalog { (identityCols, bucketSpec) } + + private def toCatalogDatabase( + db: String, + metadata: util.Map[String, String], + defaultLocation: Option[URI] = None): CatalogDatabase = { + CatalogDatabase( + name = db, + description = metadata.getOrDefault(COMMENT_TABLE_PROP, ""), + locationUri = Option(metadata.get(LOCATION_TABLE_PROP)) + .map(CatalogUtils.stringToURI) + .orElse(defaultLocation) + .getOrElse(throw new IllegalArgumentException("Missing database location")), + properties = metadata.asScala.toMap -- Seq("comment", "location")) + } + + private implicit class CatalogDatabaseHelper(catalogDatabase: CatalogDatabase) { + def toMetadata: util.Map[String, String] = { + val metadata = mutable.HashMap[String, String]() + + catalogDatabase.properties.foreach { + case (key, value) => metadata.put(key, value) + } + metadata.put(LOCATION_TABLE_PROP, catalogDatabase.locationUri.toString) + metadata.put(COMMENT_TABLE_PROP, catalogDatabase.description) + + metadata.asJava + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 64460d033805..275bc339b3b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -22,41 +22,56 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalog.v2.{Catalogs, Identifier, TableCatalog, TableChange} -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalog.v2.{Catalogs, Identifier, NamespaceChange, TableChange} +import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap -class V2SessionCatalogSuite - extends SparkFunSuite with SharedSparkSession with BeforeAndAfter { - import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ +class V2SessionCatalogBaseSuite extends SparkFunSuite with SharedSparkSession with BeforeAndAfter { - private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] - private val schema: StructType = new StructType() + val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] + val schema: StructType = new StructType() .add("id", IntegerType) .add("data", StringType) + val testNs: Array[String] = Array("db") + val defaultNs: Array[String] = Array("default") + val testIdent: Identifier = Identifier.of(testNs, "test_table") + + def newCatalog(): V2SessionCatalog = { + val newCatalog = new V2SessionCatalog(spark.sessionState) + newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) + newCatalog + } +} + +class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + override protected def beforeAll(): Unit = { super.beforeAll() - spark.sql("""CREATE DATABASE IF NOT EXISTS db""") - spark.sql("""CREATE DATABASE IF NOT EXISTS db2""") - spark.sql("""CREATE DATABASE IF NOT EXISTS ns""") - spark.sql("""CREATE DATABASE IF NOT EXISTS ns2""") + // TODO: when there is a public API for v2 catalogs, use that instead + val catalog = newCatalog() + catalog.createNamespace(Array("db"), emptyProps) + catalog.createNamespace(Array("db2"), emptyProps) + catalog.createNamespace(Array("ns"), emptyProps) + catalog.createNamespace(Array("ns2"), emptyProps) } override protected def afterAll(): Unit = { - spark.sql("""DROP TABLE IF EXISTS db.test_table""") - spark.sql("""DROP DATABASE IF EXISTS db""") - spark.sql("""DROP DATABASE IF EXISTS db2""") - spark.sql("""DROP DATABASE IF EXISTS ns""") - spark.sql("""DROP DATABASE IF EXISTS ns2""") + val catalog = newCatalog() + catalog.dropNamespace(Array("db")) + catalog.dropNamespace(Array("db2")) + catalog.dropNamespace(Array("ns")) + catalog.dropNamespace(Array("ns2")) super.afterAll() } @@ -65,14 +80,6 @@ class V2SessionCatalogSuite newCatalog().dropTable(testIdentNew) } - private def newCatalog(): TableCatalog = { - val newCatalog = new V2SessionCatalog(spark.sessionState) - newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) - newCatalog - } - - private val testNs = Array("db") - private val testIdent = Identifier.of(testNs, "test_table") private val testIdentNew = Identifier.of(testNs, "test_table_new") test("Catalogs can load the catalog") { @@ -753,3 +760,293 @@ class V2SessionCatalogSuite assert(exc.message.contains("RENAME TABLE source and destination databases do not match")) } } + +class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + + def checkMetadata( + expected: scala.collection.Map[String, String], + actual: scala.collection.Map[String, String]): Unit = { + // remove location and comment that are automatically added by HMS unless they are expected + val toRemove = V2SessionCatalog.RESERVED_PROPERTIES.filter(expected.contains) + assert(expected -- toRemove === actual) + } + + test("listNamespaces: basic behavior") { + val catalog = newCatalog() + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + + assert(catalog.listNamespaces() === Array(testNs, defaultNs)) + assert(catalog.listNamespaces(Array()) === Array(testNs, defaultNs)) + assert(catalog.listNamespaces(testNs) === Array()) + + catalog.dropNamespace(testNs) + } + + test("listNamespaces: fail if missing namespace") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + val exc = intercept[NoSuchNamespaceException] { + assert(catalog.listNamespaces(testNs) === Array()) + } + + assert(exc.getMessage.contains(testNs.quoted)) + assert(catalog.namespaceExists(testNs) === false) + } + + test("loadNamespaceMetadata: fail missing namespace") { + val catalog = newCatalog() + + val exc = intercept[NoSuchNamespaceException] { + catalog.loadNamespaceMetadata(testNs) + } + + assert(exc.getMessage.contains(testNs.quoted)) + } + + test("loadNamespaceMetadata: non-empty metadata") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + + val metadata = catalog.loadNamespaceMetadata(testNs) + + assert(catalog.namespaceExists(testNs) === true) + checkMetadata(metadata.asScala, Map("property" -> "value")) + + catalog.dropNamespace(testNs) + } + + test("loadNamespaceMetadata: empty metadata") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + catalog.createNamespace(testNs, emptyProps) + + val metadata = catalog.loadNamespaceMetadata(testNs) + + assert(catalog.namespaceExists(testNs) === true) + checkMetadata(metadata.asScala, emptyProps.asScala) + + catalog.dropNamespace(testNs) + } + + test("createNamespace: basic behavior") { + val catalog = newCatalog() + val expectedPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString + + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + + assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) + + assert(catalog.namespaceExists(testNs) === true) + val metadata = catalog.loadNamespaceMetadata(testNs).asScala + checkMetadata(metadata, Map("property" -> "value")) + assert(expectedPath === metadata("location")) + + catalog.dropNamespace(testNs) + } + + test("createNamespace: initialize location") { + val catalog = newCatalog() + val expectedPath = "file:/tmp/db.db" + + catalog.createNamespace(testNs, Map("location" -> expectedPath).asJava) + + assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) + + assert(catalog.namespaceExists(testNs) === true) + val metadata = catalog.loadNamespaceMetadata(testNs).asScala + checkMetadata(metadata, Map.empty) + assert(expectedPath === metadata("location")) + + catalog.dropNamespace(testNs) + } + + test("createNamespace: fail if namespace already exists") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + + val exc = intercept[NamespaceAlreadyExistsException] { + catalog.createNamespace(testNs, Map("property" -> "value2").asJava) + } + + assert(exc.getMessage.contains(testNs.quoted)) + assert(catalog.namespaceExists(testNs) === true) + checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) + + catalog.dropNamespace(testNs) + } + + test("createNamespace: fail nested namespace") { + val catalog = newCatalog() + + // ensure the parent exists + catalog.createNamespace(Array("db"), emptyProps) + + val exc = intercept[IllegalArgumentException] { + catalog.createNamespace(Array("db", "nested"), emptyProps) + } + + assert(exc.getMessage.contains("Invalid namespace name: db.nested")) + + catalog.dropNamespace(Array("db")) + } + + test("createTable: fail if namespace does not exist") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + val exc = intercept[NoSuchNamespaceException] { + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + } + + assert(exc.getMessage.contains(testNs.quoted)) + assert(catalog.namespaceExists(testNs) === false) + } + + test("dropNamespace: drop missing namespace") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + val ret = catalog.dropNamespace(testNs) + + assert(ret === false) + } + + test("dropNamespace: drop empty namespace") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, emptyProps) + + assert(catalog.namespaceExists(testNs) === true) + + val ret = catalog.dropNamespace(testNs) + + assert(ret === true) + assert(catalog.namespaceExists(testNs) === false) + } + + test("dropNamespace: fail if not empty") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + val exc = intercept[IllegalStateException] { + catalog.dropNamespace(testNs) + } + + assert(exc.getMessage.contains(testNs.quoted)) + assert(catalog.namespaceExists(testNs) === true) + checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) + + catalog.dropTable(testIdent) + catalog.dropNamespace(testNs) + } + + test("alterNamespace: basic behavior") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + + catalog.alterNamespace(testNs, NamespaceChange.setProperty("property2", "value2")) + checkMetadata( + catalog.loadNamespaceMetadata(testNs).asScala, + Map("property" -> "value", "property2" -> "value2")) + + catalog.alterNamespace(testNs, + NamespaceChange.removeProperty("property2"), + NamespaceChange.setProperty("property3", "value3")) + checkMetadata( + catalog.loadNamespaceMetadata(testNs).asScala, + Map("property" -> "value", "property3" -> "value3")) + + catalog.alterNamespace(testNs, NamespaceChange.removeProperty("property3")) + checkMetadata( + catalog.loadNamespaceMetadata(testNs).asScala, + Map("property" -> "value")) + + catalog.dropNamespace(testNs) + } + + test("alterNamespace: update namespace location") { + val catalog = newCatalog() + val initialPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString + val newPath = "file:/tmp/db.db" + + catalog.createNamespace(testNs, emptyProps) + + assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) + + catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newPath)) + + assert(newPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) + + catalog.dropNamespace(testNs) + } + + test("alterNamespace: update namespace comment") { + val catalog = newCatalog() + val newComment = "test db" + + catalog.createNamespace(testNs, emptyProps) + + assert(spark.catalog.getDatabase(testNs(0)).description.isEmpty) + + catalog.alterNamespace(testNs, NamespaceChange.setProperty("comment", newComment)) + + assert(newComment === spark.catalog.getDatabase(testNs(0)).description) + + catalog.dropNamespace(testNs) + } + + test("alterNamespace: fail if namespace doesn't exist") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + val exc = intercept[NoSuchNamespaceException] { + catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value")) + } + + assert(exc.getMessage.contains(testNs.quoted)) + } + + test("alterNamespace: fail to remove location") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, emptyProps) + + val exc = intercept[UnsupportedOperationException] { + catalog.alterNamespace(testNs, NamespaceChange.removeProperty("location")) + } + + assert(exc.getMessage.contains("Cannot remove reserved property: location")) + + catalog.dropNamespace(testNs) + } + + test("alterNamespace: fail to remove comment") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, Map("comment" -> "test db").asJava) + + val exc = intercept[UnsupportedOperationException] { + catalog.alterNamespace(testNs, NamespaceChange.removeProperty("comment")) + } + + assert(exc.getMessage.contains("Cannot remove reserved property: comment")) + + catalog.dropNamespace(testNs) + } +}