Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ class Accumulable[R, T] (
* Set the accumulator's value; only allowed on master.
*/
def value_= (newValue: R) {
if (!deserialized) value_ = newValue
else throw new UnsupportedOperationException("Can't assign accumulator value in task")
if (!deserialized) {
value_ = newValue
} else {
throw new UnsupportedOperationException("Can't assign accumulator value in task")
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
if (k.startsWith("spark")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
}
else {
} else {
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ private[spark] class Master(
if (waitingDrivers.contains(d)) {
waitingDrivers -= d
self ! DriverStateChanged(driverId, DriverState.KILLED, None)
}
else {
} else {
// We just notify the worker to kill the driver here. The final bookkeeping occurs
// on the return path when the worker submits a state change back to the master
// to notify it that the driver was successfully killed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ private[spark] class DriverRunner(
}

val state =
if (killed) { DriverState.KILLED }
else if (finalException.isDefined) { DriverState.ERROR }
else {
if (killed) {
DriverState.KILLED
} else if (finalException.isDefined) {
DriverState.ERROR
} else {
finalExitCode match {
case Some(0) => DriverState.FINISHED
case _ => DriverState.FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
Previous {Utils.bytesToString(math.min(byteLength, startByte))}
</button>
</a>
}
else {
} else {
<button type="button" class="btn btn-default" disabled="disabled">
Previous 0 B
</button>
Expand All @@ -104,8 +103,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))}
</button>
</a>
}
else {
} else {
<button type="button" class="btn btn-default" disabled="disabled">
Next 0 B
</button>
Expand Down Expand Up @@ -137,9 +135,13 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
val logLength = file.length()
val getOffset = offset.getOrElse(logLength - defaultBytes)
val startByte =
if (getOffset < 0) 0L
else if (getOffset > logLength) logLength
else getOffset
if (getOffset < 0) {
0L
} else if (getOffset > logLength) {
logLength
} else {
getOffset
}
val logPageLength = math.min(byteLength, maxBytes)
val endByte = math.min(startByte + logPageLength, logLength)
(startByte, endByte)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ private[spark] class BlockManager(
val onDiskSize = status.diskSize
master.updateBlockInfo(
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
} else true
} else {
true
}
}

/**
Expand Down Expand Up @@ -676,7 +678,7 @@ private[spark] class BlockManager(
tachyonStore.putValues(blockId, iterator, level, false)
case ArrayBufferValues(array) =>
tachyonStore.putValues(blockId, array, level, false)
case ByteBufferValues(bytes) =>
case ByteBufferValues(bytes) =>
bytes.rewind()
tachyonStore.putBytes(blockId, bytes, level)
}
Expand All @@ -695,7 +697,7 @@ private[spark] class BlockManager(
diskStore.putValues(blockId, iterator, level, askForBytes)
case ArrayBufferValues(array) =>
diskStore.putValues(blockId, array, level, askForBytes)
case ByteBufferValues(bytes) =>
case ByteBufferValues(bytes) =>
bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin
}

override def +=(elem: A): this.type = {
if (size < maxSize) underlying.offer(elem)
else maybeReplaceLowest(elem)
if (size < maxSize) {
underlying.offer(elem)
} else {
maybeReplaceLowest(elem)
}
this
}

Expand All @@ -59,7 +62,8 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin
if (head != null && ord.gt(a, head)) {
underlying.poll()
underlying.offer(a)
} else false
} else {
false
}
}
}

4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ private[spark] class FileLogger(
* @param withTime Whether to prepend message with a timestamp
*/
def log(msg: String, withTime: Boolean = false) {
val writeInfo = if (!withTime) msg else {
val writeInfo = if (!withTime) {
msg
} else {
val date = new Date(System.currentTimeMillis())
dateFormat.get.format(date) + ": " + msg
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -811,8 +811,7 @@ private[spark] object Utils extends Logging {
} else {
el.getMethodName
}
}
else {
} else {
firstUserLine = el.getLineNumber
firstUserFile = el.getFileName
firstUserClass = el.getClassName
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val prng42 = new Random(42)
val prng43 = new Random(43)
Array(1, 2, 3, 4, 5, 6).filter{i =>
if (i < 4) 0 == prng42.nextInt(3)
else 0 == prng43.nextInt(3)}
if (i < 4) 0 == prng42.nextInt(3) else 0 == prng43.nextInt(3)
}
}
assert(sample.size === checkSample.size)
for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ object LogQuery {
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)

val dataSet =
if (args.length == 2) sc.textFile(args(1))
else sc.parallelize(exampleApacheLogs)
if (args.length == 2) sc.textFile(args(1)) else sc.parallelize(exampleApacheLogs)
// scalastyle:off
val apacheLogRegex =
"""^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ object PageViewStream {
val normalCount = statuses.filter(_ == 200).size
val errorCount = statuses.size - normalCount
val errorRatio = errorCount.toFloat / statuses.size
if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)}
else {"%s: %s".format(zip, errorRatio)}
if (errorRatio > 0.05) {
"%s: **%s**".format(zip, errorRatio)
} else {
"%s: %s".format(zip, errorRatio)
}
}

// Return the number unique users in last 15 seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,11 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
// not have any edges in the specified direction.
assert(edges.count === 50)
edges.collect.foreach {
case (vid, edges) => if (vid > 0 && vid < 49) assert(edges.size == 2)
else assert(edges.size == 1)
case (vid, edges) => if (vid > 0 && vid < 49) {
assert(edges.size == 2)
} else {
assert(edges.size == 1)
}
}
edges.collect.foreach {
case (vid, edges) =>
Expand Down
13 changes: 8 additions & 5 deletions repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ trait SparkExprTyper extends Logging {
var isIncomplete = false
reporter.withIncompleteHandler((_, _) => isIncomplete = true) {
val trees = codeParser.stmts(line)
if (reporter.hasErrors) Some(Nil)
else if (isIncomplete) None
else Some(trees)
if (reporter.hasErrors) {
Some(Nil)
} else if (isIncomplete) {
None
} else {
Some(trees)
}
}
}
// def parsesAsExpr(line: String) = {
Expand All @@ -70,8 +74,7 @@ trait SparkExprTyper extends Logging {
val sym0 = symbolOfTerm(name)
// drop NullaryMethodType
val sym = sym0.cloneSymbol setInfo afterTyper(sym0.info.finalResultType)
if (sym.info.typeSymbol eq UnitClass) NoSymbol
else sym
if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym
case _ => NoSymbol
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ case class InsertIntoParquetTable(
val stageId = sc.newRddId()

val taskIdOffset =
if (overwrite) 1
else {
if (overwrite) {
1
} else {
FileSystemHelper
.findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,11 @@ private[parquet] class CatalystGroupConverter(
a => a.dataType match {
case ctype: NativeType =>
// note: for some reason matching for StringType fails so use this ugly if instead
if (ctype == StringType) new CatalystPrimitiveStringConverter(this, schema.indexOf(a))
else new CatalystPrimitiveConverter(this, schema.indexOf(a))
if (ctype == StringType) {
new CatalystPrimitiveStringConverter(this, schema.indexOf(a))
} else {
new CatalystPrimitiveConverter(this, schema.indexOf(a))
}
case _ => throw new RuntimeException(
s"unable to convert datatype ${a.dataType.toString} in CatalystGroupConverter")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
}
else {
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))

localDirs match {
case None => throw new Exception("Yarn Local dirs can't be empty")
case Some(l) => l
}
}
}

private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
Expand Down Expand Up @@ -321,8 +321,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.allocateContainers(missingExecutorCount)
} else {
sendProgress()
}
else sendProgress()
Thread.sleep(sleepTime)
}
}
Expand Down Expand Up @@ -361,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return
}
isFinished = true

logInfo("finishApplicationMaster with " + status)
if (registered) {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
logInfo("Allocating " + missingExecutorCount +
" containers to make up for (potentially ?) lost containers")
yarnAllocator.allocateContainers(missingExecutorCount)
} else {
sendProgress()
}
else sendProgress()
Thread.sleep(sleepTime)
}
}
Expand Down
Loading