From 14868355b862bdfbfa4ca16e7b367eba1f9cd277 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 8 May 2023 11:09:20 -0400 Subject: [PATCH] [SPARK-43292][CORE][CONNECT] Move `ExecutorClassLoader` to `core` module and simplify `Executor#addReplClassLoaderIfNeeded` ### What changes were proposed in this pull request? This pr move `ExecutorClassLoader` from `repl` module to `core` module an put it into `executor` package, then `ArtifactManagerSuite` can test using maven. On the other hand, this pr removed reflection calls in the `Executor#addReplClassLoaderIfNeeded` due to `ExecutorClassLoader` and `Executor` are in the same module after this pr. ### Why are the changes needed? 1. `ExecutorClassLoader` only be used by `Executor`, it is more suitable for placing in the `core` module 2. Make `ArtifactManagerSuite` can test using maven. ### Does this PR introduce _any_ user-facing change? No, just for maven test ### How was this patch tested? - Pass GitHub Actions - Manual test Run the following commands ``` build/mvn clean install -DskipTests -Phive build/mvn test -pl connector/connect/server ``` **Before** `ArtifactManagerSuite` test failed due to: ``` 23/04/26 16:36:57.140 ScalaTest-main-running-DiscoverySuite ERROR Executor: Could not find org.apache.spark.repl.ExecutorClassLoader on classpath! ``` **After** All tests passed. ``` Run completed in 10 seconds, 494 milliseconds. Total number of tests run: 560 Suites: completed 11, aborted 0 Tests: succeeded 560, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes #40956 from LuciferYang/SPARK-43292. Lead-authored-by: yangjie01 Co-authored-by: YangJie Signed-off-by: Herman van Hovell --- .../network/server/TransportRequestHandler.java | 2 +- .../scala/org/apache/spark/executor/Executor.scala | 14 +------------- .../spark/executor}/ExecutorClassLoader.scala | 6 +++--- .../spark/executor}/ExecutorClassLoaderSuite.scala | 2 +- 4 files changed, 6 insertions(+), 18 deletions(-) rename {repl/src/main/scala/org/apache/spark/repl => core/src/main/scala/org/apache/spark/executor}/ExecutorClassLoader.scala (98%) rename {repl/src/test/scala/org/apache/spark/repl => core/src/test/scala/org/apache/spark/executor}/ExecutorClassLoaderSuite.scala (99%) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index bc99248ee2cee..81012c3ea61bf 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -151,7 +151,7 @@ private void processStreamRequest(final StreamRequest req) { streamManager.streamSent(req.streamId); }); } else { - // org.apache.spark.repl.ExecutorClassLoader.STREAM_NOT_FOUND_REGEX should also be updated + // org.apache.spark.executor.ExecutorClassLoader.STREAM_NOT_FOUND_REGEX should also be updated // when the following error message is changed. respond(new StreamFailure(req.streamId, String.format( "Stream '%s' was not found.", req.streamId))); diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5d623b22abdc0..ed3e8626d6d69 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -992,19 +992,7 @@ private[spark] class Executor( val classUri = conf.get("spark.repl.class.uri", null) if (classUri != null) { logInfo("Using REPL class URI: " + classUri) - try { - val _userClassPathFirst: java.lang.Boolean = userClassPathFirst - val klass = Utils.classForName("org.apache.spark.repl.ExecutorClassLoader") - .asInstanceOf[Class[_ <: ClassLoader]] - val constructor = klass.getConstructor(classOf[SparkConf], classOf[SparkEnv], - classOf[String], classOf[ClassLoader], classOf[Boolean]) - constructor.newInstance(conf, env, classUri, parent, _userClassPathFirst) - } catch { - case _: ClassNotFoundException => - logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!") - System.exit(1) - null - } + new ExecutorClassLoader(conf, env, classUri, parent, userClassPathFirst) } else { parent } diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala similarity index 98% rename from repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala rename to core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala index 7d4758ec0a153..b9f4486b66fa6 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.repl +package org.apache.spark.executor import java.io.{ByteArrayOutputStream, FileNotFoundException, FilterInputStream, InputStream} import java.net.{URI, URL, URLEncoder} @@ -60,7 +60,7 @@ class ExecutorClassLoader( val parentLoader = new ParentClassLoader(parent) // Allows HTTP connect and read timeouts to be controlled for testing / debugging purposes - private[repl] var httpUrlConnectionTimeoutMillis: Int = -1 + private[executor] var httpUrlConnectionTimeoutMillis: Int = -1 private val fetchFn: (String) => InputStream = uri.getScheme() match { case "spark" => getClassFileInputStreamFromSparkRPC @@ -269,5 +269,5 @@ extends ClassVisitor(ASM9, cv) { * throw a special one that's neither [[LinkageError]] nor [[ClassNotFoundException]] to make JVM * retry to load this class later. */ -private[repl] class RemoteClassLoaderError(className: String, cause: Throwable) +private[executor] class RemoteClassLoaderError(className: String, cause: Throwable) extends Error(className, cause) diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorClassLoaderSuite.scala similarity index 99% rename from repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala rename to core/src/test/scala/org/apache/spark/executor/ExecutorClassLoaderSuite.scala index 23ea3fee2505b..8e93da7adf0b1 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorClassLoaderSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.repl +package org.apache.spark.executor import java.io.{File, IOException} import java.lang.reflect.InvocationTargetException