Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
* Returns all files except the deleted ones.
*/
def allFiles(): Array[T] = {
var latestId = getLatest().map(_._1).getOrElse(-1L)
var latestId = getLatestBatchId().getOrElse(-1L)
// There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex`
// is calling this method. This loop will retry the reading to deal with the
// race condition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class FileStreamSink(
}

override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

val latestBatchId = getLatest().map(_._1).getOrElse(-1L)

Can these two places also be optimized in this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think so. Nice finding. Thanks!

if (batchId <= fileLog.getLatestBatchId().getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
} else {
val committer = FileCommitProtocol.instantiate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class FileStreamSourceLog(
val searchKeys = removedBatches.map(_._1)
val retrievedBatches = if (searchKeys.nonEmpty) {
logWarning(s"Get batches from removed files, this is unexpected in the current code path!!!")
val latestBatchId = getLatest().map(_._1).getOrElse(-1L)
val latestBatchId = getLatestBatchId().getOrElse(-1L)
if (latestBatchId < 0) {
Map.empty[Long, Option[Array[FileEntry]]]
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,26 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
}
}

override def getLatest(): Option[(Long, T)] = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
/**
* Return the latest batch Id without reading the file. This method only checks for existence of
* file to avoid cost on reading and deserializing log file.
*/
def getLatestBatchId(): Option[Long] = {
fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))
.sorted(Ordering.Long.reverse)
for (batchId <- batchIds) {
val batch = get(batchId)
if (batch.isDefined) {
return Some((batchId, batch.get))
.headOption
}

override def getLatest(): Option[(Long, T)] = {
getLatestBatchId().map { batchId =>
val content = get(batchId).getOrElse {
// If we find the last batch file, we must read that file, other than failing back to
// old batches.
throw new IllegalStateException(s"failed to read log file for batch $batchId")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit but maybe not to involve a new behavior change here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a part of the change in #25965 which should be dealt with. It shouldn't give the content with batch ID which is less than the latest batch ID - it should rather fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, you might have interesting proposals on my old PRs, https://github.com/apache/spark/pulls/HeartSaVioR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reference, will take a look later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reference #25965, LGTM to this change.
I personally think the comment in https://github.com/apache/spark/pull/25965/files#diff-aaeb546880508bb771df502318c40a99R183 is clearer. Either way is fine though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pulled the comment here. Either this or #25965 will have to resolve merge conflict but wanted to be sure the code comment is clear in any way.

}
(batchId, content)
}
None
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@
package org.apache.spark.sql.execution.streaming

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.lang.{Long => JLong}
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.util.Random

import org.apache.hadoop.fs.{FSDataInputStream, Path, RawLocalFileSystem}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -240,6 +248,44 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
))
}

test("getLatestBatchId") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't add E2E test to simplify the test code, but if we prefer E2E than I'll try to add a new test to FileStreamSinkSuite.

withCountOpenLocalFileSystemAsLocalFileSystem {
val scheme = CountOpenLocalFileSystem.scheme
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withTempDir { dir =>
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
s"$scheme:///${dir.getCanonicalPath}")
for (batchId <- 0L to 2L) {
sinkLog.add(
batchId,
Array(newFakeSinkFileStatus("/a/b/" + batchId, FileStreamSinkLog.ADD_ACTION)))
}

def getCountForOpenOnMetadataFile(batchId: Long): Long = {
val path = sinkLog.batchIdToPath(batchId).toUri.getPath
CountOpenLocalFileSystem.pathToNumOpenCalled.getOrDefault(path, 0L)
}

CountOpenLocalFileSystem.resetCount()

assert(sinkLog.getLatestBatchId() === Some(2L))
// getLatestBatchId doesn't open the latest metadata log file
(0L to 2L).foreach { batchId =>
assert(getCountForOpenOnMetadataFile(batchId) === 0L)
}

assert(sinkLog.getLatest().map(_._1).getOrElse(-1L) === 2L)
(0L to 1L).foreach { batchId =>
assert(getCountForOpenOnMetadataFile(batchId) === 0L)
}
// getLatest opens the latest metadata log file, which explains the needs on
// having "getLatestBatchId".
assert(getCountForOpenOnMetadataFile(2L) === 1L)
}
}
}
}

/**
* Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
* in SinkFileStatus.
Expand Down Expand Up @@ -267,4 +313,41 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
val log = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, input.toString)
log.allFiles()
}

private def withCountOpenLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code regarding FileSystem I add here is very similar with what I add in #27620. When either one gets merged, I'll rebase and deduplicate it.

val optionKey = s"fs.${CountOpenLocalFileSystem.scheme}.impl"
val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
try {
spark.conf.set(optionKey, classOf[CountOpenLocalFileSystem].getName)
body
} finally {
originClassForLocalFileSystem match {
case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
case _ => spark.conf.unset(optionKey)
}
}
}
}

class CountOpenLocalFileSystem extends RawLocalFileSystem {
import CountOpenLocalFileSystem._

override def getUri: URI = {
URI.create(s"$scheme:///")
}

override def open(f: Path, bufferSize: Int): FSDataInputStream = {
val path = f.toUri.getPath
pathToNumOpenCalled.compute(path, (_, v) => {
if (v == null) 1L else v + 1
})
super.open(f, bufferSize)
}
}

object CountOpenLocalFileSystem {
val scheme = s"FileStreamSinkLogSuite${math.abs(Random.nextInt)}fs"
val pathToNumOpenCalled = new ConcurrentHashMap[String, JLong]

def resetCount(): Unit = pathToNumOpenCalled.clear()
}