Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2

import scala.collection.mutable

import org.apache.spark.sql.internal.SQLConf

/**
* A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow
* the caller to look up a catalog by name.
*/
class CatalogManager(conf: SQLConf, val v2SessionCatalog: TableCatalog) {

/**
* Tracks all the registered catalogs.
*/
private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]

/**
* Looks up a catalog by name.
*/
def getCatalog(name: String): CatalogPlugin = synchronized {
catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))
}

/**
* Returns the default catalog specified by config.
*/
def getDefaultCatalog(): Option[CatalogPlugin] = {
conf.defaultV2Catalog.map(getCatalog)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalog.v2

import scala.util.control.NonFatal

import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -29,10 +27,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
@Experimental
trait LookupCatalog extends Logging {

import LookupCatalog._

protected def defaultCatalogName: Option[String] = None
protected def lookupCatalog(name: String): CatalogPlugin
protected val catalogManager: CatalogManager

/**
* Returns the default catalog. When set, this catalog is used for all identifiers that do not
Expand All @@ -42,31 +37,15 @@ trait LookupCatalog extends Logging {
* If this is None and a table's provider (source) is a v2 provider, the v2 session catalog will
* be used.
*/
def defaultCatalog: Option[CatalogPlugin] = {
try {
defaultCatalogName.map(lookupCatalog)
} catch {
case NonFatal(e) =>
logError(s"Cannot load default v2 catalog: ${defaultCatalogName.get}", e)
None
}
}
def defaultCatalog: Option[CatalogPlugin] = catalogManager.getDefaultCatalog()

/**
* This catalog is a v2 catalog that delegates to the v1 session catalog. it is used when the
* session catalog is responsible for an identifier, but the source requires the v2 catalog API.
* This happens when the source implementation extends the v2 TableProvider API and is not listed
* in the fallback configuration, spark.sql.sources.write.useV1SourceList
*/
def sessionCatalog: Option[CatalogPlugin] = {
try {
Some(lookupCatalog(SESSION_CATALOG_NAME))
} catch {
case NonFatal(e) =>
logError("Cannot load v2 session catalog", e)
None
}
}
def sessionCatalog: TableCatalog = catalogManager.v2SessionCatalog

/**
* Extract catalog plugin and remaining identifier names.
Expand All @@ -79,7 +58,7 @@ trait LookupCatalog extends Logging {
Some((None, parts))
case Seq(catalogName, tail @ _*) =>
try {
Some((Some(lookupCatalog(catalogName)), tail))
Some((Some(catalogManager.getCatalog(catalogName)), tail))
} catch {
case _: CatalogNotFoundException =>
Some((None, parts))
Expand Down Expand Up @@ -137,7 +116,3 @@ trait LookupCatalog extends Logging {
}
}
}

object LookupCatalog {
val SESSION_CATALOG_NAME: String = "session"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.sql.catalyst.analysis

import java.util
import java.util.Locale

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog}
import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, LookupCatalog, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
Expand All @@ -39,7 +41,9 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
Expand All @@ -55,6 +59,27 @@ object SimpleAnalyzer extends Analyzer(
},
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))

// For test only
class NoopV2SessionCatalog extends TableCatalog {
override def listTables(namespace: Array[String]): Array[Identifier] = Array.empty
override def loadTable(ident: Identifier): Table = {
throw new UnsupportedOperationException
}
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
throw new UnsupportedOperationException
}
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
throw new UnsupportedOperationException
}
override def dropTable(ident: Identifier): Boolean = false
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
override def name(): String = "noop"
}

/**
* Provides a way to keep state during the analysis, this enables us to decouple the concerns
* of analysis environment from the catalog.
Expand Down Expand Up @@ -96,18 +121,15 @@ object AnalysisContext {
*/
class Analyzer(
catalog: SessionCatalog,
conf: SQLConf,
maxIterations: Int)
val catalogManager: CatalogManager,
conf: SQLConf)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {

def this(catalog: SessionCatalog, conf: SQLConf) = {
this(catalog, conf, conf.optimizerMaxIterations)
this(catalog, new CatalogManager(conf, new NoopV2SessionCatalog), conf)
}

override protected def defaultCatalogName: Option[String] = conf.defaultV2Catalog

override protected def lookupCatalog(name: String): CatalogPlugin =
throw new CatalogNotFoundException("No catalog lookup function")
private val maxIterations: Int = conf.optimizerMaxIterations

def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
AnalysisHelper.markInAnalyzer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1833,11 +1833,6 @@ object SQLConf {
.stringConf
.createOptional

val V2_SESSION_CATALOG = buildConf("spark.sql.catalog.session")
.doc("Name of the default v2 catalog, used when a catalog is not identified in queries")
.stringConf
.createWithDefault("org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog")

val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.looseUpcast")
.doc("When true, the upcast will be loose and allows string to atomic types.")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package org.apache.spark.sql.catalyst.catalog.v2

import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, when}
import org.mockito.invocation.InvocationOnMock
import org.scalatest.Inside
import org.scalatest.Matchers._

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, LookupCatalog}
import org.apache.spark.sql.catalog.v2.{CatalogManager, CatalogNotFoundException, CatalogPlugin, Identifier, LookupCatalog}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -35,8 +38,15 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {

private val catalogs = Seq("prod", "test").map(x => x -> new TestCatalogPlugin(x)).toMap

override def lookupCatalog(name: String): CatalogPlugin =
catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found"))
override val catalogManager: CatalogManager = {
val manager = mock(classOf[CatalogManager])
when(manager.getCatalog(any())).thenAnswer((invocation: InvocationOnMock) => {
val name = invocation.getArgument[String](0)
catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found"))
})
when(manager.getDefaultCatalog()).thenReturn(None)
manager
}

test("catalog object identifier") {
Seq(
Expand Down Expand Up @@ -120,10 +130,15 @@ class LookupCatalogWithDefaultSuite extends SparkFunSuite with LookupCatalog wit

private val catalogs = Seq("prod", "test").map(x => x -> new TestCatalogPlugin(x)).toMap

override def defaultCatalogName: Option[String] = Some("prod")

override def lookupCatalog(name: String): CatalogPlugin =
catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found"))
override val catalogManager: CatalogManager = {
val manager = mock(classOf[CatalogManager])
when(manager.getCatalog(any())).thenAnswer((invocation: InvocationOnMock) => {
val name = invocation.getArgument[String](0)
catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not found"))
})
when(manager.getDefaultCatalog()).thenReturn(catalogs.get("prod"))
manager
}

test("catalog object identifier") {
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,12 +607,6 @@ class SparkSession private(
*/
@transient lazy val catalog: Catalog = new CatalogImpl(self)

@transient private lazy val catalogs = new mutable.HashMap[String, CatalogPlugin]()

private[sql] def catalog(name: String): CatalogPlugin = synchronized {
catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf))
}

/**
* Returns the specified table/view as a `DataFrame`.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import scala.collection.mutable

import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.{Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CastSupport
Expand All @@ -44,9 +44,6 @@ case class DataSourceResolution(
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
import lookup._

lazy val v2SessionCatalog: CatalogPlugin = lookup.sessionCatalog
.getOrElse(throw new AnalysisException("No v2 session catalog implementation is available"))

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTableStatement(
AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties,
Expand All @@ -68,7 +65,7 @@ case class DataSourceResolution(
case _ =>
// the identifier had no catalog and no default catalog is set, but the source is v2.
// use the v2 session catalog, which delegates to the global v1 session catalog
convertCreateTable(v2SessionCatalog.asTableCatalog, identifier, create)
convertCreateTable(lookup.sessionCatalog, identifier, create)
}

case CreateTableAsSelectStatement(
Expand All @@ -91,7 +88,7 @@ case class DataSourceResolution(
case _ =>
// the identifier had no catalog and no default catalog is set, but the source is v2.
// use the v2 session catalog, which delegates to the global v1 session catalog
convertCTAS(v2SessionCatalog.asTableCatalog, identifier, create)
convertCTAS(lookup.sessionCatalog, identifier, create)
}

case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,26 @@
package org.apache.spark.sql.execution.datasources.v2

import java.util
import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.{Table, TableCapability}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A [[TableCatalog]] that translates calls to the v1 SessionCatalog.
*/
class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
def this() = {
this(SparkSession.active.sessionState)
}

private lazy val catalog: SessionCatalog = sessionState.catalog

class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) extends TableCatalog {
private var _name: String = _

override def name: String = _name
Expand Down Expand Up @@ -85,7 +77,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
properties: util.Map[String, String]): Table = {

val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions)
val provider = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName)
val provider = properties.getOrDefault("provider", conf.defaultDataSourceName)
val tableProperties = properties.asScala
val location = Option(properties.get("location"))
val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap)
Expand All @@ -100,7 +92,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog {
partitionColumnNames = partitionColumns,
bucketSpec = maybeBucketSpec,
properties = tableProperties.toMap,
tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions,
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
comment = Option(properties.get("comment")))

try {
Expand Down
Loading