Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,16 @@ private[state] class HDFSBackedStateStoreProvider(
private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = {
synchronized {
val finalDeltaFile = deltaFile(newVersion)
if (!fs.rename(tempDeltaFile, finalDeltaFile)) {

// scalastyle:off
// Renaming a file atop an existing one fails on HDFS
// (http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html).
// Hence we should either skip the rename step or delete the target file. Because deleting the
// target file will break speculation, skipping the rename step is the only choice. It's still
// semantically correct because Structured Streaming requires rerunning a batch should
// generate the same output. (SPARK-19677)
// scalastyle:on
if (!fs.exists(finalDeltaFile) && !fs.rename(tempDeltaFile, finalDeltaFile)) {
throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
}
loadedMaps.put(newVersion, map)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,6 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
assert(store1.commit() === 2)
assert(rowsToSet(store1.iterator()) === Set("a" -> 1, "b" -> 1))
assert(getDataFromFiles(provider) === Set("a" -> 1, "b" -> 1))

// Overwrite the version with other data
val store2 = provider.getStore(1)
put(store2, "c", 1)
assert(store2.commit() === 2)
assert(rowsToSet(store2.iterator()) === Set("a" -> 1, "c" -> 1))
assert(getDataFromFiles(provider) === Set("a" -> 1, "c" -> 1))
}

test("snapshotting") {
Expand Down Expand Up @@ -292,6 +285,15 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
assert(getDataFromFiles(provider, 19) === Set("a" -> 19))
}

test("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") {
val conf = new Configuration()
conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName)
conf.set("fs.default.name", "fake:///")

val provider = newStoreProvider(hadoopConf = conf)
provider.getStore(0).commit()
provider.getStore(0).commit()
}

test("corrupted file handling") {
val provider = newStoreProvider(minDeltasForSnapshot = 5)
Expand Down Expand Up @@ -681,6 +683,21 @@ private[state] object StateStoreSuite {
}
}

/**
* Fake FileSystem that simulates HDFS rename semantic, i.e. renaming a file atop an existing
* one should return false.
* See hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html
*/
class RenameLikeHDFSFileSystem extends RawLocalFileSystem {
override def rename(src: Path, dst: Path): Boolean = {
if (exists(dst)) {
return false
} else {
return super.rename(src, dst)
}
}
}

/**
* Fake FileSystem to test that the StateStore throws an exception while committing the
* delta file, when `fs.rename` returns `false`.
Expand Down