Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import java.io.File

import scala.util.Random

import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
import org.apache.spark.util.Utils

class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
Expand All @@ -38,36 +38,42 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
var blockManager: BlockManager = null
var dir: File = null

override def beforeEach(): Unit = {
dir = Utils.createTempDir()
}

override def afterEach(): Unit = {
Utils.deleteRecursively(dir)
}

override def beforeAll(): Unit = {
sparkContext = new SparkContext(conf)
blockManager = sparkContext.env.blockManager
dir = Files.createTempDir()
}

override def afterAll(): Unit = {
// Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests.
sparkContext.stop()
dir.delete()
System.clearProperty("spark.driver.port")
}

ignore("Read data available in block manager and write ahead log") {
test("Read data available in block manager and write ahead log") {
testRDD(5, 5)
}

ignore("Read data available only in block manager, not in write ahead log") {
test("Read data available only in block manager, not in write ahead log") {
testRDD(5, 0)
}

ignore("Read data available only in write ahead log, not in block manager") {
test("Read data available only in write ahead log, not in block manager") {
testRDD(0, 5)
}

ignore("Read data available only in write ahead log, and test storing in block manager") {
test("Read data available only in write ahead log, and test storing in block manager") {
testRDD(0, 5, testStoreInBM = true)
}

ignore("Read data with partially available in block manager, and rest in write ahead log") {
test("Read data with partially available in block manager, and rest in write ahead log") {
testRDD(3, 2)
}

Expand Down Expand Up @@ -137,7 +143,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
blockIds: Seq[BlockId]
): Seq[WriteAheadLogFileSegment] = {
require(blockData.size === blockIds.size)
val writer = new WriteAheadLogWriter(new File(dir, Random.nextString(10)).toString, hadoopConf)
val writer = new WriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf)
val segments = blockData.zip(blockIds).map { case (data, id) =>
writer.write(blockManager.dataSerialize(id, data.iterator))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.{implicitConversions, postfixOps}
import scala.util.Random

import WriteAheadLogSuite._
import com.google.common.io.Files
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.util.Utils
Expand All @@ -42,17 +39,17 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter {
var manager: WriteAheadLogManager = null

before {
tempDir = Files.createTempDir()
tempDir = Utils.createTempDir()
testDir = tempDir.toString
testFile = new File(tempDir, Random.nextString(10)).toString
testFile = new File(tempDir, "testFile").toString
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of an unrelated change, but I wanted to remove this Random.nextString() call since it seemed confusing and didn't seem to serve any obvious purpose, since the tempDir is re-created before each test anyways.

if (manager != null) {
manager.stop()
manager = null
}
}

after {
FileUtils.deleteQuietly(tempDir)
Utils.deleteRecursively(tempDir)
}

test("WriteAheadLogWriter - writing data") {
Expand Down