diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index d2b983c4b4d1..3cbbf37a2e69 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark.streaming.rdd import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.commons.io.FileUtils +import org.apache.spark.util.Utils import scala.util.Random @@ -33,6 +37,9 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { .setMaster("local[2]") .setAppName(this.getClass.getSimpleName) val hadoopConf = new Configuration() + // Ensure that the file names are monotonically increasing to avoid conflicts. + // Using AtomicInteger instead of Int allows tests to run in parallel. + val testCounter = new AtomicInteger(0) var sparkContext: SparkContext = null var blockManager: BlockManager = null @@ -47,7 +54,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { override def afterAll(): Unit = { // Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests. sparkContext.stop() - dir.delete() + Utils.deleteRecursively(dir) System.clearProperty("spark.driver.port") } @@ -137,7 +144,8 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { blockIds: Seq[BlockId] ): Seq[WriteAheadLogFileSegment] = { require(blockData.size === blockIds.size) - val writer = new WriteAheadLogWriter(new File(dir, Random.nextString(10)).toString, hadoopConf) + val writer = new WriteAheadLogWriter( + new File(dir, testCounter.incrementAndGet().toString).toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => writer.write(blockManager.dataSerialize(id, data.iterator)) }