Skip to content
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ case class ExceptionFailure(
this(e, accumUpdates, preserveCause = true)
}

def exception: Option[Throwable] = exceptionWrapper.flatMap {
(w: ThrowableSerializationWrapper) => Option(w.exception)
}
def exception: Option[Throwable] = exceptionWrapper.flatMap(w => Option(w.exception))

override def toErrorString: String =
if (fullStackTrace == null) {
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = {
new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue()))
new JavaDoubleRDD(rdd.map(f.call(_).doubleValue()))
}

/**
Expand All @@ -131,7 +131,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
new JavaDoubleRDD(rdd.flatMap(fn).map(_.doubleValue()))
}

/**
Expand Down Expand Up @@ -173,7 +173,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
new JavaDoubleRDD(rdd.mapPartitions(fn).map(_.doubleValue()))
}

/**
Expand All @@ -196,7 +196,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
(x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue()))
.map(_.doubleValue()))
}

/**
Expand All @@ -215,7 +215,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
rdd.foreachPartition((x => f.call(x.asJava)))
rdd.foreachPartition(x => f.call(x.asJava))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val state = master.askWithRetry[MasterStateResponse](RequestMasterState)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
val app = state.activeApps.find(_.id == appId)
.getOrElse(state.completedApps.find(_.id == appId).orNull)
if (app == null) {
val msg = <div class="row-fluid">No running application with ID {appId}</div>
return UIUtils.basicSparkPage(msg, "Not Found")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ private[deploy] class DriverRunner(

private var clock: Clock = new SystemClock()
private var sleeper = new Sleeper {
def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed})
def sleep(seconds: Int): Unit = (0 until seconds).takeWhile { _ =>
Thread.sleep(1000)
!killed
}
}

/** Starts a thread to run and manage the driver. */
Expand Down Expand Up @@ -116,7 +119,7 @@ private[deploy] class DriverRunner(
/** Terminate this driver (or prevent it from ever starting if not yet started) */
private[worker] def kill() {
synchronized {
process.foreach(p => p.destroy())
process.foreach(_.destroy())
killed = true
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,11 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:

// initializes/resets to start iterating from the beginning
def resetIterator(): Iterator[(String, Partition)] = {
val iterators = (0 to 2).map( x =>
prev.partitions.iterator.flatMap(p => {
val iterators = (0 to 2).map { x =>
prev.partitions.iterator.flatMap { p =>
if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
} )
)
}
}
iterators.reduceLeft((x, y) => x ++ y)
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class JdbcRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
// bounds are inclusive, hence the + 1 here and - 1 on end
val length = BigInt(1) + upperBound - lowerBound
(0 until numPartitions).map(i => {
(0 until numPartitions).map { i =>
val start = lowerBound + ((i * length) / numPartitions)
val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
new JdbcPartition(i, start.toLong, end.toLong)
}).toArray
}.toArray
}

override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,15 @@ private object ParallelCollectionRDD {
}
seq match {
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}).toSeq.asInstanceOf[Seq[Seq[T]]]
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
Expand All @@ -150,10 +150,9 @@ private object ParallelCollectionRDD {
slices
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map({
case (start, end) =>
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq
}).toSeq
}.toSeq
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ object StreamingTestExample {

val conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample")
val ssc = new StreamingContext(conf, batchDuration)
ssc.checkpoint({
ssc.checkpoint {
val dir = Utils.createTempDir()
dir.toString
})
}

// $example on$
val data = ssc.textFileStream(dataDir).map(line => line.split(",") match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ object RecoverableNetworkWordCount {
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { (rdd, time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
Expand Down Expand Up @@ -158,9 +158,7 @@ object RecoverableNetworkWordCount {
}
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(ip, port, outputPath, checkpointDirectory)
})
() => createContext(ip, port, outputPath, checkpointDirectory))
ssc.start()
ssc.awaitTermination()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object SqlNetworkWordCount {
val words = lines.flatMap(_.split(" "))

// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
words.foreachRDD { (rdd, time) =>
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ private[python] class Word2VecModelWrapper(model: Word2VecModel) {
}

def getVectors: JMap[String, JList[Float]] = {
model.getVectors.map({case (k, v) => (k, v.toList.asJava)}).asJava
model.getVectors.map { case (k, v) =>
(k, v.toList.asJava)
}.asJava
}

def save(sc: SparkContext, path: String): Unit = model.save(sc, path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ class BinaryClassificationMetrics @Since("1.3.0") (
Iterator(agg)
}.collect()
val partitionwiseCumulativeCounts =
agg.scanLeft(new BinaryLabelCounter())(
(agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c)
agg.scanLeft(new BinaryLabelCounter())((agg, c) => agg.clone() += c)
val totalCount = partitionwiseCumulativeCounts.last
logInfo(s"Total counts: $totalCount")
val cumulativeCounts = binnedCounts.mapPartitionsWithIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ class Analyzer(
private def assignAliases(exprs: Seq[NamedExpression]) = {
exprs.zipWithIndex.map {
case (expr, i) =>
expr transformUp {
case u @ UnresolvedAlias(child, optionalAliasName) => child match {
expr.transformUp { case u @ UnresolvedAlias(child, optionalAliasName) =>
child match {
case ne: NamedExpression => ne
case e if !e.resolved => u
case g: Generator => MultiAlias(g, Nil)
Expand Down Expand Up @@ -212,7 +212,7 @@ class Analyzer(
* represented as the bit masks.
*/
def bitmasks(r: Rollup): Seq[Int] = {
Seq.tabulate(r.groupByExprs.length + 1)(idx => {(1 << idx) - 1})
Seq.tabulate(r.groupByExprs.length + 1)(idx => (1 << idx) - 1)
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ object FromUnsafeProjection {
* Returns an UnsafeProjection for given Array of DataTypes.
*/
def apply(fields: Seq[DataType]): Projection = {
create(fields.zipWithIndex.map(x => {
new BoundReference(x._2, x._1, true)
}))
create(fields.zipWithIndex.map(x => new BoundReference(x._2, x._1, true)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
assert(children.nonEmpty)
val (deterministic, nondeterministic) = partitionByDeterministic(condition)
val newFirstChild = Filter(deterministic, children.head)
val newOtherChildren = children.tail.map {
child => {
val rewrites = buildRewrites(children.head, child)
Filter(pushToRight(deterministic, rewrites), child)
}
val newOtherChildren = children.tail.map { child =>
val rewrites = buildRewrites(children.head, child)
Filter(pushToRight(deterministic, rewrites), child)
}
Filter(nondeterministic, Union(newFirstChild +: newOtherChildren))

Expand Down Expand Up @@ -346,7 +344,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
case a @ Project(_, e @ Expand(_, _, grandChild)) if (e.outputSet -- a.references).nonEmpty =>
val newOutput = e.output.filter(a.references.contains(_))
val newProjects = e.projections.map { proj =>
proj.zip(e.output).filter { case (e, a) =>
proj.zip(e.output).filter { case (_, a) =>
newOutput.contains(a)
}.unzip._1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ case class ShuffledHashJoin(
val context = TaskContext.get()
val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
// This relation is usually used until the end of task.
context.addTaskCompletionListener((t: TaskContext) =>
relation.close()
)
context.addTaskCompletionListener(_ => relation.close())
relation
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ private[sql] object StatFunctions extends Logging {
s"exceed 1e4. Currently $columnSize")
val table = counts.groupBy(_.get(0)).map { case (col1Item, rows) =>
val countsRow = new GenericMutableRow(columnSize + 1)
rows.foreach { (row: Row) =>
rows.foreach { row =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here it is definitely unclear what the input type is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but there are a thousand places in the code where a local val's type isn't obvious (e.g. what's rows above?). The question is readability, and I do agree this isn't obviously better on that dimension. I wouldn't have touched it for its own sake. I scanned this as a line where we were already changing the braces.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you are agreeing with me that it is not obvious, and making the type more explicit is good here, but yet you are arguing we should change it to make it less obvious?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW in Spark SQL there are internal row types and external row types, so it is actually really not obvious what the row type is here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the type isn't obvious, but that can't be the standard, or else we'd write types almost everywhere in Scala. While I don't see the reason this is a special case (you identify a decent reason right above though), I also would not have changed it just for its own sake. I personally would accept the change if the line were being modified, but it wasn't. Hence on a second look I'd be neutral, and wouldn't oppose undoing this, even if I think it's really 6 of one, half-dozen of the other.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"I agree the type isn't obvious, but that can't be the standard"

That should be the standard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a tiny point but is actually a decent opportunity to talk about how we collectively should write the most readable code. As I say, either you mean all the types in this method are obvious to you (they aren't to me!), or you mean you'd actually write just about every type in this method, and I dare say no Scala programmer would. Putting aside cases where the type is essential (methods, or where the inferred reference type must be overridden), this is why I'd say the standard is certainly readability, not strictly whether the type is obvious. I bet we mean the same thing. It's going to be a judgment call that deserves relatively wide latitude, but that does argue against changing a call in the first place unless it seems materially better, and that standard was not met on this line, I agree in retrospect. While I might undo this, not some others you're pointing out, if you feel strongly about reverting a few of these types of changes and that lets us close this out, that LGTM

// row.get(0) is column 1
// row.get(1) is column 2
// row.get(2) is the frequency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[hive] case class HiveSimpleUDF(

// TODO: Finish input output types.
override def eval(input: InternalRow): Any = {
val inputs = wrap(children.map(c => c.eval(input)), arguments, cached, inputDataTypes)
val inputs = wrap(children.map(_.eval(input)), arguments, cached, inputDataTypes)
val ret = FunctionRegistry.invoke(
method,
function,
Expand Down Expand Up @@ -152,10 +152,8 @@ private[hive] case class HiveGenericUDF(
var i = 0
while (i < children.length) {
val idx = i
deferredObjects(i).asInstanceOf[DeferredObjectAdapter].set(
() => {
children(idx).eval(input)
})
deferredObjects(i).asInstanceOf[DeferredObjectAdapter]
.set(() => children(idx).eval(input))
i += 1
}
unwrap(function.evaluate(deferredObjects), returnInspector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ abstract class DStream[T: ClassTag] (
* of this DStream.
*/
def reduce(reduceFunc: (T, T) => T): DStream[T] = ssc.withScope {
this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
this.map((null, _)).reduceByKey(reduceFunc, 1).map(_._2)
}

/**
Expand All @@ -615,7 +615,7 @@ abstract class DStream[T: ClassTag] (
*/
def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {
this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
this.map((_, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}

/**
Expand All @@ -624,7 +624,7 @@ abstract class DStream[T: ClassTag] (
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true)
foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)
}

/**
Expand Down Expand Up @@ -663,7 +663,7 @@ abstract class DStream[T: ClassTag] (
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
transform((r: RDD[T], t: Time) => cleanedF(r))
transform((r: RDD[T], _: Time) => cleanedF(r))
}

/**
Expand Down Expand Up @@ -806,7 +806,7 @@ abstract class DStream[T: ClassTag] (
windowDuration: Duration,
slideDuration: Duration
): DStream[T] = ssc.withScope {
this.map(x => (1, x))
this.map((1, _))
.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
.map(_._2)
}
Expand Down Expand Up @@ -845,7 +845,7 @@ abstract class DStream[T: ClassTag] (
numPartitions: Int = ssc.sc.defaultParallelism)
(implicit ord: Ordering[T] = null)
: DStream[(T, Long)] = ssc.withScope {
this.map(x => (x, 1L)).reduceByKeyAndWindow(
this.map((_, 1L)).reduceByKeyAndWindow(
(x: Long, y: Long) => x + y,
(x: Long, y: Long) => x - y,
windowDuration,
Expand Down Expand Up @@ -895,9 +895,9 @@ abstract class DStream[T: ClassTag] (
logInfo(s"Slicing from $fromTime to $toTime" +
s" (aligned to $alignedFromTime and $alignedToTime)")

alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
alignedFromTime.to(alignedToTime, slideDuration).flatMap { time =>
if (time >= zeroTime) getOrCompute(time) else None
})
}
}

/**
Expand Down