Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 0 additions & 181 deletions core/src/main/scala/org/apache/spark/HttpServer.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class ExecutorClassLoaderSuite
var tempDir2: File = _
var url1: String = _
var urls2: Array[URL] = _
var classServer: HttpServer = _

override def beforeAll() {
super.beforeAll()
Expand All @@ -74,9 +73,6 @@ class ExecutorClassLoaderSuite

override def afterAll() {
try {
if (classServer != null) {
classServer.stop()
}
Utils.deleteRecursively(tempDir1)
Utils.deleteRecursively(tempDir2)
SparkEnv.set(null)
Expand Down Expand Up @@ -137,55 +133,6 @@ class ExecutorClassLoaderSuite
assert(fileReader.readLine().contains("resource"), "File doesn't contain 'resource'")
}

test("failing to fetch classes from HTTP server should not leak resources (SPARK-6209)") {
// This is a regression test for SPARK-6209, a bug where each failed attempt to load a class
// from the driver's class server would leak a HTTP connection, causing the class server's
// thread / connection pool to be exhausted.
val conf = new SparkConf()
val securityManager = new SecurityManager(conf)
classServer = new HttpServer(conf, tempDir1, securityManager)
classServer.start()
// ExecutorClassLoader uses SparkEnv's SecurityManager, so we need to mock this
val mockEnv = mock[SparkEnv]
when(mockEnv.securityManager).thenReturn(securityManager)
SparkEnv.set(mockEnv)
// Create an ExecutorClassLoader that's configured to load classes from the HTTP server
val parentLoader = new URLClassLoader(Array.empty, null)
val classLoader = new ExecutorClassLoader(conf, null, classServer.uri, parentLoader, false)
classLoader.httpUrlConnectionTimeoutMillis = 500
// Check that this class loader can actually load classes that exist
val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
val fakeClassVersion = fakeClass.toString
assert(fakeClassVersion === "1")
// Try to perform a full GC now, since GC during the test might mask resource leaks
System.gc()
// When the original bug occurs, the test thread becomes blocked in a classloading call
// and does not respond to interrupts. Therefore, use a custom ScalaTest interruptor to
// shut down the HTTP server when the test times out
val interruptor: Interruptor = new Interruptor {
override def apply(thread: Thread): Unit = {
classServer.stop()
classServer = null
thread.interrupt()
}
}
def tryAndFailToLoadABunchOfClasses(): Unit = {
// The number of trials here should be much larger than Jetty's thread / connection limit
// in order to expose thread or connection leaks
for (i <- 1 to 1000) {
if (Thread.currentThread().isInterrupted) {
throw new InterruptedException()
}
// Incorporate the iteration number into the class name in order to avoid any response
// caching that might be added in the future
intercept[ClassNotFoundException] {
classLoader.loadClass(s"ReplFakeClassDoesNotExist$i").newInstance()
}
}
}
failAfter(10 seconds)(tryAndFailToLoadABunchOfClasses())(interruptor)
}

test("fetch classes using Spark's RpcEnv") {
val env = mock[SparkEnv]
val rpcEnv = mock[RpcEnv]
Expand Down