Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,23 @@ private[spark] case class SSLOptions(
* are supported by the current Java security provider for this protocol.
*/
private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) {
Set()
Set.empty
} else {
var context: SSLContext = null
try {
context = SSLContext.getInstance(protocol.orNull)
/* The set of supported algorithms does not depend upon the keys, trust, or
if (protocol.isEmpty) {
logDebug("No SSL protocol specified")
context = SSLContext.getDefault
} else {
try {
context = SSLContext.getInstance(protocol.get)
/* The set of supported algorithms does not depend upon the keys, trust, or
rng, although they will influence which algorithms are eventually used. */
context.init(null, null, null)
} catch {
case npe: NullPointerException =>
logDebug("No SSL protocol specified")
context = SSLContext.getDefault
case nsa: NoSuchAlgorithmException =>
logDebug(s"No support for requested SSL protocol ${protocol.get}")
context = SSLContext.getDefault
context.init(null, null, null)
} catch {
case nsa: NoSuchAlgorithmException =>
logDebug(s"No support for requested SSL protocol ${protocol.get}")
context = SSLContext.getDefault
}
}

val providerAlgorithms = context.getServerSocketFactory.getSupportedCipherSuites.toSet
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ object SparkEnv extends Logging {
if (!conf.contains("spark.scheduler.mode")) {
Seq(("spark.scheduler.mode", schedulingMode))
} else {
Seq[(String, String)]()
Seq.empty[(String, String)]
}
val sparkProperties = (conf.getAll ++ schedulerMode).sorted

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ private[spark] object TestUtils {
def createJarWithClasses(
classNames: Seq[String],
toStringValue: String = "",
classNamesWithBase: Seq[(String, String)] = Seq(),
classpathUrls: Seq[URL] = Seq()): URL = {
classNamesWithBase: Seq[(String, String)] = Seq.empty,
classpathUrls: Seq[URL] = Seq.empty): URL = {
val tempDir = Utils.createTempDir()
val files1 = for (name <- classNames) yield {
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
Expand Down Expand Up @@ -137,7 +137,7 @@ private[spark] object TestUtils {
val options = if (classpathUrls.nonEmpty) {
Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator))
} else {
Seq()
Seq.empty
}
compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call()

Expand All @@ -160,7 +160,7 @@ private[spark] object TestUtils {
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
classpathUrls: Seq[URL] = Seq.empty): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
}
}
}
super.finalize()
}
}
// scalastyle:on no.finalize
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ private[spark] object SerDe {
}

def readBoolean(in: DataInputStream): Boolean = {
val intVal = in.readInt()
if (intVal == 0) false else true
in.readInt() != 0
}

