diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index 728e7f0afad5..7a6a2f3e577d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -20,15 +20,15 @@ import java.io.File import scala.util.Random -import com.google.common.io.Files import org.apache.hadoop.conf.Configuration -import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId} import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter} +import org.apache.spark.util.Utils -class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { +class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach { val conf = new SparkConf() .setMaster("local[2]") .setAppName(this.getClass.getSimpleName) @@ -38,36 +38,42 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { var blockManager: BlockManager = null var dir: File = null + override def beforeEach(): Unit = { + dir = Utils.createTempDir() + } + + override def afterEach(): Unit = { + Utils.deleteRecursively(dir) + } + override def beforeAll(): Unit = { sparkContext = new SparkContext(conf) blockManager = sparkContext.env.blockManager - dir = Files.createTempDir() } override def afterAll(): Unit = { // Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests. sparkContext.stop() - dir.delete() System.clearProperty("spark.driver.port") } - ignore("Read data available in block manager and write ahead log") { + test("Read data available in block manager and write ahead log") { testRDD(5, 5) } - ignore("Read data available only in block manager, not in write ahead log") { + test("Read data available only in block manager, not in write ahead log") { testRDD(5, 0) } - ignore("Read data available only in write ahead log, not in block manager") { + test("Read data available only in write ahead log, not in block manager") { testRDD(0, 5) } - ignore("Read data available only in write ahead log, and test storing in block manager") { + test("Read data available only in write ahead log, and test storing in block manager") { testRDD(0, 5, testStoreInBM = true) } - ignore("Read data with partially available in block manager, and rest in write ahead log") { + test("Read data with partially available in block manager, and rest in write ahead log") { testRDD(3, 2) } @@ -137,7 +143,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { blockIds: Seq[BlockId] ): Seq[WriteAheadLogFileSegment] = { require(blockData.size === blockIds.size) - val writer = new WriteAheadLogWriter(new File(dir, Random.nextString(10)).toString, hadoopConf) + val writer = new WriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf) val segments = blockData.zip(blockIds).map { case (data, id) => writer.write(blockManager.dataSerialize(id, data.iterator)) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 1956a4f1db90..8f69bcb64279 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -22,11 +22,8 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.{implicitConversions, postfixOps} -import scala.util.Random import WriteAheadLogSuite._ -import com.google.common.io.Files -import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.util.Utils @@ -42,9 +39,9 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { var manager: WriteAheadLogManager = null before { - tempDir = Files.createTempDir() + tempDir = Utils.createTempDir() testDir = tempDir.toString - testFile = new File(tempDir, Random.nextString(10)).toString + testFile = new File(tempDir, "testFile").toString if (manager != null) { manager.stop() manager = null @@ -52,7 +49,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } after { - FileUtils.deleteQuietly(tempDir) + Utils.deleteRecursively(tempDir) } test("WriteAheadLogWriter - writing data") {