Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
}

def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = {
isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = false)
isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void close() {
public void testCustomWAL() {
SparkConf conf = new SparkConf();
conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName());
conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false");
WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null);

String data1 = "data1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite
: Seq[ReceivedBlockTrackerLogEvent] = {
logFiles.flatMap {
file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
}.map { byteBuffer =>
Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array)
}.flatMap { byteBuffer =>
val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) {
Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap)
} else {
Array(byteBuffer)
}
validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array()))
}.toList
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io._
import java.nio.ByteBuffer
import java.util.{Iterator => JIterator}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
import java.util.concurrent.{RejectedExecutionException, TimeUnit, CountDownLatch, ThreadPoolExecutor}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -190,6 +190,28 @@ abstract class CommonWriteAheadLogTests(
}
assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment")
}

test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") {
// write some data
val writtenData = (1 to 10).map { i =>
val data = generateRandomData()
val file = testDir + s"/log-$i-$i"
writeDataManually(data, file, allowBatching)
data
}.flatten

val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching)
// create iterator but don't materialize it
val readData = wal.readAll().asScala.map(byteBufferToString)
wal.close()
if (closeFileAfterWrite) {
// the threadpool is shutdown by the wal.close call above, therefore we shouldn't be able
// to materialize the iterator with parallel recovery
intercept[RejectedExecutionException](readData.toArray)
} else {
assert(readData.toSeq === writtenData)
}
}
}

class FileBasedWriteAheadLogSuite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
test("log selection and creation") {

val emptyConf = new SparkConf() // no log configuration
assertDriverLogClass[FileBasedWriteAheadLog](emptyConf)
assertDriverLogClass[FileBasedWriteAheadLog](emptyConf, isBatched = true)
assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf)

// Verify setting driver WAL class
val driverWALConf = new SparkConf().set("spark.streaming.driver.writeAheadLog.class",
classOf[MockWriteAheadLog0].getName())
assertDriverLogClass[MockWriteAheadLog0](driverWALConf)
assertDriverLogClass[MockWriteAheadLog0](driverWALConf, isBatched = true)
assertReceiverLogClass[FileBasedWriteAheadLog](driverWALConf)

// Verify setting receiver WAL class
val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
classOf[MockWriteAheadLog0].getName())
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf)
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true)
assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)

// Verify setting receiver WAL class with 1-arg constructor
Expand Down Expand Up @@ -104,6 +104,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true)
assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
}

test("batching is enabled by default in WriteAheadLog") {
val conf = new SparkConf()
assert(WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true))
// batching is not valid for receiver WALs
assert(!WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = false))
}

test("closeFileAfterWrite is disabled by default in WriteAheadLog") {
val conf = new SparkConf()
assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = true))
assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = false))
}
}

object WriteAheadLogUtilsSuite {
Expand Down