Skip to content

Commit f24bb43

Browse files
committed
[SPARK-40459][K8S] recoverDiskStore should not stop by existing recomputed files
### What changes were proposed in this pull request? This PR aims to ignore `FileExistsException` during `recoverDiskStore` processing. ### Why are the changes needed? Although `recoverDiskStore` is already wrapped by `tryLogNonFatalError`, a single file recovery exception should not block the whole `recoverDiskStore` . https://github.com/apache/spark/blob/5938e84e72b81663ccacf0b36c2f8271455de292/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala#L45-L47 ``` org.apache.commons.io.FileExistsException: ... at org.apache.commons.io.FileUtils.requireAbsent(FileUtils.java:2587) at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2305) at org.apache.commons.io.FileUtils.moveFile(FileUtils.java:2283) at org.apache.spark.storage.DiskStore.moveFileToBlock(DiskStore.scala:150) at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.saveToDiskStore(BlockManager.scala:487) at org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:407) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445) at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:380) at org.apache.spark.storage.BlockManager$TempFileBasedBlockStoreUpdater.save(BlockManager.scala:490) at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.$anonfun$recoverDiskStore$14(KubernetesLocalDiskShuffleExecutorComponents.scala:95) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents$.recoverDiskStore(KubernetesLocalDiskShuffleExecutorComponents.scala:91) ``` ### Does this PR introduce _any_ user-facing change? No, this will improve the recover rate. ### How was this patch tested? Pass the CIs. Closes #37903 from dongjoon-hyun/SPARK-40459. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 5938e84 commit f24bb43

File tree

1 file changed

+4
-0
lines changed

1 file changed

+4
-0
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.util.Optional
2222

2323
import scala.reflect.ClassTag
2424

25+
import org.apache.commons.io.FileExistsException
26+
2527
import org.apache.spark.{SparkConf, SparkEnv}
2628
import org.apache.spark.internal.Logging
2729
import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter}
@@ -95,6 +97,8 @@ object KubernetesLocalDiskShuffleExecutorComponents extends Logging {
9597
bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, decryptedSize).save()
9698
} catch {
9799
case _: UnrecognizedBlockId =>
100+
case _: FileExistsException =>
101+
// This may happen due to recompute, but we continue to recover next files
98102
}
99103
}
100104
}

0 commit comments

Comments
 (0)