diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index 332164a7be3e..da069d7acb36 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -21,12 +21,11 @@ import java.io._ import java.nio.channels.Channels import java.nio.file.Files -import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.{SparkConf, SparkEnv, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.io.NioBufferedFileInputStream import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID import org.apache.spark.storage._ import org.apache.spark.util.Utils @@ -35,8 +34,9 @@ import org.apache.spark.util.Utils * Data of shuffle blocks from the same map task are stored in a single consolidated data file. * The offsets of the data blocks in the data file are stored in a separate index file. * - * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data" - * as the filename postfix for data file, and ".index" as the filename postfix for index file. + * We use the name of the shuffle data's shuffleBlockId with reduce ID set to task attempt number + * and add ".data" as the filename postfix for data file, and ".index" as the filename postfix for + * index file. * */ // Note: Changes to the format in this file should be kept in sync with @@ -51,12 +51,23 @@ private[spark] class IndexShuffleBlockResolver( private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle") + // No-op reduce ID used in interactions with disk store. + // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort + // shuffle outputs for several reduces are glommed into a single file. + // SPARK-29257: the noop reduce id used to be 0, which results a fixed index or data file name. + // When the nodes in your cluster has more than one disks, if one of them is broken and the fixed + // file name's hash code % disk number just point to it, all attempts of one task will inevitably + // access the broken disk, which lead to meaningless task failure or even tear down the whole job. + // Here we change the noop reduce id to task attempt number to produce different file name, which + // may try another healthy disk. + private def noopReduceId: Int = Option(TaskContext.get()).map(_.attemptNumber()).getOrElse(0) + def getDataFile(shuffleId: Int, mapId: Long): File = { - blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) + blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, noopReduceId)) } private def getIndexFile(shuffleId: Int, mapId: Long): File = { - blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) + blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, noopReduceId)) } /** @@ -225,10 +236,3 @@ private[spark] class IndexShuffleBlockResolver( override def stop(): Unit = {} } - -private[spark] object IndexShuffleBlockResolver { - // No-op reduce ID used in interactions with disk store. - // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort - // shuffle outputs for several reduces are glommed into a single file. - val NOOP_REDUCE_ID = 0 -} diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index 27bb06b4e063..f22169f931cf 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.shuffle.sort -import java.io.{DataInputStream, File, FileInputStream, FileOutputStream} +import java.io._ import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Answers.RETURNS_SMART_NULLS @@ -26,7 +26,7 @@ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.scalatest.BeforeAndAfterEach -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext} import org.apache.spark.shuffle.IndexShuffleBlockResolver import org.apache.spark.storage._ import org.apache.spark.util.Utils @@ -36,6 +36,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = _ @Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: DiskBlockManager = _ + @Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _ private var tempDir: File = _ private val conf: SparkConf = new SparkConf(loadDefaults = false) @@ -48,6 +49,8 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa when(blockManager.diskBlockManager).thenReturn(diskBlockManager) when(diskBlockManager.getFile(any[BlockId])).thenAnswer( (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString)) + + TaskContext.setTaskContext(taskContext) } override def afterEach(): Unit = { @@ -155,4 +158,65 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa indexIn2.close() } } + + test("get data file should in different task attempts") { + val resolver = new IndexShuffleBlockResolver(conf, blockManager) + val shuffleId = 1 + val mapId = 2 + when(taskContext.attemptNumber()).thenReturn(0, Seq(1, 2, 3): _*) + assert(resolver.getDataFile(shuffleId, mapId).getName.endsWith("0.data")) + assert(resolver.getDataFile(shuffleId, mapId).getName.endsWith("1.data")) + assert(resolver.getDataFile(shuffleId, mapId).getName.endsWith("2.data")) + assert(resolver.getDataFile(shuffleId, mapId).getName.endsWith("3.data")) + } + + test("different task attempts should be able to choose different local dirs") { + val localDirSuffixes = 1 to 4 + val dirs = localDirSuffixes.map(x => tempDir + "/test_local" + x).mkString(",") + val confClone = conf.clone.set("spark.local.dir", dirs) + val resolver = new IndexShuffleBlockResolver(confClone, blockManager) + val dbm = new DiskBlockManager(confClone, true) + when(blockManager.diskBlockManager).thenReturn(dbm) + when(taskContext.attemptNumber()).thenReturn(0, Seq(1, 2, 3): _*) + val dataFiles = localDirSuffixes.map(_ => resolver.getDataFile(1, 2)) + val usedLocalDirSuffixed = + dataFiles.map(_.getAbsolutePath.split("test_local")(1).substring(0, 1).toInt) + assert(usedLocalDirSuffixed.diff(localDirSuffixes).isEmpty) + } + + test("new task attempt should be able to success in another available local dir") { + val localDirSuffixes = 1 to 2 + val dirs = localDirSuffixes.map { x => tempDir + "/test_local" + x }.mkString(",") + val confClone = conf.clone.set("spark.local.dir", dirs) + val resolver = new IndexShuffleBlockResolver(confClone, blockManager) + val dbm = new DiskBlockManager(confClone, true) + when(blockManager.diskBlockManager).thenReturn(dbm, dbm) + val shuffleId = 1 + val mapId = 2 + val lengths = Array[Long](10, 0, 20) + val dataTmp = File.createTempFile("shuffle", null, tempDir) + val out = new FileOutputStream(dataTmp) + Utils.tryWithSafeFinally { + out.write(new Array[Byte](30)) + } { + out.close() + } + val idxName = s"shuffle_${shuffleId}_${mapId}_0.index" + val localDirIdx = Utils.nonNegativeHash(idxName) % localDirSuffixes.length + + val badDisk = dbm.localDirs(localDirIdx) + badDisk.setWritable(false) // just like a disk error occurs + + // 1. index -> fail + // 2. index -> data -> verify data + when(taskContext.attemptNumber()).thenReturn(0, Seq(1, 1, 1): _*) + val e = + intercept[IOException](resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths, dataTmp)) + assert(e.getMessage.contains(badDisk.getAbsolutePath)) + + resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths, dataTmp) + + val dataFile = resolver.getDataFile(shuffleId, mapId) + assert(dataFile.exists()) + } }