Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b91a344
[SPARK-40480][SHUFFLE]Remove push-based shuffle data after query fini…
wankunde Sep 18, 2022
6fb184d
[SPARK-40480][SHUFFLE]Remove push-based shuffle data after query fini…
wankunde Sep 19, 2022
75e2013
[SPARK-40480][SHUFFLE]Remove push-based shuffle data after query fini…
wankunde Oct 9, 2022
5b39ba3
[SPARK-40480][SHUFFLE]Remove push-based shuffle data after query fini…
wankunde Oct 9, 2022
7cff74d
Fix UT
wankunde Oct 9, 2022
d4132b3
Update code
wankunde Oct 9, 2022
3a0b6a4
Fix UT
wankunde Oct 11, 2022
0a9730d
fix UT
wankunde Oct 12, 2022
0f24bb9
Update comment
wankunde Nov 23, 2022
c848da6
update RemoteBlockPushResolver.removeShuffleMerge() code
wankunde Dec 18, 2022
e54bb75
Save the latest shuffle merge id in appShuffleInfo.shuffles
wankunde Dec 20, 2022
4d2b940
Update code
wankunde Dec 22, 2022
5d3aa2d
Fix UT
wankunde Dec 23, 2022
d034985
Update code
wankunde Dec 30, 2022
11e7d9a
Fix UT
wankunde Dec 31, 2022
29f4918
Format codestyle
wankunde Jan 1, 2023
3a7d99c
move writeAppAttemptShuffleMergeInfoToDB to main thread
wankunde Jan 2, 2023
613a99a
remove try catch for file.delete()
wankunde Jan 7, 2023
96ba7e1
Simplify the code and update some comments
wankunde Jan 11, 2023
e06ef76
Add UT
wankunde Jan 12, 2023
43085d1
Update common/network-shuffle/src/main/java/org/apache/spark/network/…
wankunde Jan 13, 2023
371feae
Format code
wankunde Jan 13, 2023
6975bcc
Add a TODO for RemoveShuffleMerge RPC
wankunde Jan 13, 2023
475f8bd
Fix bug and backport UT
wankunde Jan 13, 2023
0f1f5eb
Change log level to debug
wankunde Jan 13, 2023
7284b8f
Change log level to debug
wankunde Jan 14, 2023
3181e64
Bug fix
wankunde Jan 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,18 @@ public void getMergedBlockMeta(
MergedBlocksMetaListener listener) {
throw new UnsupportedOperationException();
}

/**
* Remove the shuffle merge data in shuffle services
*
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param shuffleId shuffle id.
* @param shuffleMergeId shuffle merge id.
*
* @since 3.4.0
*/
public boolean removeShuffleMerge(String host, int port, int shuffleId, int shuffleMergeId) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ protected void handleMessage(
} finally {
responseDelayContext.stop();
}
} else if (msgObj instanceof RemoveShuffleMerge) {
RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj;
checkAuth(client, msg.appId);
logger.info("Removing shuffle merge data for application {} shuffle {} shuffleMerge {}",
msg.appId, msg.shuffleId, msg.shuffleMergeId);
mergeManager.removeShuffleMerge(msg);
} else if (msgObj instanceof DiagnoseCorruption) {
DiagnoseCorruption msg = (DiagnoseCorruption) msgObj;
checkAuth(client, msg.appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,23 @@ public void onFailure(Throwable e) {
}
}

@Override
public boolean removeShuffleMerge(String host, int port, int shuffleId, int shuffleMergeId) {
checkInit();
try {
TransportClient client = clientFactory.createClient(host, port);
client.send(
new RemoveShuffleMerge(appId, comparableAppAttemptId, shuffleId, shuffleMergeId)
.toByteBuffer());
// TODO(SPARK-42025): Add some error logs for RemoveShuffleMerge RPC
} catch (Exception e) {
logger.debug("Exception while sending RemoveShuffleMerge request to {}:{}",
host, port, e);
return false;
}
return true;
}

@Override
public MetricSet shuffleMetrics() {
checkInit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;

/**
* The MergedShuffleFileManager is used to process push based shuffle when enabled. It works
Expand Down Expand Up @@ -121,6 +122,14 @@ MergedBlockMeta getMergedBlockMeta(
*/
String[] getMergedBlockDirs(String appId);

/**
* Remove shuffle merge data files.
*
* @param removeShuffleMerge contains shuffle details (appId, shuffleId, etc) to uniquely
* identify a shuffle to be removed
*/
void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge);

/**
* Optionally close any resources associated the MergedShuffleFileManager, such as the
* leveldb for state persistence.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
import org.apache.spark.network.util.TransportConf;

/**
Expand Down Expand Up @@ -84,4 +85,9 @@ public MergedBlockMeta getMergedBlockMeta(
public String[] getMergedBlockDirs(String appId) {
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
}

@Override
public void removeShuffleMerge(RemoveShuffleMerge removeShuffleMerge) {
throw new UnsupportedOperationException("Cannot handle merged shuffle remove");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
import org.apache.spark.network.shuffledb.DB;
import org.apache.spark.network.shuffledb.DBBackend;
import org.apache.spark.network.shuffledb.DBIterator;
Expand All @@ -95,6 +96,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
public static final String MERGE_DIR_KEY = "mergeDir";
public static final String ATTEMPT_ID_KEY = "attemptId";
private static final int UNDEFINED_ATTEMPT_ID = -1;

/**
* The flag for deleting all merged shuffle data.
*/
public static final int DELETE_ALL_MERGED_SHUFFLE = -1;

private static final String DB_KEY_DELIMITER = ";";
private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler();
// ByteBuffer to respond to client upon a successful merge of a pushed block
Expand Down Expand Up @@ -396,6 +403,59 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
}
}

