Skip to content

Commit

Permalink
[SPARK-43292][CORE][CONNECT] Move ExecutorClassLoader to core mod…
Browse files Browse the repository at this point in the history
…ule and simplify `Executor#addReplClassLoaderIfNeeded`

### What changes were proposed in this pull request?
This pr move `ExecutorClassLoader` from `repl` module to `core` module an put it into `executor` package, then `ArtifactManagerSuite` can test using maven.

On the other hand, this pr removed reflection calls in the `Executor#addReplClassLoaderIfNeeded` due to `ExecutorClassLoader` and `Executor` are in the same module after this pr.

### Why are the changes needed?
1. `ExecutorClassLoader` only be used by `Executor`, it is more suitable for placing in the `core` module
2. Make `ArtifactManagerSuite` can test using maven.

### Does this PR introduce _any_ user-facing change?
No, just for maven test

### How was this patch tested?
- Pass GitHub Actions
- Manual test

Run the following commands

```
build/mvn clean install -DskipTests -Phive
build/mvn test -pl connector/connect/server
```

**Before**

`ArtifactManagerSuite` test failed due to:

```
23/04/26 16:36:57.140 ScalaTest-main-running-DiscoverySuite ERROR Executor: Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!
```

**After**

All tests passed.

```
Run completed in 10 seconds, 494 milliseconds.
Total number of tests run: 560
Suites: completed 11, aborted 0
Tests: succeeded 560, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes #40956 from LuciferYang/SPARK-43292.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
  • Loading branch information
LuciferYang authored and hvanhovell committed May 8, 2023
1 parent e7a466e commit 1486835
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private void processStreamRequest(final StreamRequest req) {
streamManager.streamSent(req.streamId);
});
} else {
// org.apache.spark.repl.ExecutorClassLoader.STREAM_NOT_FOUND_REGEX should also be updated
// org.apache.spark.executor.ExecutorClassLoader.STREAM_NOT_FOUND_REGEX should also be updated
// when the following error message is changed.
respond(new StreamFailure(req.streamId, String.format(
"Stream '%s' was not found.", req.streamId)));
Expand Down
14 changes: 1 addition & 13 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -992,19 +992,7 @@ private[spark] class Executor(
val classUri = conf.get("spark.repl.class.uri", null)
if (classUri != null) {
logInfo("Using REPL class URI: " + classUri)
try {
val _userClassPathFirst: java.lang.Boolean = userClassPathFirst
val klass = Utils.classForName("org.apache.spark.repl.ExecutorClassLoader")
.asInstanceOf[Class[_ <: ClassLoader]]
val constructor = klass.getConstructor(classOf[SparkConf], classOf[SparkEnv],
classOf[String], classOf[ClassLoader], classOf[Boolean])
constructor.newInstance(conf, env, classUri, parent, _userClassPathFirst)
} catch {
case _: ClassNotFoundException =>
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")
System.exit(1)
null
}
new ExecutorClassLoader(conf, env, classUri, parent, userClassPathFirst)
} else {
parent
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.repl
package org.apache.spark.executor

import java.io.{ByteArrayOutputStream, FileNotFoundException, FilterInputStream, InputStream}
import java.net.{URI, URL, URLEncoder}
Expand Down Expand Up @@ -60,7 +60,7 @@ class ExecutorClassLoader(
val parentLoader = new ParentClassLoader(parent)

// Allows HTTP connect and read timeouts to be controlled for testing / debugging purposes
private[repl] var httpUrlConnectionTimeoutMillis: Int = -1
private[executor] var httpUrlConnectionTimeoutMillis: Int = -1

private val fetchFn: (String) => InputStream = uri.getScheme() match {
case "spark" => getClassFileInputStreamFromSparkRPC
Expand Down Expand Up @@ -269,5 +269,5 @@ extends ClassVisitor(ASM9, cv) {
* throw a special one that's neither [[LinkageError]] nor [[ClassNotFoundException]] to make JVM
* retry to load this class later.
*/
private[repl] class RemoteClassLoaderError(className: String, cause: Throwable)
private[executor] class RemoteClassLoaderError(className: String, cause: Throwable)
extends Error(className, cause)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.repl
package org.apache.spark.executor

import java.io.{File, IOException}
import java.lang.reflect.InvocationTargetException
Expand Down

0 comments on commit 1486835

Please sign in to comment.