Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,18 @@ abstract class RDD[T: ClassTag](
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

/**
* Process all of the elements in this RDD, and take the first num elements of the RDD.
*/
def processAllAndTake (num: Int): Array[T] = {
val buf = new ArrayBuffer[T]
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
for (partition <- results; data <- partition if buf.size <= num){
buf += data
}
buf.toArray
}

/**
* Return an array that contains all of the elements in this RDD.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,20 @@ abstract class DStream[T: ClassTag] (
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

/**
* Print the first specified number elements of each RDD in this DStream.
*/
def processAllAndPrintFirst(num: Int) {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val first11 = rdd.processAllAndTake(num)
println ("-------------------------------------------")
println ("Time: " + time)
println ("-------------------------------------------")
first11.take(num).foreach(println)
}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}

/**
* Return a new DStream in which each RDD contains all the elements in seen in a
* sliding window of time over this DStream. The new DStream generates RDDs with
Expand Down