Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private[spark] class SparkSubmit extends Logging {
}

// Set the deploy mode; default is client mode
var deployMode: Int = args.deployMode match {
val deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private[spark] class TaskSetManager(
val weight = 1
val minShare = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
val stageId = taskSet.stageId
val name = "TaskSet_" + taskSet.id
var parent: Pool = null
private var totalResultSize = 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ private class LiveStage extends LiveEntity {

// Used for cleanup of tasks after they reach the configured limit. Not written to the store.
@volatile var cleaning = false
var savedTasks = new AtomicInteger(0)
val savedTasks = new AtomicInteger(0)

def executorSummary(executorId: String): LiveExecutorStageSummary = {
executorSummaries.getOrElseUpdate(executorId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ private[spark] class MedianHeap(implicit val ord: Ordering[Double]) {
* Stores all the numbers less than the current median in a smallerHalf,
* i.e median is the maximum, at the root.
*/
private[this] var smallerHalf = PriorityQueue.empty[Double](ord)
private[this] val smallerHalf = PriorityQueue.empty[Double](ord)

/**
* Stores all the numbers greater than the current median in a largerHalf,
* i.e median is the minimum, at the root.
*/
private[this] var largerHalf = PriorityQueue.empty[Double](ord.reverse)
private[this] val largerHalf = PriorityQueue.empty[Double](ord.reverse)

def isEmpty(): Boolean = {
smallerHalf.isEmpty && largerHalf.isEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ trait SharedSparkContext extends BeforeAndAfterAll with BeforeAndAfterEach { sel

def sc: SparkContext = _sc

var conf = new SparkConf(false)
val conf = new SparkConf(false)

/**
* Initialize the [[SparkContext]]. Generally, this is just called from beforeAll; however, in
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
val conf = new SparkConf()
conf.set(TASK_GPU_ID.amountConf, "2")
conf.set(TASK_FPGA_ID.amountConf, "0")
var taskResourceRequirement =
val taskResourceRequirement =
parseResourceRequirements(conf, SPARK_TASK_PREFIX)
.map(req => (req.resourceName, req.amount)).toMap

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
.setAppName("test-cluster")
conf.set(TASK_GPU_ID.amountConf, "1")

var error = intercept[SparkException] {
val error = intercept[SparkException] {
sc = new SparkContext(conf)
}.getMessage()

Expand All @@ -954,7 +954,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
conf.set(TASK_GPU_ID.amountConf, "2")
conf.set(EXECUTOR_GPU_ID.amountConf, "1")

var error = intercept[SparkException] {
val error = intercept[SparkException] {
sc = new SparkContext(conf)
}.getMessage()

Expand All @@ -970,7 +970,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
conf.set(TASK_GPU_ID.amountConf, "2")
conf.set(EXECUTOR_GPU_ID.amountConf, "4")

var error = intercept[SparkException] {
val error = intercept[SparkException] {
sc = new SparkContext(conf)
}.getMessage()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
var getAppUICount = 0L
var attachCount = 0L
var detachCount = 0L
var updateProbeCount = 0L
val updateProbeCount = 0L

override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
logDebug(s"getAppUI($appId, $attemptId)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class SingleFileEventLogFileReaderSuite extends EventLogFileReadersSuite {
Utils.tryWithResource(new ZipInputStream(
new ByteArrayInputStream(underlyingStream.toByteArray))) { is =>

var entry = is.getNextEntry
val entry = is.getNextEntry
assert(entry != null)
val actual = new String(ByteStreams.toByteArray(is), StandardCharsets.UTF_8)
val expected = Files.toString(new File(logPath.toString), StandardCharsets.UTF_8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
val id = seq.toString
override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq,
conf, new SecurityManager(conf))
var apps = new mutable.HashMap[String, String]()
val apps = new mutable.HashMap[String, String]()
val driverIdToAppId = new mutable.HashMap[String, String]()
def newDriver(driverId: String): RpcEndpointRef = {
val name = s"driver_${drivers.size}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val testResourceArgs: JObject = ("" -> "")
val ja = JArray(List(testResourceArgs))
val f1 = createTempJsonFile(tmpDir, "resources", ja)
var error = intercept[SparkException] {
val error = intercept[SparkException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()

Expand Down Expand Up @@ -146,7 +146,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = createTempJsonFile(tmpDir, "resources", ja)

var error = intercept[IllegalArgumentException] {
val error = intercept[IllegalArgumentException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()

Expand All @@ -160,7 +160,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val ja = Extraction.decompose(Seq(fpga))
val f1 = createTempJsonFile(tmpDir, "resources", ja)

var error = intercept[SparkException] {
val error = intercept[SparkException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()

Expand Down Expand Up @@ -199,7 +199,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = createTempJsonFile(tmpDir, "resources", ja)

var error = intercept[IllegalArgumentException] {
val error = intercept[IllegalArgumentException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val gpuExecReq =
new ExecutorResourceRequests().resource("gpu", 2, "someScript")
val immrprof = rprof.require(gpuExecReq).build
var error = intercept[SparkException] {
val error = intercept[SparkException] {
rpmanager.isSupported(immrprof)
}.getMessage()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class ResourceUtilsSuite extends SparkFunSuite
test("list resource ids") {
val conf = new SparkConf
conf.set(DRIVER_GPU_ID.amountConf, "2")
var resources = listResourceIds(conf, SPARK_DRIVER_PREFIX)
val resources = listResourceIds(conf, SPARK_DRIVER_PREFIX)
assert(resources.size === 1, "should only have GPU for resource")
assert(resources(0).resourceName == GPU, "name should be gpu")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
Thread.sleep(5000)
iter
}
var taskStarted = new AtomicBoolean(false)
var taskEnded = new AtomicBoolean(false)
val taskStarted = new AtomicBoolean(false)
val taskEnded = new AtomicBoolean(false)
val listener = new SparkListener() {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
taskStarted.set(true)
Expand Down Expand Up @@ -239,11 +239,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
var execResources = backend.getExecutorAvailableResources("1")
assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))

var exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
assert(exec3ResourceProfileId === rp.id)

val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
var taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
"t1", 0, 1, mutable.Map.empty[String, Long],
mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
new Properties(), taskResources, bytebuffer)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3336,7 +3336,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val ereqs3 = new ExecutorResourceRequests().cores(3)
val treqs3 = new TaskResourceRequests().cpus(2)
val rp3 = new ResourceProfile(ereqs3.requests, treqs3.requests)
var mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, rp3))
val mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, rp3))

assert(mergedRp.getTaskCpus.get == 2)
assert(mergedRp.getExecutorCores.get == 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val tsm = stageToMockTaskSetManager(0)

// submit an offer with one executor
var taskAttempts = taskScheduler.resourceOffers(IndexedSeq(
val taskAttempts = taskScheduler.resourceOffers(IndexedSeq(
WorkerOffer("executor0", "host0", 1)
)).flatten

Expand Down Expand Up @@ -1763,7 +1763,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B

taskScheduler.submitTasks(taskSet)
// Launch tasks on executor that satisfies resource requirements.
var taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
val taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
assert(3 === taskDescriptions.length)
assert(!failedTaskSet)
assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.addresses)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {

test("shuffle encryption key length should be 128 by default") {
val conf = createConf()
var key = CryptoStreamUtils.createKey(conf)
val key = CryptoStreamUtils.createKey(conf)
val actual = key.length * (java.lang.Byte.SIZE)
assert(actual === 128)
}

test("create 256-bit key") {
val conf = createConf(IO_ENCRYPTION_KEY_SIZE_BITS.key -> "256")
var key = CryptoStreamUtils.createKey(conf)
val key = CryptoStreamUtils.createKey(conf)
val actual = key.length * (java.lang.Byte.SIZE)
assert(actual === 256)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class ElementTrackingStoreSuite extends SparkFunSuite with Eventually {
val tracking = new ElementTrackingStore(store, new SparkConf()
.set(ASYNC_TRACKING_ENABLED, true))

var done = new AtomicBoolean(false)
var type1 = new AtomicInteger(0)
val done = new AtomicBoolean(false)
val type1 = new AtomicInteger(0)
var queued0: WriteQueueResult = null
var queued1: WriteQueueResult = null
var queued2: WriteQueueResult = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
val expand_size = 100
val data = sc.parallelize((1 to 5).toSeq).
flatMap( x => Stream.range(0, expand_size))
var persisted = data.persist(StorageLevel.DISK_ONLY)
val persisted = data.persist(StorageLevel.DISK_ONLY)
assert(persisted.count()===500)
assert(persisted.filter(_==1).count()===5)
}
Expand All @@ -48,7 +48,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
val expand_size = 100
val data = sc.parallelize((1 to 5).toSeq).
flatMap(x => Stream.range(0, expand_size))
var persisted = data.persist(StorageLevel.MEMORY_ONLY)
val persisted = data.persist(StorageLevel.MEMORY_ONLY)
assert(persisted.count()===500)
assert(persisted.filter(_==1).count()===5)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class MemoryStoreSuite
with BeforeAndAfterEach
with ResetSystemProperties {

var conf: SparkConf = new SparkConf(false)
val conf: SparkConf = new SparkConf(false)
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)

// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object TestObject {
}

class TestClass extends Serializable {
var x = 5
val x = 5

def getX: Int = x

Expand Down Expand Up @@ -179,7 +179,7 @@ class TestClassWithoutFieldAccess {

def run(): Int = {
var nonSer2 = new NonSerializable
var x = 5
val x = 5
withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + x).reduce(_ + _)
Expand Down Expand Up @@ -218,10 +218,10 @@ object TestObjectWithNesting {
var answer = 0
withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
var y = 1
val y = 1
for (i <- 1 to 4) {
var nonSer2 = new NonSerializable
var x = i
val x = i
answer += nums.map(_ + x + y).reduce(_ + _)
}
answer
Expand All @@ -239,7 +239,7 @@ class TestClassWithNesting(val y: Int) extends Serializable {
val nums = sc.parallelize(Array(1, 2, 3, 4))
for (i <- 1 to 4) {
var nonSer2 = new NonSerializable
var x = i
val x = i
answer += nums.map(_ + x + getY).reduce(_ + _)
}
answer
Expand Down Expand Up @@ -339,7 +339,7 @@ private object TestUserClosuresActuallyCleaned {

class TestCreateNullValue {

var x = 5
val x = 5

def getX: Int = x

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
RandomSampler.defaultMaxGapSamplingFraction should be (0.4)

var d: Double = 0.0
var sampler = new BernoulliSampler[Int](0.1)
val sampler = new BernoulliSampler[Int](0.1)
sampler.setSeed(rngSeed.nextLong)

// Array iterator (indexable type)
Expand Down Expand Up @@ -547,7 +547,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
RandomSampler.defaultMaxGapSamplingFraction should be (0.4)

var d: Double = 0.0
var sampler = new PoissonSampler[Int](0.1)
val sampler = new PoissonSampler[Int](0.1)
sampler.setSeed(rngSeed.nextLong)

// Array iterator (indexable type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object LocalKMeans {
}
}

var newPoints = pointStats.map {mapping =>
val newPoints = pointStats.map { mapping =>
(mapping._1, mapping._2._1 * (1.0 / mapping._2._2))}

tempDist = 0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {

// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
var sentCount = sent.values.sum
val sentCount = sent.values.sum

val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams,
Array(OffsetRange(topic, 0, 0, sentCount)), preferredHosts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[mllib] object EigenValueDecomposition {
// "LM" : compute the NEV largest (in magnitude) eigenvalues
val which = "LM"

var iparam = new Array[Int](11)
val iparam = new Array[Int](11)
// use exact shift in each iteration
iparam(0) = 1
// maximum number of Arnoldi update iterations, or the actual number of iterations on output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite
(3, 4)
)).toDF("src", "dst").repartition(1)

var assignments2 = new PowerIterationClustering()
val assignments2 = new PowerIterationClustering()
.setInitMode("random")
.setK(2)
.assignClusters(data2)
Expand All @@ -238,7 +238,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite
assert(Set(predictions2(0).size, predictions2(1).size) !== Set(2, 3))


var assignments3 = new PowerIterationClustering()
val assignments3 = new PowerIterationClustering()
.setInitMode("degree")
.setK(2)
.assignClusters(data2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private[joins] class UnsafeHashedRelation(
var resultRow = new UnsafeRow(numFields)

// re-used in getWithKeyIndex()/getValueWithKeyIndex()/valuesWithKeyIndex()
var valueRowWithKeyIndex = new ValueRowWithKeyIndex
val valueRowWithKeyIndex = new ValueRowWithKeyIndex

override def get(key: InternalRow): Iterator[InternalRow] = {
val unsafeKey = key.asInstanceOf[UnsafeRow]
Expand Down
Loading