@Override
public void removeShuffleMerge(RemoveShuffleMerge msg) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
if (appShuffleInfo.attemptId != msg.appAttemptId) {
throw new IllegalArgumentException(
String.format("The attempt id %s in this RemoveShuffleMerge message does not match "
+ "with the current attempt id %s stored in shuffle service for application %s",
msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
}
appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
if (mergePartitionsInfo == null) {
if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
return null;
} else {
writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
}
}
boolean deleteCurrentMergedShuffle =
msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
int shuffleMergeIdToDelete = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
if (deleteCurrentMergedShuffle ||
shuffleMergeIdToDelete > mergePartitionsInfo.shuffleMergeId) {
AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
new AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
if (!mergePartitionsInfo.isFinalized()) {
// Clean up shuffle data before the shuffle was finalized. Close and delete all the open
// files.
submitCleanupTask(() ->
closeAndDeleteOutdatedPartitions(
currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
} else {
// Current shuffle was finalized, delete all the merged files through reduceIds set
// in finalizeShuffleMerge method.
submitCleanupTask(() ->
deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
mergePartitionsInfo.getReduceIds(), false));
}
} else {
throw new RuntimeException(String.format("Asked to remove old shuffle merged data for " +
"application %s shuffleId %s shuffleMergeId %s, but current shuffleMergeId %s ",
msg.appId, msg.shuffleId, shuffleMergeIdToDelete, mergePartitionsInfo.shuffleMergeId));
}
writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeIdToDelete));
return new AppShuffleMergePartitionsInfo(shuffleMergeIdToDelete, true);
});
}

