Skip to content

Commit 31adf15

Browse files
committed
Fix after review
1 parent 79f716d commit 31adf15

File tree

10 files changed

+51
-60
lines changed

10 files changed

+51
-60
lines changed

connector/connect/client/jvm/pom.xml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -221,13 +221,6 @@
221221
<groupId>org.apache.maven.plugins</groupId>
222222
<artifactId>maven-jar-plugin</artifactId>
223223
<executions>
224-
<execution>
225-
<id>default-jar</id>
226-
<phase>compile</phase>
227-
<goals>
228-
<goal>jar</goal>
229-
</goals>
230-
</execution>
231224
<execution>
232225
<id>prepare-test-jar</id>
233226
<phase>test-compile</phase>

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UDFClassLoadingE2ESuite.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class UDFClassLoadingE2ESuite extends RemoteSparkSession {
4646
.range(10)
4747
.filter(n => {
4848
// Try to use spark result
49-
new SparkResult[Int](null, null, null)
49+
new SparkResult[Int](null, null, null, "")
5050
n > 5
5151
})
5252
.collectAsList()
@@ -58,17 +58,20 @@ class UDFClassLoadingE2ESuite extends RemoteSparkSession {
5858
addClientTestArtifactInServerClasspath(session)
5959
val ds = session.range(10).filter(n => n % 2 == 0)
6060

61-
// load SparkResult as a stubbed class
61+
// Load SparkResult as a stubbed class
6262
val rows = ds.collectAsList()
6363
assert(rows == Arrays.asList[Long](0, 2, 4, 6, 8))
6464

6565
// Upload SparkResult and then SparkResult can be used in the udf
6666
addClientTestArtifactInServerClasspath(session, testJar = false)
67-
val rows2 = session.range(10).filter(n => {
68-
// Try to use spark result
69-
new SparkResult[Int](null, null, null)
70-
n > 5
71-
}).collectAsList()
67+
val rows2 = session
68+
.range(10)
69+
.filter(n => {
70+
// Try to use spark result
71+
new SparkResult[Int](null, null, null, "")
72+
n > 5
73+
})
74+
.collectAsList()
7275
assert(rows2 == Arrays.asList[Long](6, 7, 8, 9))
7376
}
7477

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/IntegrationTestUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ object IntegrationTestUtils {
2929

3030
// System properties used for testing and debugging
3131
private val DEBUG_SC_JVM_CLIENT = "spark.debug.sc.jvm.client"
32-
// Enable this flag to print all client debug log + server logs to the console
33-
private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "true").toBoolean
32+
// Enable this flag to print all server logs to the console
33+
private[connect] val isDebug = System.getProperty(DEBUG_SC_JVM_CLIENT, "false").toBoolean
3434

3535
private[sql] lazy val scalaVersion = {
3636
versionNumberString.split('.') match {

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/RemoteSparkSession.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,9 @@ object SparkConnectServerUtils {
9696
// To find InMemoryTableCatalog for V2 writer tests
9797
val catalystTestJar =
9898
tryFindJar("sql/catalyst", "spark-catalyst", "spark-catalyst", test = true)
99-
.map(clientTestJar => Seq("--jars", clientTestJar.getCanonicalPath))
99+
.map(jar => Seq("--jars", jar.getCanonicalPath))
100100
.getOrElse(Seq.empty)
101101

102-
val jarsConfigs = Seq("--jars", catalystTestJar.mkString(","))
103-
104102
// Use InMemoryTableCatalog for V2 writer tests
105103
val writerV2Configs = Seq(
106104
"--conf",
@@ -127,7 +125,7 @@ object SparkConnectServerUtils {
127125
Seq("--conf", s"spark.sql.catalogImplementation=$catalogImplementation")
128126
}
129127

130-
jarsConfigs ++ writerV2Configs ++ hiveTestConfigs
128+
catalystTestJar ++ writerV2Configs ++ hiveTestConfigs
131129
}
132130

133131
def start(): Unit = {
@@ -198,9 +196,10 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll {
198196
debug(error)
199197
throw error
200198
}
201-
}
202199

203-
addClientTestArtifactInServerClasspath(spark)
200+
// Add client test jar into the spark session classpath
201+
addClientTestArtifactInServerClasspath(spark)
202+
}
204203
}
205204

206205
// For UDF maven E2E tests, the server needs the client test code to find the UDFs defined in
@@ -214,8 +213,8 @@ trait RemoteSparkSession extends ConnectFunSuite with BeforeAndAfterAll {
214213
// So we skip building or finding this jar for SBT.
215214
"sbt-tests-do-not-need-this-jar",
216215
"spark-connect-client-jvm",
217-
test = testJar
218-
).foreach(clientTestJar => session.addArtifact(clientTestJar.getCanonicalPath))
216+
test = testJar).foreach(clientTestJar =>
217+
session.addArtifact(clientTestJar.getCanonicalPath))
219218
}
220219

