diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 214f22bc5b603..73092993ca531 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -787,6 +787,18 @@ abstract class RDD[T: ClassTag]( sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) } + /** + * Process all of the elements in this RDD, and take the first num elements of the RDD. + */ + def processAllAndTake (num: Int): Array[T] = { + val buf = new ArrayBuffer[T] + val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) + for (partition <- results; data <- partition if buf.size <= num){ + buf += data + } + buf.toArray + } + /** * Return an array that contains all of the elements in this RDD. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index dbf1ebbaf653a..86e0de504de11 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -618,6 +618,20 @@ abstract class DStream[T: ClassTag] ( new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() } + /** + * Print the first specified number elements of each RDD in this DStream. + */ + def processAllAndPrintFirst(num: Int) { + def foreachFunc = (rdd: RDD[T], time: Time) => { + val first11 = rdd.processAllAndTake(num) + println ("-------------------------------------------") + println ("Time: " + time) + println ("-------------------------------------------") + first11.take(num).foreach(println) + } + new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register() + } + /** * Return a new DStream in which each RDD contains all the elements in seen in a * sliding window of time over this DStream. The new DStream generates RDDs with