@@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
2727import scala .collection .mutable
2828
2929import com .google .common .io .Files
30- import org .apache .hadoop .fs .Path
30+ import org .apache .commons .io .IOUtils
31+ import org .apache .hadoop .fs .{FileSystem , Path }
3132import org .apache .hadoop .io .{LongWritable , Text }
3233import org .apache .hadoop .mapreduce .lib .input .TextInputFormat
3334import org .scalatest .BeforeAndAfter
@@ -130,10 +131,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
130131 }
131132
132133 test(" binary records stream" ) {
133- var testDir : File = null
134- try {
134+ withTempDir { testDir =>
135135 val batchDuration = Seconds (2 )
136- testDir = Utils .createTempDir()
137136 // Create a file that exists before the StreamingContext is created:
138137 val existingFile = new File (testDir, " 0" )
139138 Files .write(" 0\n " , existingFile, StandardCharsets .UTF_8 )
@@ -176,8 +175,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
176175 assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
177176 }
178177 }
179- } finally {
180- if (testDir != null ) Utils .deleteRecursively(testDir)
181178 }
182179 }
183180
@@ -190,10 +187,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
190187 }
191188
192189 test(" file input stream - wildcard" ) {
193- var testDir : File = null
194- try {
190+ withTempDir { testDir =>
195191 val batchDuration = Seconds (2 )
196- testDir = Utils .createTempDir()
197192 val testSubDir1 = Utils .createDirectory(testDir.toString, " tmp1" )
198193 val testSubDir2 = Utils .createDirectory(testDir.toString, " tmp2" )
199194
@@ -221,12 +216,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
221216 // not enough to trigger a batch
222217 clock.advance(batchDuration.milliseconds / 2 )
223218
224- def createFileAndAdvenceTime (data : Int , dir : File ): Unit = {
219+ def createFileAndAdvanceTime (data : Int , dir : File ): Unit = {
225220 val file = new File (testSubDir1, data.toString)
226221 Files .write(data + " \n " , file, StandardCharsets .UTF_8 )
227222 assert(file.setLastModified(clock.getTimeMillis()))
228223 assert(file.lastModified === clock.getTimeMillis())
229- logInfo(" Created file " + file)
224+ logInfo(s " Created file $ file" )
230225 // Advance the clock after creating the file to avoid a race when
231226 // setting its modification time
232227 clock.advance(batchDuration.milliseconds)
@@ -236,18 +231,90 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
236231 }
237232 // Over time, create files in the temp directory 1
238233 val input1 = Seq (1 , 2 , 3 , 4 , 5 )
239- input1.foreach(i => createFileAndAdvenceTime (i, testSubDir1))
234+ input1.foreach(i => createFileAndAdvanceTime (i, testSubDir1))
240235
241236 // Over time, create files in the temp directory 1
242237 val input2 = Seq (6 , 7 , 8 , 9 , 10 )
243- input2.foreach(i => createFileAndAdvenceTime (i, testSubDir2))
238+ input2.foreach(i => createFileAndAdvanceTime (i, testSubDir2))
244239
245240 // Verify that all the files have been read
246241 val expectedOutput = (input1 ++ input2).map(_.toString).toSet
247242 assert(outputQueue.asScala.flatten.toSet === expectedOutput)
248243 }
249- } finally {
250- if (testDir != null ) Utils .deleteRecursively(testDir)
244+ }
245+ }
246+
247+ /**
248+ * Tests that renamed directories are included in new batches -but that only files created
249+ * within the batch window are included.
250+ * Uses the Hadoop APIs to verify consistent behavior with the operations used internally.
251+ */
252+ test(" renamed directories are scanned" ) {
253+ withTempDir { testDir =>
254+ val batchDuration = Seconds (2 )
255+ val durationMs = batchDuration.milliseconds
256+ val testPath = new Path (testDir.toURI)
257+ val streamDir = new Path (testPath, " streaming" )
258+ val streamGlobPath = new Path (streamDir, " sub*" )
259+ val generatedDir = new Path (testPath, " generated" )
260+ val generatedSubDir = new Path (generatedDir, " subdir" )
261+ val renamedSubDir = new Path (streamDir, " subdir" )
262+
263+ withStreamingContext(new StreamingContext (conf, batchDuration)) { ssc =>
264+ val sparkContext = ssc.sparkContext
265+ val hc = sparkContext.hadoopConfiguration
266+ val fs = FileSystem .get(testPath.toUri, hc)
267+
268+ fs.delete(testPath, true )
269+ fs.mkdirs(testPath)
270+ fs.mkdirs(streamDir)
271+ fs.mkdirs(generatedSubDir)
272+
273+ def write (path : Path , text : String ): Unit = {
274+ val out = fs.create(path, true )
275+ IOUtils .write(text, out)
276+ out.close()
277+ }
278+
279+ val clock = ssc.scheduler.clock.asInstanceOf [ManualClock ]
280+ val existingFile = new Path (generatedSubDir, " existing" )
281+ write(existingFile, " existing\n " )
282+ val status = fs.getFileStatus(existingFile)
283+ clock.setTime(status.getModificationTime + durationMs)
284+ val batchCounter = new BatchCounter (ssc)
285+ val fileStream = ssc.textFileStream(streamGlobPath.toUri.toString)
286+ val outputQueue = new ConcurrentLinkedQueue [Seq [String ]]
287+ val outputStream = new TestOutputStream (fileStream, outputQueue)
288+ outputStream.register()
289+
290+ ssc.start()
291+ clock.advance(durationMs)
292+ eventually(eventuallyTimeout) {
293+ assert(1 === batchCounter.getNumCompletedBatches)
294+ }
295+ // create and rename the file
296+ // put a file into the generated directory
297+ val textPath = new Path (generatedSubDir, " renamed.txt" )
298+ write(textPath, " renamed\n " )
299+ val now = clock.getTimeMillis()
300+ val modTime = now + durationMs / 2
301+ fs.setTimes(textPath, modTime, modTime)
302+ val textFilestatus = fs.getFileStatus(existingFile)
303+ assert(textFilestatus.getModificationTime < now + durationMs)
304+
305+ // rename the directory under the path being scanned
306+ fs.rename(generatedSubDir, renamedSubDir)
307+
308+ // move forward one window
309+ clock.advance(durationMs)
310+ // await the next scan completing
311+ eventually(eventuallyTimeout) {
312+ assert(2 === batchCounter.getNumCompletedBatches)
313+ }
314+ // verify that the "renamed" file is found, but not the "existing" one which is out of
315+ // the window
316+ assert(Set (" renamed" ) === outputQueue.asScala.flatten.toSet)
317+ }
251318 }
252319 }
253320
@@ -416,10 +483,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
416483 }
417484
418485 def testFileStream (newFilesOnly : Boolean ) {
419- var testDir : File = null
420- try {
486+ withTempDir { testDir =>
421487 val batchDuration = Seconds (2 )
422- testDir = Utils .createTempDir()
423488 // Create a file that exists before the StreamingContext is created:
424489 val existingFile = new File (testDir, " 0" )
425490 Files .write(" 0\n " , existingFile, StandardCharsets .UTF_8 )
@@ -466,8 +531,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
466531 }
467532 assert(outputQueue.asScala.flatten.toSet === expectedOutput)
468533 }
469- } finally {
470- if (testDir != null ) Utils .deleteRecursively(testDir)
471534 }
472535 }
473536}
0 commit comments