Skip to content

Commit 9a7596e

Browse files
Kimahrimantgravescs
authored andcommitted
[SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors
### What changes were proposed in this pull request? Add support for removing shuffle files on released executors via the external shuffle service. The shuffle service already supports removing shuffle service cached RDD blocks, so I reused this mechanism to remove shuffle blocks as well, so as not to require updating the shuffle service itself. To support this change functioning in a secure Yarn environment, I updated permissions on some of the block manager folders and files. Specifically: - Block manager sub directories have the group write posix permission added to them. This gives the shuffle service permission to delete files from within these folders. - Shuffle files have the world readable posix permission added to them. This is because when the sub directories are marked group writable, they lose the setgid bit that gets set in a secure Yarn environment. Without this, the permissions on the files would be `rw-r-----`, and since the group running Yarn (and therefore the shuffle service), is no longer the group owner of the file, it does not have access to read the file. The sub directories still do not have world execute permissions, so there's no security issue opening up these files. Both of these changes are done after creating a file so that umasks don't affect the resulting permissions. ### Why are the changes needed? External shuffle services are very useful for long running jobs and dynamic allocation. However, currently if an executor is removed (either through dynamic deallocation or through some error), the shuffle files created by that executor will live until the application finishes. This results in local disks slowly filling up over time, eventually causing problems for long running applications. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Not sure if there's a better way I could have tested for the files being deleted or any other tests I should add. Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
1 parent 8ef0159 commit 9a7596e

File tree

20 files changed

+372
-43
lines changed

20 files changed

+372
-43
lines changed

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,15 +299,15 @@ public void onSuccess(ByteBuffer response) {
299299
BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
300300
numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks);
301301
} catch (Throwable t) {
302-
logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) +
302+
logger.warn("Error trying to remove blocks " + Arrays.toString(blockIds) +
303303
" via external shuffle service from executor: " + execId, t);
304304
numRemovedBlocksFuture.complete(0);
305305
}
306306
}
307307

308308
@Override
309309
public void onFailure(Throwable e) {
310-
logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) +
310+
logger.warn("Error trying to remove blocks " + Arrays.toString(blockIds) +
311311
" via external shuffle service from executor: " + execId, e);
312312
numRemovedBlocksFuture.complete(0);
313313
}

core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.spark.internal.config.package$;
3737
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
3838
import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage;
39-
import org.apache.spark.util.Utils;
4039

