Skip to content

Commit a00261d

Browse files
committed
Merge remote-tracking branch 'upstream/master' into nested-union
2 parents 190b326 + f88c641 commit a00261d

File tree

124 files changed

+2280
-1244
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

124 files changed

+2280
-1244
lines changed

.github/PULL_REQUEST_TEMPLATE

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
## What changes were proposed in this pull request?
2+
3+
(Please fill in changes proposed in this fix)
4+
5+
6+
## How was the this patch tested?
7+
8+
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
9+
10+
11+
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
12+

R/pkg/R/pairRDD.R

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -305,11 +305,11 @@ setMethod("groupByKey",
305305
#' Merge values by key
306306
#'
307307
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
308-
#' and merges the values for each key using an associative reduce function.
308+
#' and merges the values for each key using an associative and commutative reduce function.
309309
#'
310310
#' @param x The RDD to reduce by key. Should be an RDD where each element is
311311
#' list(K, V) or c(K, V).
312-
#' @param combineFunc The associative reduce function to use.
312+
#' @param combineFunc The associative and commutative reduce function to use.
313313
#' @param numPartitions Number of partitions to create.
314314
#' @return An RDD where each element is list(K, V') where V' is the merged
315315
#' value
@@ -347,12 +347,12 @@ setMethod("reduceByKey",
347347
#' Merge values by key locally
348348
#'
349349
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
350-
#' and merges the values for each key using an associative reduce function, but return the
351-
#' results immediately to the driver as an R list.
350+
#' and merges the values for each key using an associative and commutative reduce function, but
351+
#' return the results immediately to the driver as an R list.
352352
#'
353353
#' @param x The RDD to reduce by key. Should be an RDD where each element is
354354
#' list(K, V) or c(K, V).
355-
#' @param combineFunc The associative reduce function to use.
355+
#' @param combineFunc The associative and commutative reduce function to use.
356356
#' @return A list of elements of type list(K, V') where V' is the merged value for each key
357357
#' @seealso reduceByKey
358358
#' @examples

build/mvn

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ install_app() {
6969

7070
# Install maven under the build/ folder
7171
install_mvn() {
72-
local MVN_VERSION="3.3.3"
72+
local MVN_VERSION="3.3.9"
7373

7474
install_app \
7575
"http://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \

core/src/main/scala/org/apache/spark/Accumulator.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
2929
/**
3030
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
3131
* as the types of elements being merged, i.e. variables that are only "added" to through an
32-
* associative operation and can therefore be efficiently supported in parallel. They can be used
33-
* to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric
34-
* value types, and programmers can add support for new types.
32+
* associative and commutative operation and can therefore be efficiently supported in parallel.
33+
* They can be used to implement counters (as in MapReduce) or sums. Spark natively supports
34+
* accumulators of numeric value types, and programmers can add support for new types.
3535
*
3636
* An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]].
3737
* Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator.

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -278,17 +278,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
278278
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
279279

280280
/**
281-
* Merge the values for each key using an associative reduce function. This will also perform
282-
* the merging locally on each mapper before sending results to a reducer, similarly to a
283-
* "combiner" in MapReduce.
281+
* Merge the values for each key using an associative and commutative reduce function. This will
282+
* also perform the merging locally on each mapper before sending results to a reducer, similarly
283+
* to a "combiner" in MapReduce.
284284
*/
285285
def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
286286
fromRDD(rdd.reduceByKey(partitioner, func))
287287

288288
/**
289-
* Merge the values for each key using an associative reduce function, but return the results
290-
* immediately to the master as a Map. This will also perform the merging locally on each mapper
291-
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
289+
* Merge the values for each key using an associative and commutative reduce function, but return
290+
* the result immediately to the master as a Map. This will also perform the merging locally on
291+
* each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
292292
*/
293293
def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] =
294294
mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func))
@@ -381,9 +381,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
381381
fromRDD(rdd.foldByKey(zeroValue)(func))
382382

383383
/**
384-
* Merge the values for each key using an associative reduce function. This will also perform
385-
* the merging locally on each mapper before sending results to a reducer, similarly to a
386-
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
384+
* Merge the values for each key using an associative and commutative reduce function. This will
385+
* also perform the merging locally on each mapper before sending results to a reducer, similarly
386+
* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
387387
*/
388388
def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairRDD[K, V] =
389389
fromRDD(rdd.reduceByKey(func, numPartitions))
@@ -461,10 +461,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
461461
fromRDD(rdd.partitionBy(partitioner))
462462

463463
/**
464-
* Merge the values for each key using an associative reduce function. This will also perform
465-
* the merging locally on each mapper before sending results to a reducer, similarly to a
466-
* "combiner" in MapReduce.
467-
*/
464+
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
465+
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
466+
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
467+
*/
468468
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
469469
fromRDD(rdd.join(other, partitioner))
470470

@@ -520,9 +520,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
520520
}
521521

522522
/**
523-
* Merge the values for each key using an associative reduce function. This will also perform
524-
* the merging locally on each mapper before sending results to a reducer, similarly to a
525-
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
523+
* Merge the values for each key using an associative and commutative reduce function. This will
524+
* also perform the merging locally on each mapper before sending results to a reducer, similarly
525+
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
526526
* parallelism level.
527527
*/
528528
def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
373373

