Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.streaming

import org.apache.spark.Logging
import org.apache.spark.streaming.util.MasterFailureTest
import org.apache.spark.util.Utils

import java.io.File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,18 @@
* limitations under the License.
*/

package org.apache.spark.streaming.util
package org.apache.spark.streaming

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils
import StreamingContext._
import org.apache.spark.streaming.StreamingContext._

import scala.util.Random
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import java.io.{File, ObjectInputStream, IOException}
import java.io.{File, IOException}
import java.nio.charset.Charset
import java.util.UUID

Expand Down Expand Up @@ -91,7 +89,7 @@ object MasterFailureTest extends Logging {
// Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
// Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j))
val expectedOutput = (1L to numBatches).map(i => (1L to i).sum).map(j => ("a", j))

val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Long], state: Option[Long]) => {
Expand Down Expand Up @@ -218,7 +216,7 @@ object MasterFailureTest extends Logging {

while(!isLastOutputGenerated && !isTimedOut) {
// Get the output buffer
val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output
val outputBuffer = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[T]].output
def output = outputBuffer.flatMap(x => x)

// Start the thread to kill the streaming after some time
Expand All @@ -239,7 +237,7 @@ object MasterFailureTest extends Logging {
while (!killed && !isLastOutputGenerated && !isTimedOut) {
Thread.sleep(100)
timeRan = System.currentTimeMillis() - startTime
isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput)
isLastOutputGenerated = (output.nonEmpty && output.last == lastExpectedOutput)
isTimedOut = (timeRan + totalTimeRan > maxTimeToRun)
}
} catch {
Expand Down Expand Up @@ -313,31 +311,6 @@ object MasterFailureTest extends Logging {
}
}

/**
* This is a output stream just for testing. All the output is collected into a
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
private[streaming]
class TestOutputStream[T: ClassTag](
parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
) extends ForEachDStream[T](
parent,
(rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
}
) {

// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
ois.defaultReadObject()
output.clear()
}
}


/**
* Thread to kill streaming context after a random period of time.
*/
Expand Down