/**
* Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
* If cleanupLocalDirs is true, the merged shuffle files will also be deleted.
Expand Down Expand Up @@ -470,6 +530,40 @@ void closeAndDeleteOutdatedPartitions(
});
}

void deleteMergedFiles(
AppAttemptShuffleMergeId appAttemptShuffleMergeId,
AppShuffleInfo appShuffleInfo,
int[] reduceIds,
boolean deleteFromDB) {
if (deleteFromDB) {
removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
}
int shuffleId = appAttemptShuffleMergeId.shuffleId;
int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
int dataFilesDeleteCnt = 0;
int indexFilesDeleteCnt = 0;
int metaFilesDeleteCnt = 0;
for (int reduceId : reduceIds) {
File dataFile =
appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId);
if (dataFile.delete()) {
dataFilesDeleteCnt++;
}
File indexFile = new File(
appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId));
if (indexFile.delete()) {
indexFilesDeleteCnt++;
}
File metaFile =
appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId);
if (metaFile.delete()) {
metaFilesDeleteCnt++;
}
}
logger.info("Delete {} data files, {} index files, {} meta files for {}",
dataFilesDeleteCnt, indexFilesDeleteCnt, metaFilesDeleteCnt, appAttemptShuffleMergeId);
}

/**
* Remove the finalized shuffle partition information for a specific appAttemptShuffleMergeId
* @param appAttemptShuffleMergeId
Expand Down Expand Up @@ -712,6 +806,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds),
Longs.toArray(sizes));
appShuffleInfo.shuffles.get(msg.shuffleId).setReduceIds(Ints.toArray(reduceIds));
}
logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of shuffle merge completed",
msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
Expand Down Expand Up @@ -1465,6 +1560,8 @@ public static class AppShuffleMergePartitionsInfo {
private final int shuffleMergeId;
private final Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions;

private final AtomicReference<int[]> reduceIds = new AtomicReference<>(new int[0]);

public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) {
this.shuffleMergeId = shuffleMergeId;
this.shuffleMergePartitions = shuffleFinalized ? SHUFFLE_FINALIZED_MARKER :
Expand All @@ -1479,6 +1576,14 @@ public Map<Integer, AppShufflePartitionInfo> getShuffleMergePartitions() {
public boolean isFinalized() {
return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER;
}

public void setReduceIds(int[] reduceIds) {
this.reduceIds.set(reduceIds);
}

public int[] getReduceIds() {
return this.reduceIds.get();
}
}

/**
Expand Down Expand Up @@ -1687,9 +1792,9 @@ void closeAllFilesAndDeleteIfNeeded(boolean delete) {
try {
if (dataChannel.isOpen()) {
dataChannel.close();
if (delete) {
dataFile.delete();
}
}
if (delete) {
dataFile.delete();
}
} catch (IOException ioe) {
logger.warn("Error closing data channel for {} reduceId {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public enum Type {
FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), LOCAL_DIRS_FOR_EXECUTORS(11),
PUSH_BLOCK_STREAM(12), FINALIZE_SHUFFLE_MERGE(13), MERGE_STATUSES(14),
FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), CORRUPTION_CAUSE(17),
PUSH_BLOCK_RETURN_CODE(18);
PUSH_BLOCK_RETURN_CODE(18), REMOVE_SHUFFLE_MERGE(19);

private final byte id;

Expand Down Expand Up @@ -88,6 +88,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
case 16: return DiagnoseCorruption.decode(buf);
case 17: return CorruptionCause.decode(buf);
case 18: return BlockPushReturnCode.decode(buf);
case 19: return RemoveShuffleMerge.decode(buf);
default: throw new IllegalArgumentException("Unknown message type: " + type);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle.protocol;

import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

import org.apache.spark.network.protocol.Encoders;

/**
* Remove the merged data for a given shuffle.
* Returns {@link Boolean}
*
* @since 3.4.0
*/
public class RemoveShuffleMerge extends BlockTransferMessage {
public final String appId;
public final int appAttemptId;
public final int shuffleId;
public final int shuffleMergeId;

public RemoveShuffleMerge(
String appId,
int appAttemptId,
int shuffleId,
int shuffleMergeId) {
this.appId = appId;
this.appAttemptId = appAttemptId;
this.shuffleId = shuffleId;
this.shuffleMergeId = shuffleMergeId;
}

@Override
protected Type type() {
return Type.REMOVE_SHUFFLE_MERGE;
}

@Override
public int hashCode() {
return Objects.hashCode(appId, appAttemptId, shuffleId, shuffleMergeId);
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("appId", appId)
.append("attemptId", appAttemptId)
.append("shuffleId", shuffleId)
.append("shuffleMergeId", shuffleMergeId)
.toString();
}

@Override
public boolean equals(Object other) {
if (other != null && other instanceof RemoveShuffleMerge) {
RemoveShuffleMerge o = (RemoveShuffleMerge) other;
return Objects.equal(appId, o.appId)
&& appAttemptId == o.appAttemptId
&& shuffleId == o.shuffleId
&& shuffleMergeId == o.shuffleMergeId;
}
return false;
}

@Override
public int encodedLength() {
return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4;
}

@Override
public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
buf.writeInt(appAttemptId);
buf.writeInt(shuffleId);
buf.writeInt(shuffleMergeId);
}

public static RemoveShuffleMerge decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
int attemptId = buf.readInt();
int shuffleId = buf.readInt();
int shuffleMergeId = buf.readInt();
return new RemoveShuffleMerge(appId, attemptId, shuffleId, shuffleMergeId);
}
}
Loading