Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import org.apache.spark.input.StreamFileInputFormat
import org.apache.spark.{ Partition, SparkContext }

private[spark] class BinaryFileRDD[T](
sc: SparkContext,
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
sc: SparkContext,
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
Expand Down
35 changes: 0 additions & 35 deletions core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala

This file was deleted.

34 changes: 0 additions & 34 deletions core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala

This file was deleted.

35 changes: 0 additions & 35 deletions core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala

This file was deleted.

31 changes: 0 additions & 31 deletions core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala

This file was deleted.

32 changes: 0 additions & 32 deletions core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala

This file was deleted.

33 changes: 0 additions & 33 deletions core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala

This file was deleted.

10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def mapValues[U](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MappedValuesRDD(self, cleanF)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}

/**
Expand All @@ -669,7 +671,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new FlatMappedValuesRDD(self, cleanF)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.flatMap { case (k, v) =>
cleanF(v).map(x => (k, x))
},
preservesPartitioning = true)
}

/**
Expand Down
28 changes: 20 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.rdd

import java.util.{Properties, Random}
import java.util.Random

import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
Expand All @@ -36,13 +36,12 @@ import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler,
SamplingUtils}
Expand Down Expand Up @@ -270,19 +269,30 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
def map[U: ClassTag](f: T => U): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
def filter(f: T => Boolean): RDD[T] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}

/**
* Return a new RDD containing the distinct elements in this RDD.
Expand Down Expand Up @@ -503,7 +513,9 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]] = new GlommedRDD(this)
def glom(): RDD[Array[T]] = {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}

/**
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
Expand Down
Loading