Skip to content

Commit ee4989e

Browse files
zhenlineoHyukjinKwon
authored andcommitted
[SPARK-43744][CONNECT][FOLLOW-UP] Throw error from the constructor
### What changes were proposed in this pull request? Made the stub constructor to throw ClassNotFoundException if called. A tiny improvement to not recreate class loaders in executor if stubbing is not enabled. ### Why are the changes needed? Enhancement to #42069 Should be merged to 3.5. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests. Closes #42222 from zhenlineo/error-from-constuctor. Authored-by: Zhen Li <zhenlineo@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 5df1d79) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 93c2973 commit ee4989e

File tree

5 files changed

+50
-16
lines changed

5 files changed

+50
-16
lines changed

connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.{LocalFileSystem, Path => FSPath}
3131

3232
import org.apache.spark.{JobArtifactSet, JobArtifactState, SparkContext, SparkEnv}
3333
import org.apache.spark.internal.Logging
34-
import org.apache.spark.internal.config.CONNECT_SCALA_UDF_STUB_CLASSES
34+
import org.apache.spark.internal.config.CONNECT_SCALA_UDF_STUB_PREFIXES
3535
import org.apache.spark.sql.SparkSession
3636
import org.apache.spark.sql.connect.artifact.util.ArtifactUtils
3737
import org.apache.spark.sql.connect.config.Connect.CONNECT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL
@@ -162,9 +162,9 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
162162
*/
163163
def classloader: ClassLoader = {
164164
val urls = getSparkConnectAddedJars :+ classDir.toUri.toURL
165-
val loader = if (SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES).nonEmpty) {
165+
val loader = if (SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES).nonEmpty) {
166166
val stubClassLoader =
167-
StubClassLoader(null, SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_CLASSES))
167+
StubClassLoader(null, SparkEnv.get.conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES))
168168
new ChildFirstURLClassLoader(
169169
urls.toArray,
170170
stubClassLoader,

connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/StubClassLoaderSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,21 @@ class StubClassLoaderSuite extends SparkFunSuite {
5555
}
5656
}
5757