374374
/**
375375
* Aggregate the elements of each partition, and then the results for all the partitions, using a
376-
* given associative and commutative function and a neutral "zero value". The function
376+
* given associative function and a neutral "zero value". The function
377377
* op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
378378
* allocation; however, it should not modify t2.
379379
*

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ private object FaultToleranceTest extends App with Logging {
252252
val f = Future {
253253
try {
254254
val res = sc.parallelize(0 until 10).collect()
255-
assertTrue(res.toList == (0 until 10))
255+
assertTrue(res.toList == (0 until 10).toList)
256256
true
257257
} catch {
258258
case e: Exception =>

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,27 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
364364

365365
}
366366

367+
/**
368+
* Internal subclass of [[TaskMetrics]] which is used only for posting events to listeners.
369+
* Its purpose is to obviate the need for the driver to reconstruct the original accumulators,
370+
* which might have been garbage-collected. See SPARK-13407 for more details.
371+
*
372+
* Instances of this class should be considered read-only and users should not call `inc*()` or
373+
* `set*()` methods. While we could override the setter methods to throw
374+
* UnsupportedOperationException, we choose not to do so because the overrides would quickly become
375+
* out-of-date when new metrics are added.
376+
*/
377+
private[spark] class ListenerTaskMetrics(
378+
initialAccums: Seq[Accumulator[_]],
379+
accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics(initialAccums) {
380+
381+
override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates
382+
383+
override private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
384+
throw new UnsupportedOperationException("This TaskMetrics is read-only")
385+
}
386+
}
387+
367388
private[spark] object TaskMetrics extends Logging {
368389

369390
def empty: TaskMetrics = new TaskMetrics
@@ -397,33 +418,15 @@ private[spark] object TaskMetrics extends Logging {
397418
// Initial accumulators are passed into the TaskMetrics constructor first because these
398419
// are required to be uniquely named. The rest of the accumulators from this task are
399420
// registered later because they need not satisfy this requirement.
400-
val (initialAccumInfos, otherAccumInfos) = accumUpdates
401-
.filter { info => info.update.isDefined }
402-
.partition { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
403-
val initialAccums = initialAccumInfos.map { info =>
404-
val accum = InternalAccumulator.create(info.name.get)
405-
accum.setValueAny(info.update.get)
406-
accum
407-
}
408-
// We don't know the types of the rest of the accumulators, so we try to find the same ones
409-
// that were previously registered here on the driver and make copies of them. It is important
410-
// that we copy the accumulators here since they are used across many tasks and we want to
411-
// maintain a snapshot of their local task values when we post them to listeners downstream.
412-
val otherAccums = otherAccumInfos.flatMap { info =>
413-
val id = info.id
414-
val acc = Accumulators.get(id).map { a =>
415-
val newAcc = a.copy()
416-
newAcc.setValueAny(info.update.get)
417-
newAcc
421+
val definedAccumUpdates = accumUpdates.filter { info => info.update.isDefined }
422+
val initialAccums = definedAccumUpdates
423+
.filter { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
424+
.map { info =>
425+
val accum = InternalAccumulator.create(info.name.get)
426+
accum.setValueAny(info.update.get)
427+
accum
418428
}
419-
if (acc.isEmpty) {
420-
logWarning(s"encountered unregistered accumulator $id when reconstructing task metrics.")
421-
}
422-
acc
423-
}
424-
val metrics = new TaskMetrics(initialAccums)
425-
otherAccums.foreach(metrics.registerAccumulator)
426-
metrics
429+
new ListenerTaskMetrics(initialAccums, definedAccumUpdates)
427430
}
428431

429432
}

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -300,37 +300,37 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
300300
}
301301

302302
/**
303-
* Merge the values for each key using an associative reduce function. This will also perform
304-
* the merging locally on each mapper before sending results to a reducer, similarly to a
305-
* "combiner" in MapReduce.
303+
* Merge the values for each key using an associative and commutative reduce function. This will
304+
* also perform the merging locally on each mapper before sending results to a reducer, similarly
305+
* to a "combiner" in MapReduce.
306306
*/
307307
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
308308
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
309309
}
310310

311311
/**
312-
* Merge the values for each key using an associative reduce function. This will also perform
313-
* the merging locally on each mapper before sending results to a reducer, similarly to a
314-
* "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
312+
* Merge the values for each key using an associative and commutative reduce function. This will
313+
* also perform the merging locally on each mapper before sending results to a reducer, similarly
314+
* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
315315
*/
316316
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
317317
reduceByKey(new HashPartitioner(numPartitions), func)
318318
}
319319

320320
/**
321-
* Merge the values for each key using an associative reduce function. This will also perform
322-
* the merging locally on each mapper before sending results to a reducer, similarly to a
323-
* "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
321+
* Merge the values for each key using an associative and commutative reduce function. This will
322+
* also perform the merging locally on each mapper before sending results to a reducer, similarly
323+
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
324324
* parallelism level.
325325
*/
326326
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
327327
reduceByKey(defaultPartitioner(self), func)
328328
}
329329

330330
/**
331-
* Merge the values for each key using an associative reduce function, but return the results
332-
* immediately to the master as a Map. This will also perform the merging locally on each mapper
333-
* before sending results to a reducer, similarly to a "combiner" in MapReduce.
331+
* Merge the values for each key using an associative and commutative reduce function, but return
332+
* the results immediately to the master as a Map. This will also perform the merging locally on
333+
* each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
334334
*/
335335
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
336336
val cleanedF = self.sparkContext.clean(func)

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -973,7 +973,7 @@ abstract class RDD[T: ClassTag](
973973

974974
/**
975975
* Aggregate the elements of each partition, and then the results for all the partitions, using a
976-
* given associative and commutative function and a neutral "zero value". The function
976+
* given associative function and a neutral "zero value". The function
977977
* op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
978978
* allocation; however, it should not modify t2.
979979
*

0 commit comments

Comments
 (0)