Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,21 +218,18 @@ private class ManagedBufferIterator implements Iterator<ManagedBuffer> {
private final int shuffleId;
// An array containing mapId and reduceId pairs.
private final int[] mapIdAndReduceIds;
private final int shuffleGenerationId;

ManagedBufferIterator(String appId, String execId, String[] blockIds) {
this.appId = appId;
this.execId = execId;
String[] blockId0Parts = blockIds[0].split("_");
if (blockId0Parts.length != 4 || !blockId0Parts[0].equals("shuffle")) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[0]);
}
String[] blockId0Parts = splitBlockId(blockIds[0]);
this.shuffleId = Integer.parseInt(blockId0Parts[1]);
mapIdAndReduceIds = new int[2 * blockIds.length];
this.shuffleGenerationId =
(blockId0Parts.length == 5) ? Integer.parseInt(blockId0Parts[4]) : -1;
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
if (blockIdParts.length != 4 || !blockIdParts[0].equals("shuffle")) {
throw new IllegalArgumentException("Unexpected shuffle block id format: " + blockIds[i]);
}
String[] blockIdParts = splitBlockId(blockIds[i]);
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId +
", got:" + blockIds[i]);
Expand All @@ -242,6 +239,16 @@ private class ManagedBufferIterator implements Iterator<ManagedBuffer> {
}
}

private String[] splitBlockId(String blockId) {
String[] blockIdParts = blockId.split("_");
if ((blockIdParts.length != 4 && blockIdParts.length != 5)
|| !blockIdParts[0].equals("shuffle")) {
throw new IllegalArgumentException(
"Unexpected shuffle block id format: " + blockId);
}
return blockIdParts;
}

@Override
public boolean hasNext() {
return index < mapIdAndReduceIds.length;
Expand All @@ -250,7 +257,7 @@ public boolean hasNext() {
@Override
public ManagedBuffer next() {
final ManagedBuffer block = blockManager.getBlockData(appId, execId, shuffleId,
mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1]);
mapIdAndReduceIds[index], mapIdAndReduceIds[index + 1], shuffleGenerationId);
index += 2;
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
return block;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ public void registerExecutor(
executors.put(fullId, executorInfo);
}

/**
* Overload getBlockData with setting stageAttemptId to an invalid value of -1.
*/
public ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
int mapId,
int reduceId) {
return getBlockData(appId, execId, shuffleId, mapId, reduceId, -1);
}

/**
* Obtains a FileSegmentManagedBuffer from (shuffleId, mapId, reduceId). We make assumptions
* about how the hash and sort based shuffles store their data.
Expand All @@ -170,13 +182,15 @@ public ManagedBuffer getBlockData(
String execId,
int shuffleId,
int mapId,
int reduceId) {
int reduceId,
int stageAttemptId) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
if (executor == null) {
throw new RuntimeException(
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
}
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
return getSortBasedShuffleBlockData(
executor, shuffleId, mapId, reduceId, stageAttemptId);
}

/**
Expand Down Expand Up @@ -278,19 +292,25 @@ public boolean accept(File dir, String name) {
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
* While the shuffle data and index file generated from the indeterminate stage,
* the ShuffleDataBlockId and ShuffleIndexBlockId will be extended by the stage attempt id.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
ExecutorShuffleInfo executor, int shuffleId, int mapId, int reduceId) {
ExecutorShuffleInfo executor, int shuffleId,
int mapId, int reduceId, int stageAttemptId) {
String baseFileName = "shuffle_" + shuffleId + "_" + mapId + "_0";
if (stageAttemptId != -1) {
baseFileName = baseFileName + "_" + stageAttemptId;
}
File indexFile = getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.index");
baseFileName + ".index");

try {
ShuffleIndexInformation shuffleIndexInformation = shuffleIndexCache.get(indexFile);
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(reduceId);
return new FileSegmentManagedBuffer(
conf,
getFile(executor.localDirs, executor.subDirsPerLocalDir,
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
getFile(executor.localDirs, executor.subDirsPerLocalDir, baseFileName + ".data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ public void testOpenShuffleBlocks() {

ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0)).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1)).thenReturn(block1Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 0, -1)).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", 0, 0, 1, -1)).thenReturn(block1Marker);
ByteBuffer openBlocks = new OpenBlocks("app0", "exec1",
new String[] { "shuffle_0_0_0", "shuffle_0_0_1" })
.toByteBuffer();
Expand All @@ -109,8 +109,8 @@ public void testOpenShuffleBlocks() {
assertEquals(block0Marker, buffers.next());
assertEquals(block1Marker, buffers.next());
assertFalse(buffers.hasNext());
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0, -1);
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1, -1);

// Verify open block request latency metrics
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalShuffleBlockHandler) handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ public static void beforeAll() throws IOException {
// Write some sort data.
dataContext.insertSortShuffleData(0, 0, new byte[][] {
sortBlock0.getBytes(StandardCharsets.UTF_8),
sortBlock1.getBytes(StandardCharsets.UTF_8)});
sortBlock1.getBytes(StandardCharsets.UTF_8)}, false);
dataContext.insertSortShuffleData(0, 0, new byte[][] {
sortBlock0.getBytes(StandardCharsets.UTF_8),
sortBlock1.getBytes(StandardCharsets.UTF_8)}, true);
}

@AfterClass
Expand Down Expand Up @@ -113,6 +116,27 @@ public void testSortShuffleBlocks() throws IOException {
}
}

@Test
public void testExtendedSortShuffleBlocks() throws IOException {
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
resolver.registerExecutor("app0", "exec0",
dataContext.createExecutorInfo(SORT_MANAGER));

try (InputStream block0Stream = resolver.getBlockData(
"app0", "exec0", 0, 0, 0, 0).createInputStream()) {
String block0 =
CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
assertEquals(sortBlock0, block0);
}

try (InputStream block1Stream = resolver.getBlockData(
"app0", "exec0", 0, 0, 1, 0).createInputStream()) {
String block1 =
CharStreams.toString(new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
assertEquals(sortBlock1, block1);
}
}

@Test
public void jsonSerializationOfExecutorRegistration() throws IOException {
ObjectMapper mapper = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private static TestShuffleDataContext createSomeData() throws IOException {
dataContext.create();
dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
"ABC".getBytes(StandardCharsets.UTF_8),
"DEF".getBytes(StandardCharsets.UTF_8)});
"DEF".getBytes(StandardCharsets.UTF_8)}, false);
return dataContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public static void beforeAll() throws IOException {

dataContext0 = new TestShuffleDataContext(2, 5);
dataContext0.create();
dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
dataContext0.insertSortShuffleData(0, 0, exec0Blocks, false);
dataContext0.insertSortShuffleData(0, 0, exec0Blocks, true);

conf = new TransportConf("shuffle", MapConfigProvider.EMPTY);
handler = new ExternalShuffleBlockHandler(conf, null);
Expand Down Expand Up @@ -191,6 +192,28 @@ public void testFetchThreeSort() throws Exception {
exec0Fetch.releaseBuffers();
}

@Test
public void testFetchOneExtendedSort() throws Exception {
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0_0" });
assertEquals(Sets.newHashSet("shuffle_0_0_0_0"), exec0Fetch.successBlocks);
assertTrue(exec0Fetch.failedBlocks.isEmpty());
assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks[0]));
exec0Fetch.releaseBuffers();
}

@Test
public void testFetchThreeExtendedSort() throws Exception {
registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
FetchResult exec0Fetch = fetchBlocks("exec-0",
new String[] { "shuffle_0_0_0_0", "shuffle_0_0_1_0", "shuffle_0_0_2_0" });
assertEquals(Sets.newHashSet("shuffle_0_0_0_0", "shuffle_0_0_1_0", "shuffle_0_0_2_0"),
exec0Fetch.successBlocks);
assertTrue(exec0Fetch.failedBlocks.isEmpty());
assertBufferListsEqual(exec0Fetch.buffers, Arrays.asList(exec0Blocks));
exec0Fetch.releaseBuffers();
}

@Test (expected = RuntimeException.class)
public void testRegisterInvalidExecutor() throws Exception {
registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private static void createShuffleFiles(TestShuffleDataContext dataContext) throw
Random rand = new Random(123);
dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
"ABC".getBytes(StandardCharsets.UTF_8),
"DEF".getBytes(StandardCharsets.UTF_8)});
"DEF".getBytes(StandardCharsets.UTF_8)}, false);
}

private static void createNonShuffleFiles(TestShuffleDataContext dataContext) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ public void cleanup() {
}

/** Creates reducer blocks in a sort-based data format within our local dirs. */
public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException {
public void insertSortShuffleData(
int shuffleId, int mapId, byte[][] blocks, boolean extendedBlockId) throws IOException {
String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
if (extendedBlockId) blockId += "_0";

OutputStream dataStream = null;
DataOutputStream indexStream = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.Serializer;
Expand Down Expand Up @@ -83,6 +84,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final int mapId;
private final Serializer serializer;
private final IndexShuffleBlockResolver shuffleBlockResolver;
private final Option<Object> stageAttemptId;

/** Array of file writers, one for each partition */
private DiskBlockObjectWriter[] partitionWriters;
Expand All @@ -102,6 +104,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
IndexShuffleBlockResolver shuffleBlockResolver,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
TaskContext taskContext,
SparkConf conf,
ShuffleWriteMetricsReporter writeMetrics) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
Expand All @@ -116,14 +119,16 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.writeMetrics = writeMetrics;
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
this.stageAttemptId = taskContext.getShuffleGenerationId(dep.shuffleId());
}

@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
shuffleBlockResolver.writeIndexFileAndCommit(
shuffleId, mapId, partitionLengths, null, stageAttemptId);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
Expand Down Expand Up @@ -156,11 +161,12 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}
}

File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId, stageAttemptId);
File tmp = Utils.tempFileWith(output);
try {
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
shuffleBlockResolver.writeIndexFileAndCommit(
shuffleId, mapId, partitionLengths, tmp, stageAttemptId);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final int initialSortBufferSize;
private final int inputBufferSizeInBytes;
private final int outputBufferSizeInBytes;
private final Option<Object> stageAttemptId;

@Nullable private MapStatus mapStatus;
@Nullable private ShuffleExternalSorter sorter;
Expand Down Expand Up @@ -150,6 +151,7 @@ public UnsafeShuffleWriter(
this.outputBufferSizeInBytes =
(int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
open();
this.stageAttemptId = taskContext.getShuffleGenerationId(dep.shuffleId());
}

private void updatePeakMemoryUsed() {
Expand Down Expand Up @@ -231,7 +233,8 @@ void closeAndWriteOutput() throws IOException {
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File output = shuffleBlockResolver.getDataFile(
shuffleId, mapId, stageAttemptId);
final File tmp = Utils.tempFileWith(output);
try {
try {
Expand All @@ -243,7 +246,8 @@ void closeAndWriteOutput() throws IOException {
}
}
}
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
shuffleBlockResolver.writeIndexFileAndCommit(
shuffleId, mapId, partitionLengths, tmp, stageAttemptId);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
Expand Down
Loading