4140
/**
4241
* Implementation of {@link ShuffleMapOutputWriter} that replicates the functionality of shuffle
@@ -87,7 +86,7 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I
8786
}
8887
lastPartitionId = reducePartitionId;
8988
if (outputTempFile == null) {
90-
outputTempFile = Utils.tempFileWith(outputFile);
89+
outputTempFile = blockResolver.createTempFile(outputFile);
9190
}
9291
if (outputFileChannel != null) {
9392
currChannelPosition = outputFileChannel.position();

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
235235
try {
236236
if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
237237
logDebug("Cleaning shuffle " + shuffleId)
238-
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
238+
// Shuffle must be removed before it's unregistered from the output tracker
239+
// to find blocks served by the shuffle service on deallocated executors
239240
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
241+
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
240242
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
241243
logDebug("Cleaned shuffle " + shuffleId)
242244
} else {

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,12 +343,14 @@ object SparkEnv extends Logging {
343343
isLocal,
344344
conf,
345345
listenerBus,
346-
if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) {
346+
if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
347347
externalShuffleClient
348348
} else {
349349
None
350350
}, blockManagerInfo,
351-
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], isDriver)),
351+
mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
352+
shuffleManager,
353+
isDriver)),
352354
registerOrLookupEndpoint(
353355
BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
354356
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,16 @@ package object config {
686686
.booleanConf
687687
.createWithDefault(false)
688688

689+
private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED =
690+
ConfigBuilder("spark.shuffle.service.removeShuffle")
691+
.doc("Whether to use the ExternalShuffleService for deleting shuffle blocks for " +
692+
"deallocated executors when the shuffle is no longer needed. Without this enabled, " +
693+
"shuffle data on executors that are deallocated will remain on disk until the " +
694+
"application ends.")
695+
.version("3.3.0")
696+
.booleanConf
697+
.createWithDefault(false)
698+
689699
private[spark] val SHUFFLE_SERVICE_FETCH_RDD_ENABLED =
690700
ConfigBuilder(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
691701
.doc("Whether to use the ExternalShuffleService for fetching disk persisted RDD blocks. " +

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ private[spark] class IndexShuffleBlockResolver(
8484
shuffleFiles.map(_.length()).sum
8585
}
8686

87+
/** Create a temporary file that will be renamed to the final resulting file */
88+
def createTempFile(file: File): File = {
89+
blockManager.diskBlockManager.createTempFileWith(file)
90+
}
91+
8792
/**
8893
* Get the shuffle data file.
8994
*
@@ -234,7 +239,7 @@ private[spark] class IndexShuffleBlockResolver(
234239
throw new IllegalStateException(s"Unexpected shuffle block transfer ${blockId} as " +
235240
s"${blockId.getClass().getSimpleName()}")
236241
}
237-
val fileTmp = Utils.tempFileWith(file)
242+
val fileTmp = createTempFile(file)
238243
val channel = Channels.newChannel(
239244
serializerManager.wrapStream(blockId,
240245
new FileOutputStream(fileTmp)))
@@ -335,15 +340,15 @@ private[spark] class IndexShuffleBlockResolver(
335340
checksums: Array[Long],
336341
dataTmp: File): Unit = {
337342
val indexFile = getIndexFile(shuffleId, mapId)
338-
val indexTmp = Utils.tempFileWith(indexFile)
343+
val indexTmp = createTempFile(indexFile)
339344

340345
val checksumEnabled = checksums.nonEmpty
341346
val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) {
342347
assert(lengths.length == checksums.length,
343348
"The size of partition lengths and checksums should be equal")
344349
val checksumFile =
345350
getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
346-
(Some(checksumFile), Some(Utils.tempFileWith(checksumFile)))
351+
(Some(checksumFile), Some(createTempFile(checksumFile)))
347352
} else {
348353
(None, None)
349354
}
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
597602
}
598603
}
599604

605+
override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = {
606+
Seq(
607+
ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID),
608+
ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
609+
)
610+
}
611+
600612
override def stop(): Unit = {}
601613
}
602614

core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ trait ShuffleBlockResolver {
4141
*/
4242
def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): ManagedBuffer
4343

44+
/**
45+
* Retrieve a list of BlockIds for a given shuffle map. Used to delete shuffle files
46+
* from the external shuffle service after the associated executor has been removed.
47+
*/
48+
def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = {
49+
Seq.empty
50+
}
51+
4452
/**
4553
* Retrieve the data for the specified merged shuffle block as multiple chunks.
4654
*/

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.network.shuffle.ExternalBlockStoreClient
3636
import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
3737
import org.apache.spark.scheduler._
3838
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
39+
import org.apache.spark.shuffle.ShuffleManager
3940
import org.apache.spark.storage.BlockManagerMessages._
4041
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
4142

@@ -52,6 +53,7 @@ class BlockManagerMasterEndpoint(
5253
externalBlockStoreClient: Option[ExternalBlockStoreClient],
5354
blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],
5455
mapOutputTracker: MapOutputTrackerMaster,
56+
shuffleManager: ShuffleManager,
5557
isDriver: Boolean)
5658
extends IsolatedRpcEndpoint with Logging {
5759

@@ -104,9 +106,11 @@ class BlockManagerMasterEndpoint(
104106
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver)
105107

106108
logInfo("BlockManagerMasterEndpoint up")
107-
// same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)
108-
// && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)`
109-
private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined
109+
110+
private val externalShuffleServiceRemoveShuffleEnabled: Boolean =
111+
externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED)
112+
private val externalShuffleServiceRddFetchEnabled: Boolean =
113+
externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
110114
private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf)
111115

112116
private lazy val driverEndpoint =
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
294298
}
295299
}.toSeq
296300