def readDate(in: DataInputStream): Date = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ class SparkHadoopUtil extends Logging {
if (credentials != null) {
credentials.getAllTokens.asScala.map(tokenToString)
} else {
Seq()
Seq.empty
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newLastScanTime = getNewLastScanTime()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
.getOrElse(Seq.empty[FileStatus])
// scan for modified applications, replay and merge them
val logInfos: Seq[FileStatus] = statusList
.filter { entry =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class MasterWebUI(
}

def addProxyTargets(id: String, target: String): Unit = {
var endTarget = target.stripSuffix("/")
val endTarget = target.stripSuffix("/")
val handler = createProxyHandler("/proxy/" + id, endTarget)
attachHandler(handler)
proxyHandlers(id) = handler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object CommandUtils extends Logging {
memory: Int,
sparkHome: String,
substituteArguments: String => String,
classPaths: Seq[String] = Seq[String](),
classPaths: Seq[String] = Seq.empty,
env: Map[String, String] = sys.env): ProcessBuilder = {
val localCommand = buildLocalCommand(
command, securityMgr, substituteArguments, classPaths, env)
Expand Down Expand Up @@ -73,7 +73,7 @@ object CommandUtils extends Logging {
command: Command,
securityMgr: SecurityManager,
substituteArguments: String => String,
classPath: Seq[String] = Seq[String](),
classPath: Seq[String] = Seq.empty,
env: Map[String, String]): Command = {
val libraryPathName = Utils.libraryPathEnvName
val libraryPathEntries = command.libraryPathEntries
Expand All @@ -96,7 +96,7 @@ object CommandUtils extends Logging {
command.arguments.map(substituteArguments),
newEnvironment,
command.classPathEntries ++ classPath,
Seq[String](), // library path already captured in environment variable
Seq.empty, // library path already captured in environment variable
// filter out auth secret from java options
command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF)))
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache

import java.util.Properties

/**
* Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to
* Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection,
Expand All @@ -40,9 +42,6 @@ package org.apache
* Developer API</span> are intended for advanced users want to extend Spark through lower
* level interfaces. These are subject to changes or removal in minor releases.
*/

import java.util.Properties

package object spark {

private object SparkBuildInfo {
Expand All @@ -57,6 +56,9 @@ package object spark {

val resourceStream = Thread.currentThread().getContextClassLoader.
getResourceAsStream("spark-version-info.properties")
if (resourceStream == null) {
throw new SparkException("Could not find spark-version-info.properties")
}

try {
val unknownProp = "<unknown>"
Expand All @@ -71,8 +73,6 @@ package object spark {
props.getProperty("date", unknownProp)
)
} catch {
case npe: NullPointerException =>
throw new SparkException("Error while locating file spark-version-info.properties", npe)
case e: Exception =>
throw new SparkException("Error loading properties from spark-version-info.properties", e)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
tries = 0
// if we don't have enough partition groups, create duplicates
while (numCreated < targetLen) {
var (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
tries += 1
val pgroup = new PartitionGroup(Some(nxt_replica))
groupArr += pgroup
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[spark] class Pool(
}

override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue =
schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[spark] class DirectTaskResult[T](

val numUpdates = in.readInt
if (numUpdates == 0) {
accumUpdates = Seq()
accumUpdates = Seq.empty
} else {
val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]]
for (i <- 0 until numUpdates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ private[spark] class TaskSetManager(
override def removeSchedulable(schedulable: Schedulable) {}

override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
sortedTaskSetQueue += this
sortedTaskSetQueue
}
Expand Down Expand Up @@ -948,7 +948,7 @@ private[spark] class TaskSetManager(

if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
val time = clock.getTimeMillis()
var medianDuration = successfulTaskDurations.median
val medianDuration = successfulTaskDurations.median
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
// TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future
import scala.concurrent.duration.Duration

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -427,11 +426,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* be called in the yarn-client mode when AM re-registers after a failure.
* */
protected def reset(): Unit = {
val executors = synchronized {
val executors: Set[String] = synchronized {
requestedTotalExecutors = 0
numPendingExecutors = 0
executorsPendingToRemove.clear()
Set() ++ executorDataMap.keys
executorDataMap.keys.toSet
}

// Remove all the lingering executors that should be removed but not yet. The reason might be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,11 +1275,11 @@ private[spark] class BlockManager(
val numPeersToReplicateTo = level.replication - 1
val startTime = System.nanoTime

var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
val peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
val peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
var numFailures = 0

val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_))
val initialPeers = getPeers(false).filterNot(existingReplicas.contains)

var peersForReplication = blockReplicationPolicy.prioritize(
blockManagerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}.getOrElse(jobIdTitle)
val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
// New jobs should be shown above old jobs by default.
if (jobSortColumn == jobIdTitle) true else false
jobSortColumn == jobIdTitle
)
val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
if (sc.isDefined && isFairScheduler) {
<h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
} else {
Seq[Node]()
Seq.empty[Node]
}
}
if (shouldShowActiveStages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.get(poolName) match {
case Some(s) => s.values.toSeq
case None => Seq[StageInfo]()
case None => Seq.empty[StageInfo]
}
val shouldShowActiveStages = activeStages.nonEmpty
val activeStagesTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val executorTable = new ExecutorTable(stageId, stageAttemptId, parent)

val maybeAccumulableTable: Seq[Node] =
if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()
if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq.empty

val aggMetrics =
<span class="collapse-aggregated-metrics collapse-table"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private[ui] class StageTableBase(
}.getOrElse("Stage Id")
val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse(
// New stages should be shown above old jobs by default.
if (stageSortColumn == "Stage Id") true else false
stageSortColumn == "Stage Id"
)
val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100)
val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true)
.getOrElse {
// Rather than crashing, render an "RDD Not Found" page
return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent)
return UIUtils.headerSparkPage("RDD Not Found", Seq.empty[Node], parent)
}

// Worker table
Expand Down
26 changes: 14 additions & 12 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[spark] object ClosureCleaner extends Logging {
val stack = Stack[Class[_]](obj.getClass)
while (!stack.isEmpty) {
val cr = getClassReader(stack.pop())
val set = Set[Class[_]]()
val set = Set.empty[Class[_]]
cr.accept(new InnerClosureFinder(set), 0)
for (cls <- set -- seen) {
seen += cls
Expand Down Expand Up @@ -180,16 +180,18 @@ private[spark] object ClosureCleaner extends Logging {
val declaredFields = func.getClass.getDeclaredFields
val declaredMethods = func.getClass.getDeclaredMethods

logDebug(" + declared fields: " + declaredFields.size)
declaredFields.foreach { f => logDebug(" " + f) }
logDebug(" + declared methods: " + declaredMethods.size)
declaredMethods.foreach { m => logDebug(" " + m) }
logDebug(" + inner classes: " + innerClasses.size)
innerClasses.foreach { c => logDebug(" " + c.getName) }
logDebug(" + outer classes: " + outerClasses.size)
outerClasses.foreach { c => logDebug(" " + c.getName) }
logDebug(" + outer objects: " + outerObjects.size)
outerObjects.foreach { o => logDebug(" " + o) }
if (log.isDebugEnabled) {
logDebug(" + declared fields: " + declaredFields.size)
declaredFields.foreach { f => logDebug(" " + f) }
logDebug(" + declared methods: " + declaredMethods.size)
declaredMethods.foreach { m => logDebug(" " + m) }
logDebug(" + inner classes: " + innerClasses.size)
innerClasses.foreach { c => logDebug(" " + c.getName) }
logDebug(" + outer classes: " + outerClasses.size)
outerClasses.foreach { c => logDebug(" " + c.getName) }
logDebug(" + outer objects: " + outerObjects.size)
outerObjects.foreach { o => logDebug(" " + o) }
}

// Fail fast if we detect return statements in closures
getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
Expand All @@ -201,7 +203,7 @@ private[spark] object ClosureCleaner extends Logging {
// Initialize accessed fields with the outer classes first
// This step is needed to associate the fields to the correct classes later
for (cls <- outerClasses) {
accessedFields(cls) = Set[String]()
accessedFields(cls) = Set.empty[String]
}
// Populate accessed fields by visiting all fields and methods accessed by this and
// all of its inner closures. If transitive cleaning is enabled, this may recursively
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ private[spark] object JsonProtocol {
val accumulatedValues = {
Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
case None => Seq.empty[AccumulableInfo]
}
}

Expand Down Expand Up @@ -726,7 +726,7 @@ private[spark] object JsonProtocol {
val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean])
val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
case None => Seq.empty[AccumulableInfo]
}

val taskInfo =
Expand Down
Loading