Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49436][Connect][SQL] Common interface for SQLContext #48958

Closed
wants to merge 18 commits into from

Conversation

xupefei
Copy link
Contributor

@xupefei xupefei commented Nov 25, 2024

What changes were proposed in this pull request?

This PR adds an abstraction for SQLContext in the spark-api package. Both sides (Classic and Connect) maintain their own implementation.

Why are the changes needed?

To unify the API interface and make SQLContext available to Spark Connect.

Does this PR introduce any user-facing change?

Yes. Connect users are now able to call sparkSession.sqlContext and the APIs it provides.

How was this patch tested?

Not needed. All new methods are mirrored from SparkSession except tables(), which is covered by existing tests in ListTablesSuite.

Was this patch authored or co-authored using generative AI tooling?

No.

@@ -105,7 +105,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid

// Create state store in a task and get the RocksDBConf from the instantiated RocksDB instance
val rocksDBConfInTask: RocksDBConf = testRDD.mapPartitionsWithStateStore[RocksDBConf](
spark.sqlContext, testStateInfo, testSchema, testSchema,
spark, testStateInfo, testSchema, testSchema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a follow-up should be to weed out all existing uses of SQLContext in the code base. There are a couple of user facing APIs that we can't change, but most of it should go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I reverted this change. Let's to it in another PR.

@xupefei xupefei changed the title [SPARK-49436][Connect][SQL][WIP] Common interface for SQLContext [SPARK-49436][Connect][SQL] Common interface for SQLContext Nov 27, 2024
@xupefei xupefei requested a review from hvanhovell November 27, 2024 16:38
Copy link
Contributor

@vicennial vicennial left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall approach LGTM

WIP: to be done in this PR after we agree on the interface implementation.

Do you mean the interface impl in the changes at this moment or are there other decisions?

/**
* An interface to register custom QueryExecutionListener that listen for execution metrics.
*/
def listenerManager: ExecutionListenerManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can implemented. The same applies for udf, streams, experimental, streams, ... you will have to override them, but that should ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to push back.

Copy link
Contributor Author

@xupefei xupefei Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree and I consider them low priority. Do we have tickets for them?

@hvanhovell
Copy link
Contributor

A couple of small comments.

@xupefei
Copy link
Contributor Author

xupefei commented Dec 3, 2024

WIP: to be done in this PR after we agree on the interface implementation.

Do you mean the interface impl in the changes at this moment or are there other decisions?

I was considering testing APIs that have a new implementation, such as tables(), but now that is no longer needed - all APIs now use the existing implementation in SparkSession.

def listenerManager: ExecutionListenerManager = sparkSession.listenerManager

/** @inheritdoc */
def setConf(props: Properties): Unit = sparkSession.conf.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are you trying to accomplish with this lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this behaviour from

/** Set Spark SQL configuration properties. */
def setConf(props: Properties): Unit = settings.synchronized {
props.asScala.foreach { case (k, v) => setConfString(k, v) }
}

I believe it's to avoid having two setConf with multiple KVs running at the same time. The underlying Map won't have any issue without the lock.

@@ -99,6 +99,28 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
}

test("get tables from a database") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not tested????

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah at least I didn't see any...

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small question, otherwise LGTM.

@HyukjinKwon
Copy link
Member

Merged to master.

@xupefei xupefei deleted the api-sqlcontext branch December 23, 2024 11:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants