Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* An API to extend the Spark built-in session catalog. Implementation can get the built-in session
* catalog from {@link #setDelegateCatalog(TableCatalog)}, implement catalog functions with
* some custom logic and call the built-in session catalog at the end. For example, they can
* implement {@code createTable}, do something else before calling {@code createTable} of the
* built-in session catalog.
*/
@Experimental
public interface CatalogExtension extends TableCatalog {

/**
* This will be called only once by Spark to pass in the Spark built-in session catalog, after
* {@link #initialize(String, CaseInsensitiveStringMap)} is called.
*/
void setDelegateCatalog(TableCatalog delegate);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import java.util.Map;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalog.v2.expressions.Transform;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* A simple implementation of {@link CatalogExtension}, which implements all the catalog functions
* by calling the built-in session catalog directly. This is created for convenience, so that users
* only need to override some methods where they want to apply custom logic. For example, they can
* override {@code createTable}, do something else before calling {@code super.createTable}.
*/
@Experimental
public abstract class DelegatingCatalogExtension implements CatalogExtension {

private TableCatalog delegate;

public final void setDelegateCatalog(TableCatalog delegate) {
this.delegate = delegate;
}

@Override
public String name() {
return delegate.name();
}

@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {}

@Override
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
return delegate.listTables(namespace);
}

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
return delegate.loadTable(ident);
}

@Override
public void invalidateTable(Identifier ident) {
delegate.invalidateTable(ident);
}

@Override
public boolean tableExists(Identifier ident) {
return delegate.tableExists(ident);
}

@Override
public Table createTable(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
return delegate.createTable(ident, schema, partitions, properties);
}

@Override
public Table alterTable(
Identifier ident,
TableChange... changes) throws NoSuchTableException {
return delegate.alterTable(ident, changes);
}

@Override
public boolean dropTable(Identifier ident) {
return delegate.dropTable(ident);
}

@Override
public void renameTable(
Identifier oldIdent,
Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException {
delegate.renameTable(oldIdent, newIdent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ import org.apache.spark.sql.internal.SQLConf
* A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow
* the caller to look up a catalog by name.
*/
class CatalogManager(conf: SQLConf) extends Logging {
class CatalogManager(conf: SQLConf, defaultSessionCatalog: TableCatalog) extends Logging {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz I agree with you that the logic was a bit confusing, so I refine it a little bit:

  1. when the V2_SESSION_CATALOG is not set, return the default session catalog V2SessionCatalog.
  2. when the V2_SESSION_CATALOG is set, try to instantiate it, and return the default session catalog if we hit problems during instantiating.


private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]

def catalog(name: String): CatalogPlugin = synchronized {
catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))
if (name.equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)) {
v2SessionCatalog
} else {
catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))
}
}

def defaultCatalog: Option[CatalogPlugin] = {
Expand All @@ -47,16 +51,30 @@ class CatalogManager(conf: SQLConf) extends Logging {
}
}

