Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.streaming.dstream

import java.io.{IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable
import scala.reflect.ClassTag
Expand Down Expand Up @@ -74,12 +75,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {

// This is a def so that it works during checkpoint recovery:
private def clock = ssc.scheduler.clock

// Data to be saved as part of the streaming checkpoints
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData

// Initial ignore threshold based on which old, existing files in the directory (at the time of
// starting the streaming application) will be ignored or considered
private val initialModTimeIgnoreThreshold = if (newFilesOnly) System.currentTimeMillis() else 0L
private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.currentTime() else 0L

/*
* Make sure that the information of files selected in the last few batches are remembered.
Expand All @@ -91,8 +95,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
remember(durationToRemember)

// Map of batch-time to selected file info for the remembered batches
// This is a concurrent map because it's also accessed in unit tests
@transient private[streaming] var batchTimeToSelectedFiles =
new mutable.HashMap[Time, Array[String]]
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]

// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
Expand Down Expand Up @@ -151,7 +156,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
*/
private def findNewFiles(currentTime: Long): Array[String] = {
try {
lastNewFileFindingTime = System.currentTimeMillis
lastNewFileFindingTime = clock.currentTime()

// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
Expand All @@ -164,7 +169,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
val timeTaken = clock.currentTime() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
Expand Down Expand Up @@ -267,7 +272,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]()
batchTimeToSelectedFiles =
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ class SystemClock() extends Clock {
private[streaming]
class ManualClock() extends Clock {

var time = 0L
private var time = 0L

def currentTime() = time
def currentTime() = this.synchronized {
time
}

def setTime(timeToSet: Long) = {
this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ class BasicOperationsSuite extends TestSuiteBase {
if (rememberDuration != null) ssc.remember(rememberDuration)
val output = runStreams[(Int, Int)](ssc, cleanupTestInput.size, numExpectedOutput)
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
assert(clock.time === Seconds(10).milliseconds)
assert(clock.currentTime() === Seconds(10).milliseconds)
assert(output.size === numExpectedOutput)
operatedStream
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
package org.apache.spark.streaming

import java.io.File
import java.nio.charset.Charset

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag

import com.google.common.base.Charsets
import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext._
Expand All @@ -47,8 +48,6 @@ class CheckpointSuite extends TestSuiteBase {

override def batchDuration = Milliseconds(500)

override def actuallyWait = true // to allow checkpoints to be written

override def beforeFunction() {
super.beforeFunction()
Utils.deleteRecursively(new File(checkpointDir))
Expand Down Expand Up @@ -145,7 +144,6 @@ class CheckpointSuite extends TestSuiteBase {
ssc.start()
advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
ssc = null
}

Expand Down Expand Up @@ -314,109 +312,161 @@ class CheckpointSuite extends TestSuiteBase {
testCheckpointedOperation(input, operation, output, 7)
}


// This tests whether file input stream remembers what files were seen before
// the master failure and uses them again to process a large window operation.
// It also tests whether batches, whose processing was incomplete due to the
// failure, are re-processed or not.
test("recovery with file input stream") {
// Set up the streaming context and input streams
val batchDuration = Seconds(2) // Due to 1-second resolution of setLastModified() on some OS's.
val testDir = Utils.createTempDir()
var ssc = new StreamingContext(master, framework, Seconds(1))
ssc.checkpoint(checkpointDir)
val fileStream = ssc.textFileStream(testDir.toString)
// Making value 3 take large time to process, to ensure that the master
// shuts down in the middle of processing the 3rd batch
val mappedStream = fileStream.map(s => {
val i = s.toInt
if (i == 3) Thread.sleep(2000)
i
})

// Reducing over a large window to ensure that recovery from master failure
// requires reprocessing of all the files seen before the failure
val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
val outputBuffer = new ArrayBuffer[Seq[Int]]
var outputStream = new TestOutputStream(reducedStream, outputBuffer)
outputStream.register()
ssc.start()

// Create files and advance manual clock to process them
// var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
// wait to make sure that the file is written such that it gets shown in the file listings
Thread.sleep(1000)
val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]

/**
* Writes a file named `i` (which contains the number `i`) to the test directory and sets its
* modification time to `clock`'s current time.
*/
def writeFile(i: Int, clock: ManualClock): Unit = {
val file = new File(testDir, i.toString)
Files.write(i + "\n", file, Charsets.UTF_8)
assert(file.setLastModified(clock.currentTime()))
// Check that the file's modification date is actually the value we wrote, since rounding or
// truncation will break the test:
assert(file.lastModified() === clock.currentTime())
}
logInfo("Output = " + outputStream.output.mkString(","))
assert(outputStream.output.size > 0, "No files processed before restart")
ssc.stop()

// Verify whether files created have been recorded correctly or not
var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)

// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
Thread.sleep(1000)
/**
* Returns ids that identify which files which have been recorded by the file input stream.
*/
def recordedFiles(ssc: StreamingContext): Seq[Int] = {
val fileInputDStream =
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
}

// Recover context from checkpoint file and verify whether the files that were
// recorded before failure were saved and successfully recovered
logInfo("*********** RESTARTING ************")
ssc = new StreamingContext(checkpointDir)
fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
try {
// This is a var because it's re-assigned when we restart from a checkpoint
var clock: ManualClock = null
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
ssc.checkpoint(checkpointDir)
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val batchCounter = new BatchCounter(ssc)
val fileStream = ssc.textFileStream(testDir.toString)
// Make value 3 take a large time to process, to ensure that the driver
// shuts down in the middle of processing the 3rd batch
CheckpointSuite.batchThreeShouldBlockIndefinitely = true
val mappedStream = fileStream.map(s => {
val i = s.toInt
if (i == 3) {
while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
Thread.sleep(Long.MaxValue)
}
}
i
})

// Reducing over a large window to ensure that recovery from driver failure
// requires reprocessing of all the files seen before the failure
val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
val outputStream = new TestOutputStream(reducedStream, outputBuffer)
outputStream.register()
ssc.start()

// Advance half a batch so that the first file is created after the StreamingContext starts
clock.addToTime(batchDuration.milliseconds / 2)
// Create files and advance manual clock to process them
for (i <- Seq(1, 2, 3)) {
writeFile(i, clock)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.addToTime(batchDuration.milliseconds)
if (i != 3) {
// Since we want to shut down while the 3rd batch is processing
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === i)
}
}
}
clock.addToTime(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
// Wait until all files have been recorded and all batches have started
assert(recordedFiles(ssc) === Seq(1, 2, 3) && batchCounter.getNumStartedBatches === 3)
}
// Wait for a checkpoint to be written
val fs = new Path(checkpointDir).getFileSystem(ssc.sc.hadoopConfiguration)
eventually(eventuallyTimeout) {
assert(Checkpoint.getCheckpointFiles(checkpointDir, fs).size === 6)
}
ssc.stop()
// Check that we shut down while the third batch was being processed
assert(batchCounter.getNumCompletedBatches === 2)
assert(outputStream.output.flatten === Seq(1, 3))
}

// Restart stream computation
ssc.start()
for (i <- Seq(7, 8, 9)) {
Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
Thread.sleep(1000)
}
Thread.sleep(1000)
logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
// The original StreamingContext has now been stopped.
CheckpointSuite.batchThreeShouldBlockIndefinitely = false

// Verify whether files created while the driver was down have been recorded or not
assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)

// Verify whether new files created after recover have been recorded or not
assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)

// Append the new output to the old buffer
outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
outputBuffer ++= outputStream.output

val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
logInfo("--------------------------------")
logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output, size = " + expectedOutput.size)
expectedOutput.foreach(x => logInfo("[" + x + "]"))
logInfo("--------------------------------")

// Verify whether all the elements received are as expected
val output = outputBuffer.flatMap(x => x)
assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed
output.foreach(o => // To ensure all the inputs are correctly added cumulatively
assert(expectedOutput.contains(o), "Expected value " + o + " not found")
)
// To ensure that all the inputs were received correctly
assert(expectedOutput.last === output.last)
Utils.deleteRecursively(testDir)
// Create files while the streaming driver is down
for (i <- Seq(4, 5, 6)) {
writeFile(i, clock)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.addToTime(batchDuration.milliseconds)
}

// Recover context from checkpoint file and verify whether the files that were
// recorded before failure were saved and successfully recovered
logInfo("*********** RESTARTING ************")
withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
// So that the restarted StreamingContext's clock has gone forward in time since failure
ssc.conf.set("spark.streaming.manualClock.jump", (batchDuration * 3).milliseconds.toString)
val oldClockTime = clock.currentTime()
clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val batchCounter = new BatchCounter(ssc)
val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
// Check that we remember files that were recorded before the restart
assert(recordedFiles(ssc) === Seq(1, 2, 3))

// Restart stream computation
ssc.start()
// Verify that the clock has traveled forward to the expected time
eventually(eventuallyTimeout) {
clock.currentTime() === oldClockTime
}
// Wait for pre-failure batch to be recomputed (3 while SSC was down plus last batch)
val numBatchesAfterRestart = 4
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === numBatchesAfterRestart)
}
for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
writeFile(i, clock)
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.addToTime(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
}
}
clock.addToTime(batchDuration.milliseconds)
logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()

