-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-44078][CONNECT][CORE] Add support for classloader/resource isolation #41625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| val addedFiles: Map[String, Long], | ||
| val addedJars: Map[String, Long], | ||
| val addedArchives: Map[String, Long], | ||
| val artifacts: JobArtifactSet, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq, we don't need this change, right? I suspect this is just a part of refactoring? If we want to change this, we should also change SparkContext.scala to match to have one higher level artifact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq, we don't need this change, right? I suspect this is just a part of refactoring?
It's not "needed" per-se but we use JobArtifactSet here because we also want to pass in the uuid and replClassDirUri which are used here in the Executor
If we want to change this, we should also change SparkContext.scala to match to have one higher level artifact.
To clarify, do you mean replacing these with the JobArtifactSet?
I've tried to minimize behaviour/user-visible changes in this PR (it should be a no-op in terms of existing behaviour) since this is a requisite for multi-user Spark Connect sessions but can be later expanded for general multi-user support (i.e outside of Spark Connect) in follow-up PRs.
|
I like this change, and based on this PR mechanism, updating udf might also be possible in future? |
Yes, we aim to use this mechanism for multi-user Spark Connect sessions (and thus, multi-user UDFs). This can also be later extended for non spark connect use cases. |
| def withActive[T](f: => T): T = JobArtifactSet.withActive(this)(f) | ||
|
|
||
| override def hashCode(): Int = { | ||
| Seq(uuid, replClassDirUri, jars.toSeq, files.toSeq, archives.toSeq).hashCode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Objects.hash(...) is bit simpler.
| /** | ||
| * Empty artifact set for use in tests. | ||
| */ | ||
| private[spark] def apply(): JobArtifactSet = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just create a single empty one?
hvanhovell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Merging to master. |
### What changes were proposed in this pull request? This PR follows up on #41625 to utilize the classloader/resource isolation in Spark to support multi-user Spark Connect sessions which are isolated from each other (currently, classfiles and jars) and thus, enables multi-user REPLs and UDFs. - Instead of a single instance of `SparkArtifactManager` handling all the artifact movement, each instance is now responsible for a single `sessionHolder` (i.e a Spark Connect session) which it requires in it's constructor. - Previously, all artifacts were stored under a common directory `sparkConnectArtifactDirectory` which was initialised in `SparkContext`. Moving forward, all artifacts are instead separated based on the underlying `SparkSession` (using it's `sessionUUID`) they belong to in the format of `ROOT_ARTIFACT_DIR/<sessionUUID>/jars/...`. - The `SparkConnectArtifactManager` also builds a `JobArtifactSet` [here](https://github.com/apache/spark/pull/41701/files#diff-f833145e80f2b42f54f446a0f173e60e3f5ad657a6ad1f2135bc5c20bcddc90cR157-R168) which is eventually propagated to the executors where the classloader isolation mechanism uses the `uuid` parameter. - Currently, classfile and jars are isolated but files and archives aren't. ### Why are the changes needed? To enable support for multi-user sessions coexisting on a singular Spark cluster. For example, multi-user Scala REPLs/UDFs will be supported with this PR. ### Does this PR introduce _any_ user-facing change? Yes, multiple Spark Connect REPLs may use a single Spark cluster at once and execute their own UDFs without intefering with each other. ### How was this patch tested? New unit tests in `ArtifactManagerSuite` + existing tests. Closes #41701 from vicennial/SPARK-44146. Authored-by: vicennial <venkata.gudesa@databricks.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
| val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler) | ||
| eventProcessLoop.post(JobSubmitted( | ||
| jobId, rdd, func2, partitions.toArray, callSite, waiter, | ||
| JobArtifactSet.getActiveOrDefault(sc), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be the SparkContext. So, if multiple clients adds artifacts via SparkConnect.addArtifacts before running a job, the TaskDescription will have artifacts from those clients.
Meaning that the cached IsolatedSessionState in executor side will have the state with mixed artifacts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the TaskDescription will have artifacts from those clients.
This wouldn't be the case because JobArtifactSet#getActiveOrDefault (def) first checks for the "active" set before defaulting back to copying from SparkContext.
For the flow of how the active set is propagated, here is an example for an execute operation in SparkConnext:
- The execution block is wrapped with
SessionHolder#withSessionhere. SessionHolder#withContextClassLoaderis then called here which in turn callsJobArtifactSet#withActivehere and sets the active set toSessionHolder#connectJobArtifactSet- The actual
JobArtifactSetthat is used is built up here inSparkConnectArtifactManager#jobArtifactSet
Since each client has their own JobArtifactSet made active when executing an operation, the TaskDescription would have artifacts specific to that client and subsequently, IsolatedSessionState.
| // The default isolation group | ||
| val defaultSessionState = newSessionState("default", None) | ||
|
|
||
| val isolatedSessionCache = CacheBuilder.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is another problem here. If you cache Spark session, and say, it's evicted, then it will create a new session state with empty file lists. In this case, the Executor will try to download them again, and overwrite all the files. This behaviour is disallowed by default in SparkContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.files.overwrite is false by default .. so the tasks will fail actually.
### Previous behaviour Previously, we kept `JobArtifactSet` and leveraged thread local for each client. 1. The execution block is wrapped with `SessionHolder.withSession` [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala#L53). 2. `SessionHolder.withContextClassLoader` is then called [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala#L130) which in turn calls `JobArtifactSet.withActive` [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala#L118) and sets the active set to `SessionHolder.connectJobArtifactSet` 3. The actual `JobArtifactSet` that is used is built up [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala#L157) in `SparkConnectArtifactManager.jobArtifactSet` Since each client has their own `JobArtifactSet` made `active` when executing an operation, the `TaskDescription` would have artifacts specific to that client and subsequently, `IsolatedSessionState` in Executor. Therefore, we were able to separate the Spark Connect specific logic to the Spark Connect module. ### Problem Mainly it was all good; however, the problem is that we don't call `SparkContext.addFile` or `SparkContext.addJar`, but we just pass it directly at the scheduler (to `TaskDescription`). This is fine in general but exposes several problems by not directly calling `SparkContext.addFile`: - `SparkContext.postEnvironmentUpdate` is not invoked at `SparkContext.addFile` which matters in, for example, recording the events for History Server. - Specifically for archives, `Utils.unpack(source, dest)` is not invoked at `SparkContext.addFile` in order to untar properly in the Driver. Therefore, we should duplicate those logics in Spark Connect server side, which is not ideal. In addition, we already added the isolation logic into the Executor. Driver and Executor are the symmetry (not Spark Connect Server <> Executor). Therefore, it matters about code readability, and expectation in their roles. ### Solution in this PR This PR proposes to support session-based files and archives in Spark Connect. This PR leverages the basework for #41701 and #41625 (for jars in Spark Connect Scala client). The changed logic is as follows: - Keep the session UUID, and Spark Connect Server specific information such as REPL class path within a thread local. - Add session ID when we add files or archives. `SparkContext` keeps them with a map `Map(session -> Map(file and timestamp))` in order to reuse the existing logic to address the problem mentioned After that, on executor side, - Executors create additional directory, named by session UUID, on the top of the default directory (that is the current working directory, see `SparkFiles.getRootDirectory`). - When we execute Python workers, it sets the current working directory to the one created above. - End users access to these files via using the current working directory e.g., `./blahblah.txt` in their Python UDF. Therefore, compatible with/without Spark Connect. Note that: - Here it creates Python workers for individual session because we set the session UUID as an environment variable, and we create new Python workers if environment variables are different, see also `SparkEnv.createPythonWorker` - It already kills the daemon and Python workers if they are not used for a while. ### TODOs and limitations Executor also maintains the file list but with a cache so it can evict the cache. However, it has a problem - It works as follows: - New `IsolatedSessionState` is created. - Task is executed once, and `IsolatedSessionState` holds the file list. - Later `IsolatedSessionState` is evicted at https://github.com/apache/spark/pull/41625/files#diff-d7a989c491f3cb77cca02c701496a9e2a3443f70af73b0d1ab0899239f3a789dR187 - Executor will create a new `IsolatedSessionState` with empty file lists. - Executor will attempt to redownload and overwrite the files (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1058-L1064) - `spark.files.overwrite` is `false` by default. So the task will suddenly fail at this point. Possible solutions are: - For 1., we should maintain a cache with TTL, and remove them - For 2. we should have a dedicated directory (which this PR does) and remove the directory away when the cache is evicted. So the overwrite does not happen ### Why are the changes needed? In order to allow session-based artifact control and multi tenancy. ### Does this PR introduce _any_ user-facing change? Yes, this PR now allows multiple sessions to have their own space. For example, session A and session B can add a file in the same name. Previously this was not possible. ### How was this patch tested? Unittests were added. Closes #41495 from HyukjinKwon/session-base-exec-dir. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…isolated session cache is evicted ### What changes were proposed in this pull request? This PR proposes to remove session-based directories when the isolated session is evicted from the cache. ### Why are the changes needed? SPARK-44078 added the cache for isolated sessions, and SPARK-44348 added the session-based directory for isolation. When the isolated session cache is evicted, we should remove the session-based directory so it doesn't fail when the same session is used, see also #41625 (comment) ### Does this PR introduce _any_ user-facing change? No to end users since the feature has not been released yet. ### How was this patch tested? I manually tested as described in #41292. Especially, I reduced the TTL to few minutes, and tested as below at the last step: ```python spark.range(10).select(plug_one("id")).show() spark.range(10).select(plug_one("id")).show() # Wait few minutes spark.range(10).select(plug_one("id")).show() ``` I verified that the same session can be added back to the cache, and creates the directory with the same name by reading executor's stderr at Spark UI. Closes #42289 from HyukjinKwon/SPARK-44631. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…isolated session cache is evicted ### What changes were proposed in this pull request? This PR proposes to remove session-based directories when the isolated session is evicted from the cache. ### Why are the changes needed? SPARK-44078 added the cache for isolated sessions, and SPARK-44348 added the session-based directory for isolation. When the isolated session cache is evicted, we should remove the session-based directory so it doesn't fail when the same session is used, see also #41625 (comment) ### Does this PR introduce _any_ user-facing change? No to end users since the feature has not been released yet. ### How was this patch tested? I manually tested as described in #41292. Especially, I reduced the TTL to few minutes, and tested as below at the last step: ```python spark.range(10).select(plug_one("id")).show() spark.range(10).select(plug_one("id")).show() # Wait few minutes spark.range(10).select(plug_one("id")).show() ``` I verified that the same session can be added back to the cache, and creates the directory with the same name by reading executor's stderr at Spark UI. Closes #42289 from HyukjinKwon/SPARK-44631. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 35d4765) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR adds a
JobArtifactSetwhich holds the jars/files/archives relevant to a particular Spark Job. Using this "set", we are able to support specifying visible/available resources for a job based on, for example, the SparkSession that the job belongs to.With resource specification support, we are further able to extend this to support classloader/resource isolation on the executors. The executors would use the
uuidfrom theJobArtifactSetto either create or obtain from a cache the IsolatedSessionState which holds the "state" (i.e classloaders, files, jars, archives etc) for that particularuuid.Currently, the code will default to copying over resources from the
SparkContext(the current/default behaviour) to avoid any behaviour changes. A follow-up PR would use this mechanism in Spark Connect to isolate resources among Spark Connect sessions.Why are the changes needed?
A current limitation of Scala UDFs is that a Spark cluster would only be able to support a single REPL at a time due to the fact that classloaders of different Spark Sessions (and therefore, Spark Connect sessions) aren't isolated from each other. Without isolation, REPL-generated class files and user-added JARs may conflict if there are multiple users of the cluster.
Thus, we need a mechanism to support isolated sessions (i.e isolated resources/classloader) so that each REPL user does not conflict with other users on the same cluster.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests + new suite
JobArtifactSetSuite.