def v2SessionCatalog: Option[CatalogPlugin] = {
try {
Some(catalog(CatalogManager.SESSION_CATALOG_NAME))
} catch {
case NonFatal(e) =>
logError("Cannot load v2 session catalog", e)
None
private def loadV2SessionCatalog(): CatalogPlugin = {
Catalogs.load(CatalogManager.SESSION_CATALOG_NAME, conf) match {
case extension: CatalogExtension =>
extension.setDelegateCatalog(defaultSessionCatalog)
extension
case other => other
}
}

// If the V2_SESSION_CATALOG config is specified, we try to instantiate the user-specified v2
// session catalog. Otherwise, return the default session catalog.
def v2SessionCatalog: CatalogPlugin = {
conf.getConf(SQLConf.V2_SESSION_CATALOG).map { customV2SessionCatalog =>
try {
catalogs.getOrElseUpdate(CatalogManager.SESSION_CATALOG_NAME, loadV2SessionCatalog())
} catch {
case NonFatal(_) =>
logError(
"Fail to instantiate the custom v2 session catalog: " + customV2SessionCatalog)
defaultSessionCatalog
}
}.getOrElse(defaultSessionCatalog)
}

private def getDefaultNamespace(c: CatalogPlugin) = c match {
case c: SupportsNamespaces => c.defaultNamespace()
case _ => Array.empty[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait LookupCatalog extends Logging {
* This happens when the source implementation extends the v2 TableProvider API and is not listed
* in the fallback configuration, spark.sql.sources.write.useV1SourceList
*/
def sessionCatalog: Option[CatalogPlugin] = catalogManager.v2SessionCatalog
def sessionCatalog: CatalogPlugin = catalogManager.v2SessionCatalog

/**
* Extract catalog plugin and remaining identifier names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import java.util
import java.util.Locale

import scala.collection.mutable
Expand All @@ -25,7 +26,7 @@ import scala.util.Random

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2._
import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
Expand All @@ -45,6 +46,7 @@ import org.apache.spark.sql.internal.SQLConf.{PartitionOverwriteMode, StoreAssig
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.sources.v2.internal.V1Table
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
Expand All @@ -60,6 +62,24 @@ object SimpleAnalyzer extends Analyzer(
},
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))

object FakeV2SessionCatalog extends TableCatalog {
private def fail() = throw new UnsupportedOperationException
override def listTables(namespace: Array[String]): Array[Identifier] = fail()
override def loadTable(ident: Identifier): Table = {
throw new NoSuchTableException(ident.toString)
}
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = fail()
override def alterTable(ident: Identifier, changes: TableChange*): Table = fail()
override def dropTable(ident: Identifier): Boolean = fail()
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = fail()
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = fail()
override def name(): String = fail()
}

/**
* Provides a way to keep state during the analysis, this enables us to decouple the concerns
* of analysis environment from the catalog.
Expand Down Expand Up @@ -101,15 +121,21 @@ object AnalysisContext {
*/
class Analyzer(
catalog: SessionCatalog,
v2SessionCatalog: TableCatalog,
conf: SQLConf,
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {

// Only for tests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't link this too much. Do we need to change too much if we need to remove this constructor? It's bad to inject test related stuff in production code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are more than 10 test suites using this constructor. BTW this is an existing constructor, this PR just add a comment.

def this(catalog: SessionCatalog, conf: SQLConf) = {
this(catalog, conf, conf.optimizerMaxIterations)
this(catalog, FakeV2SessionCatalog, conf, conf.optimizerMaxIterations)
}

def this(catalog: SessionCatalog, v2SessionCatalog: TableCatalog, conf: SQLConf) = {
this(catalog, v2SessionCatalog, conf, conf.optimizerMaxIterations)
}

override val catalogManager: CatalogManager = new CatalogManager(conf)
override val catalogManager: CatalogManager = new CatalogManager(conf, v2SessionCatalog)

def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
AnalysisHelper.markInAnalyzer {
Expand Down Expand Up @@ -954,7 +980,7 @@ class Analyzer(
case scala.Right(tableOpt) =>
tableOpt.map { table =>
AlterTable(
sessionCatalog.get.asTableCatalog, // table being resolved means this exists
sessionCatalog.asTableCatalog,
Identifier.of(tableName.init.toArray, tableName.last),
DataSourceV2Relation.create(table),
changes
Expand Down Expand Up @@ -2837,7 +2863,7 @@ class Analyzer(
case CatalogObjectIdentifier(Some(v2Catalog), ident) =>
scala.Left((v2Catalog, ident, loadTable(v2Catalog, ident)))
case CatalogObjectIdentifier(None, ident) =>
catalogManager.v2SessionCatalog.flatMap(loadTable(_, ident)) match {
loadTable(catalogManager.v2SessionCatalog, ident) match {
case Some(_: V1Table) => scala.Right(None)
case other => scala.Right(other)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1956,9 +1956,12 @@ object SQLConf {
.createOptional

val V2_SESSION_CATALOG = buildConf("spark.sql.catalog.session")
.doc("Name of the default v2 catalog, used when a catalog is not identified in queries")
.doc("A catalog implementation that will be used in place of the Spark built-in session " +
"catalog for v2 operations. The implementation may extend `CatalogExtension` to be " +
"passed the Spark built-in session catalog, so that it may delegate calls to the " +
"built-in session catalog.")
.stringConf
.createWithDefault("org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog")
.createOptional

val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.looseUpcast")
.doc("When true, the upcast will be loose and allows string to atomic types.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import java.util

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalog.v2.{CatalogManager, NamespaceChange, SupportsNamespaces}
import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class CatalogManagerSuite extends SparkFunSuite {

test("CatalogManager should reflect the changes of default catalog") {
val conf = new SQLConf
val catalogManager = new CatalogManager(conf)
val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog)
assert(catalogManager.currentCatalog.isEmpty)
assert(catalogManager.currentNamespace.sameElements(Array("default")))

Expand All @@ -42,7 +43,7 @@ class CatalogManagerSuite extends SparkFunSuite {

test("CatalogManager should keep the current catalog once set") {
val conf = new SQLConf
val catalogManager = new CatalogManager(conf)
val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog)
assert(catalogManager.currentCatalog.isEmpty)
conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName)
catalogManager.setCurrentCatalog("dummy")
Expand All @@ -57,7 +58,7 @@ class CatalogManagerSuite extends SparkFunSuite {

test("current namespace should be updated when switching current catalog") {
val conf = new SQLConf
val catalogManager = new CatalogManager(conf)
val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog)
catalogManager.setCurrentNamespace(Array("abc"))
assert(catalogManager.currentNamespace.sameElements(Array("abc")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

val session = df.sparkSession
val canUseV2 = lookupV2Provider().isDefined
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog
val sessionCatalog = session.sessionState.analyzer.sessionCatalog

session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
insertInto(catalog, ident)

case CatalogObjectIdentifier(None, ident)
if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 =>
insertInto(sessionCatalogOpt.get, ident)
case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 =>
insertInto(sessionCatalog, ident)

case AsTableIdentifier(tableIdentifier) =>
insertInto(tableIdentifier)
Expand Down Expand Up @@ -488,17 +487,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

val session = df.sparkSession
val canUseV2 = lookupV2Provider().isDefined
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog
val sessionCatalog = session.sessionState.analyzer.sessionCatalog

session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
saveAsTable(catalog.asTableCatalog, ident, modeForDSV2)

case CatalogObjectIdentifier(None, ident)
if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 =>
case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 =>
// We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility
// for now.
saveAsTable(sessionCatalogOpt.get.asTableCatalog, ident, modeForDSV1)
saveAsTable(sessionCatalog.asTableCatalog, ident, modeForDSV1)

case AsTableIdentifier(tableIdentifier) =>
saveAsTable(tableIdentifier)
Expand Down
Loading