221220
override def afterAll(): Unit = {

connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,10 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
164164
val urls = getSparkConnectAddedJars :+ classDir.toUri.toURL
165165
val stubClassLoader =
166166
StubClassLoader(null, SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES))
167-
new ChildFirstURLClassLoader(urls.toArray, stubClassLoader, Utils.getContextOrSparkClassLoader)
167+
new ChildFirstURLClassLoader(
168+
urls.toArray,
169+
stubClassLoader,
170+
Utils.getContextOrSparkClassLoader)
168171
}
169172

170173
/**

connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/StubClassLoaderSuite.scala

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,8 @@ class StubClassLoaderSuite extends SparkFunSuite {
6161
val stubClassLoader = new RecordedStubClassLoader(null, _ => true)
6262

6363
// Install artifact without class A.
64-
val sessionClassLoader = new ChildFirstURLClassLoader(
65-
Array(udfNoAJar),
66-
stubClassLoader,
67-
sysClassLoader
68-
)
64+
val sessionClassLoader =
65+
new ChildFirstURLClassLoader(Array(udfNoAJar), stubClassLoader, sysClassLoader)
6966
// Load udf with A used in the same class.
7067
deserializeUdf(sessionClassLoader)
7168
// Class A should be stubbed.
@@ -77,25 +74,22 @@ class StubClassLoaderSuite extends SparkFunSuite {
7774
val sysClassLoader = getClass.getClassLoader()
7875
val stubClassLoader = new RecordedStubClassLoader(null, _ => true)
7976

80-
val cl1 = new ChildFirstURLClassLoader(
81-
Array.empty,
82-
stubClassLoader,
83-
sysClassLoader)
77+
val cl1 = new ChildFirstURLClassLoader(Array.empty, stubClassLoader, sysClassLoader)
8478

8579
// Failed to load dummy udf
86-
intercept[Exception]{
80+
intercept[Exception] {
8781
deserializeUdf(cl1)
8882
}
8983
// Successfully stubbed the missing class.
90-
assert(stubClassLoader.lastStubbed ===
91-
"org.apache.spark.sql.connect.artifact.StubClassDummyUdf")
84+
assert(
85+
stubClassLoader.lastStubbed ===
86+
"org.apache.spark.sql.connect.artifact.StubClassDummyUdf")
9287

9388
// Creating a new class loader will unpack the udf correctly.
9489
val cl2 = new ChildFirstURLClassLoader(
9590
Array(udfNoAJar),
9691
stubClassLoader, // even with the same stub class loader.
97-
sysClassLoader
98-
)
92+
sysClassLoader)
9993
// Should be able to load after the artifact is added
10094
deserializeUdf(cl2)
10195
}
@@ -105,37 +99,32 @@ class StubClassLoaderSuite extends SparkFunSuite {
10599
val sysClassLoader = getClass.getClassLoader()
106100
val stubClassLoader = new RecordedStubClassLoader(null, _ => true)
107101

108-
val sessionClassLoader = new ChildFirstURLClassLoader(
109-
Array.empty,
110-
stubClassLoader,
111-
sysClassLoader)
102+
val sessionClassLoader =
103+
new ChildFirstURLClassLoader(Array.empty, stubClassLoader, sysClassLoader)
112104

113105
// Failed to load dummy udf
114-
val exception = intercept[Exception]{
106+
val exception = intercept[Exception] {
115107
deserializeUdf(sessionClassLoader)
116108
}
117109
// Successfully stubbed the missing class.
118-
assert(stubClassLoader.lastStubbed ===
119-
"org.apache.spark.sql.connect.artifact.StubClassDummyUdf")
110+
assert(
111+
stubClassLoader.lastStubbed ===
112+
"org.apache.spark.sql.connect.artifact.StubClassDummyUdf")
120113
// But failed to find the method on the stub class.
121114
val cause = exception.getCause
122115
assert(cause.isInstanceOf[NoSuchMethodException])
123116
assert(
124117
cause.getMessage.contains("org.apache.spark.sql.connect.artifact.StubClassDummyUdf"),
125-
cause.getMessage
126-
)
118+
cause.getMessage)
127119
}
128120

129121
private def deserializeUdf(sessionClassLoader: ClassLoader): UdfPacket = {
130-
Utils.deserialize[UdfPacket](
131-
udfByteArray,
132-
sessionClassLoader
133-
)
122+
Utils.deserialize[UdfPacket](udfByteArray, sessionClassLoader)
134123
}
135124
}
136125

137126
class RecordedStubClassLoader(parent: ClassLoader, shouldStub: String => Boolean)
138-
extends StubClassLoader(parent, shouldStub) {
127+
extends StubClassLoader(parent, shouldStub) {
139128
var lastStubbed: String = _
140129

141130
override def findClass(name: String): Class[_] = {

core/src/main/java/org/apache/spark/util/ChildFirstURLClassLoader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public ChildFirstURLClassLoader(URL[] urls, ClassLoader parent) {
4242

4343
/**
4444
* Specify the grandparent if there is a need to load in the order of
45-
* `grandparent -> urls (child) -> parent`.
45+
* `grandparent -&gt; urls (child) -&gt; parent`.
4646
*/
4747
public ChildFirstURLClassLoader(URL[] urls, ClassLoader parent, ClassLoader grandparent) {
4848
super(urls, grandparent);

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,8 @@ private[spark] class Executor(
554554
taskDescription.artifacts.jars,
555555
taskDescription.artifacts.archives,
556556
isolatedSession)
557-
// Always reset the thread class loader
557+
// Always reset the thread class loader to ensure if any updates, all threads (not only
558+
// the thread that updated the dependencies) can update to the new class loader.
558559
Thread.currentThread.setContextClassLoader(isolatedSession.replClassLoader)
559560
task = ser.deserialize[Task[Any]](
560561
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
@@ -1076,7 +1077,7 @@ private[spark] class Executor(
10761077
} else {
10771078
parent
10781079
}
1079-
logInfo(s"Created or updated repl class loader for $sessionUUID.")
1080+
logInfo(s"Created or updated repl class loader $classLoader for $sessionUUID.")
10801081
classLoader
10811082
}
10821083

@@ -1155,7 +1156,8 @@ private[spark] class Executor(
11551156
}
11561157
}
11571158
if (updated) {
1158-
// TODO: only update the class loader if the stub class should be unloaded.
1159+
// When a new url is added for non-default class loader, recreate the class loader
1160+
// to ensure all classes are updated.
11591161
state.urlClassLoader = createClassLoader(state.urlClassLoader.getURLs, useStub = true)
11601162
state.replClassLoader =
11611163
addReplClassLoaderIfNeeded(state.urlClassLoader, state.replClassDirUri, state.sessionUUID)

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2552,10 +2552,10 @@ package object config {
25522552
ConfigBuilder("spark.connect.scalaUdf.stubClasses")
25532553
.internal()
25542554
.doc("""
2555-
|Comma separated list of binary names of classes/packages that should be stub during the
2556-
|Scala UDF serdeser and execution if not found on the server classpath.
2555+
|Comma-separated list of binary names of classes/packages that should be stubbed during
2556+
|the Scala UDF serde and execution if not found on the server classpath.
25572557
|An empty list effectively disables stubbing for all missing classes.
2558-
|By default the server stubs classes from the Scala client package.
2558+
|By default, the server stubs classes from the Scala client package.
25592559
|""".stripMargin)
25602560
.version("3.5.0")
25612561
.stringConf

core/src/main/scala/org/apache/spark/util/StubClassLoader.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class StubClassLoader(parent: ClassLoader, shouldStub: String => Boolean)
3333
throw new ClassNotFoundException(name)
3434
}
3535
val bytes = StubClassLoader.generateStub(name)
36+
// scalastyle:off println
37+
println("###stub class: " + name)
3638
defineClass(name, bytes, 0, bytes.length)
3739
}
3840
}

0 commit comments

Comments
 (0)