From b91a344bf859aaff6134937917f3b3e78825f1ee Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sun, 18 Sep 2022 22:22:44 +0800 Subject: [PATCH 01/27] [SPARK-40480][SHUFFLE]Remove push-based shuffle data after query finished --- .../network/shuffle/BlockStoreClient.java | 13 +++ .../network/shuffle/ExternalBlockHandler.java | 6 ++ .../shuffle/ExternalBlockStoreClient.java | 14 +++ .../shuffle/MergedShuffleFileManager.java | 7 ++ .../shuffle/NoOpMergedShuffleFileManager.java | 5 ++ .../shuffle/RemoteBlockPushResolver.java | 14 +++ .../protocol/BlockTransferMessage.java | 3 +- .../shuffle/protocol/RemoveShuffleMerge.java | 85 +++++++++++++++++++ .../apache/spark/scheduler/DAGScheduler.scala | 47 +++++----- .../storage/BlockManagerMasterEndpoint.scala | 10 ++- 10 files changed, 180 insertions(+), 24 deletions(-) create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 253fb7aca1d8..95fe50aaf99c 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -255,4 +255,17 @@ public void getMergedBlockMeta( MergedBlocksMetaListener listener) { throw new UnsupportedOperationException(); } + + /** + * Remove the shuffle merge data in shuffle services + * + * @param host the host of the remote node. + * @param port the port of the remote node. + * @param shuffleId shuffle id. + * + * @since 3.4.0 + */ + public boolean removeShuffleMerge(String host, int port, int shuffleId) { + throw new UnsupportedOperationException(); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index 4e40090b065e..68faeb6c4458 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -224,6 +224,12 @@ protected void handleMessage( } finally { responseDelayContext.stop(); } + } else if (msgObj instanceof RemoveShuffleMerge) { + RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj; + checkAuth(client, msg.appId); + logger.info("Remove shuffle merge data for application %s shuffle %d", + msg.appId, msg.shuffleId); + mergeManager.removeShuffleMerge(msg.appId, msg.shuffleId); } else if (msgObj instanceof DiagnoseCorruption) { DiagnoseCorruption msg = (DiagnoseCorruption) msgObj; checkAuth(client, msg.appId); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index b066d99e8ef8..1b0152252bed 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -256,6 +256,20 @@ public void onFailure(Throwable e) { } } + @Override + public boolean removeShuffleMerge(String host, int port, int shuffleId) { + checkInit(); + try { + TransportClient client = clientFactory.createClient(host, port); + client.send(new RemoveShuffleMerge(appId, shuffleId).toByteBuffer()); + } catch (Exception e) { + logger.error("Exception while sending RemoveShuffleMerge request to {}:{}", + host, port, e); + return false; + } + return true; + } + @Override public MetricSet shuffleMetrics() { checkInit(); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java index 051684a92d0b..e66e285d0df0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java @@ -121,6 +121,13 @@ MergedBlockMeta getMergedBlockMeta( */ String[] getMergedBlockDirs(String appId); + /** + * Remove shuffle merge data files. + * + * @param appId application ID + */ + void removeShuffleMerge(String appId, int shuffleId); + /** * Optionally close any resources associated the MergedShuffleFileManager, such as the * leveldb for state persistence. diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java index 876b10095938..491977dd4bd6 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java @@ -84,4 +84,9 @@ public MergedBlockMeta getMergedBlockMeta( public String[] getMergedBlockDirs(String appId) { throw new UnsupportedOperationException("Cannot handle shuffle block merge"); } + + @Override + public void removeShuffleMerge(String appId, int shuffleId) { + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 816d1082850c..828afe3f4c59 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -396,6 +396,20 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { } } + @Override + public void removeShuffleMerge(String appId, int shuffleId) { + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); + AppShuffleMergePartitionsInfo partitionsInfo = appShuffleInfo.shuffles.remove(shuffleId); + if (partitionsInfo != null) { + AppAttemptShuffleMergeId appAttemptShuffleMergeId = + new AppAttemptShuffleMergeId( + appId, appShuffleInfo.attemptId, shuffleId, partitionsInfo.shuffleMergeId); + submitCleanupTask(() -> + closeAndDeleteOutdatedPartitions( + appAttemptShuffleMergeId, partitionsInfo.shuffleMergePartitions)); + } + } + /** * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo. * If cleanupLocalDirs is true, the merged shuffle files will also be deleted. diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index ad959c7e2e7c..33411baa09f8 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -50,7 +50,7 @@ public enum Type { FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), LOCAL_DIRS_FOR_EXECUTORS(11), PUSH_BLOCK_STREAM(12), FINALIZE_SHUFFLE_MERGE(13), MERGE_STATUSES(14), FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), CORRUPTION_CAUSE(17), - PUSH_BLOCK_RETURN_CODE(18); + PUSH_BLOCK_RETURN_CODE(18), REMOVE_SHUFFLE_MERGE(19); private final byte id; @@ -88,6 +88,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) { case 16: return DiagnoseCorruption.decode(buf); case 17: return CorruptionCause.decode(buf); case 18: return BlockPushReturnCode.decode(buf); + case 19: return RemoveShuffleMerge.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java new file mode 100644 index 000000000000..37d01bc2afa3 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.spark.network.protocol.Encoders; + +/** + * Remove the merged data for a given shuffle. + * Returns {@link Boolean} + * + * @since 3.4.0 + */ +public class RemoveShuffleMerge extends BlockTransferMessage { + public final String appId; + public final int shuffleId; + + public RemoveShuffleMerge(String appId, int shuffleId) { + this.appId = appId; + this.shuffleId = shuffleId; + } + + @Override + protected Type type() { + return Type.REMOVE_SHUFFLE_MERGE; + } + + @Override + public int hashCode() { + return Objects.hashCode(appId, shuffleId); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) + .append("shuffleId", shuffleId) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other != null && other instanceof RemoveShuffleMerge) { + RemoveShuffleMerge o = (RemoveShuffleMerge) other; + return Objects.equal(appId, o.appId) + && shuffleId == o.shuffleId; + } + return false; + } + + @Override + public int encodedLength() { + return Encoders.Strings.encodedLength(appId) + 4; + } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, appId); + buf.writeInt(shuffleId); + } + + public static RemoveShuffleMerge decode(ByteBuf buf) { + String appId = Encoders.Strings.decode(buf); + int shuffleId = buf.readInt(); + return new RemoveShuffleMerge(appId, shuffleId); + } +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cc991178481f..ef8869645873 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1398,9 +1398,7 @@ private[spark] class DAGScheduler( */ private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { assert(stage.shuffleDep.shuffleMergeAllowed && !stage.shuffleDep.isShuffleMergeFinalizedMarked) - if (stage.shuffleDep.getMergerLocs.isEmpty) { - getAndSetShufflePushMergerLocations(stage) - } + getAndSetShufflePushMergerLocations(stage) val shuffleId = stage.shuffleDep.shuffleId val shuffleMergeId = stage.shuffleDep.shuffleMergeId @@ -1416,16 +1414,24 @@ private[spark] class DAGScheduler( } private def getAndSetShufflePushMergerLocations(stage: ShuffleMapStage): Seq[BlockManagerId] = { - val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( - stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) - if (mergerLocs.nonEmpty) { - stage.shuffleDep.setMergerLocs(mergerLocs) + stage.shuffleDep.synchronized { + val oldMergeLocs = stage.shuffleDep.getMergerLocs + if (oldMergeLocs.isEmpty) { + val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( + stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) + if (mergerLocs.nonEmpty) { + stage.shuffleDep.setMergerLocs(mergerLocs) + mapOutputTracker.registerShufflePushMergerLocations( + stage.shuffleDep.shuffleId, mergerLocs) + } + logDebug(s"Shuffle merge locations for shuffle ${stage.shuffleDep.shuffleId} with" + + s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" + + s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") + mergerLocs + } else { + oldMergeLocs + } } - - logDebug(s"Shuffle merge locations for shuffle ${stage.shuffleDep.shuffleId} with" + - s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" + - s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") - mergerLocs } /** Called when stage's parents are available and we can now do its task. */ @@ -2629,16 +2635,13 @@ private[spark] class DAGScheduler( shuffleIdToMapStage.filter { case (_, stage) => stage.shuffleDep.shuffleMergeAllowed && stage.shuffleDep.getMergerLocs.isEmpty && runningStages.contains(stage) - }.foreach { case(_, stage: ShuffleMapStage) => - if (getAndSetShufflePushMergerLocations(stage).nonEmpty) { - logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" + - s" ${stage.shuffleDep.shuffleId} and shuffle merge" + - s" ${stage.shuffleDep.shuffleMergeId} with ${stage.shuffleDep.getMergerLocs.size}" + - s" merger locations") - mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId, - stage.shuffleDep.getMergerLocs) - } - } + }.foreach { case (_, stage: ShuffleMapStage) => + getAndSetShufflePushMergerLocations(stage) + logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" + + s" ${stage.shuffleDep.shuffleId} and shuffle merge" + + s" ${stage.shuffleDep.shuffleMergeId} with ${stage.shuffleDep.getMergerLocs.size}" + + s" merger locations") + } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index d30272c51be3..c48f833ecad8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -366,8 +366,16 @@ class BlockManagerMasterEndpoint( } }.getOrElse(Seq.empty) + val removeShuffleMergeFromShuffleServicesFutures = + externalBlockStoreClient.map { shuffleClient => + mapOutputTracker.getShufflePushMergerLocations(shuffleId).map { bmId => + Future[Boolean] { shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId) } + } + }.getOrElse(Seq.empty) + Future.sequence(removeShuffleFromExecutorsFutures ++ - removeShuffleFromShuffleServicesFutures) + removeShuffleFromShuffleServicesFutures ++ + removeShuffleMergeFromShuffleServicesFutures) } /** From 6fb184de1dfedbf971a583add9ac600bb99d3d0b Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Mon, 19 Sep 2022 11:01:07 +0800 Subject: [PATCH 02/27] [SPARK-40480][SHUFFLE]Remove push-based shuffle data after query finished --- .../shuffle/protocol/RemoveShuffleMerge.java | 3 +- .../apache/spark/scheduler/DAGScheduler.scala | 32 +++++++------------ .../storage/BlockManagerMasterEndpoint.scala | 13 +++++--- 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java index 37d01bc2afa3..16964eeacdf7 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java @@ -60,8 +60,7 @@ public String toString() { public boolean equals(Object other) { if (other != null && other instanceof RemoveShuffleMerge) { RemoveShuffleMerge o = (RemoveShuffleMerge) other; - return Objects.equal(appId, o.appId) - && shuffleId == o.shuffleId; + return shuffleId == o.shuffleId && Objects.equal(appId, o.appId); } return false; } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ef8869645873..7f36880eef38 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1398,7 +1398,7 @@ private[spark] class DAGScheduler( */ private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { assert(stage.shuffleDep.shuffleMergeAllowed && !stage.shuffleDep.isShuffleMergeFinalizedMarked) - getAndSetShufflePushMergerLocations(stage) + configureShufflePushMergerLocations(stage) val shuffleId = stage.shuffleDep.shuffleId val shuffleMergeId = stage.shuffleDep.shuffleMergeId @@ -1413,24 +1413,16 @@ private[spark] class DAGScheduler( } } - private def getAndSetShufflePushMergerLocations(stage: ShuffleMapStage): Seq[BlockManagerId] = { - stage.shuffleDep.synchronized { - val oldMergeLocs = stage.shuffleDep.getMergerLocs - if (oldMergeLocs.isEmpty) { - val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( - stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) - if (mergerLocs.nonEmpty) { - stage.shuffleDep.setMergerLocs(mergerLocs) - mapOutputTracker.registerShufflePushMergerLocations( - stage.shuffleDep.shuffleId, mergerLocs) - } - logDebug(s"Shuffle merge locations for shuffle ${stage.shuffleDep.shuffleId} with" + - s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" + - s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") - mergerLocs - } else { - oldMergeLocs - } + private def configureShufflePushMergerLocations(stage: ShuffleMapStage): Unit = { + if (stage.shuffleDep.getMergerLocs.nonEmpty) return + val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( + stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) + if (mergerLocs.nonEmpty) { + stage.shuffleDep.setMergerLocs(mergerLocs) + mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId, mergerLocs) + logDebug(s"Shuffle merge locations for shuffle ${stage.shuffleDep.shuffleId} with" + + s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" + + s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") } } @@ -2636,7 +2628,7 @@ private[spark] class DAGScheduler( stage.shuffleDep.shuffleMergeAllowed && stage.shuffleDep.getMergerLocs.isEmpty && runningStages.contains(stage) }.foreach { case (_, stage: ShuffleMapStage) => - getAndSetShufflePushMergerLocations(stage) + configureShufflePushMergerLocations(stage) logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" + s" ${stage.shuffleDep.shuffleId} and shuffle merge" + s" ${stage.shuffleDep.shuffleMergeId} with ${stage.shuffleDep.getMergerLocs.size}" + diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index c48f833ecad8..ef9980d9db2a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -367,10 +367,15 @@ class BlockManagerMasterEndpoint( }.getOrElse(Seq.empty) val removeShuffleMergeFromShuffleServicesFutures = - externalBlockStoreClient.map { shuffleClient => - mapOutputTracker.getShufflePushMergerLocations(shuffleId).map { bmId => - Future[Boolean] { shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId) } - } + externalBlockStoreClient.map { + case shuffleClient if Utils.isPushBasedShuffleEnabled(conf, isDriver) => + mapOutputTracker.getShufflePushMergerLocations(shuffleId).map { bmId => + Future[Boolean] { + shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId) + } + } + + case _ => Seq.empty }.getOrElse(Seq.empty) Future.sequence(removeShuffleFromExecutorsFutures ++ From 75e2013e899d7e700b08f57d20a887d2eb6fc93d Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sun, 9 Oct 2022 13:42:34 +0800 Subject: [PATCH 03/27] [SPARK-40480][SHUFFLE]Remove push-based shuffle data after query finished --- .../network/shuffle/BlockStoreClient.java | 3 +- .../network/shuffle/ExternalBlockHandler.java | 6 +-- .../shuffle/ExternalBlockStoreClient.java | 4 +- .../shuffle/MergedShuffleFileManager.java | 4 +- .../shuffle/NoOpMergedShuffleFileManager.java | 2 +- .../shuffle/RemoteBlockPushResolver.java | 38 +++++++++++-------- .../shuffle/protocol/RemoveShuffleMerge.java | 19 +++++++--- .../storage/BlockManagerMasterEndpoint.scala | 19 ++++++---- 8 files changed, 57 insertions(+), 38 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 95fe50aaf99c..32222e910df0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -262,10 +262,11 @@ public void getMergedBlockMeta( * @param host the host of the remote node. * @param port the port of the remote node. * @param shuffleId shuffle id. + * @param shuffleMergeId shuffle merge id. * * @since 3.4.0 */ - public boolean removeShuffleMerge(String host, int port, int shuffleId) { + public boolean removeShuffleMerge(String host, int port, int shuffleId, int shuffleMergeId) { throw new UnsupportedOperationException(); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index 68faeb6c4458..b4e12a7bddc0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -227,9 +227,9 @@ protected void handleMessage( } else if (msgObj instanceof RemoveShuffleMerge) { RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj; checkAuth(client, msg.appId); - logger.info("Remove shuffle merge data for application %s shuffle %d", - msg.appId, msg.shuffleId); - mergeManager.removeShuffleMerge(msg.appId, msg.shuffleId); + logger.info("Removing shuffle merge data for application {} shuffle {} shuffleMerge {}", + msg.appId, msg.shuffleId, msg.shuffleMergeId); + mergeManager.removeShuffleMerge(msg.appId, msg.shuffleId, msg.shuffleMergeId); } else if (msgObj instanceof DiagnoseCorruption) { DiagnoseCorruption msg = (DiagnoseCorruption) msgObj; checkAuth(client, msg.appId); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 1b0152252bed..fac3989af36d 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -257,11 +257,11 @@ public void onFailure(Throwable e) { } @Override - public boolean removeShuffleMerge(String host, int port, int shuffleId) { + public boolean removeShuffleMerge(String host, int port, int shuffleId, int shuffleMergeId) { checkInit(); try { TransportClient client = clientFactory.createClient(host, port); - client.send(new RemoveShuffleMerge(appId, shuffleId).toByteBuffer()); + client.send(new RemoveShuffleMerge(appId, shuffleId, shuffleMergeId).toByteBuffer()); } catch (Exception e) { logger.error("Exception while sending RemoveShuffleMerge request to {}:{}", host, port, e); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java index e66e285d0df0..12ddc82df9fe 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java @@ -125,8 +125,10 @@ MergedBlockMeta getMergedBlockMeta( * Remove shuffle merge data files. * * @param appId application ID + * @param shuffleId shuffle ID + * @param shuffleMergeId shuffle merge ID, -1 for all shuffle merges. */ - void removeShuffleMerge(String appId, int shuffleId); + void removeShuffleMerge(String appId, int shuffleId, int shuffleMergeId); /** * Optionally close any resources associated the MergedShuffleFileManager, such as the diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java index 491977dd4bd6..20ace81f7a54 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java @@ -86,7 +86,7 @@ public String[] getMergedBlockDirs(String appId) { } @Override - public void removeShuffleMerge(String appId, int shuffleId) { + public void removeShuffleMerge(String appId, int shuffleId, int shuffleMergeId) { throw new UnsupportedOperationException("Cannot handle shuffle block merge"); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 828afe3f4c59..8c20353af979 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -397,7 +397,7 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { } @Override - public void removeShuffleMerge(String appId, int shuffleId) { + public void removeShuffleMerge(String appId, int shuffleId, int shuffleMergeId) { AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); AppShuffleMergePartitionsInfo partitionsInfo = appShuffleInfo.shuffles.remove(shuffleId); if (partitionsInfo != null) { @@ -667,9 +667,10 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. + Map shuffleMergePartitions = + mergePartitionsInfo.shuffleMergePartitions; submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, shuffleMergePartitions)); } else { // This block covers: // 1. finalization of determinate stage @@ -677,14 +678,17 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { // for which the message is received. shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions); } + } else { + mergePartitionsInfo = new AppShuffleMergePartitionsInfo(shuffleId, true); } + mergePartitionsInfo.setFinalized(true); // Update the DB for the finalized shuffle writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results // sent to the driver will be empty. This can happen when the service didn't receive any // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the // shuffle. - return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); + return mergePartitionsInfo; }); Map shuffleMergePartitions = shuffleMergePartitionsRef.get(); MergeStatuses mergeStatuses; @@ -719,7 +723,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { "finalizing shuffle partition {}", msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId, partition.reduceId); } finally { - partition.closeAllFilesAndDeleteIfNeeded(false); + Boolean deleteFile = partition.mapTracker.getCardinality() == 0; + partition.closeAllFilesAndDeleteIfNeeded(deleteFile); } } } @@ -1472,17 +1477,14 @@ public String toString() { * required for the shuffles of indeterminate stages. */ public static class AppShuffleMergePartitionsInfo { - // ConcurrentHashMap doesn't allow null for keys or values which is why this is required. - // Marker to identify finalized shuffle partitions. - private static final Map SHUFFLE_FINALIZED_MARKER = - Collections.emptyMap(); private final int shuffleMergeId; private final Map shuffleMergePartitions; + private volatile Boolean finalized; public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) { this.shuffleMergeId = shuffleMergeId; - this.shuffleMergePartitions = shuffleFinalized ? SHUFFLE_FINALIZED_MARKER : - new ConcurrentHashMap<>(); + this.shuffleMergePartitions = new ConcurrentHashMap<>(); + this.finalized = shuffleFinalized; } @VisibleForTesting @@ -1490,8 +1492,12 @@ public Map getShuffleMergePartitions() { return shuffleMergePartitions; } - public boolean isFinalized() { - return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER; + public synchronized void setFinalized(boolean finalized) { + this.finalized = finalized; + } + + public synchronized boolean isFinalized() { + return this.finalized; } } @@ -1701,9 +1707,9 @@ void closeAllFilesAndDeleteIfNeeded(boolean delete) { try { if (dataChannel.isOpen()) { dataChannel.close(); - if (delete) { - dataFile.delete(); - } + } + if (delete) { + dataFile.delete(); } } catch (IOException ioe) { logger.warn("Error closing data channel for {} reduceId {}", diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java index 16964eeacdf7..7226c6d59034 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java @@ -32,10 +32,12 @@ public class RemoveShuffleMerge extends BlockTransferMessage { public final String appId; public final int shuffleId; + public final int shuffleMergeId; - public RemoveShuffleMerge(String appId, int shuffleId) { + public RemoveShuffleMerge(String appId, int shuffleId, int shuffleMergeId) { this.appId = appId; this.shuffleId = shuffleId; + this.shuffleMergeId = shuffleMergeId; } @Override @@ -45,7 +47,7 @@ protected Type type() { @Override public int hashCode() { - return Objects.hashCode(appId, shuffleId); + return Objects.hashCode(appId, shuffleId, shuffleMergeId); } @Override @@ -53,6 +55,7 @@ public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("appId", appId) .append("shuffleId", shuffleId) + .append("shuffleMergeId", shuffleMergeId) .toString(); } @@ -60,25 +63,29 @@ public String toString() { public boolean equals(Object other) { if (other != null && other instanceof RemoveShuffleMerge) { RemoveShuffleMerge o = (RemoveShuffleMerge) other; - return shuffleId == o.shuffleId && Objects.equal(appId, o.appId); + return Objects.equal(appId, o.appId) + && shuffleId == o.shuffleId + && shuffleMergeId == o.shuffleMergeId; } return false; } @Override public int encodedLength() { - return Encoders.Strings.encodedLength(appId) + 4; + return Encoders.Strings.encodedLength(appId) + 4 + 4; } @Override public void encode(ByteBuf buf) { Encoders.Strings.encode(buf, appId); buf.writeInt(shuffleId); + buf.writeInt(shuffleMergeId); } public static RemoveShuffleMerge decode(ByteBuf buf) { String appId = Encoders.Strings.decode(buf); int shuffleId = buf.readInt(); - return new RemoveShuffleMerge(appId, shuffleId); + int shuffleMergeId = buf.readInt(); + return new RemoveShuffleMerge(appId, shuffleId, shuffleMergeId); } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index ef9980d9db2a..baa88758e96d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -321,6 +321,12 @@ class BlockManagerMasterEndpoint( } private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { + val mergerLocations = + if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) { + mapOutputTracker.getShufflePushMergerLocations(shuffleId) + } else { + Seq.empty[BlockManagerId] + } val removeMsg = RemoveShuffle(shuffleId) val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => bm.storageEndpoint.ask[Boolean](removeMsg).recover { @@ -367,15 +373,12 @@ class BlockManagerMasterEndpoint( }.getOrElse(Seq.empty) val removeShuffleMergeFromShuffleServicesFutures = - externalBlockStoreClient.map { - case shuffleClient if Utils.isPushBasedShuffleEnabled(conf, isDriver) => - mapOutputTracker.getShufflePushMergerLocations(shuffleId).map { bmId => - Future[Boolean] { - shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId) - } + externalBlockStoreClient.map { shuffleClient => + mergerLocations.map { bmId => + Future[Boolean] { + shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId, -1) } - - case _ => Seq.empty + } }.getOrElse(Seq.empty) Future.sequence(removeShuffleFromExecutorsFutures ++ From 5b39ba3a71e2967ad54f398b32f920781bd1e281 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sun, 9 Oct 2022 13:59:14 +0800 Subject: [PATCH 04/27] [SPARK-40480][SHUFFLE]Remove push-based shuffle data after query finished --- .../apache/spark/network/shuffle/RemoteBlockPushResolver.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 8c20353af979..104fd1e22e46 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -400,6 +400,8 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { public void removeShuffleMerge(String appId, int shuffleId, int shuffleMergeId) { AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); AppShuffleMergePartitionsInfo partitionsInfo = appShuffleInfo.shuffles.remove(shuffleId); + logger.info("Start remove shuffle merge: app {} shuffleId {} shuffleMergeId {}", + appId, shuffleId, shuffleMergeId); if (partitionsInfo != null) { AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId( From 7cff74d666e56fabea9a6ea69ff0a2fc1fe550b3 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sun, 9 Oct 2022 17:24:45 +0800 Subject: [PATCH 05/27] Fix UT --- .../shuffle/RemoteBlockPushResolverSuite.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index eb2c1d9fa5cb..4d4b5cb83c68 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -267,8 +267,10 @@ public void testFailureAfterData() throws IOException { stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); - MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); - assertEquals("num-chunks", 0, blockMeta.getNumChunks()); + RuntimeException e = assertThrows(RuntimeException.class, + () -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0)); + assertTrue(e.getMessage().contains("Merged shuffle index file") && + e.getMessage().contains("not found")); } @Test @@ -281,8 +283,10 @@ public void testFailureAfterMultipleDataBlocks() throws IOException { stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); - MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); - assertEquals("num-chunks", 0, blockMeta.getNumChunks()); + RuntimeException e = assertThrows(RuntimeException.class, + () -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0)); + assertTrue(e.getMessage().contains("Merged shuffle index file") && + e.getMessage().contains("not found")); } @Test From d4132b32ea7af64c80b47a03623e5a1b48ac69c8 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sun, 9 Oct 2022 18:50:32 +0800 Subject: [PATCH 06/27] Update code --- .../network/shuffle/ExternalBlockHandler.java | 2 +- .../shuffle/ExternalBlockStoreClient.java | 4 ++- .../shuffle/MergedShuffleFileManager.java | 7 ++-- .../shuffle/NoOpMergedShuffleFileManager.java | 3 +- .../shuffle/RemoteBlockPushResolver.java | 36 +++++++++++++------ .../shuffle/protocol/RemoveShuffleMerge.java | 18 +++++++--- .../apache/spark/scheduler/DAGScheduler.scala | 10 +++--- 7 files changed, 54 insertions(+), 26 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index b4e12a7bddc0..3d7c1b1ca0cc 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -229,7 +229,7 @@ protected void handleMessage( checkAuth(client, msg.appId); logger.info("Removing shuffle merge data for application {} shuffle {} shuffleMerge {}", msg.appId, msg.shuffleId, msg.shuffleMergeId); - mergeManager.removeShuffleMerge(msg.appId, msg.shuffleId, msg.shuffleMergeId); + mergeManager.removeShuffleMerge(msg); } else if (msgObj instanceof DiagnoseCorruption) { DiagnoseCorruption msg = (DiagnoseCorruption) msgObj; checkAuth(client, msg.appId); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index fac3989af36d..b3395519cca9 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -261,7 +261,9 @@ public boolean removeShuffleMerge(String host, int port, int shuffleId, int shuf checkInit(); try { TransportClient client = clientFactory.createClient(host, port); - client.send(new RemoveShuffleMerge(appId, shuffleId, shuffleMergeId).toByteBuffer()); + client.send( + new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, shuffleMergeId) + .toByteBuffer()); } catch (Exception e) { logger.error("Exception while sending RemoveShuffleMerge request to {}:{}", host, port, e); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java index 12ddc82df9fe..1d64c91bbac4 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java @@ -26,6 +26,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; /** * The MergedShuffleFileManager is used to process push based shuffle when enabled. It works @@ -124,11 +125,9 @@ MergedBlockMeta getMergedBlockMeta( /** * Remove shuffle merge data files. * - * @param appId application ID - * @param shuffleId shuffle ID - * @param shuffleMergeId shuffle merge ID, -1 for all shuffle merges. + * @param removeShuffleMerge Remove shuffle merge RPC */ - void removeShuffleMerge(String appId, int shuffleId, int shuffleMergeId); + void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge); /** * Optionally close any resources associated the MergedShuffleFileManager, such as the diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java index 20ace81f7a54..df69f494aee1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java @@ -26,6 +26,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.util.TransportConf; /** @@ -86,7 +87,7 @@ public String[] getMergedBlockDirs(String appId) { } @Override - public void removeShuffleMerge(String appId, int shuffleId, int shuffleMergeId) { + public void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge) { throw new UnsupportedOperationException("Cannot handle shuffle block merge"); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 104fd1e22e46..fab6d4465a40 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -57,6 +57,7 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -397,19 +398,32 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { } @Override - public void removeShuffleMerge(String appId, int shuffleId, int shuffleMergeId) { + public void removeShuffleMerge(RemoveShuffleMerge msg) { + String appId = msg.appId; + int appAttemptId = msg.appAttemptId; + int shuffleId = msg.shuffleId; AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); - AppShuffleMergePartitionsInfo partitionsInfo = appShuffleInfo.shuffles.remove(shuffleId); - logger.info("Start remove shuffle merge: app {} shuffleId {} shuffleMergeId {}", - appId, shuffleId, shuffleMergeId); - if (partitionsInfo != null) { - AppAttemptShuffleMergeId appAttemptShuffleMergeId = - new AppAttemptShuffleMergeId( - appId, appShuffleInfo.attemptId, shuffleId, partitionsInfo.shuffleMergeId); - submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, partitionsInfo.shuffleMergePartitions)); + if (appShuffleInfo.attemptId != appAttemptId) { + throw new IllegalArgumentException( + String.format("The attempt id %s in this RemoveShuffleMerge message does not match " + + "with the current attempt id %s stored in shuffle service for application %s", + appAttemptId, appShuffleInfo.attemptId, appId)); } + + appShuffleInfo.shuffles.compute(shuffleId, (shuffleIdKey, partitionsInfo) -> { + if (null != partitionsInfo) { + submitCleanupTask(() -> { + closeAndDeleteOutdatedPartitions( + new AppAttemptShuffleMergeId( + appId, appAttemptId, shuffleId, partitionsInfo.shuffleMergeId), + partitionsInfo.shuffleMergePartitions); + writeAppAttemptShuffleMergeInfoToDB( + new AppAttemptShuffleMergeId( + appId, appAttemptId, shuffleId, msg.shuffleMergeId)); + }); + } + return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); + }); } /** diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java index 7226c6d59034..b4a0ccde920c 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java @@ -31,11 +31,17 @@ */ public class RemoveShuffleMerge extends BlockTransferMessage { public final String appId; + public final int appAttemptId; public final int shuffleId; public final int shuffleMergeId; - public RemoveShuffleMerge(String appId, int shuffleId, int shuffleMergeId) { + public RemoveShuffleMerge( + String appId, + int appAttemptId, + int shuffleId, + int shuffleMergeId) { this.appId = appId; + this.appAttemptId = appAttemptId; this.shuffleId = shuffleId; this.shuffleMergeId = shuffleMergeId; } @@ -47,13 +53,14 @@ protected Type type() { @Override public int hashCode() { - return Objects.hashCode(appId, shuffleId, shuffleMergeId); + return Objects.hashCode(appId, appAttemptId, shuffleId, shuffleMergeId); } @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("appId", appId) + .append("attemptId", appAttemptId) .append("shuffleId", shuffleId) .append("shuffleMergeId", shuffleMergeId) .toString(); @@ -64,6 +71,7 @@ public boolean equals(Object other) { if (other != null && other instanceof RemoveShuffleMerge) { RemoveShuffleMerge o = (RemoveShuffleMerge) other; return Objects.equal(appId, o.appId) + && appAttemptId == o.appAttemptId && shuffleId == o.shuffleId && shuffleMergeId == o.shuffleMergeId; } @@ -72,20 +80,22 @@ public boolean equals(Object other) { @Override public int encodedLength() { - return Encoders.Strings.encodedLength(appId) + 4 + 4; + return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4; } @Override public void encode(ByteBuf buf) { Encoders.Strings.encode(buf, appId); + buf.writeInt(appAttemptId); buf.writeInt(shuffleId); buf.writeInt(shuffleMergeId); } public static RemoveShuffleMerge decode(ByteBuf buf) { String appId = Encoders.Strings.decode(buf); + int attemptId = buf.readInt(); int shuffleId = buf.readInt(); int shuffleMergeId = buf.readInt(); - return new RemoveShuffleMerge(appId, shuffleId, shuffleMergeId); + return new RemoveShuffleMerge(appId, attemptId, shuffleId, shuffleMergeId); } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7f36880eef38..c53730818a7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2629,10 +2629,12 @@ private[spark] class DAGScheduler( runningStages.contains(stage) }.foreach { case (_, stage: ShuffleMapStage) => configureShufflePushMergerLocations(stage) - logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" + - s" ${stage.shuffleDep.shuffleId} and shuffle merge" + - s" ${stage.shuffleDep.shuffleMergeId} with ${stage.shuffleDep.getMergerLocs.size}" + - s" merger locations") + if (stage.shuffleDep.getMergerLocs.nonEmpty) { + logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" + + s" ${stage.shuffleDep.shuffleId} and shuffle merge" + + s" ${stage.shuffleDep.shuffleMergeId} with ${stage.shuffleDep.getMergerLocs.size}" + + s" merger locations") + } } } } From 3a0b6a45a2ae61bc892414bc7eb0b7696f5f694e Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Tue, 11 Oct 2022 16:32:10 +0800 Subject: [PATCH 07/27] Fix UT --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index a163fef693ea..fade0b86dd8f 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -1207,7 +1207,7 @@ private[spark] class MapOutputTrackerMaster( // This method is only called in local-mode. override def getShufflePushMergerLocations(shuffleId: Int): Seq[BlockManagerId] = { - shuffleStatuses(shuffleId).getShufflePushMergerLocations + shuffleStatuses.get(shuffleId).map(_.getShufflePushMergerLocations).getOrElse(Seq.empty) } override def stop(): Unit = { From 0a9730d276034a5529bac38563415b0b5cfa340c Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Wed, 12 Oct 2022 12:08:09 +0800 Subject: [PATCH 08/27] fix UT --- .../org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 16fa42056921..d55aed8d0908 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -323,7 +323,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { val dataFileReload3Again = ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager3, partitionId3, 1) - dataFileReload3Again.length() should be ((4 * 5 + 1) * DUMMY_BLOCK_DATA.length) + dataFileReload3Again.length() should be (0) s3.stop() } From 0f24bb9031293bd053c44116d5c998f41c1b17f1 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Wed, 23 Nov 2022 10:42:28 +0800 Subject: [PATCH 09/27] Update comment --- .../apache/spark/network/shuffle/MergedShuffleFileManager.java | 3 ++- .../spark/network/shuffle/NoOpMergedShuffleFileManager.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java index 1d64c91bbac4..ab498367d500 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java @@ -125,7 +125,8 @@ MergedBlockMeta getMergedBlockMeta( /** * Remove shuffle merge data files. * - * @param removeShuffleMerge Remove shuffle merge RPC + * @param removeShuffleMerge contains shuffle details (appId, shuffleId, etc) to uniquely + * identify a shuffle to be removed */ void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java index df69f494aee1..7d8f9e27402a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java @@ -88,6 +88,6 @@ public String[] getMergedBlockDirs(String appId) { @Override public void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge) { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); } } From c848da6cf45a4ea63cdb5536a67f2d15687a5c7f Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sun, 18 Dec 2022 11:31:11 +0800 Subject: [PATCH 10/27] update RemoteBlockPushResolver.removeShuffleMerge() code --- .../shuffle/RemoteBlockPushResolver.java | 129 +++++++++++++----- .../storage/BlockManagerMasterEndpoint.scala | 5 +- 2 files changed, 101 insertions(+), 33 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index fab6d4465a40..a1c74d2eb34c 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -96,6 +96,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { public static final String MERGE_DIR_KEY = "mergeDir"; public static final String ATTEMPT_ID_KEY = "attemptId"; private static final int UNDEFINED_ATTEMPT_ID = -1; + + /** + * The flag for deleting the current merged shuffle data. + */ + public static final int DELETE_CURRENT_MERGED_SHUFFLE_ID = -1; + private static final String DB_KEY_DELIMITER = ";"; private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler(); // ByteBuffer to respond to client upon a successful merge of a pushed block @@ -399,30 +405,51 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { @Override public void removeShuffleMerge(RemoveShuffleMerge msg) { - String appId = msg.appId; - int appAttemptId = msg.appAttemptId; - int shuffleId = msg.shuffleId; - AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId); - if (appShuffleInfo.attemptId != appAttemptId) { + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId); + if (appShuffleInfo.attemptId != msg.appAttemptId) { throw new IllegalArgumentException( String.format("The attempt id %s in this RemoveShuffleMerge message does not match " + "with the current attempt id %s stored in shuffle service for application %s", - appAttemptId, appShuffleInfo.attemptId, appId)); - } - - appShuffleInfo.shuffles.compute(shuffleId, (shuffleIdKey, partitionsInfo) -> { - if (null != partitionsInfo) { + msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); + } + appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> { + boolean deleteCurrent = + msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID || + msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId; + AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = + new AppAttemptShuffleMergeId( + msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); + AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId( + msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId); + if(deleteCurrent) { + // request to clean up shuffle we are currently hosting + if (!mergePartitionsInfo.isFinalized()) { + submitCleanupTask(() -> { + closeAndDeleteOutdatedPartitions( + currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions); + writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); + }); + } else { + submitCleanupTask(() -> { + deleteMergedFiles(currentAppAttemptShuffleMergeId, + mergePartitionsInfo.getReduceIds()); + writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); + mergePartitionsInfo.setReduceIds(new int[0]); + }); + } + } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { + throw new RuntimeException(String.format("Asked to remove old shuffle merged data for " + + "application %s shuffleId %s shuffleMergeId %s, but current shuffleMergeId %s ", + msg.appId, msg.shuffleId, msg.shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); + } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { + // cleanup request for newer shuffle - remove the outdated data we have. submitCleanupTask(() -> { closeAndDeleteOutdatedPartitions( - new AppAttemptShuffleMergeId( - appId, appAttemptId, shuffleId, partitionsInfo.shuffleMergeId), - partitionsInfo.shuffleMergePartitions); - writeAppAttemptShuffleMergeInfoToDB( - new AppAttemptShuffleMergeId( - appId, appAttemptId, shuffleId, msg.shuffleMergeId)); + currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions); + writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); }); } - return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); + return mergePartitionsInfo; }); } @@ -500,6 +527,39 @@ void closeAndDeleteOutdatedPartitions( }); } + void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, int[] reduceIds) { + removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId); + AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId); + int shuffleId = appAttemptShuffleMergeId.shuffleId; + int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId; + for (int reduceId : reduceIds) { + try { + File dataFile = + appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId); + dataFile.delete(); + } catch (Exception e) { + logger.warn("Fail to delete merged data file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId); + } + try { + File indexFile = new File( + appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId)); + indexFile.delete(); + } catch (Exception e) { + logger.warn("Fail to delete merged index file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId); + } + try { + File metaFile = + appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId); + metaFile.delete(); + } catch (Exception e) { + logger.warn("Fail to delete merged meta file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId); + } + } + } + /** * Remove the finalized shuffle partition information for a specific appAttemptShuffleMergeId * @param appAttemptShuffleMergeId @@ -683,10 +743,9 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. - Map shuffleMergePartitions = - mergePartitionsInfo.shuffleMergePartitions; submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, shuffleMergePartitions)); + closeAndDeleteOutdatedPartitions( + appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); } else { // This block covers: // 1. finalization of determinate stage @@ -694,17 +753,14 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { // for which the message is received. shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions); } - } else { - mergePartitionsInfo = new AppShuffleMergePartitionsInfo(shuffleId, true); } - mergePartitionsInfo.setFinalized(true); // Update the DB for the finalized shuffle writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results // sent to the driver will be empty. This can happen when the service didn't receive any // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the // shuffle. - return mergePartitionsInfo; + return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); }); Map shuffleMergePartitions = shuffleMergePartitionsRef.get(); MergeStatuses mergeStatuses; @@ -747,6 +803,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId, bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); + appShuffleInfo.shuffles.get(msg.shuffleId).setReduceIds(Ints.toArray(reduceIds)); } logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of shuffle merge completed", msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId); @@ -1493,14 +1550,20 @@ public String toString() { * required for the shuffles of indeterminate stages. */ public static class AppShuffleMergePartitionsInfo { + // ConcurrentHashMap doesn't allow null for keys or values which is why this is required. + // Marker to identify finalized shuffle partitions. + private static final Map SHUFFLE_FINALIZED_MARKER = + Collections.emptyMap(); private final int shuffleMergeId; private final Map shuffleMergePartitions; - private volatile Boolean finalized; + + private int[] reduceIds; public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) { this.shuffleMergeId = shuffleMergeId; - this.shuffleMergePartitions = new ConcurrentHashMap<>(); - this.finalized = shuffleFinalized; + this.shuffleMergePartitions = shuffleFinalized ? SHUFFLE_FINALIZED_MARKER : + new ConcurrentHashMap<>(); + this.reduceIds = new int[0]; } @VisibleForTesting @@ -1508,12 +1571,16 @@ public Map getShuffleMergePartitions() { return shuffleMergePartitions; } - public synchronized void setFinalized(boolean finalized) { - this.finalized = finalized; + public boolean isFinalized() { + return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER; + } + + public void setReduceIds(int[] reduceIds) { + this.reduceIds = reduceIds; } - public synchronized boolean isFinalized() { - return this.finalized; + public int[] getReduceIds() { + return reduceIds; } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index baa88758e96d..b3842bba4856 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -32,7 +32,7 @@ import com.google.common.cache.CacheBuilder import org.apache.spark.{MapOutputTrackerMaster, SparkConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.{config, Logging} -import org.apache.spark.network.shuffle.ExternalBlockStoreClient +import org.apache.spark.network.shuffle.{ExternalBlockStoreClient, RemoteBlockPushResolver} import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend} @@ -376,7 +376,8 @@ class BlockManagerMasterEndpoint( externalBlockStoreClient.map { shuffleClient => mergerLocations.map { bmId => Future[Boolean] { - shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId, -1) + shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId, + RemoteBlockPushResolver.DELETE_CURRENT_MERGED_SHUFFLE_ID) } } }.getOrElse(Seq.empty) From e54bb75af55b254adbcbc79629a7631e81b3bf46 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Tue, 20 Dec 2022 23:03:49 +0800 Subject: [PATCH 11/27] Save the latest shuffle merge id in appShuffleInfo.shuffles --- .../apache/spark/network/shuffle/RemoteBlockPushResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index a1c74d2eb34c..836c03aa839f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -449,7 +449,7 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); }); } - return mergePartitionsInfo; + return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); }); } From 4d2b940253703c7559d03cd4c7843b9500794fc7 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Thu, 22 Dec 2022 16:07:39 +0800 Subject: [PATCH 12/27] Update code --- .../shuffle/RemoteBlockPushResolver.java | 76 ++++++++++++------- .../storage/BlockManagerMasterEndpoint.scala | 2 +- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 836c03aa839f..5a9e7d40ed96 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -100,7 +100,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { /** * The flag for deleting the current merged shuffle data. */ - public static final int DELETE_CURRENT_MERGED_SHUFFLE_ID = -1; + public static final int DELETE_ALL_MERGED_SHUFFLE = -1; private static final String DB_KEY_DELIMITER = ";"; private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler(); @@ -412,16 +412,27 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { + "with the current attempt id %s stored in shuffle service for application %s", msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); } - appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> { - boolean deleteCurrent = - msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID || + appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> { + if (mergePartitionsInfo == null) { + if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) { + return null; + } else { + writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId( + msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId)); + return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); + } + } + boolean deleteAllMergedShuffle = + msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE || msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId; + int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ? + msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId; AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = new AppAttemptShuffleMergeId( msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId( - msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId); - if(deleteCurrent) { + msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId); + if(deleteAllMergedShuffle) { // request to clean up shuffle we are currently hosting if (!mergePartitionsInfo.isFinalized()) { submitCleanupTask(() -> { @@ -431,17 +442,17 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { }); } else { submitCleanupTask(() -> { - deleteMergedFiles(currentAppAttemptShuffleMergeId, + deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo, mergePartitionsInfo.getReduceIds()); writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); mergePartitionsInfo.setReduceIds(new int[0]); }); } - } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { + } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { throw new RuntimeException(String.format("Asked to remove old shuffle merged data for " + "application %s shuffleId %s shuffleMergeId %s, but current shuffleMergeId %s ", - msg.appId, msg.shuffleId, msg.shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); - } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { + msg.appId, msg.shuffleId, shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); + } else if (shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // cleanup request for newer shuffle - remove the outdated data we have. submitCleanupTask(() -> { closeAndDeleteOutdatedPartitions( @@ -449,7 +460,7 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); }); } - return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); + return new AppShuffleMergePartitionsInfo(shuffleMergeId, true); }); } @@ -527,35 +538,46 @@ void closeAndDeleteOutdatedPartitions( }); } - void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, int[] reduceIds) { + void deleteMergedFiles( + AppAttemptShuffleMergeId appAttemptShuffleMergeId, + AppShuffleInfo appShuffleInfo, + int[] reduceIds) { removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId); - AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId); int shuffleId = appAttemptShuffleMergeId.shuffleId; int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId; for (int reduceId : reduceIds) { try { File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId); - dataFile.delete(); + if(!dataFile.delete()) { + logger.warn("Fail to delete merged data file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId); + } } catch (Exception e) { - logger.warn("Fail to delete merged data file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId); + logger.error(String.format("Fail to delete merged data file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId), e); } try { File indexFile = new File( appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId)); - indexFile.delete(); + if(!indexFile.delete()) { + logger.warn("Fail to delete merged index file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId); + } } catch (Exception e) { - logger.warn("Fail to delete merged index file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId); + logger.error(String.format("Fail to delete merged index file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId), e); } try { File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId); - metaFile.delete(); + if(!metaFile.delete()) { + logger.warn("Fail to delete merged meta file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId); + } } catch (Exception e) { - logger.warn("Fail to delete merged meta file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId); + logger.error(String.format("Fail to delete merged meta file for {} reduceId {}", + appAttemptShuffleMergeId, reduceId), e); } } } @@ -795,8 +817,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { "finalizing shuffle partition {}", msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId, partition.reduceId); } finally { - Boolean deleteFile = partition.mapTracker.getCardinality() == 0; - partition.closeAllFilesAndDeleteIfNeeded(deleteFile); + partition.closeAllFilesAndDeleteIfNeeded(false); } } } @@ -1557,13 +1578,12 @@ public static class AppShuffleMergePartitionsInfo { private final int shuffleMergeId; private final Map shuffleMergePartitions; - private int[] reduceIds; + private final AtomicReference reduceIds = new AtomicReference<>(new int[0]); public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) { this.shuffleMergeId = shuffleMergeId; this.shuffleMergePartitions = shuffleFinalized ? SHUFFLE_FINALIZED_MARKER : new ConcurrentHashMap<>(); - this.reduceIds = new int[0]; } @VisibleForTesting @@ -1576,11 +1596,11 @@ public boolean isFinalized() { } public void setReduceIds(int[] reduceIds) { - this.reduceIds = reduceIds; + this.reduceIds.set(reduceIds); } public int[] getReduceIds() { - return reduceIds; + return this.reduceIds.get(); } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index b3842bba4856..9f03228bb4f4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -377,7 +377,7 @@ class BlockManagerMasterEndpoint( mergerLocations.map { bmId => Future[Boolean] { shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId, - RemoteBlockPushResolver.DELETE_CURRENT_MERGED_SHUFFLE_ID) + RemoteBlockPushResolver.DELETE_ALL_MERGED_SHUFFLE) } } }.getOrElse(Seq.empty) From 5d3aa2ddc82df7e40870b2b07b263f2630bc23e3 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 23 Dec 2022 11:35:47 +0800 Subject: [PATCH 13/27] Fix UT --- .../shuffle/RemoteBlockPushResolver.java | 3 +- .../shuffle/RemoteBlockPushResolverSuite.java | 67 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 5a9e7d40ed96..f95c33e53015 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -817,7 +817,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { "finalizing shuffle partition {}", msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId, partition.reduceId); } finally { - partition.closeAllFilesAndDeleteIfNeeded(false); + Boolean deleteFile = partition.mapTracker.getCardinality() == 0; + partition.closeAllFilesAndDeleteIfNeeded(deleteFile); } } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 4d4b5cb83c68..9c75443e6a47 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -31,11 +31,13 @@ import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -1320,6 +1322,71 @@ public void testJsonSerializationOfPushShufflePartitionInfo() throws IOException RemoteBlockPushResolver.AppAttemptShuffleMergeId.class)); } + @Test + public void testRemoveShuffleMerge() throws IOException, InterruptedException { + Semaphore closed = new Semaphore(0); + String testApp = "testRemoveShuffleMerge"; + RemoteBlockPushResolver pushResolver = new RemoteBlockPushResolver(conf, null) { + @Override + void closeAndDeleteOutdatedPartitions( + AppAttemptShuffleMergeId appAttemptShuffleMergeId, + Map partitions) { + super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, partitions); + closed.release(); + } + + @Override + void deleteMergedFiles( + AppAttemptShuffleMergeId appAttemptShuffleMergeId, + AppShuffleInfo appShuffleInfo, + int[] reduceIds) { + super.deleteMergedFiles(appAttemptShuffleMergeId, appShuffleInfo, reduceIds); + closed.release(); + } + }; + pushResolver.registerExecutor(testApp, new ExecutorShuffleInfo( + prepareLocalDirs(localDirs, MERGE_DIRECTORY), 1, MERGE_DIRECTORY_META)); + + // 1. Check whether the data is cleaned up when merged shuffle is finalized + RemoteBlockPushResolver.AppShuffleInfo shuffleInfo = + pushResolver.validateAndGetAppShuffleInfo(testApp); + StreamCallbackWithID streamCallback1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 1, 0, 0, 0)); + streamCallback1.onData(streamCallback1.getID(), ByteBuffer.wrap(new byte[2])); + streamCallback1.onComplete(streamCallback1.getID()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 1)); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists()); + assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists()); + assertTrue(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); + pushResolver.removeShuffleMerge( + new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 1)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists()); + assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists()); + assertFalse(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); + + // 2. Check whether the data is cleaned up when merged shuffle is not finalized. + StreamCallbackWithID streamCallback2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, NO_ATTEMPT_ID, 2, 1, 0, 0, 0)); + streamCallback2.onData(streamCallback2.getID(), ByteBuffer.wrap(new byte[2])); + streamCallback2.onComplete(streamCallback2.getID()); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists()); + pushResolver.removeShuffleMerge( + new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 2, 1)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists()); + + // 3. Check whether the data is cleaned up when higher shuffleMergeId finalize request comes + StreamCallbackWithID streamCallback3 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, NO_ATTEMPT_ID, 3, 1, 0, 0, 0)); + streamCallback3.onData(streamCallback3.getID(), ByteBuffer.wrap(new byte[2])); + streamCallback3.onComplete(streamCallback3.getID()); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 3, 2)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists()); + } + private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException { pushResolver = new RemoteBlockPushResolver(conf, null) { @Override From d034985907b0c508a97527abe0cf112e9a5fcd89 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 30 Dec 2022 22:12:46 +0800 Subject: [PATCH 14/27] Update code --- .../shuffle/NoOpMergedShuffleFileManager.java | 10 ++--- .../shuffle/RemoteBlockPushResolver.java | 44 +++++++++++-------- .../shuffle/RemoteBlockPushResolverSuite.java | 5 ++- 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java index 7d8f9e27402a..b9001ef6cbe3 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java @@ -44,12 +44,12 @@ public NoOpMergedShuffleFileManager(TransportConf transportConf, File recoveryFi @Override public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); } @Override public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); } @Override @@ -69,7 +69,7 @@ public ManagedBuffer getMergedBlockData( int shuffleMergeId, int reduceId, int chunkId) { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); } @Override @@ -78,12 +78,12 @@ public MergedBlockMeta getMergedBlockMeta( int shuffleId, int shuffleMergeId, int reduceId) { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); } @Override public String[] getMergedBlockDirs(String appId) { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); } @Override diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index f95c33e53015..7cc53e94277b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -422,7 +422,7 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); } } - boolean deleteAllMergedShuffle = + boolean deleteCurrentMergedShuffle = msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE || msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId; int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ? @@ -432,7 +432,7 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId( msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId); - if(deleteAllMergedShuffle) { + if(deleteCurrentMergedShuffle) { // request to clean up shuffle we are currently hosting if (!mergePartitionsInfo.isFinalized()) { submitCleanupTask(() -> { @@ -443,9 +443,7 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { } else { submitCleanupTask(() -> { deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo, - mergePartitionsInfo.getReduceIds()); - writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); - mergePartitionsInfo.setReduceIds(new int[0]); + mergePartitionsInfo.getReduceIds(), false); }); } } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { @@ -541,17 +539,24 @@ void closeAndDeleteOutdatedPartitions( void deleteMergedFiles( AppAttemptShuffleMergeId appAttemptShuffleMergeId, AppShuffleInfo appShuffleInfo, - int[] reduceIds) { - removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId); + int[] reduceIds, + boolean deleteFromDB) { + if(deleteFromDB) { + removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId); + } int shuffleId = appAttemptShuffleMergeId.shuffleId; int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId; + int dataFilesDeleteCnt = 0; + int indexFilesDeleteCnt = 0; + int metaFilesDeleteCnt = 0; for (int reduceId : reduceIds) { try { File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId); - if(!dataFile.delete()) { - logger.warn("Fail to delete merged data file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId); + if (dataFile.delete()) { + dataFilesDeleteCnt++; + } else { + logger.warn("Fail to delete merged file {} for {}", dataFile, appAttemptShuffleMergeId); } } catch (Exception e) { logger.error(String.format("Fail to delete merged data file for {} reduceId {}", @@ -560,9 +565,10 @@ void deleteMergedFiles( try { File indexFile = new File( appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId)); - if(!indexFile.delete()) { - logger.warn("Fail to delete merged index file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId); + if (indexFile.delete()) { + indexFilesDeleteCnt++; + } else { + logger.warn("Fail to delete merged file {} for {}", indexFile, appAttemptShuffleMergeId); } } catch (Exception e) { logger.error(String.format("Fail to delete merged index file for {} reduceId {}", @@ -571,15 +577,18 @@ void deleteMergedFiles( try { File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId); - if(!metaFile.delete()) { - logger.warn("Fail to delete merged meta file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId); + if (metaFile.delete()) { + metaFilesDeleteCnt++; + } else { + logger.warn("Fail to delete merged file {} for {}", metaFile, appAttemptShuffleMergeId); } } catch (Exception e) { logger.error(String.format("Fail to delete merged meta file for {} reduceId {}", appAttemptShuffleMergeId, reduceId), e); } } + logger.info("Delete {} data files, {} index files, {} meta files for {}", + dataFilesDeleteCnt, indexFilesDeleteCnt, metaFilesDeleteCnt, appAttemptShuffleMergeId); } /** @@ -817,8 +826,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { "finalizing shuffle partition {}", msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId, partition.reduceId); } finally { - Boolean deleteFile = partition.mapTracker.getCardinality() == 0; - partition.closeAllFilesAndDeleteIfNeeded(deleteFile); + partition.closeAllFilesAndDeleteIfNeeded(false); } } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 9c75443e6a47..a6d33382c033 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -1339,8 +1339,9 @@ void closeAndDeleteOutdatedPartitions( void deleteMergedFiles( AppAttemptShuffleMergeId appAttemptShuffleMergeId, AppShuffleInfo appShuffleInfo, - int[] reduceIds) { - super.deleteMergedFiles(appAttemptShuffleMergeId, appShuffleInfo, reduceIds); + int[] reduceIds, + boolean deleteFromDB) { + super.deleteMergedFiles(appAttemptShuffleMergeId, appShuffleInfo, reduceIds, deleteFromDB); closed.release(); } }; From 11e7d9a519daf69fd48edfe4bb6d157e316ca9c8 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sat, 31 Dec 2022 10:26:40 +0800 Subject: [PATCH 15/27] Fix UT --- .../shuffle/RemoteBlockPushResolverSuite.java | 12 ++++-------- .../spark/network/yarn/YarnShuffleServiceSuite.scala | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index a6d33382c033..58d55078f95c 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -269,10 +269,8 @@ public void testFailureAfterData() throws IOException { stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); - RuntimeException e = assertThrows(RuntimeException.class, - () -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0)); - assertTrue(e.getMessage().contains("Merged shuffle index file") && - e.getMessage().contains("not found")); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); + assertEquals("num-chunks", 0, blockMeta.getNumChunks()); } @Test @@ -285,10 +283,8 @@ public void testFailureAfterMultipleDataBlocks() throws IOException { stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); - RuntimeException e = assertThrows(RuntimeException.class, - () -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0)); - assertTrue(e.getMessage().contains("Merged shuffle index file") && - e.getMessage().contains("not found")); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); + assertEquals("num-chunks", 0, blockMeta.getNumChunks()); } @Test diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index d55aed8d0908..67fa21665416 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -323,7 +323,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { val dataFileReload3Again = ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager3, partitionId3, 1) - dataFileReload3Again.length() should be (0) + dataFileReload3Again.length() should be (294) s3.stop() } From 29f491803e54609053e89ffdc7b99ccaf433e05c Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sun, 1 Jan 2023 10:15:31 +0800 Subject: [PATCH 16/27] Format codestyle --- .../network/shuffle/NoOpMergedShuffleFileManager.java | 10 +++++----- .../spark/network/shuffle/RemoteBlockPushResolver.java | 2 +- .../network/shuffle/protocol/RemoveShuffleMerge.java | 1 + .../network/shuffle/RemoteBlockPushResolverSuite.java | 2 +- .../spark/network/yarn/YarnShuffleServiceSuite.scala | 2 +- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java index b9001ef6cbe3..7d8f9e27402a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java @@ -44,12 +44,12 @@ public NoOpMergedShuffleFileManager(TransportConf transportConf, File recoveryFi @Override public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { - throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); } @Override public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException { - throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); } @Override @@ -69,7 +69,7 @@ public ManagedBuffer getMergedBlockData( int shuffleMergeId, int reduceId, int chunkId) { - throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); } @Override @@ -78,12 +78,12 @@ public MergedBlockMeta getMergedBlockMeta( int shuffleId, int shuffleMergeId, int reduceId) { - throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); } @Override public String[] getMergedBlockDirs(String appId) { - throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); } @Override diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 7cc53e94277b..30d7158eef44 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -57,7 +57,6 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +71,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.shuffledb.DB; import org.apache.spark.network.shuffledb.DBBackend; import org.apache.spark.network.shuffledb.DBIterator; diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java index b4a0ccde920c..3bcb57a70bcb 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; + import org.apache.spark.network.protocol.Encoders; /** diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 58d55078f95c..04bc34d6c1e4 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -37,7 +37,6 @@ import com.google.common.collect.ImmutableMap; import org.apache.commons.io.FileUtils; -import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -58,6 +57,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 67fa21665416..16fa42056921 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -323,7 +323,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { val dataFileReload3Again = ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager3, partitionId3, 1) - dataFileReload3Again.length() should be (294) + dataFileReload3Again.length() should be ((4 * 5 + 1) * DUMMY_BLOCK_DATA.length) s3.stop() } From 3a7d99c38bb1ab78c08c7a52819779fb5ad4bf15 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Mon, 2 Jan 2023 11:38:41 +0800 Subject: [PATCH 17/27] move writeAppAttemptShuffleMergeInfoToDB to main thread --- .../shuffle/RemoteBlockPushResolver.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 30d7158eef44..d8c3cb068458 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -435,16 +435,13 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { if(deleteCurrentMergedShuffle) { // request to clean up shuffle we are currently hosting if (!mergePartitionsInfo.isFinalized()) { - submitCleanupTask(() -> { + submitCleanupTask(() -> closeAndDeleteOutdatedPartitions( - currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions); - writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); - }); + currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); } else { - submitCleanupTask(() -> { + submitCleanupTask(() -> deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo, - mergePartitionsInfo.getReduceIds(), false); - }); + mergePartitionsInfo.getReduceIds(), false)); } } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { throw new RuntimeException(String.format("Asked to remove old shuffle merged data for " + @@ -452,12 +449,11 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { msg.appId, msg.shuffleId, shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); } else if (shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // cleanup request for newer shuffle - remove the outdated data we have. - submitCleanupTask(() -> { + submitCleanupTask(() -> closeAndDeleteOutdatedPartitions( - currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions); - writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); - }); + currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); } + writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); return new AppShuffleMergePartitionsInfo(shuffleMergeId, true); }); } From 613a99a6e3d0b718914e8013ef5c48b67e330b47 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sat, 7 Jan 2023 21:10:20 +0800 Subject: [PATCH 18/27] remove try catch for file.delete() --- .../shuffle/RemoteBlockPushResolver.java | 57 ++++++--------- .../shuffle/RemoteBlockPushResolverSuite.java | 73 ++++++++++++++++--- 2 files changed, 82 insertions(+), 48 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index d8c3cb068458..3eff50db59d6 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -449,9 +449,15 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { msg.appId, msg.shuffleId, shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); } else if (shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // cleanup request for newer shuffle - remove the outdated data we have. - submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + if (!mergePartitionsInfo.isFinalized()) { + submitCleanupTask(() -> + closeAndDeleteOutdatedPartitions( + currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + } else { + submitCleanupTask(() -> + deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo, + mergePartitionsInfo.getReduceIds(), false)); + } } writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); return new AppShuffleMergePartitionsInfo(shuffleMergeId, true); @@ -546,41 +552,20 @@ void deleteMergedFiles( int indexFilesDeleteCnt = 0; int metaFilesDeleteCnt = 0; for (int reduceId : reduceIds) { - try { - File dataFile = - appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId); - if (dataFile.delete()) { - dataFilesDeleteCnt++; - } else { - logger.warn("Fail to delete merged file {} for {}", dataFile, appAttemptShuffleMergeId); - } - } catch (Exception e) { - logger.error(String.format("Fail to delete merged data file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId), e); + File dataFile = + appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId); + if (dataFile.delete()) { + dataFilesDeleteCnt++; } - try { - File indexFile = new File( - appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId)); - if (indexFile.delete()) { - indexFilesDeleteCnt++; - } else { - logger.warn("Fail to delete merged file {} for {}", indexFile, appAttemptShuffleMergeId); - } - } catch (Exception e) { - logger.error(String.format("Fail to delete merged index file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId), e); + File indexFile = new File( + appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId)); + if (indexFile.delete()) { + indexFilesDeleteCnt++; } - try { - File metaFile = - appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId); - if (metaFile.delete()) { - metaFilesDeleteCnt++; - } else { - logger.warn("Fail to delete merged file {} for {}", metaFile, appAttemptShuffleMergeId); - } - } catch (Exception e) { - logger.error(String.format("Fail to delete merged meta file for {} reduceId {}", - appAttemptShuffleMergeId, reduceId), e); + File metaFile = + appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId); + if (metaFile.delete()) { + metaFilesDeleteCnt++; } } logger.info("Delete {} data files, {} index files, {} meta files for {}", diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 04bc34d6c1e4..4c0869402762 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -1343,14 +1343,15 @@ void deleteMergedFiles( }; pushResolver.registerExecutor(testApp, new ExecutorShuffleInfo( prepareLocalDirs(localDirs, MERGE_DIRECTORY), 1, MERGE_DIRECTORY_META)); - - // 1. Check whether the data is cleaned up when merged shuffle is finalized RemoteBlockPushResolver.AppShuffleInfo shuffleInfo = pushResolver.validateAndGetAppShuffleInfo(testApp); - StreamCallbackWithID streamCallback1 = pushResolver.receiveBlockDataAsStream( + + // 1. Check whether the data is cleaned up when merged shuffle is finalized + // 1.1 Cleaned up the merged files when msg.shuffleMergeId is current shuffleMergeId + StreamCallbackWithID streamCallback0 = pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 1, 0, 0, 0)); - streamCallback1.onData(streamCallback1.getID(), ByteBuffer.wrap(new byte[2])); - streamCallback1.onComplete(streamCallback1.getID()); + streamCallback0.onData(streamCallback0.getID(), ByteBuffer.wrap(new byte[2])); + streamCallback0.onComplete(streamCallback0.getID()); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 1)); assertTrue(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists()); assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists()); @@ -1362,26 +1363,74 @@ void deleteMergedFiles( assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists()); assertFalse(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); - // 2. Check whether the data is cleaned up when merged shuffle is not finalized. + // 1.2 Cleaned up the merged files when msg.shuffleMergeId is DELETE_ALL_MERGED_SHUFFLE + StreamCallbackWithID streamCallback1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, NO_ATTEMPT_ID, 1, 1, 0, 0, 0)); + streamCallback1.onData(streamCallback1.getID(), ByteBuffer.wrap(new byte[2])); + streamCallback1.onComplete(streamCallback1.getID()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 1, 1)); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(1, 1, 0).exists()); + assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(1, 1, 0)).exists()); + assertTrue(shuffleInfo.getMergedShuffleDataFile(1, 1, 0).exists()); + pushResolver.removeShuffleMerge(new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 1, + RemoteBlockPushResolver.DELETE_ALL_MERGED_SHUFFLE)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(1, 1, 0).exists()); + assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists()); + assertFalse(shuffleInfo.getMergedShuffleDataFile(1, 1, 0).exists()); + + // 1.3 Cleaned up the merged files when msg.shuffleMergeId < current shuffleMergeId StreamCallbackWithID streamCallback2 = pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 2, 1, 0, 0, 0)); streamCallback2.onData(streamCallback2.getID(), ByteBuffer.wrap(new byte[2])); streamCallback2.onComplete(streamCallback2.getID()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 2, 1)); assertTrue(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists()); - pushResolver.removeShuffleMerge( - new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 2, 1)); - closed.tryAcquire(10, TimeUnit.SECONDS); - assertFalse(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists()); + assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(2, 1, 0)).exists()); + assertTrue(shuffleInfo.getMergedShuffleDataFile(2, 1, 0).exists()); - // 3. Check whether the data is cleaned up when higher shuffleMergeId finalize request comes + RuntimeException e = assertThrows(RuntimeException.class, + () -> pushResolver.removeShuffleMerge( + new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 2, 0))); + assertEquals("Asked to remove old shuffle merged data for application " + testApp + + " shuffleId 2 shuffleMergeId 0, but current shuffleMergeId 1 ", e.getMessage()); + + // 1.4 Cleaned up the merged files when msg.shuffleMergeId > current shuffleMergeId StreamCallbackWithID streamCallback3 = pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 3, 1, 0, 0, 0)); streamCallback3.onData(streamCallback3.getID(), ByteBuffer.wrap(new byte[2])); streamCallback3.onComplete(streamCallback3.getID()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 3, 1)); assertTrue(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 3, 2)); + assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(3, 1, 0)).exists()); + assertTrue(shuffleInfo.getMergedShuffleDataFile(3, 1, 0).exists()); + pushResolver.removeShuffleMerge( + new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 3, 2)); closed.tryAcquire(10, TimeUnit.SECONDS); assertFalse(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists()); + assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(3, 1, 0)).exists()); + assertFalse(shuffleInfo.getMergedShuffleDataFile(3, 1, 0).exists()); + + // 2. Check whether the data is cleaned up when merged shuffle is not finalized. + StreamCallbackWithID streamCallback4 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, NO_ATTEMPT_ID, 4, 1, 0, 0, 0)); + streamCallback4.onData(streamCallback4.getID(), ByteBuffer.wrap(new byte[2])); + streamCallback4.onComplete(streamCallback4.getID()); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(4, 1, 0).exists()); + pushResolver.removeShuffleMerge( + new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 4, 1)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(4, 1, 0).exists()); + + // 3. Check whether the data is cleaned up when higher shuffleMergeId finalize request comes + StreamCallbackWithID streamCallback5 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, NO_ATTEMPT_ID, 5, 1, 0, 0, 0)); + streamCallback5.onData(streamCallback5.getID(), ByteBuffer.wrap(new byte[2])); + streamCallback5.onComplete(streamCallback5.getID()); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(5, 1, 0).exists()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 5, 2)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(5, 1, 0).exists()); } private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException { From 96ba7e181cbae7059caa17265192f3701642de04 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Wed, 11 Jan 2023 20:34:31 +0800 Subject: [PATCH 19/27] Simplify the code and update some comments --- .../shuffle/RemoteBlockPushResolver.java | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 3eff50db59d6..52b60a904672 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -98,7 +98,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private static final int UNDEFINED_ATTEMPT_ID = -1; /** - * The flag for deleting the current merged shuffle data. + * The flag for deleting all merged shuffle data. */ public static final int DELETE_ALL_MERGED_SHUFFLE = -1; @@ -432,32 +432,24 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId( msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId); - if(deleteCurrentMergedShuffle) { - // request to clean up shuffle we are currently hosting - if (!mergePartitionsInfo.isFinalized()) { - submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); - } else { - submitCleanupTask(() -> - deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo, - mergePartitionsInfo.getReduceIds(), false)); - } - } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { - throw new RuntimeException(String.format("Asked to remove old shuffle merged data for " + - "application %s shuffleId %s shuffleMergeId %s, but current shuffleMergeId %s ", - msg.appId, msg.shuffleId, shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); - } else if (shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { - // cleanup request for newer shuffle - remove the outdated data we have. + if(deleteCurrentMergedShuffle || shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { if (!mergePartitionsInfo.isFinalized()) { + // Clean up shuffle data before the shuffle was finalized. Close and delete all the open + // files. submitCleanupTask(() -> closeAndDeleteOutdatedPartitions( currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); } else { + // Current shuffle was finalized, delete all the merged files through reduceIds set + // in finalizeShuffleMerge method. submitCleanupTask(() -> deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo, mergePartitionsInfo.getReduceIds(), false)); } + } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { + throw new RuntimeException(String.format("Asked to remove old shuffle merged data for " + + "application %s shuffleId %s shuffleMergeId %s, but current shuffleMergeId %s ", + msg.appId, msg.shuffleId, shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); } writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); return new AppShuffleMergePartitionsInfo(shuffleMergeId, true); From e06ef7688fc578c19ad5bfed032eedefbade965d Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Thu, 12 Jan 2023 19:08:52 +0800 Subject: [PATCH 20/27] Add UT --- .../apache/spark/MapOutputTrackerSuite.scala | 60 ++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index a13527f4b74c..591b055d19b5 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -17,12 +17,16 @@ package org.apache.spark +import java.util.{Collections => JCollections, HashSet => JHashSet} import java.util.concurrent.atomic.LongAdder +import scala.collection.JavaConverters._ +import scala.collection.concurrent import scala.collection.mutable.ArrayBuffer import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock import org.roaringbitmap.RoaringBitmap import org.apache.spark.LocalSparkContext._ @@ -30,10 +34,11 @@ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, RPC_MESSAGE_MAX_SIZE} import org.apache.spark.internal.config.Tests.IS_TESTING +import org.apache.spark.network.shuffle.ExternalBlockStoreClient import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv} import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus, MapStatus, MergeStatus} import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId} +import org.apache.spark.storage.{BlockManagerId, BlockManagerInfo, BlockManagerMaster, BlockManagerMasterEndpoint, ShuffleBlockId, ShuffleMergedBlockId} class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { private val conf = new SparkConf @@ -913,6 +918,59 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { slaveRpcEnv.shutdown() } + test("SPARK-40480: shuffle remove should cleanup merged files as well") { + val newConf = new SparkConf + newConf.set("spark.shuffle.push.enabled", "true") + newConf.set("spark.shuffle.service.enabled", "true") + newConf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") + newConf.set(IS_TESTING, true) + + val SHUFFLE_ID = 10 + // needs TorrentBroadcast so need a SparkContext + withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc => + val rpcEnv = sc.env.rpcEnv + val masterTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + + val blockStoreClient = mock(classOf[ExternalBlockStoreClient]) + val blockManagerMasterEndpoint = new BlockManagerMasterEndpoint( + rpcEnv, + sc.isLocal, + sc.conf, + sc.listenerBus, + Some(blockStoreClient), + // We dont care about this ... + new concurrent.TrieMap[BlockManagerId, BlockManagerInfo](), + masterTracker, + sc.env.shuffleManager, + true + ) + rpcEnv.stop(sc.env.blockManager.master.driverEndpoint) + sc.env.blockManager.master.driverEndpoint = + rpcEnv.setupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME, + blockManagerMasterEndpoint) + + masterTracker.registerShuffle(SHUFFLE_ID, 10, 10) + val mergerLocs = (1 to 10).map(x => BlockManagerId(s"exec-$x", s"host-$x", x)) + masterTracker.registerShufflePushMergerLocations(SHUFFLE_ID, mergerLocs) + + assert(masterTracker.getShufflePushMergerLocations(SHUFFLE_ID).map(_.host).toSet == + mergerLocs.map(_.host).toSet) + + val foundHosts = JCollections.synchronizedSet(new JHashSet[String]()) + when(blockStoreClient.removeShuffleMerge(any(), any(), any(), any())).thenAnswer( + (m: InvocationOnMock) => { + val host = m.getArgument(0).asInstanceOf[String] + val shuffleId = m.getArgument(2).asInstanceOf[Int] + assert(shuffleId == SHUFFLE_ID) + foundHosts.add(host) + true + }) + + sc.cleaner.get.doCleanupShuffle(SHUFFLE_ID, blocking = true) + assert(foundHosts.asScala == mergerLocs.map(_.host).toSet) + } + } + test("SPARK-34826: Adaptive shuffle mergers") { val newConf = new SparkConf newConf.set("spark.shuffle.push.based.enabled", "true") From 43085d12aad6cbd1c0cd9e5849da5df62e52573a Mon Sep 17 00:00:00 2001 From: wankun Date: Fri, 13 Jan 2023 11:40:45 +0800 Subject: [PATCH 21/27] Update common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java Co-authored-by: otterc --- .../shuffle/RemoteBlockPushResolver.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 52b60a904672..27487cfaada2 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -425,14 +425,13 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { boolean deleteCurrentMergedShuffle = msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE || msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId; - int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ? + int shuffleMergeIdToDelete = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ? msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId; - AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = - new AppAttemptShuffleMergeId( - msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); - AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId( - msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId); - if(deleteCurrentMergedShuffle || shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { + if (deleteCurrentMergedShuffle || + shuffleMergeIdToDelete > mergePartitionsInfo.shuffleMergeId) { + AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId = + new AppAttemptShuffleMergeId( + msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId); if (!mergePartitionsInfo.isFinalized()) { // Clean up shuffle data before the shuffle was finalized. Close and delete all the open // files. @@ -446,13 +445,14 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo, mergePartitionsInfo.getReduceIds(), false)); } - } else if(shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { + } else { throw new RuntimeException(String.format("Asked to remove old shuffle merged data for " + "application %s shuffleId %s shuffleMergeId %s, but current shuffleMergeId %s ", - msg.appId, msg.shuffleId, shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); + msg.appId, msg.shuffleId, shuffleMergeIdToDelete, mergePartitionsInfo.shuffleMergeId)); } - writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); - return new AppShuffleMergePartitionsInfo(shuffleMergeId, true); + writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId( + msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeIdToDelete)); + return new AppShuffleMergePartitionsInfo(shuffleMergeIdToDelete, true); }); } From 371feae344c51e6a99c3d7f6a42023be893ade4a Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 13 Jan 2023 11:42:59 +0800 Subject: [PATCH 22/27] Format code --- .../apache/spark/network/shuffle/RemoteBlockPushResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 27487cfaada2..edb0b6f2d4d1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -535,7 +535,7 @@ void deleteMergedFiles( AppShuffleInfo appShuffleInfo, int[] reduceIds, boolean deleteFromDB) { - if(deleteFromDB) { + if (deleteFromDB) { removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId); } int shuffleId = appAttemptShuffleMergeId.shuffleId; From 6975bcc11065b243948768f19e6cb21ad1f6848f Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 13 Jan 2023 11:49:48 +0800 Subject: [PATCH 23/27] Add a TODO for RemoveShuffleMerge RPC --- .../apache/spark/network/shuffle/ExternalBlockStoreClient.java | 1 + 1 file changed, 1 insertion(+) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index b3395519cca9..aab5cbb9e8af 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -264,6 +264,7 @@ public boolean removeShuffleMerge(String host, int port, int shuffleId, int shuf client.send( new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, shuffleMergeId) .toByteBuffer()); + // TODO(SPARK-42025): Add some error logs for RemoveShuffleMerge RPC. } catch (Exception e) { logger.error("Exception while sending RemoveShuffleMerge request to {}:{}", host, port, e); From 475f8bd97ef36985724ed7f03ac08fdf647fe56c Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 13 Jan 2023 12:16:02 +0800 Subject: [PATCH 24/27] Fix bug and backport UT --- .../storage/BlockManagerMasterEndpoint.scala | 31 +++++++------ .../apache/spark/MapOutputTrackerSuite.scala | 44 +++++++++---------- 2 files changed, 39 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 9f03228bb4f4..e516f1cb3faf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -57,6 +57,7 @@ class BlockManagerMasterEndpoint( isDriver: Boolean) extends IsolatedThreadSafeRpcEndpoint with Logging { + private val testing: Boolean = conf.get(config.Tests.IS_TESTING).getOrElse(false) // Mapping from executor id to the block manager's local disk directories. private val executorIdToLocalDirs = CacheBuilder @@ -321,20 +322,6 @@ class BlockManagerMasterEndpoint( } private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { - val mergerLocations = - if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) { - mapOutputTracker.getShufflePushMergerLocations(shuffleId) - } else { - Seq.empty[BlockManagerId] - } - val removeMsg = RemoveShuffle(shuffleId) - val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => - bm.storageEndpoint.ask[Boolean](removeMsg).recover { - // use false as default value means no shuffle data were removed - handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) - } - }.toSeq - // Find all shuffle blocks on executors that are no longer running val blocksToDeleteByShuffleService = new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] @@ -374,6 +361,12 @@ class BlockManagerMasterEndpoint( val removeShuffleMergeFromShuffleServicesFutures = externalBlockStoreClient.map { shuffleClient => + val mergerLocations = + if (Utils.isPushBasedShuffleEnabled(conf, isDriver)) { + mapOutputTracker.getShufflePushMergerLocations(shuffleId) + } else { + Seq.empty[BlockManagerId] + } mergerLocations.map { bmId => Future[Boolean] { shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId, @@ -382,6 +375,16 @@ class BlockManagerMasterEndpoint( } }.getOrElse(Seq.empty) + val removeMsg = RemoveShuffle(shuffleId) + val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => + bm.storageEndpoint.ask[Boolean](removeMsg).recover { + // use false as default value means no shuffle data were removed + handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + } + }.toSeq + if (testing) { + RpcUtils.INFINITE_TIMEOUT.awaitResult(Future.sequence(removeShuffleFromExecutorsFutures)) + } Future.sequence(removeShuffleFromExecutorsFutures ++ removeShuffleFromShuffleServicesFutures ++ removeShuffleMergeFromShuffleServicesFutures) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 591b055d19b5..dfad4a924d7c 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -21,7 +21,6 @@ import java.util.{Collections => JCollections, HashSet => JHashSet} import java.util.concurrent.atomic.LongAdder import scala.collection.JavaConverters._ -import scala.collection.concurrent import scala.collection.mutable.ArrayBuffer import org.mockito.ArgumentMatchers.any @@ -35,10 +34,10 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, RPC_MESSAGE_MAX_SIZE} import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.network.shuffle.ExternalBlockStoreClient -import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv} +import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus, MapStatus, MergeStatus} import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.storage.{BlockManagerId, BlockManagerInfo, BlockManagerMaster, BlockManagerMasterEndpoint, ShuffleBlockId, ShuffleMergedBlockId} +import org.apache.spark.storage.{BlockManagerId, BlockManagerMasterEndpoint, ShuffleBlockId, ShuffleMergedBlockId} class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { private val conf = new SparkConf @@ -918,6 +917,21 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { slaveRpcEnv.shutdown() } + private def fetchDeclaredField(value: AnyRef, fieldName: String): AnyRef = { + val field = value.getClass.getDeclaredField(fieldName) + field.setAccessible(true) + field.get(value) + } + + private def lookupBlockManagerMasterEndpoint(sc: SparkContext): BlockManagerMasterEndpoint = { + val rpcEnv = sc.env.rpcEnv + val dispatcher = fetchDeclaredField(rpcEnv, "dispatcher") + fetchDeclaredField(dispatcher, "endpointRefs"). + asInstanceOf[java.util.Map[RpcEndpoint, RpcEndpointRef]].asScala. + filter(_._1.isInstanceOf[BlockManagerMasterEndpoint]). + head._1.asInstanceOf[BlockManagerMasterEndpoint] + } + test("SPARK-40480: shuffle remove should cleanup merged files as well") { val newConf = new SparkConf newConf.set("spark.shuffle.push.enabled", "true") @@ -926,28 +940,14 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { newConf.set(IS_TESTING, true) val SHUFFLE_ID = 10 - // needs TorrentBroadcast so need a SparkContext withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) { sc => - val rpcEnv = sc.env.rpcEnv val masterTracker = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] val blockStoreClient = mock(classOf[ExternalBlockStoreClient]) - val blockManagerMasterEndpoint = new BlockManagerMasterEndpoint( - rpcEnv, - sc.isLocal, - sc.conf, - sc.listenerBus, - Some(blockStoreClient), - // We dont care about this ... - new concurrent.TrieMap[BlockManagerId, BlockManagerInfo](), - masterTracker, - sc.env.shuffleManager, - true - ) - rpcEnv.stop(sc.env.blockManager.master.driverEndpoint) - sc.env.blockManager.master.driverEndpoint = - rpcEnv.setupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME, - blockManagerMasterEndpoint) + val bmMaster = lookupBlockManagerMasterEndpoint(sc) + val field = bmMaster.getClass.getDeclaredField("externalBlockStoreClient") + field.setAccessible(true) + field.set(bmMaster, Some(blockStoreClient)) masterTracker.registerShuffle(SHUFFLE_ID, 10, 10) val mergerLocs = (1 to 10).map(x => BlockManagerId(s"exec-$x", s"host-$x", x)) @@ -973,7 +973,7 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { test("SPARK-34826: Adaptive shuffle mergers") { val newConf = new SparkConf - newConf.set("spark.shuffle.push.based.enabled", "true") + newConf.set("spark.shuffle.push.enabled", "true") newConf.set("spark.shuffle.service.enabled", "true") // needs TorrentBroadcast so need a SparkContext From 0f1f5ebc52d3facd28bc37dcd28a342a422de352 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Fri, 13 Jan 2023 15:43:50 +0800 Subject: [PATCH 25/27] Change log level to debug --- .../apache/spark/network/shuffle/ExternalBlockStoreClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index aab5cbb9e8af..737ae96a99f1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -266,7 +266,7 @@ public boolean removeShuffleMerge(String host, int port, int shuffleId, int shuf .toByteBuffer()); // TODO(SPARK-42025): Add some error logs for RemoveShuffleMerge RPC. } catch (Exception e) { - logger.error("Exception while sending RemoveShuffleMerge request to {}:{}", + logger.debug("Exception while sending RemoveShuffleMerge request to {}:{}", host, port, e); return false; } From 7284b8f8e5ea191d157bb77616036c1175f1d424 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sat, 14 Jan 2023 10:13:21 +0800 Subject: [PATCH 26/27] Change log level to debug --- .../apache/spark/network/shuffle/ExternalBlockStoreClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 737ae96a99f1..1451d5712812 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -264,7 +264,7 @@ public boolean removeShuffleMerge(String host, int port, int shuffleId, int shuf client.send( new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, shuffleMergeId) .toByteBuffer()); - // TODO(SPARK-42025): Add some error logs for RemoveShuffleMerge RPC. + // TODO(SPARK-42025): Add some error logs for RemoveShuffleMerge RPC } catch (Exception e) { logger.debug("Exception while sending RemoveShuffleMerge request to {}:{}", host, port, e); From 3181e640a4a8154a3b99e404ef89a8673e837af8 Mon Sep 17 00:00:00 2001 From: Kun Wan Date: Sat, 14 Jan 2023 15:21:24 +0800 Subject: [PATCH 27/27] Bug fix --- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index e516f1cb3faf..681a812e880a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -57,7 +57,6 @@ class BlockManagerMasterEndpoint( isDriver: Boolean) extends IsolatedThreadSafeRpcEndpoint with Logging { - private val testing: Boolean = conf.get(config.Tests.IS_TESTING).getOrElse(false) // Mapping from executor id to the block manager's local disk directories. private val executorIdToLocalDirs = CacheBuilder @@ -382,9 +381,6 @@ class BlockManagerMasterEndpoint( handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) } }.toSeq - if (testing) { - RpcUtils.INFINITE_TIMEOUT.awaitResult(Future.sequence(removeShuffleFromExecutorsFutures)) - } Future.sequence(removeShuffleFromExecutorsFutures ++ removeShuffleFromShuffleServicesFutures ++ removeShuffleMergeFromShuffleServicesFutures)