-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24252][SQL] Add v2 catalog plugin system #23915
Conversation
@mccheah, @cloud-fan, could you review? Matt, I've updated this to use |
* @throws SparkException If the plugin class cannot be found or instantiated | ||
*/ | ||
public static CatalogPlugin load(String name, SQLConf conf) throws SparkException { | ||
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this configuration allow for loading multiple catalogs with the same name but with different contexts? For example, say I want to load a different catalog plugin for functions vs. tables, but I want them to be named the same.
My intuition is that we shouldn't allow that as it makes the behavior quite ambiguous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is reasonable to go with what is here: a name has just one implementation class. That class can implement multiple catalog interfaces, which do not conflict.
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
Show resolved
Hide resolved
@@ -620,6 +622,12 @@ class SparkSession private( | |||
*/ | |||
@transient lazy val catalog: Catalog = new CatalogImpl(self) | |||
|
|||
@transient private lazy val catalogs = new mutable.HashMap[String, CatalogPlugin]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be some best-effort lifecycle for stopping or closing these catalog plugins? I'm wondering for example if we want to close connections to databases when the Spark Session shuts down. Maybe add a stop()
API and calling it from SparkSession#stop
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can check for Closeable
and clean them up. Do you think this is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using an explicit stop
API is more consistent with other parts of Spark, thinking the SparkSession itself and scheduler components for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a use case for stopping catalogs? HiveExternalCatalog doesn't have a stop method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
possibly - a job server type thing that would like to "update" or "refresh" catalog perhaps. I don't know if it's super important for starter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say let's build it when we have a use case for it and avoid unnecessary additions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking JDBC would be a significant one, enough so that it's worth putting in the API up front and showing that we support it. We'd like to be able to shut down connection pools when the catalog is discarded.
Overall it looks reasonable; it's about what I would expect for a pluggable system that relies on classloading and reflective instantiation. We do this in other places too, for example in |
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java
Outdated
Show resolved
Hide resolved
* @param name the name used to identify and load this catalog | ||
* @param options a case-insensitive string map of configuration | ||
*/ | ||
void initialize(String name, CaseInsensitiveStringMap options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's weird to ask the catalog plugin to report name and initialize
requires a name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall this be void initialize(CaseInsensitiveStringMap options);
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The catalog is passed the name that was used to identify it. For example, say I have a REST-based catalog endpoint that I'm configuring in two cases, like this:
spark.sql.catalog.prod = com.example.MyCatalogPlugin
spark.sql.catalog.prod.connuri = http://prod.catalog.example.com:80/
spark.sql.catalog.test = com.example.MyCatalogPlugin
spark.sql.catalog.test.connuri = http://test.catalog.example.com:80/
MyCatalogPlugin
is instantiated and configured twice and both times is passed the name it is configured with, prod
and test
.
Adding a getter for name
just makes it easy to identify the catalog without Spark keeping track of name -> catalog instance everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, how would the MyCatalogPlugin
report its name? prod
or test
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ depends on which catalog instance. one would say prod
the other would say test
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java
Show resolved
Hide resolved
IIRC we want to follow Presto and support config files as well. Is it still in the roadmap? |
Test build #102846 has finished for PR 23915 at commit
|
Could config files be implementation-specific? For example I'd want my Data Source implementation to be given a YML file but another data source implementation wants a properties file. Deserialization of the file doesn't make sense to be done on the Spark side, but in the internal implementation of the data source. Thus I'd imagine one of the properties passed to the data source implementation would be the path to the configuration to deserialize, and we leave it up to the implementation to decide how to handle that path in So I don't think we need to make configuration files as an explicit part of the API here. |
I hadn't considered this before. Can we add it in a follow-up if it is something that we want to do? |
@mccheah, @cloud-fan, I've updated the PR and replied to your comments. Please have a look. Thank you! |
Test build #102891 has finished for PR 23915 at commit
|
|
||
} catch (ClassNotFoundException e) { | ||
throw new SparkException(String.format( | ||
"Cannot find catalog plugin class for catalog '%s': %s", name, pluginClassName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not pass e
for consistency?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is useful in this case. The stack trace would be from Class.forName
into the Java runtime, and all of the information from the message, like the class name, is included in this one. The stack above this call is also included in the thrown exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Then we should only wrap the Class.forName
call in the try-catch - if anything else in the block throws ClassNotFoundException
it will not be obvious where it was thrown from. And while ClassNotFoundException
can't be thrown by any other code currently, future contributors adding code in this block can get their exceptions swallowed up.
I do think in general it's best practice to pass along the exception. Prevents us from losing any state, even if that state is noise 99.9% of the time.
@@ -620,6 +622,12 @@ class SparkSession private( | |||
*/ | |||
@transient lazy val catalog: Catalog = new CatalogImpl(self) | |||
|
|||
@transient private lazy val catalogs = new mutable.HashMap[String, CatalogPlugin]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
possibly - a job server type thing that would like to "update" or "refresh" catalog perhaps. I don't know if it's super important for starter
17e6de1
to
6edb880
Compare
Test build #102918 has finished for PR 23915 at commit
|
Retest this please. |
Test build #103071 has finished for PR 23915 at commit
|
sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogPlugin.java
Show resolved
Hide resolved
@cloud-fan, I've updated the docs as you requested. There should be no need to wait for tests because the change was to docs only. |
Test build #103102 has finished for PR 23915 at commit
|
The change itself LGTM(except one comment) but I need a little more information to justify the design:
Interestingly the EDIT: |
retest this please |
@transient private lazy val catalogs = new mutable.HashMap[String, CatalogPlugin]() | ||
|
||
private[sql] def catalog(name: String): CatalogPlugin = synchronized { | ||
catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't support unloading or reloading a catalog plugin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have any use case of unloading/reloading?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible case like loading a catalog plugin with different config? Currently it has to load it with a different catalog name. For example, like for a REST-based catalog endpoint, if it requires some authentication. We might want to reconnect the catalog with different authentication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be fine adding this feature if it is needed, but we don't currently support reconfiguration in SessionCatalog so I think it makes sense to get this in and consider adding it later.
Test build #103118 has finished for PR 23915 at commit
|
retest this please |
Test build #103128 has finished for PR 23915 at commit
|
f118a3d
to
7c64a26
Compare
Test build #103162 has finished for PR 23915 at commit
|
Test build #103165 has finished for PR 23915 at commit
|
thanks, merging to master! |
## What changes were proposed in this pull request? This adds a v2 API for adding new catalog plugins to Spark. * Catalog implementations extend `CatalogPlugin` and are loaded via reflection, similar to data sources * `Catalogs` loads and initializes catalogs using configuration from a `SQLConf` * `CaseInsensitiveStringMap` is used to pass configuration to `CatalogPlugin` via `initialize` Catalogs are configured by adding config properties starting with `spark.sql.catalog.(name)`. The name property must specify a class that implements `CatalogPlugin`. Other properties under the namespace (`spark.sql.catalog.(name).(prop)`) are passed to the provider during initialization along with the catalog name. This replaces apache#21306, which will be implemented in two multiple parts: the catalog plugin system (this commit) and specific catalog APIs, like `TableCatalog`. ## How was this patch tested? Added test suites for `CaseInsensitiveStringMap` and for catalog loading. Closes apache#23915 from rdblue/SPARK-24252-add-v2-catalog-plugins. Authored-by: Ryan Blue <blue@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What's the difference between the existing |
It's just a java friendly version of |
What changes were proposed in this pull request?
This adds a v2 API for adding new catalog plugins to Spark.
CatalogPlugin
and are loaded via reflection, similar to data sourcesCatalogs
loads and initializes catalogs using configuration from aSQLConf
CaseInsensitiveStringMap
is used to pass configuration toCatalogPlugin
viainitialize
Catalogs are configured by adding config properties starting with
spark.sql.catalog.(name)
. The name property must specify a class that implementsCatalogPlugin
. Other properties under the namespace (spark.sql.catalog.(name).(prop)
) are passed to the provider during initialization along with the catalog name.This replaces #21306, which will be implemented in two multiple parts: the catalog plugin system (this commit) and specific catalog APIs, like
TableCatalog
.How was this patch tested?
Added test suites for
CaseInsensitiveStringMap
and for catalog loading.