diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index fd287b022618..d305dfa8e83c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -398,4 +398,14 @@ public long mergedIndexCacheSize() { return JavaUtils.byteStringAsBytes( conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m")); } + + /** + * The threshold for number of IOExceptions while merging shuffle blocks to a shuffle partition. + * When the number of IOExceptions while writing to merged shuffle data/index/meta file exceed + * this threshold then the shuffle server will respond back to client to stop pushing shuffle + * blocks for this shuffle partition. + */ + public int ioExceptionsThresholdDuringMerge() { + return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java index d13a0272744a..968777fba785 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java @@ -71,6 +71,15 @@ class BlockPushErrorHandler implements ErrorHandler { public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX = "Couldn't find an opportunity to write block"; + /** + * String constant used for generating exception messages indicating the server encountered + * IOExceptions multiple times, greater than the configured threshold, while trying to merged + * shuffle blocks of the same shuffle partition. When the client receives this this response, + * it will stop pushing any more blocks for the same shuffle partition. + */ + public static final String IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX = + "IOExceptions exceeded the threshold"; + @Override public boolean shouldRetryError(Throwable t) { // If it is a connection time out or a connection closed exception, no need to retry. diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 76abb05c99bb..0e2355646465 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -17,15 +17,16 @@ package org.apache.spark.network.shuffle; +import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -45,6 +46,8 @@ import com.google.common.cache.LoadingCache; import com.google.common.cache.Weigher; import com.google.common.collect.Maps; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +81,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private final Executor directoryCleaner; private final TransportConf conf; private final int minChunkSize; + private final int ioExceptionsThresholdDuringMerge; private final ErrorHandler.BlockPushErrorHandler errorHandler; @SuppressWarnings("UnstableApiUsage") @@ -92,6 +96,7 @@ public RemoteBlockPushResolver(TransportConf conf) { // Add `spark` prefix because it will run in NM in Yarn mode. NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner")); this.minChunkSize = conf.minChunkSizeInMergedShuffleFile(); + this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge(); CacheLoader indexCacheLoader = new CacheLoader() { public ShuffleIndexInformation load(File file) throws IOException { @@ -132,7 +137,7 @@ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( if (dataFile.exists()) { return null; } else { - return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile); + return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile); } } catch (IOException e) { logger.error( @@ -146,6 +151,17 @@ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( }); } + @VisibleForTesting + AppShufflePartitionInfo newAppShufflePartitionInfo( + AppShuffleId appShuffleId, + int reduceId, + File dataFile, + File indexFile, + File metaFile) throws IOException { + return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, + new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile)); + } + @Override public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduceId) { AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId); @@ -370,26 +386,19 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc new MergeStatuses(msg.shuffleId, new RoaringBitmap[0], new int[0], new long[0]); } else { Collection partitionsToFinalize = shufflePartitions.values(); - int totalPartitions = partitionsToFinalize.size(); - RoaringBitmap[] bitmaps = new RoaringBitmap[totalPartitions]; - int[] reduceIds = new int[totalPartitions]; - long[] sizes = new long[totalPartitions]; + List bitmaps = new ArrayList<>(partitionsToFinalize.size()); + List reduceIds = new ArrayList<>(partitionsToFinalize.size()); + List sizes = new ArrayList<>(partitionsToFinalize.size()); Iterator partitionsIter = partitionsToFinalize.iterator(); - int idx = 0; while (partitionsIter.hasNext()) { AppShufflePartitionInfo partition = partitionsIter.next(); synchronized (partition) { - // Get rid of any partial block data at the end of the file. This could either - // be due to failure or a request still being processed when the shuffle - // merge gets finalized. try { - partition.dataChannel.truncate(partition.getPosition()); - if (partition.getPosition() != partition.getLastChunkOffset()) { - partition.updateChunkInfo(partition.getPosition(), partition.lastMergedMapIndex); - } - bitmaps[idx] = partition.mapTracker; - reduceIds[idx] = partition.reduceId; - sizes[idx++] = partition.getPosition(); + // This can throw IOException which will marks this shuffle partition as not merged. + partition.finalizePartition(); + bitmaps.add(partition.mapTracker); + reduceIds.add(partition.reduceId); + sizes.add(partition.getLastChunkOffset()); } catch (IOException ioe) { logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId, msg.shuffleId, partition.reduceId, ioe); @@ -401,7 +410,9 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc } } } - mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps, reduceIds, sizes); + mergeStatuses = new MergeStatuses(msg.shuffleId, + bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), + Longs.toArray(sizes)); } partitions.remove(appShuffleId); logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); @@ -450,6 +461,7 @@ private PushBlockStreamCallback( this.streamId = streamId; this.partitionInfo = Preconditions.checkNotNull(partitionInfo); this.mapIndex = mapIndex; + abortIfNecessary(); } @Override @@ -466,11 +478,11 @@ public String getID() { private void writeBuf(ByteBuffer buf) throws IOException { while (buf.hasRemaining()) { if (partitionInfo.isEncounteredFailure()) { - long updatedPos = partitionInfo.getPosition() + length; + long updatedPos = partitionInfo.getDataFilePos() + length; logger.debug( "{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}", partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, - partitionInfo.reduceId, partitionInfo.getPosition(), updatedPos); + partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos); length += partitionInfo.dataChannel.write(buf, updatedPos); } else { length += partitionInfo.dataChannel.write(buf); @@ -510,15 +522,35 @@ private boolean isDuplicateBlock() { * This is only invoked when the stream is able to write. The stream first writes any deferred * block parts buffered in memory. */ - private void writeAnyDeferredBufs() throws IOException { - if (deferredBufs != null && !deferredBufs.isEmpty()) { - for (ByteBuffer deferredBuf : deferredBufs) { - writeBuf(deferredBuf); - } + private void writeDeferredBufs() throws IOException { + for (ByteBuffer deferredBuf : deferredBufs) { + writeBuf(deferredBuf); + } + deferredBufs = null; + } + + /** + * This throws RuntimeException if the number of IOExceptions have exceeded threshold. + */ + private void abortIfNecessary() { + if (partitionInfo.shouldAbort(mergeManager.ioExceptionsThresholdDuringMerge)) { deferredBufs = null; + throw new RuntimeException(String.format("%s when merging %s", + ErrorHandler.BlockPushErrorHandler.IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX, + streamId)); } } + /** + * This increments the number of IOExceptions and throws RuntimeException if it exceeds the + * threshold which will abort the merge of a particular shuffle partition. + */ + private void incrementIOExceptionsAndAbortIfNecessary() { + // Update the count of IOExceptions + partitionInfo.incrementIOExceptions(); + abortIfNecessary(); + } + @Override public void onData(String streamId, ByteBuffer buf) throws IOException { // When handling the block data using StreamInterceptor, it can help to reduce the amount @@ -556,6 +588,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { deferredBufs = null; return; } + abortIfNecessary(); logger.trace("{} shuffleId {} reduceId {} onData writable", partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId, partitionInfo.reduceId); @@ -565,8 +598,17 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { // If we got here, it's safe to write the block data to the merged shuffle file. We // first write any deferred block. - writeAnyDeferredBufs(); - writeBuf(buf); + try { + if (deferredBufs != null && !deferredBufs.isEmpty()) { + writeDeferredBufs(); + } + writeBuf(buf); + } catch (IOException ioe) { + incrementIOExceptionsAndAbortIfNecessary(); + // If the above doesn't throw a RuntimeException, then we propagate the IOException + // back to the client so the block could be retried. + throw ioe; + } // If we got here, it means we successfully write the current chunk of block to merged // shuffle file. If we encountered failure while writing the previous block, we should // reset the file channel position and the status of partitionInfo to indicate that we @@ -574,7 +616,7 @@ public void onData(String streamId, ByteBuffer buf) throws IOException { // position tracked by partitionInfo here. That is only updated while the entire block // is successfully written to merged shuffle file. if (partitionInfo.isEncounteredFailure()) { - partitionInfo.dataChannel.position(partitionInfo.getPosition() + length); + partitionInfo.dataChannel.position(partitionInfo.getDataFilePos() + length); partitionInfo.setEncounteredFailure(false); } } else { @@ -636,15 +678,33 @@ public void onComplete(String streamId) throws IOException { return; } if (partitionInfo.getCurrentMapIndex() < 0) { - writeAnyDeferredBufs(); + try { + if (deferredBufs != null && !deferredBufs.isEmpty()) { + abortIfNecessary(); + writeDeferredBufs(); + } + } catch (IOException ioe) { + incrementIOExceptionsAndAbortIfNecessary(); + // If the above doesn't throw a RuntimeException, then we propagate the IOException + // back to the client so the block could be retried. + throw ioe; + } } - long updatedPos = partitionInfo.getPosition() + length; + long updatedPos = partitionInfo.getDataFilePos() + length; boolean indexUpdated = false; if (updatedPos - partitionInfo.getLastChunkOffset() >= mergeManager.minChunkSize) { - partitionInfo.updateChunkInfo(updatedPos, mapIndex); - indexUpdated = true; + try { + partitionInfo.updateChunkInfo(updatedPos, mapIndex); + indexUpdated = true; + } catch (IOException ioe) { + incrementIOExceptionsAndAbortIfNecessary(); + // If the above doesn't throw a RuntimeException, then we do not propagate the + // IOException to the client. This may increase the chunk size however the increase is + // still limited because of the limit on the number of IOExceptions for a + // particular shuffle partition. + } } - partitionInfo.setPosition(updatedPos); + partitionInfo.setDataFilePos(updatedPos); partitionInfo.setCurrentMapIndex(-1); // update merged results @@ -687,6 +747,11 @@ public void onFailure(String streamId, Throwable throwable) throws IOException { } } } + + @VisibleForTesting + AppShufflePartitionInfo getPartitionInfo() { + return partitionInfo; + } } /** @@ -736,7 +801,7 @@ public static class AppShufflePartitionInfo { // The merged shuffle data file channel public FileChannel dataChannel; // Location offset of the last successfully merged block for this shuffle partition - private long position; + private long dataFilePos; // Indicating whether failure was encountered when merging the previous block private boolean encounteredFailure; // Track the map index whose block is being merged for this shuffle partition @@ -744,44 +809,46 @@ public static class AppShufflePartitionInfo { // Bitmap tracking which mapper's blocks have been merged for this shuffle partition private RoaringBitmap mapTracker; // The index file for a particular merged shuffle contains the chunk offsets. - private RandomAccessFile indexFile; + private MergeShuffleFile indexFile; // The meta file for a particular merged shuffle contains all the map indices that belong to // every chunk. The entry per chunk is a serialized bitmap. - private RandomAccessFile metaFile; + private MergeShuffleFile metaFile; // The offset for the last chunk tracked in the index file for this shuffle partition private long lastChunkOffset; private int lastMergedMapIndex = -1; // Bitmap tracking which mapper's blocks are in the current shuffle chunk private RoaringBitmap chunkTracker; + private int numIOExceptions = 0; + private boolean indexMetaUpdateFailed; AppShufflePartitionInfo( AppShuffleId appShuffleId, int reduceId, File dataFile, - File indexFile, - File metaFile) throws IOException { + MergeShuffleFile indexFile, + MergeShuffleFile metaFile) throws IOException { this.appShuffleId = Preconditions.checkNotNull(appShuffleId, "app shuffle id"); this.reduceId = reduceId; this.dataChannel = new FileOutputStream(dataFile).getChannel(); - this.indexFile = new RandomAccessFile(indexFile, "rw"); - this.metaFile = new RandomAccessFile(metaFile, "rw"); + this.indexFile = indexFile; + this.metaFile = metaFile; this.currentMapIndex = -1; // Writing 0 offset so that we can reuse ShuffleIndexInformation.getIndex() updateChunkInfo(0L, -1); - this.position = 0; + this.dataFilePos = 0; this.encounteredFailure = false; this.mapTracker = new RoaringBitmap(); this.chunkTracker = new RoaringBitmap(); } - public long getPosition() { - return position; + public long getDataFilePos() { + return dataFilePos; } - public void setPosition(long position) { + public void setDataFilePos(long dataFilePos) { logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", appShuffleId.appId, - appShuffleId.shuffleId, reduceId, this.position, position); - this.position = position; + appShuffleId.shuffleId, reduceId, this.dataFilePos, dataFilePos); + this.dataFilePos = dataFilePos; } boolean isEncounteredFailure() { @@ -825,25 +892,29 @@ void resetChunkTracker() { * @param mapIndex the map index to be added to chunk tracker. */ void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException { - long idxStartPos = -1; try { - // update the chunk tracker to meta file before index file + logger.trace("{} shuffleId {} reduceId {} index current {} updated {}", + appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset, chunkOffset); + if (indexMetaUpdateFailed) { + indexFile.getChannel().position(indexFile.getPos()); + } + indexFile.getDos().writeLong(chunkOffset); + // Chunk bitmap should be written to the meta file after the index file because if there are + // any exceptions during writing the offset to the index file, meta file should not be + // updated. If the update to the index file is successful but the update to meta file isn't + // then the index file position is not updated. writeChunkTracker(mapIndex); - idxStartPos = indexFile.getFilePointer(); - logger.trace("{} shuffleId {} reduceId {} updated index current {} updated {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset, - chunkOffset); - indexFile.writeLong(chunkOffset); + indexFile.updatePos(8); + this.lastChunkOffset = chunkOffset; + indexMetaUpdateFailed = false; } catch (IOException ioe) { - if (idxStartPos != -1) { - // reset the position to avoid corrupting index files during exception. - logger.warn("{} shuffleId {} reduceId {} reset index to position {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, idxStartPos); - indexFile.seek(idxStartPos); - } + logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", appShuffleId.appId, + appShuffleId.shuffleId, reduceId); + indexMetaUpdateFailed = true; + // Any exception here is propagated to the caller and the caller can decide whether to + // abort or not. throw ioe; } - this.lastChunkOffset = chunkOffset; } private void writeChunkTracker(int mapIndex) throws IOException { @@ -851,17 +922,38 @@ private void writeChunkTracker(int mapIndex) throws IOException { return; } chunkTracker.add(mapIndex); - long metaStartPos = metaFile.getFilePointer(); - try { - logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex); - chunkTracker.serialize(metaFile); - } catch (IOException ioe) { - logger.warn("{} shuffleId {} reduceId {} mapIndex {} reset position of meta file to {}", - appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex, metaStartPos); - metaFile.seek(metaStartPos); - throw ioe; + logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file", + appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex); + if (indexMetaUpdateFailed) { + metaFile.getChannel().position(metaFile.getPos()); + } + chunkTracker.serialize(metaFile.getDos()); + metaFile.updatePos(metaFile.getChannel().position() - metaFile.getPos()); + } + + private void incrementIOExceptions() { + numIOExceptions++; + } + + private boolean shouldAbort(int ioExceptionsThresholdDuringMerge) { + return numIOExceptions > ioExceptionsThresholdDuringMerge; + } + + private void finalizePartition() throws IOException { + if (dataFilePos != lastChunkOffset) { + try { + updateChunkInfo(dataFilePos, lastMergedMapIndex); + } catch (IOException ioe) { + // Any exceptions here while updating the meta files can be ignored. If the files + // aren't successfully updated they will be truncated. + } } + // Get rid of any partial block data at the end of the file. This could either + // be due to failure, or a request still being processed when the shuffle + // merge gets finalized, or any exceptions while updating index/meta files. + dataChannel.truncate(lastChunkOffset); + indexFile.getChannel().truncate(indexFile.getPos()); + metaFile.getChannel().truncate(metaFile.getPos()); } void closeAllFiles() { @@ -877,7 +969,6 @@ void closeAllFiles() { } if (metaFile != null) { try { - // if the stream is closed, channel get's closed as well. metaFile.close(); } catch (IOException ioe) { logger.warn("Error closing meta file for {} shuffleId {} reduceId {}", @@ -902,6 +993,26 @@ void closeAllFiles() { protected void finalize() throws Throwable { closeAllFiles(); } + + @VisibleForTesting + MergeShuffleFile getIndexFile() { + return indexFile; + } + + @VisibleForTesting + MergeShuffleFile getMetaFile() { + return metaFile; + } + + @VisibleForTesting + FileChannel getDataChannel() { + return dataChannel; + } + + @VisibleForTesting + int getNumIOExceptions() { + return numIOExceptions; + } } /** @@ -931,4 +1042,52 @@ private AppPathsInfo( } } } + + @VisibleForTesting + static class MergeShuffleFile { + private FileChannel channel; + private DataOutputStream dos; + private long pos; + + @VisibleForTesting + MergeShuffleFile(File file) throws IOException { + FileOutputStream fos = new FileOutputStream(file); + channel = fos.getChannel(); + dos = new DataOutputStream(fos); + } + + @VisibleForTesting + MergeShuffleFile(FileChannel channel, DataOutputStream dos) { + this.channel = channel; + this.dos = dos; + } + + private void updatePos(long numBytes) { + pos += numBytes; + } + + void close() throws IOException { + try { + dos.close(); + } finally { + dos = null; + channel = null; + } + } + + @VisibleForTesting + DataOutputStream getDos() { + return dos; + } + + @VisibleForTesting + FileChannel getChannel() { + return channel; + } + + @VisibleForTesting + long getPos() { + return pos; + } + } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 0f200dc72196..8c6f7434748e 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -17,9 +17,12 @@ package org.apache.spark.network.shuffle; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -42,6 +45,7 @@ import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.client.StreamCallbackWithID; +import org.apache.spark.network.shuffle.RemoteBlockPushResolver.MergeShuffleFile; import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; @@ -411,6 +415,347 @@ void deleteExecutorDirs(Path[] dirs) { } } + @Test + public void testRecoverIndexFileAfterIOExceptions() throws IOException { + useTestFiles(true, false); + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); + callback1.onComplete(callback1.getID()); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); + // Close the index stream so it throws IOException + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + testIndexFile.close(); + StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any IOExceptions because number of IOExceptions are less than + // the threshold but the update to index file will be unsuccessful. + callback2.onComplete(callback2.getID()); + assertEquals("index position", 16, testIndexFile.getPos()); + // Restore the index stream so it can write successfully again. + testIndexFile.restore(); + StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2])); + callback3.onComplete(callback3.getID()); + assertEquals("index position", 24, testIndexFile.getPos()); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, 0)); + validateMergeStatuses(statuses, new int[] {0}, new long[] {11}); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}}); + } + + @Test + public void testRecoverIndexFileAfterIOExceptionsInFinalize() throws IOException { + useTestFiles(true, false); + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); + callback1.onComplete(callback1.getID()); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); + // Close the index stream so it throws IOException + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + testIndexFile.close(); + StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any IOExceptions because number of IOExceptions are less than + // the threshold but the update to index file will be unsuccessful. + callback2.onComplete(callback2.getID()); + assertEquals("index position", 16, testIndexFile.getPos()); + // The last update to index was unsuccessful however any further updates will be successful. + // Restore the index stream so it can write successfully again. + testIndexFile.restore(); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, 0)); + assertEquals("index position", 24, testIndexFile.getPos()); + validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][] {{0}, {1}}); + } + + @Test + public void testRecoverMetaFileAfterIOExceptions() throws IOException { + useTestFiles(false, true); + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); + callback1.onComplete(callback1.getID()); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); + // Close the meta stream so it throws IOException + TestMergeShuffleFile testMetaFile = (TestMergeShuffleFile) partitionInfo.getMetaFile(); + long metaPosBeforeClose = testMetaFile.getPos(); + testMetaFile.close(); + StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any IOExceptions because number of IOExceptions are less than + // the threshold but the update to index and meta file will be unsuccessful. + callback2.onComplete(callback2.getID()); + assertEquals("index position", 16, partitionInfo.getIndexFile().getPos()); + assertEquals("meta position", metaPosBeforeClose, testMetaFile.getPos()); + // Restore the meta stream so it can write successfully again. + testMetaFile.restore(); + StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2])); + callback3.onComplete(callback3.getID()); + assertEquals("index position", 24, partitionInfo.getIndexFile().getPos()); + assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, 0)); + validateMergeStatuses(statuses, new int[] {0}, new long[] {11}); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}}); + } + + @Test + public void testRecoverMetaFileAfterIOExceptionsInFinalize() throws IOException { + useTestFiles(false, true); + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4])); + callback1.onComplete(callback1.getID()); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo(); + // Close the meta stream so it throws IOException + TestMergeShuffleFile testMetaFile = (TestMergeShuffleFile) partitionInfo.getMetaFile(); + long metaPosBeforeClose = testMetaFile.getPos(); + testMetaFile.close(); + StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any IOExceptions because number of IOExceptions are less than + // the threshold but the update to index and meta file will be unsuccessful. + callback2.onComplete(callback2.getID()); + MergeShuffleFile indexFile = partitionInfo.getIndexFile(); + assertEquals("index position", 16, indexFile.getPos()); + assertEquals("meta position", metaPosBeforeClose, testMetaFile.getPos()); + // Restore the meta stream so it can write successfully again. + testMetaFile.restore(); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, 0)); + assertEquals("index position", 24, indexFile.getPos()); + assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose); + validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); + validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][] {{0}, {1}}); + } + + @Test (expected = RuntimeException.class) + public void testIOExceptionsExceededThreshold() throws IOException { + RemoteBlockPushResolver.PushBlockStreamCallback callback = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); + callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); + callback.onComplete(callback.getID()); + // Close the data stream so it throws continuous IOException + partitionInfo.getDataChannel().close(); + for (int i = 1; i < 5; i++) { + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, i, 0, 0)); + try { + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[2])); + } catch (IOException ioe) { + // this will throw IOException so the client can retry. + callback1.onFailure(callback1.getID(), ioe); + } + } + assertEquals(4, partitionInfo.getNumIOExceptions()); + // After 4 IOException, the server will respond with IOExceptions exceeded threshold + try { + RemoteBlockPushResolver.PushBlockStreamCallback callback2 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + callback2.onData(callback.getID(), ByteBuffer.wrap(new byte[1])); + } catch (Throwable t) { + assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_5_0", + t.getMessage()); + throw t; + } + } + + @Test (expected = RuntimeException.class) + public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOException { + useTestFiles(true, false); + RemoteBlockPushResolver.PushBlockStreamCallback callback = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); + callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); + callback.onComplete(callback.getID()); + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + testIndexFile.close(); + for (int i = 1; i < 5; i++) { + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, i, 0, 0)); + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any exceptions but the exception count is increased. + callback1.onComplete(callback1.getID()); + } + assertEquals(4, partitionInfo.getNumIOExceptions()); + // After 4 IOException, the server will respond with IOExceptions exceeded threshold for any + // new request for this partition. + try { + RemoteBlockPushResolver.PushBlockStreamCallback callback2 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[4])); + callback2.onComplete(callback2.getID()); + } catch (Throwable t) { + assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_5_0", + t.getMessage()); + throw t; + } + } + + @Test (expected = RuntimeException.class) + public void testRequestForAbortedShufflePartitionThrowsException() { + try { + testIOExceptionsDuringMetaUpdateIncreasesExceptionCount(); + } catch (Throwable t) { + // No more blocks can be merged to this partition. + } + try { + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 10, 0, 0)); + } catch (Throwable t) { + assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_10_0", + t.getMessage()); + throw t; + } + } + + @Test (expected = RuntimeException.class) + public void testPendingBlockIsAbortedImmediately() throws IOException { + useTestFiles(true, false); + RemoteBlockPushResolver.PushBlockStreamCallback callback = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + testIndexFile.close(); + for (int i = 1; i < 6; i++) { + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, i, 0, 0)); + try { + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any exceptions but the exception count is increased. + callback1.onComplete(callback1.getID()); + } catch (Throwable t) { + callback1.onFailure(callback1.getID(), t); + } + } + assertEquals(5, partitionInfo.getNumIOExceptions()); + // The server will respond with IOExceptions exceeded threshold for any additional attempts + // to write. + try { + callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); + } catch (Throwable t) { + assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_0", + t.getMessage()); + throw t; + } + } + + @Test (expected = RuntimeException.class) + public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete() throws IOException { + useTestFiles(true, false); + RemoteBlockPushResolver.PushBlockStreamCallback callback = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + testIndexFile.close(); + for (int i = 1; i < 5; i++) { + RemoteBlockPushResolver.PushBlockStreamCallback callback1 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, i, 0, 0)); + try { + callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5])); + // This will complete without any exceptions but the exception count is increased. + callback1.onComplete(callback1.getID()); + } catch (Throwable t) { + callback1.onFailure(callback1.getID(), t); + } + } + assertEquals(4, partitionInfo.getNumIOExceptions()); + RemoteBlockPushResolver.PushBlockStreamCallback callback2 = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 5, 0, 0)); + callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5])); + // This is deferred + callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4])); + // Callback2 completes which will throw another exception. + try { + callback2.onComplete(callback2.getID()); + } catch (Throwable t) { + callback2.onFailure(callback2.getID(), t); + } + assertEquals(5, partitionInfo.getNumIOExceptions()); + // Restore index file so that any further writes to it are successful and any exceptions are + // due to IOExceptions exceeding threshold. + testIndexFile.restore(); + try { + callback.onComplete(callback.getID()); + } catch (Throwable t) { + assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_0", + t.getMessage()); + throw t; + } + } + + @Test + public void testFailureWhileTruncatingFiles() throws IOException { + useTestFiles(true, false); + PushBlock[] pushBlocks = new PushBlock[] { + new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[2])), + new PushBlock(0, 1, 0, ByteBuffer.wrap(new byte[3])), + new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])), + new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3])) + }; + pushBlockHelper(TEST_APP, pushBlocks); + RemoteBlockPushResolver.PushBlockStreamCallback callback = + (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, 0, 2, 0, 0)); + callback.onData(callback.getID(), ByteBuffer.wrap(new byte[2])); + callback.onComplete(callback.getID()); + RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo(); + TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile(); + // Close the index file so truncate throws IOException + testIndexFile.close(); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, 0)); + validateMergeStatuses(statuses, new int[] {1}, new long[] {8}); + MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 1); + validateChunks(TEST_APP, 0, 1, meta, new int[]{5, 3}, new int[][]{{0},{1}}); + } + + private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException { + pushResolver = new RemoteBlockPushResolver(conf) { + @Override + AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId appShuffleId, int reduceId, + File dataFile, File indexFile, File metaFile) throws IOException { + MergeShuffleFile mergedIndexFile = useTestIndexFile ? new TestMergeShuffleFile(indexFile) + : new MergeShuffleFile(indexFile); + MergeShuffleFile mergedMetaFile = useTestMetaFile ? new TestMergeShuffleFile(metaFile) : + new MergeShuffleFile(metaFile); + return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, mergedIndexFile, + mergedMetaFile); + } + }; + registerExecutor(TEST_APP, prepareLocalDirs(localDirs)); + } + private Path[] createLocalDirs(int numLocalDirs) throws IOException { Path[] localDirs = new Path[numLocalDirs]; for (int i = 0; i < localDirs.length; i++) { @@ -493,4 +838,39 @@ private static class PushBlock { this.buffer = buffer; } } + + private static class TestMergeShuffleFile extends MergeShuffleFile { + private DataOutputStream activeDos; + private File file; + private FileChannel channel; + + private TestMergeShuffleFile(File file) throws IOException { + super(null, null); + this.file = file; + FileOutputStream fos = new FileOutputStream(file); + channel = fos.getChannel(); + activeDos = new DataOutputStream(fos); + } + + @Override + DataOutputStream getDos() { + return activeDos; + } + + @Override + FileChannel getChannel() { + return channel; + } + + @Override + void close() throws IOException { + activeDos.close(); + } + + void restore() throws IOException { + FileOutputStream fos = new FileOutputStream(file, true); + channel = fos.getChannel(); + activeDos = new DataOutputStream(fos); + } + } }