Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.spark.deploy.worker

import org.scalatest.Matchers
import org.scalatest.{BeforeAndAfter, Matchers}

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, ExecutorState}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.rpc.{RpcAddress, RpcEnv}

class WorkerSuite extends SparkFunSuite with Matchers {
class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {

import org.apache.spark.deploy.DeployTestUtils._

Expand All @@ -34,6 +34,25 @@ class WorkerSuite extends SparkFunSuite with Matchers {
}
def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)

private var _worker: Worker = _

private def makeWorker(conf: SparkConf): Worker = {
assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
_worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, securityMgr)
_worker
}

after {
if (_worker != null) {
_worker.rpcEnv.shutdown()
_worker.rpcEnv.awaitTermination()
_worker = null
}
}

test("test isUseLocalNodeSSLConfig") {
Worker.isUseLocalNodeSSLConfig(cmd("-Dasdf=dfgh")) shouldBe false
Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=true")) shouldBe true
Expand Down Expand Up @@ -65,9 +84,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
test("test clearing of finishedExecutors (small number of executors)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedExecutors", 2.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, new SecurityManager(conf))
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 5) {
worker.executors += s"app1/$i" -> createExecutorRunner(i)
Expand All @@ -91,9 +108,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
test("test clearing of finishedExecutors (more executors)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedExecutors", 30.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, new SecurityManager(conf))
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 50) {
worker.executors += s"app1/$i" -> createExecutorRunner(i)
Expand Down Expand Up @@ -126,9 +141,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
test("test clearing of finishedDrivers (small number of drivers)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedDrivers", 2.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, new SecurityManager(conf))
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 5) {
val driverId = s"driverId-$i"
Expand All @@ -152,9 +165,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
test("test clearing of finishedDrivers (more drivers)") {
val conf = new SparkConf()
conf.set("spark.worker.ui.retainedDrivers", 30.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
"Worker", "/tmp", conf, new SecurityManager(conf))
val worker = makeWorker(conf)
// initialize workers
for (i <- 0 until 50) {
val driverId = s"driverId-$i"
Expand Down