Skip to content

Commit 165d482

Browse files
committed
fix structured streaming restart bug
1 parent 988f6d7 commit 165d482

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.util.control.NonFatal
2727
import com.google.common.io.ByteStreams
2828
import org.apache.hadoop.conf.Configuration
2929
import org.apache.hadoop.fs.{FileStatus, Path}
30+
import org.apache.hadoop.hdfs.protocol.HdfsConstants
3031

3132
import org.apache.spark.{SparkConf, SparkEnv}
3233
import org.apache.spark.internal.Logging
@@ -274,8 +275,19 @@ private[state] class HDFSBackedStateStoreProvider(
274275
private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = {
275276
synchronized {
276277
val finalDeltaFile = deltaFile(newVersion)
277-
if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
278-
throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
278+
/** When job restart, delta file may be already generated before on account of offsets WAL. */
279+
if (fs.getScheme == HdfsConstants.HDFS_URI_SCHEME) {
280+
if (!fs.exists(finalDeltaFile)) {
281+
if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
282+
throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
283+
}
284+
} else {
285+
fs.delete(tempDeltaFile, true)
286+
}
287+
} else {
288+
if (!fs.rename(tempDeltaFile, finalDeltaFile)) {
289+
throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile")
290+
}
279291
}
280292
loadedMaps.put(newVersion, map)
281293
finalDeltaFile

0 commit comments

Comments
 (0)