297-
val removeRddBlockViaExtShuffleServiceFutures = externalBlockStoreClient.map { shuffleClient =>
298-
blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
299-
Future[Int] {
300-
val numRemovedBlocks = shuffleClient.removeBlocks(
301-
bmId.host,
302-
bmId.port,
303-
bmId.executorId,
304-
blockIds.map(_.toString).toArray)
305-
numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS)
301+
val removeRddBlockViaExtShuffleServiceFutures = if (externalShuffleServiceRddFetchEnabled) {
302+
externalBlockStoreClient.map { shuffleClient =>
303+
blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
304+
Future[Int] {
305+
val numRemovedBlocks = shuffleClient.removeBlocks(
306+
bmId.host,
307+
bmId.port,
308+
bmId.executorId,
309+
blockIds.map(_.toString).toArray)
310+
numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS)
311+
}
306312
}
307-
}
308-
}.getOrElse(Seq.empty)
313+
}.getOrElse(Seq.empty)
314+
} else {
315+
Seq.empty
316+
}
309317

310318
Future.sequence(removeRddFromExecutorsFutures ++ removeRddBlockViaExtShuffleServiceFutures)
311319
}
312320

313321
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
314-
// Nothing to do in the BlockManagerMasterEndpoint data structures
315322
val removeMsg = RemoveShuffle(shuffleId)
316-
Future.sequence(
317-
blockManagerInfo.values.map { bm =>
318-
bm.storageEndpoint.ask[Boolean](removeMsg).recover {
319-
// use false as default value means no shuffle data were removed
320-
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
323+
val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
324+
bm.storageEndpoint.ask[Boolean](removeMsg).recover {
325+
// use false as default value means no shuffle data were removed
326+
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
327+
}
328+
}.toSeq
329+
330+
// Find all shuffle blocks on executors that are no longer running
331+
val blocksToDeleteByShuffleService =
332+
new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
333+
if (externalShuffleServiceRemoveShuffleEnabled) {
334+
mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus =>
335+
shuffleStatus.withMapStatuses { mapStatuses =>
336+
mapStatuses.foreach { mapStatus =>
337+
// Check if the executor has been deallocated
338+
if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
339+
val blocksToDel =
340+
shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId)
341+
if (blocksToDel.nonEmpty) {
342+
val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
343+
new mutable.HashSet[BlockId])
344+
blocks ++= blocksToDel
345+
}
346+
}
347+
}
321348
}
322-
}.toSeq
323-
)
349+
}
350+
}
351+
352+
val removeShuffleFromShuffleServicesFutures =
353+
externalBlockStoreClient.map { shuffleClient =>
354+
blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
355+
Future[Boolean] {
356+
val numRemovedBlocks = shuffleClient.removeBlocks(
357+
bmId.host,
358+
bmId.port,
359+
bmId.executorId,
360+
blockIds.map(_.toString).toArray)
361+
numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
362+
TimeUnit.SECONDS) == blockIds.size
363+
}
364+
}
365+
}.getOrElse(Seq.empty)
366+
367+
Future.sequence(removeShuffleFromExecutorsFutures ++
368+
removeShuffleFromShuffleServicesFutures)
324369
}
325370

326371
/**

core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.storage
1919

2020
import java.io.{File, IOException}
2121
import java.nio.file.Files
22+
import java.nio.file.attribute.PosixFilePermission
2223
import java.util.UUID
2324

2425
import scala.collection.mutable.HashMap
@@ -77,6 +78,15 @@ private[spark] class DiskBlockManager(
7778

7879
private val shutdownHook = addShutdownHook()
7980

81+
// If either of these features are enabled, we must change permissions on block manager
82+
// directories and files to accomodate the shuffle service deleting files in a secure environment.
83+
// Parent directories are assumed to be restrictive to prevent unauthorized users from accessing
84+
// or modifying world readable files.
85+
private val permissionChangingRequired = conf.get(config.SHUFFLE_SERVICE_ENABLED) && (
86+
conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) ||
87+
conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
88+
)
89+
8090
/** Looks up a file by hashing it into one of our local subdirectories. */
8191
// This method should be kept in sync with
8292
// org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath().
@@ -94,7 +104,16 @@ private[spark] class DiskBlockManager(
94104
} else {
95105
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
96106
if (!newDir.exists()) {
97-
Files.createDirectory(newDir.toPath)
107+
val path = newDir.toPath
108+
Files.createDirectory(path)
109+
if (permissionChangingRequired) {
110+
// SPARK-37618: Create dir as group writable so files within can be deleted by the
111+
// shuffle service in a secure setup. This will remove the setgid bit so files created
112+
// within won't be created with the parent folder group.
113+
val currentPerms = Files.getPosixFilePermissions(path)
114+
currentPerms.add(PosixFilePermission.GROUP_WRITE)
115+
Files.setPosixFilePermissions(path, currentPerms)
116+
}
98117
}
99118
subDirs(dirId)(subDirId) = newDir
100119
newDir
@@ -166,6 +185,37 @@ private[spark] class DiskBlockManager(
166185
}
167186
}
168187

