Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import java.io._
import java.nio.channels.Channels
import java.nio.file.Files

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.{SparkConf, SparkEnv, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.io.NioBufferedFileInputStream
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage._
import org.apache.spark.util.Utils

Expand All @@ -35,8 +34,9 @@ import org.apache.spark.util.Utils
* Data of shuffle blocks from the same map task are stored in a single consolidated data file.
* The offsets of the data blocks in the data file are stored in a separate index file.
*
* We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
* as the filename postfix for data file, and ".index" as the filename postfix for index file.
* We use the name of the shuffle data's shuffleBlockId with reduce ID set to task attempt number
* and add ".data" as the filename postfix for data file, and ".index" as the filename postfix for
* index file.
*
*/
// Note: Changes to the format in this file should be kept in sync with
Expand All @@ -51,12 +51,23 @@ private[spark] class IndexShuffleBlockResolver(

private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")

// No-op reduce ID used in interactions with disk store.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
// SPARK-29257: the noop reduce id used to be 0, which results a fixed index or data file name.
// When the nodes in your cluster has more than one disks, if one of them is broken and the fixed
// file name's hash code % disk number just point to it, all attempts of one task will inevitably
// access the broken disk, which lead to meaningless task failure or even tear down the whole job.
// Here we change the noop reduce id to task attempt number to produce different file name, which
// may try another healthy disk.
private def noopReduceId: Int = Option(TaskContext.get()).map(_.attemptNumber()).getOrElse(0)

def getDataFile(shuffleId: Int, mapId: Long): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, noopReduceId))
}

private def getIndexFile(shuffleId: Int, mapId: Long): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, noopReduceId))
}

/**
Expand Down Expand Up @@ -225,10 +236,3 @@ private[spark] class IndexShuffleBlockResolver(

override def stop(): Unit = {}
}

private[spark] object IndexShuffleBlockResolver {
// No-op reduce ID used in interactions with disk store.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
val NOOP_REDUCE_ID = 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.shuffle.sort

import java.io.{DataInputStream, File, FileInputStream, FileOutputStream}
import java.io._

import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
Expand All @@ -26,7 +26,7 @@ import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.storage._
import org.apache.spark.util.Utils
Expand All @@ -36,6 +36,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa

@Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = _
@Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: DiskBlockManager = _
@Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _

private var tempDir: File = _
private val conf: SparkConf = new SparkConf(loadDefaults = false)
Expand All @@ -48,6 +49,8 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
when(diskBlockManager.getFile(any[BlockId])).thenAnswer(
(invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString))

TaskContext.setTaskContext(taskContext)
}

override def afterEach(): Unit = {
Expand Down Expand Up @@ -155,4 +158,65 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
indexIn2.close()
}
}

test("get data file should in different task attempts") {
val resolver = new IndexShuffleBlockResolver(conf, blockManager)
val shuffleId = 1
val mapId = 2
when(taskContext.attemptNumber()).thenReturn(0, Seq(1, 2, 3): _*)
assert(resolver.getDataFile(shuffleId, mapId).getName.endsWith("0.data"))
assert(resolver.getDataFile(shuffleId, mapId).getName.endsWith("1.data"))
assert(resolver.getDataFile(shuffleId, mapId).getName.endsWith("2.data"))
assert(resolver.getDataFile(shuffleId, mapId).getName.endsWith("3.data"))
}

test("different task attempts should be able to choose different local dirs") {
val localDirSuffixes = 1 to 4
val dirs = localDirSuffixes.map(x => tempDir + "/test_local" + x).mkString(",")
val confClone = conf.clone.set("spark.local.dir", dirs)
val resolver = new IndexShuffleBlockResolver(confClone, blockManager)
val dbm = new DiskBlockManager(confClone, true)
when(blockManager.diskBlockManager).thenReturn(dbm)
when(taskContext.attemptNumber()).thenReturn(0, Seq(1, 2, 3): _*)
val dataFiles = localDirSuffixes.map(_ => resolver.getDataFile(1, 2))
val usedLocalDirSuffixed =
dataFiles.map(_.getAbsolutePath.split("test_local")(1).substring(0, 1).toInt)
assert(usedLocalDirSuffixed.diff(localDirSuffixes).isEmpty)
}

test("new task attempt should be able to success in another available local dir") {
val localDirSuffixes = 1 to 2
val dirs = localDirSuffixes.map { x => tempDir + "/test_local" + x }.mkString(",")
val confClone = conf.clone.set("spark.local.dir", dirs)
val resolver = new IndexShuffleBlockResolver(confClone, blockManager)
val dbm = new DiskBlockManager(confClone, true)
when(blockManager.diskBlockManager).thenReturn(dbm, dbm)
val shuffleId = 1
val mapId = 2
val lengths = Array[Long](10, 0, 20)
val dataTmp = File.createTempFile("shuffle", null, tempDir)
val out = new FileOutputStream(dataTmp)
Utils.tryWithSafeFinally {
out.write(new Array[Byte](30))
} {
out.close()
}
val idxName = s"shuffle_${shuffleId}_${mapId}_0.index"
val localDirIdx = Utils.nonNegativeHash(idxName) % localDirSuffixes.length

val badDisk = dbm.localDirs(localDirIdx)
badDisk.setWritable(false) // just like a disk error occurs

// 1. index -> fail
// 2. index -> data -> verify data
when(taskContext.attemptNumber()).thenReturn(0, Seq(1, 1, 1): _*)
val e =
intercept[IOException](resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths, dataTmp))
assert(e.getMessage.contains(badDisk.getAbsolutePath))

resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths, dataTmp)

val dataFile = resolver.getDataFile(shuffleId, mapId)
assert(dataFile.exists())
}
}