diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index d2df77658ccd..b066d99e8ef8 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -299,7 +299,7 @@ public void onSuccess(ByteBuffer response) { BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks); } catch (Throwable t) { - logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) + + logger.warn("Error trying to remove blocks " + Arrays.toString(blockIds) + " via external shuffle service from executor: " + execId, t); numRemovedBlocksFuture.complete(0); } @@ -307,7 +307,7 @@ public void onSuccess(ByteBuffer response) { @Override public void onFailure(Throwable e) { - logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) + + logger.warn("Error trying to remove blocks " + Arrays.toString(blockIds) + " via external shuffle service from executor: " + execId, e); numRemovedBlocksFuture.complete(0); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index 6c5025d1822f..efe508d1361c 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -36,7 +36,6 @@ import org.apache.spark.internal.config.package$; import org.apache.spark.shuffle.IndexShuffleBlockResolver; import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage; -import org.apache.spark.util.Utils; /** * Implementation of {@link ShuffleMapOutputWriter} that replicates the functionality of shuffle @@ -87,7 +86,7 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I } lastPartitionId = reducePartitionId; if (outputTempFile == null) { - outputTempFile = Utils.tempFileWith(outputFile); + outputTempFile = blockResolver.createTempFile(outputFile); } if (outputFileChannel != null) { currChannelPosition = outputFileChannel.position(); diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 091b5e1600d9..a6fa28b8ae8e 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -235,8 +235,10 @@ private[spark] class ContextCleaner( try { if (mapOutputTrackerMaster.containsShuffle(shuffleId)) { logDebug("Cleaning shuffle " + shuffleId) - mapOutputTrackerMaster.unregisterShuffle(shuffleId) + // Shuffle must be removed before it's unregistered from the output tracker + // to find blocks served by the shuffle service on deallocated executors shuffleDriverComponents.removeShuffle(shuffleId, blocking) + mapOutputTrackerMaster.unregisterShuffle(shuffleId) listeners.asScala.foreach(_.shuffleCleaned(shuffleId)) logDebug("Cleaned shuffle " + shuffleId) } else { diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index d07614a5e212..19467e7eca12 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -343,12 +343,14 @@ object SparkEnv extends Logging { isLocal, conf, listenerBus, - if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) { + if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) { externalShuffleClient } else { None }, blockManagerInfo, - mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], isDriver)), + mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], + shuffleManager, + isDriver)), registerOrLookupEndpoint( BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME, new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)), diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index dbec61a1fdb7..7ea7c05515bd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -654,6 +654,16 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED = + ConfigBuilder("spark.shuffle.service.removeShuffle") + .doc("Whether to use the ExternalShuffleService for deleting shuffle blocks for " + + "deallocated executors when the shuffle is no longer needed. Without this enabled, " + + "shuffle data on executors that are deallocated will remain on disk until the " + + "application ends.") + .version("3.3.0") + .booleanConf + .createWithDefault(false) + private[spark] val SHUFFLE_SERVICE_FETCH_RDD_ENABLED = ConfigBuilder(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) .doc("Whether to use the ExternalShuffleService for fetching disk persisted RDD blocks. " + diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index f1485ec99789..ba54555311e7 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -84,6 +84,11 @@ private[spark] class IndexShuffleBlockResolver( shuffleFiles.map(_.length()).sum } + /** Create a temporary file that will be renamed to the final resulting file */ + def createTempFile(file: File): File = { + blockManager.diskBlockManager.createTempFileWith(file) + } + /** * Get the shuffle data file. * @@ -234,7 +239,7 @@ private[spark] class IndexShuffleBlockResolver( throw new IllegalStateException(s"Unexpected shuffle block transfer ${blockId} as " + s"${blockId.getClass().getSimpleName()}") } - val fileTmp = Utils.tempFileWith(file) + val fileTmp = createTempFile(file) val channel = Channels.newChannel( serializerManager.wrapStream(blockId, new FileOutputStream(fileTmp))) @@ -335,7 +340,7 @@ private[spark] class IndexShuffleBlockResolver( checksums: Array[Long], dataTmp: File): Unit = { val indexFile = getIndexFile(shuffleId, mapId) - val indexTmp = Utils.tempFileWith(indexFile) + val indexTmp = createTempFile(indexFile) val checksumEnabled = checksums.nonEmpty val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) { @@ -343,7 +348,7 @@ private[spark] class IndexShuffleBlockResolver( "The size of partition lengths and checksums should be equal") val checksumFile = getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)) - (Some(checksumFile), Some(Utils.tempFileWith(checksumFile))) + (Some(checksumFile), Some(createTempFile(checksumFile))) } else { (None, None) } @@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver( } } + override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { + Seq( + ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID), + ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID) + ) + } + override def stop(): Unit = {} } diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala index 0f35f8c983d6..c8fde8d2d39d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala @@ -41,6 +41,14 @@ trait ShuffleBlockResolver { */ def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): ManagedBuffer + /** + * Retrieve a list of BlockIds for a given shuffle map. Used to delete shuffle files + * from the external shuffle service after the associated executor has been removed. + */ + def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = { + Seq.empty + } + /** * Retrieve the data for the specified merged shuffle block as multiple chunks. */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index b96befce2c0d..4d8ba9b3e4e0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -36,6 +36,7 @@ import org.apache.spark.network.shuffle.ExternalBlockStoreClient import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend} +import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} @@ -52,6 +53,7 @@ class BlockManagerMasterEndpoint( externalBlockStoreClient: Option[ExternalBlockStoreClient], blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo], mapOutputTracker: MapOutputTrackerMaster, + shuffleManager: ShuffleManager, isDriver: Boolean) extends IsolatedRpcEndpoint with Logging { @@ -104,9 +106,11 @@ class BlockManagerMasterEndpoint( private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver) logInfo("BlockManagerMasterEndpoint up") - // same as `conf.get(config.SHUFFLE_SERVICE_ENABLED) - // && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)` - private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined + + private val externalShuffleServiceRemoveShuffleEnabled: Boolean = + externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) + private val externalShuffleServiceRddFetchEnabled: Boolean = + externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf) private lazy val driverEndpoint = @@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint( } }.toSeq - val removeRddBlockViaExtShuffleServiceFutures = externalBlockStoreClient.map { shuffleClient => - blocksToDeleteByShuffleService.map { case (bmId, blockIds) => - Future[Int] { - val numRemovedBlocks = shuffleClient.removeBlocks( - bmId.host, - bmId.port, - bmId.executorId, - blockIds.map(_.toString).toArray) - numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS) + val removeRddBlockViaExtShuffleServiceFutures = if (externalShuffleServiceRddFetchEnabled) { + externalBlockStoreClient.map { shuffleClient => + blocksToDeleteByShuffleService.map { case (bmId, blockIds) => + Future[Int] { + val numRemovedBlocks = shuffleClient.removeBlocks( + bmId.host, + bmId.port, + bmId.executorId, + blockIds.map(_.toString).toArray) + numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS) + } } - } - }.getOrElse(Seq.empty) + }.getOrElse(Seq.empty) + } else { + Seq.empty + } Future.sequence(removeRddFromExecutorsFutures ++ removeRddBlockViaExtShuffleServiceFutures) } private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { - // Nothing to do in the BlockManagerMasterEndpoint data structures val removeMsg = RemoveShuffle(shuffleId) - Future.sequence( - blockManagerInfo.values.map { bm => - bm.storageEndpoint.ask[Boolean](removeMsg).recover { - // use false as default value means no shuffle data were removed - handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => + bm.storageEndpoint.ask[Boolean](removeMsg).recover { + // use false as default value means no shuffle data were removed + handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + } + }.toSeq + + // Find all shuffle blocks on executors that are no longer running + val blocksToDeleteByShuffleService = + new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] + if (externalShuffleServiceRemoveShuffleEnabled) { + mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => + shuffleStatus.withMapStatuses { mapStatuses => + mapStatuses.foreach { mapStatus => + // Check if the executor has been deallocated + if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { + val blocksToDel = + shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) + if (blocksToDel.nonEmpty) { + val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, + new mutable.HashSet[BlockId]) + blocks ++= blocksToDel + } + } + } } - }.toSeq - ) + } + } + + val removeShuffleFromShuffleServicesFutures = + externalBlockStoreClient.map { shuffleClient => + blocksToDeleteByShuffleService.map { case (bmId, blockIds) => + Future[Boolean] { + val numRemovedBlocks = shuffleClient.removeBlocks( + bmId.host, + bmId.port, + bmId.executorId, + blockIds.map(_.toString).toArray) + numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, + TimeUnit.SECONDS) == blockIds.size + } + } + }.getOrElse(Seq.empty) + + Future.sequence(removeShuffleFromExecutorsFutures ++ + removeShuffleFromShuffleServicesFutures) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index c6a22972d2a0..e29f3fc1b805 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.io.{File, IOException} import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermission import java.util.UUID import scala.collection.mutable.HashMap @@ -77,6 +78,15 @@ private[spark] class DiskBlockManager( private val shutdownHook = addShutdownHook() + // If either of these features are enabled, we must change permissions on block manager + // directories and files to accomodate the shuffle service deleting files in a secure environment. + // Parent directories are assumed to be restrictive to prevent unauthorized users from accessing + // or modifying world readable files. + private val permissionChangingRequired = conf.get(config.SHUFFLE_SERVICE_ENABLED) && ( + conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) || + conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) + ) + /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath(). @@ -94,7 +104,16 @@ private[spark] class DiskBlockManager( } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) if (!newDir.exists()) { - Files.createDirectory(newDir.toPath) + val path = newDir.toPath + Files.createDirectory(path) + if (permissionChangingRequired) { + // SPARK-37618: Create dir as group writable so files within can be deleted by the + // shuffle service in a secure setup. This will remove the setgid bit so files created + // within won't be created with the parent folder group. + val currentPerms = Files.getPosixFilePermissions(path) + currentPerms.add(PosixFilePermission.GROUP_WRITE) + Files.setPosixFilePermissions(path, currentPerms) + } } subDirs(dirId)(subDirId) = newDir newDir @@ -166,6 +185,37 @@ private[spark] class DiskBlockManager( } } + /** + * SPARK-37618: Makes sure that the file is created as world readable. This is to get + * around the fact that making the block manager sub dirs group writable removes + * the setgid bit in secure Yarn environments, which prevents the shuffle service + * from being able to read shuffle files. The outer directories will still not be + * world executable, so this doesn't allow access to these files except for the + * running user and shuffle service. + */ + def createWorldReadableFile(file: File): Unit = { + val path = file.toPath + Files.createFile(path) + val currentPerms = Files.getPosixFilePermissions(path) + currentPerms.add(PosixFilePermission.OTHERS_READ) + Files.setPosixFilePermissions(path, currentPerms) + } + + /** + * Creates a temporary version of the given file with world readable permissions (if required). + * Used to create block files that will be renamed to the final version of the file. + */ + def createTempFileWith(file: File): File = { + val tmpFile = Utils.tempFileWith(file) + if (permissionChangingRequired) { + // SPARK-37618: we need to make the file world readable because the parent will + // lose the setgid bit when making it group writable. Without this the shuffle + // service can't read the shuffle files in a secure setup. + createWorldReadableFile(tmpFile) + } + tmpFile + } + /** Produces a unique block id and File suitable for storing local intermediate results. */ def createTempLocalBlock(): (TempLocalBlockId, File) = { var blockId = new TempLocalBlockId(UUID.randomUUID()) @@ -181,7 +231,14 @@ private[spark] class DiskBlockManager( while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } - (blockId, getFile(blockId)) + val tmpFile = getFile(blockId) + if (permissionChangingRequired) { + // SPARK-37618: we need to make the file world readable because the parent will + // lose the setgid bit when making it group writable. Without this the shuffle + // service can't read the shuffle files in a secure setup. + createWorldReadableFile(tmpFile) + } + (blockId, tmpFile) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index f0334c56962c..d45947db6934 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -50,6 +50,9 @@ private[spark] class DiskStore( private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS) private val blockSizes = new ConcurrentHashMap[BlockId, Long]() + private val shuffleServiceFetchRddEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) && + conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED) + def getSize(blockId: BlockId): Long = blockSizes.get(blockId) /** @@ -71,6 +74,13 @@ private[spark] class DiskStore( logDebug(s"Attempting to put block $blockId") val startTimeNs = System.nanoTime() val file = diskManager.getFile(blockId) + + // SPARK-37618: If fetching cached RDDs from the shuffle service is enabled, we must make + // the file world readable, as it will not be owned by the group running the shuffle service + // in a secure environment. This is due to changing directory permissions to allow deletion, + if (shuffleServiceFetchRddEnabled) { + diskManager.createWorldReadableFile(file) + } val out = new CountingWritableChannel(openForWrite(file)) var threwException: Boolean = true try { diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index cd25f32cca89..c4ee261b38fb 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -128,6 +128,10 @@ public void setUp() throws Exception { }); when(shuffleBlockResolver.getDataFile(anyInt(), anyLong())).thenReturn(mergedOutputFile); + when(shuffleBlockResolver.createTempFile(any(File.class))).thenAnswer(invocationOnMock -> { + File file = (File) invocationOnMock.getArguments()[0]; + return Utils.tempFileWith(file); + }); Answer> renameTempAnswer = invocationOnMock -> { partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2]; @@ -159,6 +163,10 @@ public void setUp() throws Exception { spillFilesCreated.add(file); return Tuple2$.MODULE$.apply(blockId, file); }); + when(diskBlockManager.createTempFileWith(any(File.class))).thenAnswer(invocationOnMock -> { + File file = (File) invocationOnMock.getArguments()[0]; + return Utils.tempFileWith(file); + }); when(taskContext.taskMetrics()).thenReturn(taskMetrics); when(shuffleDep.serializer()).thenReturn(serializer); diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala index 48c1cc5906f3..dd3d90f3124d 100644 --- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala @@ -17,6 +17,13 @@ package org.apache.spark +import java.io.File +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermission + +import scala.concurrent.Promise +import scala.concurrent.duration.Duration + import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually import org.scalatest.matchers.should.Matchers._ @@ -26,9 +33,9 @@ import org.apache.spark.internal.config import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.server.TransportServer -import org.apache.spark.network.shuffle.{ExternalBlockHandler, ExternalBlockStoreClient} -import org.apache.spark.storage.{RDDBlockId, StorageLevel} -import org.apache.spark.util.Utils +import org.apache.spark.network.shuffle.{ExecutorDiskUtils, ExternalBlockHandler, ExternalBlockStoreClient} +import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel} +import org.apache.spark.util.{ThreadUtils, Utils} /** * This suite creates an external shuffle server and routes all shuffle fetches through it. @@ -101,7 +108,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi } test("SPARK-25888: using external shuffle service fetching disk persisted blocks") { - val confWithRddFetchEnabled = conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) + val confWithRddFetchEnabled = conf.clone + .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) + .set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetchEnabled) sc.env.blockManager.externalShuffleServiceEnabled should equal(true) sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) @@ -113,13 +122,42 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi rdd.count() val blockId = RDDBlockId(rdd.id, 0) - eventually(timeout(2.seconds), interval(100.milliseconds)) { + val bms = eventually(timeout(2.seconds), interval(100.milliseconds)) { val locations = sc.env.blockManager.master.getLocations(blockId) assert(locations.size === 2) assert(locations.map(_.port).contains(server.getPort), "external shuffle service port should be contained") + locations } + val dirManager = sc.env.blockManager.hostLocalDirManager + .getOrElse(fail("No host local dir manager")) + + val promises = bms.map { case bmid => + val promise = Promise[File]() + dirManager.getHostLocalDirs(bmid.host, bmid.port, Seq(bmid.executorId).toArray) { + case scala.util.Success(res) => res.foreach { case (eid, dirs) => + val file = new File(ExecutorDiskUtils.getFilePath(dirs, + sc.env.blockManager.subDirsPerLocalDir, blockId.name)) + promise.success(file) + } + case scala.util.Failure(error) => promise.failure(error) + } + promise.future + } + val filesToCheck = promises.map(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) + + filesToCheck.foreach(f => { + val parentPerms = Files.getPosixFilePermissions(f.getParentFile.toPath) + assert(parentPerms.contains(PosixFilePermission.GROUP_WRITE)) + + // On most operating systems the default umask will make this test pass + // even if the permission isn't changed. To properly test this, run the + // test with a umask of 0027 + val perms = Files.getPosixFilePermissions(f.toPath) + assert(perms.contains(PosixFilePermission.OTHERS_READ)) + }) + sc.killExecutors(sc.getExecutorIds()) eventually(timeout(2.seconds), interval(100.milliseconds)) { @@ -138,4 +176,83 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi rpcHandler.applicationRemoved(sc.conf.getAppId, true) } } + + test("SPARK-37618: external shuffle service removes shuffle blocks from deallocated executors") { + for (enabled <- Seq(true, false)) { + // Use local disk reading to get location of shuffle files on disk + val confWithLocalDiskReading = conf.clone + .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true) + .set(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED, enabled) + sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithLocalDiskReading) + sc.env.blockManager.externalShuffleServiceEnabled should equal(true) + sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient]) + try { + val rdd = sc.parallelize(0 until 100, 2) + .map { i => (i, 1) } + .repartition(1) + + rdd.count() + + val mapOutputs = sc.env.mapOutputTracker.getMapSizesByExecutorId(0, 0).toSeq + + val dirManager = sc.env.blockManager.hostLocalDirManager + .getOrElse(fail("No host local dir manager")) + + val promises = mapOutputs.map { case (bmid, blocks) => + val promise = Promise[Seq[File]]() + dirManager.getHostLocalDirs(bmid.host, bmid.port, Seq(bmid.executorId).toArray) { + case scala.util.Success(res) => res.foreach { case (eid, dirs) => + val files = blocks.flatMap { case (blockId, _, _) => + val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] + Seq( + ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name, + ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId, + shuffleBlockId.reduceId).name + ).map { blockId => + new File(ExecutorDiskUtils.getFilePath(dirs, + sc.env.blockManager.subDirsPerLocalDir, blockId)) + } + } + promise.success(files) + } + case scala.util.Failure(error) => promise.failure(error) + } + promise.future + } + val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec"))) + assert(filesToCheck.length == 4) + assert(filesToCheck.forall(_.exists())) + + if (enabled) { + filesToCheck.foreach(f => { + val parentPerms = Files.getPosixFilePermissions(f.getParentFile.toPath) + assert(parentPerms.contains(PosixFilePermission.GROUP_WRITE)) + + // On most operating systems the default umask will make this test pass + // even if the permission isn't changed. To properly test this, run the + // test with a umask of 0027 + val perms = Files.getPosixFilePermissions(f.toPath) + assert(perms.contains(PosixFilePermission.OTHERS_READ)) + }) + } + + sc.killExecutors(sc.getExecutorIds()) + eventually(timeout(2.seconds), interval(100.milliseconds)) { + assert(sc.env.blockManager.master.getExecutorEndpointRef("0").isEmpty) + } + + sc.cleaner.foreach(_.doCleanupShuffle(0, true)) + + if (enabled) { + assert(filesToCheck.forall(!_.exists())) + } else { + assert(filesToCheck.forall(_.exists())) + } + } finally { + rpcHandler.applicationRemoved(sc.conf.getAppId, true) + sc.stop() + } + } + } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 38ed702d0e4c..83bd3b0a9977 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -111,6 +111,12 @@ class BypassMergeSortShuffleWriterSuite blockId = args(0).asInstanceOf[BlockId]) } + when(blockResolver.createTempFile(any(classOf[File]))) + .thenAnswer { invocationOnMock => + val file = invocationOnMock.getArguments()(0).asInstanceOf[File] + Utils.tempFileWith(file) + } + when(diskBlockManager.createTempShuffleBlock()) .thenAnswer { _ => val blockId = new TempShuffleBlockId(UUID.randomUUID) @@ -266,6 +272,11 @@ class BypassMergeSortShuffleWriterSuite temporaryFilesCreated += file (blockId, file) } + when(diskBlockManager.createTempFileWith(any(classOf[File]))) + .thenAnswer { invocationOnMock => + val file = invocationOnMock.getArguments()(0).asInstanceOf[File] + Utils.tempFileWith(file) + } val numPartition = shuffleHandle.dependency.partitioner.numPartitions val writer = new BypassMergeSortShuffleWriter[Int, Int]( diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala index 21704b1c6732..de12f6840a1a 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -56,6 +56,11 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa any[BlockId], any[Option[Array[String]]])).thenAnswer( (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString)) when(diskBlockManager.localDirs).thenReturn(Array(tempDir)) + when(diskBlockManager.createTempFileWith(any(classOf[File]))) + .thenAnswer { invocationOnMock => + val file = invocationOnMock.getArguments()(0).asInstanceOf[File] + Utils.tempFileWith(file) + } conf.set("spark.app.id", appId) } diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala index 35d9b4ab1f76..6c9ec8b71a42 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala @@ -74,6 +74,11 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndA .set("spark.app.id", "example.spark.app") .set("spark.shuffle.unsafe.file.output.buffer", "16k") when(blockResolver.getDataFile(anyInt, anyLong)).thenReturn(mergedOutputFile) + when(blockResolver.createTempFile(any(classOf[File]))) + .thenAnswer { invocationOnMock => + val file = invocationOnMock.getArguments()(0).asInstanceOf[File] + Utils.tempFileWith(file) + } when(blockResolver.writeMetadataFileAndCommit( anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[Array[Long]]), any(classOf[File]))) .thenAnswer { invocationOnMock => diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index fc7b7a440697..14e1ee5b09d5 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -102,7 +102,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, isDriver = true)), + new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, sc.env.shuffleManager, + isDriver = true)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true) allStores.clear() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 0f99ea819f67..45e05b2cc2da 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -188,7 +188,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE liveListenerBus = spy(new LiveListenerBus(conf)) master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - liveListenerBus, None, blockManagerInfo, mapOutputTracker, isDriver = true)), + liveListenerBus, None, blockManagerInfo, mapOutputTracker, shuffleManager, + isDriver = true)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true)) } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index b36eeb767e2e..58fe40f9adeb 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import java.io.{File, FileWriter} import java.nio.file.{Files, Paths} -import java.nio.file.attribute.PosixFilePermissions +import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions} import java.util.HashMap import com.fasterxml.jackson.core.`type`.TypeReference @@ -141,6 +141,30 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B assert(attemptId.equals("1")) } + test("SPARK-37618: Sub dirs are group writable when removing from shuffle service enabled") { + val conf = testConf.clone + conf.set("spark.local.dir", rootDirs) + conf.set("spark.shuffle.service.enabled", "true") + conf.set("spark.shuffle.service.removeShuffle", "false") + val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true, isDriver = false) + val blockId = new TestBlockId("test") + val newFile = diskBlockManager.getFile(blockId) + val parentDir = newFile.getParentFile() + assert(parentDir.exists && parentDir.isDirectory) + val permission = Files.getPosixFilePermissions(parentDir.toPath) + assert(!permission.contains(PosixFilePermission.GROUP_WRITE)) + + assert(parentDir.delete()) + + conf.set("spark.shuffle.service.removeShuffle", "true") + val diskBlockManager2 = new DiskBlockManager(conf, deleteFilesOnStop = true, isDriver = false) + val newFile2 = diskBlockManager2.getFile(blockId) + val parentDir2 = newFile2.getParentFile() + assert(parentDir2.exists && parentDir2.isDirectory) + val permission2 = Files.getPosixFilePermissions(parentDir2.toPath) + assert(permission2.contains(PosixFilePermission.GROUP_WRITE)) + } + def writeToFile(file: File, numBytes: Int): Unit = { val writer = new FileWriter(file, true) for (i <- 0 until numBytes) writer.write(i) diff --git a/docs/configuration.md b/docs/configuration.md index ae3f422f34b3..494d4fc78737 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -966,6 +966,17 @@ Apart from these, the following properties are also available, and may be useful
spark.shuffle.service.removeShufflespark.shuffle.maxChunksBeingTransferred