188+
/**
189+
* SPARK-37618: Makes sure that the file is created as world readable. This is to get
190+
* around the fact that making the block manager sub dirs group writable removes
191+
* the setgid bit in secure Yarn environments, which prevents the shuffle service
192+
* from being able to read shuffle files. The outer directories will still not be
193+
* world executable, so this doesn't allow access to these files except for the
194+
* running user and shuffle service.
195+
*/
196+
def createWorldReadableFile(file: File): Unit = {
197+
val path = file.toPath
198+
Files.createFile(path)
199+
val currentPerms = Files.getPosixFilePermissions(path)
200+
currentPerms.add(PosixFilePermission.OTHERS_READ)
201+
Files.setPosixFilePermissions(path, currentPerms)
202+
}
203+
204+
/**
205+
* Creates a temporary version of the given file with world readable permissions (if required).
206+
* Used to create block files that will be renamed to the final version of the file.
207+
*/
208+
def createTempFileWith(file: File): File = {
209+
val tmpFile = Utils.tempFileWith(file)
210+
if (permissionChangingRequired) {
211+
// SPARK-37618: we need to make the file world readable because the parent will
212+
// lose the setgid bit when making it group writable. Without this the shuffle
213+
// service can't read the shuffle files in a secure setup.
214+
createWorldReadableFile(tmpFile)
215+
}
216+
tmpFile
217+
}
218+
169219
/** Produces a unique block id and File suitable for storing local intermediate results. */
170220
def createTempLocalBlock(): (TempLocalBlockId, File) = {
171221
var blockId = new TempLocalBlockId(UUID.randomUUID())
@@ -181,7 +231,14 @@ private[spark] class DiskBlockManager(
181231
while (getFile(blockId).exists()) {
182232
blockId = new TempShuffleBlockId(UUID.randomUUID())
183233
}
184-
(blockId, getFile(blockId))
234+
val tmpFile = getFile(blockId)
235+
if (permissionChangingRequired) {
236+
// SPARK-37618: we need to make the file world readable because the parent will
237+
// lose the setgid bit when making it group writable. Without this the shuffle
238+
// service can't read the shuffle files in a secure setup.
239+
createWorldReadableFile(tmpFile)
240+
}
241+
(blockId, tmpFile)
185242
}
186243

187244
/**

core/src/main/scala/org/apache/spark/storage/DiskStore.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ private[spark] class DiskStore(
5050
private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS)
5151
private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
5252

53+
private val shuffleServiceFetchRddEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) &&
54+
conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
55+
5356
def getSize(blockId: BlockId): Long = blockSizes.get(blockId)
5457

5558
/**
@@ -71,6 +74,13 @@ private[spark] class DiskStore(
7174
logDebug(s"Attempting to put block $blockId")
7275
val startTimeNs = System.nanoTime()
7376
val file = diskManager.getFile(blockId)
77+
78+
// SPARK-37618: If fetching cached RDDs from the shuffle service is enabled, we must make
79+
// the file world readable, as it will not be owned by the group running the shuffle service
80+
// in a secure environment. This is due to changing directory permissions to allow deletion,
81+
if (shuffleServiceFetchRddEnabled) {
82+
diskManager.createWorldReadableFile(file)
83+
}
7484
val out = new CountingWritableChannel(openForWrite(file))
7585
var threwException: Boolean = true
7686
try {

0 commit comments

Comments
 (0)