// Verify whether files created while the driver was down (4, 5, 6) and files created after
// recovery (7, 8, 9) have been recorded
assert(recordedFiles(ssc) === (1 to 9))

// Append the new output to the old buffer
outputBuffer ++= outputStream.output

// Verify whether all the elements received are as expected
val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
assert(outputBuffer.flatten.toSet === expectedOutput.toSet)
}
} finally {
Utils.deleteRecursively(testDir)
}
}


Expand Down Expand Up @@ -473,12 +523,12 @@ class CheckpointSuite extends TestSuiteBase {
*/
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
logInfo("Manual clock before advancing = " + clock.currentTime())
for (i <- 1 to numBatches.toInt) {
clock.addToTime(batchDuration.milliseconds)
Thread.sleep(batchDuration.milliseconds)
}
logInfo("Manual clock after advancing = " + clock.time)
logInfo("Manual clock after advancing = " + clock.currentTime())
Thread.sleep(batchDuration.milliseconds)

val outputStream = ssc.graph.getOutputStreams.filter { dstream =>
Expand All @@ -487,3 +537,7 @@ class CheckpointSuite extends TestSuiteBase {
outputStream.output.map(_.flatten)
}
}

private object CheckpointSuite extends Serializable {
var batchThreeShouldBlockIndefinitely: Boolean = true
}
Loading