Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ object SinkFileStatus {
* (drops the deleted files).
*/
class FileStreamSinkLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[Seq[SinkFileStatus]](sparkSession, path) {
extends HDFSMetadataLog[Array[SinkFileStatus]](sparkSession, path) {

import FileStreamSinkLog._

Expand Down Expand Up @@ -123,11 +123,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
}
}

override def serialize(logData: Seq[SinkFileStatus]): Array[Byte] = {
override def serialize(logData: Array[SinkFileStatus]): Array[Byte] = {
(VERSION +: logData.map(write(_))).mkString("\n").getBytes(UTF_8)
}

override def deserialize(bytes: Array[Byte]): Seq[SinkFileStatus] = {
override def deserialize(bytes: Array[Byte]): Array[SinkFileStatus] = {
val lines = new String(bytes, UTF_8).split("\n")
if (lines.length == 0) {
throw new IllegalStateException("Incomplete log file")
Expand All @@ -136,10 +136,10 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
if (version != VERSION) {
throw new IllegalStateException(s"Unknown log version: ${version}")
}
lines.toSeq.slice(1, lines.length).map(read[SinkFileStatus](_))
lines.slice(1, lines.length).map(read[SinkFileStatus](_))
}

override def add(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
override def add(batchId: Long, logs: Array[SinkFileStatus]): Boolean = {
if (isCompactionBatch(batchId, compactInterval)) {
compact(batchId, logs)
} else {
Expand Down Expand Up @@ -186,7 +186,7 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
if (super.add(batchId, compactLogs(allLogs))) {
if (super.add(batchId, compactLogs(allLogs).toArray)) {
if (isDeletingExpiredLog) {
deleteExpiredLog(batchId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contains glob patterns
}

private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath)
private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath)

private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

Expand Down Expand Up @@ -98,7 +98,7 @@ class FileStreamSource(

if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
metadataLog.add(maxBatchId, batchFiles.toArray)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ import org.apache.spark.util.UninterruptibleThread
class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
extends MetadataLog[T] with Logging {

// Avoid serializing generic sequences, see SPARK-17372
require(implicitly[ClassTag[T]].runtimeClass != classOf[Seq[_]],
Copy link
Contributor Author

@tdas tdas Sep 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a best effort attempt for future Spark developers from accidentally using new HDFSMetadatalog[Seq[X]].

"Should not create a log with type Seq, use Arrays instead - see SPARK-17372")

import HDFSMetadataLog._

val metadataPath = new Path(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ class StreamExecution(
awaitBatchLock.lock()
try {
awaitBatchLockCondition.await(100, TimeUnit.MILLISECONDS)
if (streamDeathCause != null) {
throw streamDeathCause
}
} finally {
awaitBatchLock.unlock()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {

test("serialize") {
withFileStreamSinkLog { sinkLog =>
val logs = Seq(
val logs = Array(
SinkFileStatus(
path = "/a/b/x",
size = 100L,
Expand Down Expand Up @@ -132,7 +132,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
// scalastyle:on
assert(expected === new String(sinkLog.serialize(logs), UTF_8))

assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Nil), UTF_8))
assert(FileStreamSinkLog.VERSION === new String(sinkLog.serialize(Array()), UTF_8))
}
}

Expand Down Expand Up @@ -196,7 +196,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
for (batchId <- 0 to 10) {
sinkLog.add(
batchId,
Seq(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
Array(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
val expectedFiles = (0 to batchId).map {
id => newFakeSinkFileStatus("/a/b/" + id, FileStreamSinkLog.ADD_ACTION)
}
Expand Down Expand Up @@ -230,17 +230,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}.toSet
}

sinkLog.add(0, Seq(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Seq(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0", "1") === listBatchFiles())
sinkLog.add(2, Seq(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(2, Array(newFakeSinkFileStatus("/a/b/2", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact") === listBatchFiles())
sinkLog.add(3, Seq(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(3, Array(newFakeSinkFileStatus("/a/b/3", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact", "3") === listBatchFiles())
sinkLog.add(4, Seq(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(4, Array(newFakeSinkFileStatus("/a/b/4", FileStreamSinkLog.ADD_ACTION)))
assert(Set("2.compact", "3", "4") === listBatchFiles())
sinkLog.add(5, Seq(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
sinkLog.add(5, Array(newFakeSinkFileStatus("/a/b/5", FileStreamSinkLog.ADD_ACTION)))
assert(Set("5.compact") === listBatchFiles())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.streaming

import java.io.File
import java.util.UUID

import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
Expand Down Expand Up @@ -142,6 +144,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

import testImplicits._

override val streamingTimeout = 20.seconds

/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
private def createFileStreamSource(
format: String,
Expand Down Expand Up @@ -761,6 +765,42 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
}

test("SPARK-17372 - write file names to WAL as Array[String]") {
// Note: If this test takes longer than the timeout, then its likely that this is actually
// running a Spark job with 10000 tasks. This test tries to avoid that by
// 1. Setting the threshold for parallel file listing to very high
// 2. Using a query that should use constant folding to eliminate reading of the files

val numFiles = 10000

// This is to avoid running a spark job to list of files in parallel
// by the ListingFileCatalog.
spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)

withTempDirs { case (root, tmp) =>
val src = new File(root, "a=1")
src.mkdirs()

(1 to numFiles).map { _.toString }.foreach { i =>
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val finalFile = new File(src, tempFile.getName)
stringToFile(finalFile, i)
}
assert(src.listFiles().size === numFiles)

val files = spark.readStream.text(root.getCanonicalPath).as[String]

// Note this query will use constant folding to eliminate the file scan.
// This is to avoid actually running a Spark job with 10000 tasks
val df = files.filter("1 == 0").groupBy().count()

testStream(df, InternalOutputModes.Complete)(
AddTextFileData("0", src, tmp),
CheckAnswer(0)
)
}
}
}

class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
Expand Down