Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@

package org.apache.spark.sql.execution.datasources.v2

import java.net.URI
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.catalog.v2.{Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.NamespaceChange.{RemoveProperty, SetProperty}
import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.sources.v2.Table
Expand All @@ -39,11 +41,16 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A [[TableCatalog]] that translates calls to the v1 SessionCatalog.
*/
class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with SupportsNamespaces {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
import V2SessionCatalog._

def this() = {
this(SparkSession.active.sessionState)
}

override val defaultNamespace: Array[String] = Array("default")

private lazy val catalog: SessionCatalog = sessionState.catalog

private var _name: String = _
Expand Down Expand Up @@ -87,7 +94,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions)
val provider = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName)
val tableProperties = properties.asScala
val location = Option(properties.get("location"))
val location = Option(properties.get(LOCATION_TABLE_PROP))
val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap)
.copy(locationUri = location.map(CatalogUtils.stringToURI))
val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
Expand All @@ -102,7 +109,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
bucketSpec = maybeBucketSpec,
properties = tableProperties.toMap,
tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions,
comment = Option(properties.get("comment")))
comment = Option(properties.get(COMMENT_TABLE_PROP)))

try {
catalog.createTable(tableDesc, ignoreIfExists = false)
Expand Down Expand Up @@ -177,10 +184,97 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
}
}

override def namespaceExists(namespace: Array[String]): Boolean = namespace match {
case Array(db) =>
catalog.databaseExists(db)
case _ =>
false
}

override def listNamespaces(): Array[Array[String]] = {
catalog.listDatabases().map(Array(_)).toArray
}

override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
namespace match {
case Array() =>
listNamespaces()
case Array(db) if catalog.databaseExists(db) =>
Array()
case _ =>
throw new NoSuchNamespaceException(namespace)
}
}

override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = {
namespace match {
case Array(db) =>
catalog.getDatabaseMetadata(db).toMetadata

case _ =>
throw new NoSuchNamespaceException(namespace)
}
}

override def createNamespace(
namespace: Array[String],
metadata: util.Map[String, String]): Unit = namespace match {
case Array(db) if !catalog.databaseExists(db) =>
catalog.createDatabase(
toCatalogDatabase(db, metadata, defaultLocation = Some(catalog.getDefaultDBPath(db))),
ignoreIfExists = false)

case Array(_) =>
throw new NamespaceAlreadyExistsException(namespace)

case _ =>
throw new IllegalArgumentException(s"Invalid namespace name: ${namespace.quoted}")
}

override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
namespace match {
case Array(db) =>
// validate that this catalog's reserved properties are not removed
changes.foreach {
case remove: RemoveProperty if RESERVED_PROPERTIES.contains(remove.property) =>
throw new UnsupportedOperationException(
s"Cannot remove reserved property: ${remove.property}")
case _ =>
}

val metadata = catalog.getDatabaseMetadata(db).toMetadata
catalog.alterDatabase(
toCatalogDatabase(db, CatalogV2Util.applyNamespaceChanges(metadata, changes)))

case _ =>
throw new NoSuchNamespaceException(namespace)
}
}

override def dropNamespace(namespace: Array[String]): Boolean = namespace match {
case Array(db) if catalog.databaseExists(db) =>
if (catalog.listTables(db).nonEmpty) {
throw new IllegalStateException(s"Namespace ${namespace.quoted} is not empty")
}
catalog.dropDatabase(db, ignoreIfNotExists = false, cascade = false)
true

case Array(_) =>
// exists returned false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not exists

Copy link
Contributor Author

@rdblue rdblue Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. This is the case where the database does not exist. We know that because the above existence check returned false. This comment clarifies the Array case because it appears that an Array of one item always matches. So we need to note the context from the previous case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see

false

case _ =>
throw new NoSuchNamespaceException(namespace)
}

override def toString: String = s"V2SessionCatalog($name)"
}

private[sql] object V2SessionCatalog {
val COMMENT_TABLE_PROP: String = "comment"
val LOCATION_TABLE_PROP: String = "location"
val RESERVED_PROPERTIES: Set[String] = Set(COMMENT_TABLE_PROP, LOCATION_TABLE_PROP)

/**
* Convert v2 Transforms to v1 partition columns and an optional bucket spec.
*/
Expand All @@ -202,4 +296,32 @@ private[sql] object V2SessionCatalog {

(identityCols, bucketSpec)
}

private def toCatalogDatabase(
db: String,
metadata: util.Map[String, String],
defaultLocation: Option[URI] = None): CatalogDatabase = {
CatalogDatabase(
name = db,
description = metadata.getOrDefault(COMMENT_TABLE_PROP, ""),
locationUri = Option(metadata.get(LOCATION_TABLE_PROP))
.map(CatalogUtils.stringToURI)
.orElse(defaultLocation)
.getOrElse(throw new IllegalArgumentException("Missing database location")),
properties = metadata.asScala.toMap -- Seq("comment", "location"))
}

private implicit class CatalogDatabaseHelper(catalogDatabase: CatalogDatabase) {
def toMetadata: util.Map[String, String] = {
val metadata = mutable.HashMap[String, String]()

catalogDatabase.properties.foreach {
case (key, value) => metadata.put(key, value)
}
metadata.put(LOCATION_TABLE_PROP, catalogDatabase.locationUri.toString)
metadata.put(COMMENT_TABLE_PROP, catalogDatabase.description)

metadata.asJava
}
}
}
Loading