58+
test("call stub class default constructor") {
59+
val cl = new RecordedStubClassLoader(getClass().getClassLoader(), _ => true)
60+
// scalastyle:off classforname
61+
val cls = Class.forName("my.name.HelloWorld", false, cl)
62+
// scalastyle:on classforname
63+
assert(cl.lastStubbed === "my.name.HelloWorld")
64+
val error = intercept[java.lang.reflect.InvocationTargetException] {
65+
cls.getDeclaredConstructor().newInstance()
66+
}
67+
assert(
68+
error.getCause != null && error.getCause.getMessage.contains(
69+
"Fail to initiate the class my.name.HelloWorld because it is stubbed"),
70+
error)
71+
}
72+
5873
test("stub missing class") {
5974
val sysClassLoader = getClass.getClassLoader()
6075
val stubClassLoader = new RecordedStubClassLoader(null, _ => true)

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ private[spark] class Executor(
174174
val currentFiles = new HashMap[String, Long]
175175
val currentJars = new HashMap[String, Long]
176176
val currentArchives = new HashMap[String, Long]
177-
val urlClassLoader = createClassLoader(currentJars, !isDefaultState(jobArtifactState.uuid))
177+
val urlClassLoader =
178+
createClassLoader(currentJars, isStubbingEnabledForState(jobArtifactState.uuid))
178179
val replClassLoader = addReplClassLoaderIfNeeded(
179180
urlClassLoader, jobArtifactState.replClassDirUri, jobArtifactState.uuid)
180181
new IsolatedSessionState(
@@ -186,6 +187,11 @@ private[spark] class Executor(
186187
)
187188
}
188189

190+
private def isStubbingEnabledForState(name: String) = {
191+
!isDefaultState(name) &&
192+
conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES).nonEmpty
193+
}
194+
189195
private def isDefaultState(name: String) = name == "default"
190196

191197
// Classloader isolation
@@ -1031,8 +1037,8 @@ private[spark] class Executor(
10311037
urls.mkString("'", ",", "'")
10321038
)
10331039

1034-
if (useStub && conf.get(CONNECT_SCALA_UDF_STUB_CLASSES).nonEmpty) {
1035-
createClassLoaderWithStub(urls, conf.get(CONNECT_SCALA_UDF_STUB_CLASSES))
1040+
if (useStub) {
1041+
createClassLoaderWithStub(urls, conf.get(CONNECT_SCALA_UDF_STUB_PREFIXES))
10361042
} else {
10371043
createClassLoader(urls)
10381044
}
@@ -1093,7 +1099,7 @@ private[spark] class Executor(
10931099
state: IsolatedSessionState,
10941100
testStartLatch: Option[CountDownLatch] = None,
10951101
testEndLatch: Option[CountDownLatch] = None): Unit = {
1096-
var updated = false;
1102+
var renewClassLoader = false;
10971103
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
10981104
updateDependenciesLock.lockInterruptibly()
10991105
try {
@@ -1149,15 +1155,14 @@ private[spark] class Executor(
11491155
if (!state.urlClassLoader.getURLs().contains(url)) {
11501156
logInfo(s"Adding $url to class loader ${state.sessionUUID}")
11511157
state.urlClassLoader.addURL(url)
1152-
if (!isDefaultState(state.sessionUUID)) {
1153-
updated = true
1158+
if (isStubbingEnabledForState(state.sessionUUID)) {
1159+
renewClassLoader = true
11541160
}
11551161
}
11561162
}
11571163
}
1158-
if (updated) {
1159-
// When a new url is added for non-default class loader, recreate the class loader
1160-
// to ensure all classes are updated.
1164+
if (renewClassLoader) {
1165+
// Recreate the class loader to ensure all classes are updated.
11611166
state.urlClassLoader = createClassLoader(state.urlClassLoader.getURLs, useStub = true)
11621167
state.replClassLoader =
11631168
addReplClassLoaderIfNeeded(state.urlClassLoader, state.replClassDirUri, state.sessionUUID)

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2541,8 +2541,8 @@ package object config {
25412541
.booleanConf
25422542
.createWithDefault(false)
25432543

2544-
private[spark] val CONNECT_SCALA_UDF_STUB_CLASSES =
2545-
ConfigBuilder("spark.connect.scalaUdf.stubClasses")
2544+
private[spark] val CONNECT_SCALA_UDF_STUB_PREFIXES =
2545+
ConfigBuilder("spark.connect.scalaUdf.stubPrefixes")
25462546
.internal()
25472547
.doc("""
25482548
|Comma-separated list of binary names of classes/packages that should be stubbed during

core/src/main/scala/org/apache/spark/util/StubClassLoader.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,22 @@ object StubClassLoader {
7070
"()V",
7171
false)
7272

73-
ctorWriter.visitInsn(Opcodes.RETURN)
74-
ctorWriter.visitMaxs(1, 1)
73+
val internalException: String = "java/lang/ClassNotFoundException"
74+
ctorWriter.visitTypeInsn(Opcodes.NEW, internalException)
75+
ctorWriter.visitInsn(Opcodes.DUP)
76+
ctorWriter.visitLdcInsn(
77+
s"Fail to initiate the class $binaryName because it is stubbed. " +
78+
"Please install the artifact of the missing class by calling session.addArtifact.")
79+
// Invoke throwable constructor
80+
ctorWriter.visitMethodInsn(
81+
Opcodes.INVOKESPECIAL,
82+
internalException,
83+
"<init>",
84+
"(Ljava/lang/String;)V",
85+
false)
86+
87+
ctorWriter.visitInsn(Opcodes.ATHROW)
88+
ctorWriter.visitMaxs(3, 3)
7589
ctorWriter.visitEnd()
7690
classWriter.visitEnd()
7791
classWriter.toByteArray

0 commit comments

Comments
 (0)