From 418ba98ed08bbaa64daf44af90f41f9ae7aab07a Mon Sep 17 00:00:00 2001 From: "mudit.sharma" Date: Sat, 14 Oct 2023 14:51:21 +0530 Subject: [PATCH] TEZ-4518: Added capability to limit number of spill files being generated --- .../library/api/TezRuntimeConfiguration.java | 6 ++ .../common/sort/impl/PipelinedSorter.java | 32 +++++++- .../common/sort/impl/dflt/DefaultSorter.java | 30 +++++++- .../common/sort/impl/TestPipelinedSorter.java | 77 +++++++++++++++++++ .../sort/impl/dflt/TestDefaultSorter.java | 73 ++++++++++++++++++ 5 files changed, 212 insertions(+), 6 deletions(-) diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java index de28286d9b..2a6cd07ac8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java @@ -122,6 +122,11 @@ private TezRuntimeConfiguration() {} public static final int TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT = 1024 * 1024; + @ConfigurationProperty(type = "integer") + public static final String TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT = TEZ_RUNTIME_PREFIX + + "sort.spill.files.count.limit"; + public static final int TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT = -1; + // TODO Use the default value @ConfigurationProperty(type = "integer") @@ -616,6 +621,7 @@ private TezRuntimeConfiguration() {} TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SORT_SPILL_PERCENT); TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_IO_SORT_MB); TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES); + TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT); TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS); TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS); TEZ_RUNTIME_KEYS.add( diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 08786c9b2c..8929081df8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -38,7 +38,6 @@ import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.library.api.IOInterruptedException; import org.slf4j.Logger; @@ -69,6 +68,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT; import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions; @SuppressWarnings({"unchecked", "rawtypes"}) @@ -110,6 +110,8 @@ public class PipelinedSorter extends ExternalSorter { private final ArrayList indexCacheList = new ArrayList(); + private final int spillFilesCountLimit; + private final boolean pipelinedShuffle; private long currentAllocatableMemory; @@ -130,6 +132,8 @@ public class PipelinedSorter extends ExternalSorter { */ private final List finalEvents; + private static final int SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1; + // TODO Set additional countesr - total bytes written, spills etc. public PipelinedSorter(OutputContext outputContext, Configuration conf, int numOutputs, @@ -171,6 +175,14 @@ public PipelinedSorter(OutputContext outputContext, Configuration conf, int numO pipelinedShuffle = !isFinalMergeEnabled() && confPipelinedShuffle; auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT); + + spillFilesCountLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, + TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT); + Preconditions.checkArgument(spillFilesCountLimit == SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE + || spillFilesCountLimit > 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT + + " should be greater than 0 or unbounded"); + //sanity checks final long sortmb = this.availableMemoryMb; @@ -542,7 +554,7 @@ private void spillSingleRecord(final Object key, final Object value, spillRec.writeToFile(indexFilename, conf, localFs); //TODO: honor cache limits indexCacheList.add(spillRec); - ++numSpills; + incrementNumSpills(); if (!isFinalMergeEnabled()) { fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen()); //No final merge. Set the number of files offered via shuffle-handler @@ -633,7 +645,7 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException { spillRec.writeToFile(indexFilename, conf, localFs); //TODO: honor cache limits indexCacheList.add(spillRec); - ++numSpills; + incrementNumSpills(); if (!isFinalMergeEnabled()) { fileOutputByteCounter.increment(rfs.getFileStatus(filename).getLen()); //No final merge. Set the number of files offered via shuffle-handler @@ -1503,4 +1515,18 @@ public TezRawKeyValueIterator filter(int partition) { } } + + /** + * Increments numSpills local counter by taking into consideration + * the max limit on spill files being generated by the job. + * If limit is reached, this function throws an IOException + */ + private void incrementNumSpills() throws IOException { + ++numSpills; + if(spillFilesCountLimit != SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE && numSpills > spillFilesCountLimit) { + throw new IOException("Too many spill files got created, control it with " + + "tez.runtime.sort.spill.files.count.limit, current value: " + spillFilesCountLimit + + ", current spill count: " + numSpills); + } + } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 7c678749b2..29ad8414f1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -63,6 +63,7 @@ import org.apache.tez.common.Preconditions; +import static org.apache.tez.runtime.library.api.TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT; import static org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.ensureSpillFilePermissions; @SuppressWarnings({"unchecked", "rawtypes"}) @@ -123,12 +124,14 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab final ArrayList indexCacheList = new ArrayList(); private final int indexCacheMemoryLimit; + private final int spillFilesCountLimit; private int totalIndexCacheMemory; private long totalKeys = 0; private long sameKey = 0; public static final int MAX_IO_SORT_MB = 1800; + private static final int SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1; public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs, @@ -148,6 +151,13 @@ public DefaultSorter(OutputContext outputContext, Configuration conf, int numOut indexCacheMemoryLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES_DEFAULT); + spillFilesCountLimit = this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, + TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT_DEFAULT); + Preconditions.checkArgument(spillFilesCountLimit == SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE + || spillFilesCountLimit > 0, + TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT + + " should be greater than 0 or unbounded"); + boolean confPipelinedShuffle = this.conf.getBoolean(TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED, TezRuntimeConfiguration .TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED_DEFAULT); @@ -978,7 +988,7 @@ protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCoun } LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Finished spill " + numSpills + " at " + filename.toString()); - ++numSpills; + incrementNumSpills(); if (!isFinalMergeEnabled()) { numShuffleChunks.setValue(numSpills); } else if (numSpills > 1) { @@ -1057,7 +1067,7 @@ private void spillSingleRecord(final Object key, final Object value, totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } - ++numSpills; + incrementNumSpills(); if (!isFinalMergeEnabled()) { numShuffleChunks.setValue(numSpills); } else if (numSpills > 1) { @@ -1312,7 +1322,7 @@ private void mergeParts() throws IOException, InterruptedException { } finally { finalOut.close(); } - ++numSpills; + incrementNumSpills(); if (!isFinalMergeEnabled()) { List events = Lists.newLinkedList(); maybeSendEventForSpill(events, true, sr, 0, true); @@ -1399,4 +1409,18 @@ private void mergeParts() throws IOException, InterruptedException { } } } + + /** + * Increments numSpills local counter by taking into consideration + * the max limit on spill files being generated by the job. + * If limit is reached, this function throws an IOException + */ + private void incrementNumSpills() throws IOException { + ++numSpills; + if(spillFilesCountLimit != SORT_SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE && numSpills > spillFilesCountLimit) { + throw new IOException("Too many spill files got created, control it with " + + "tez.runtime.sort.spill.files.count.limit, current value: " + spillFilesCountLimit + + ", current spill count: " + numSpills); + } + } } diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java index 84ec143808..94af738db4 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java @@ -54,9 +54,14 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -116,6 +121,9 @@ public void setup() throws IOException { this.outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService); } + @Rule + public ExpectedException exception = ExpectedException.none(); + public static Configuration getConf() { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); @@ -858,6 +866,75 @@ public void testWithLargeKeyValueWithMinBlockSize() throws IOException { basicTest(1, 5, (2 << 20), (48 * 1024l * 1024l), 16 << 20); } + @Test(timeout = 60000) + @SuppressWarnings("unchecked") + public void testSpillFilesCountLimitInvalidValue() throws IOException, NoSuchMethodException, + NoSuchFieldException, IllegalAccessException { + this.numOutputs = 10; + + //128 MB. Do not pre-allocate. + // Get 32 MB buffer first and the another buffer with 96 on filling up + // the 32 MB buffer. + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, -2); + + exception.expect(IllegalArgumentException.class); + exception.expectMessage("tez.runtime.sort.spill.files.count.limit should be greater than 0 or unbounded"); + + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (128L << 20)); + + closeSorter(sorter); + } + + @Test(timeout = 60000) + @SuppressWarnings("unchecked") + public void testSpillFilesCountBreach() throws IOException, NoSuchMethodException, + NoSuchFieldException, IllegalAccessException { + this.numOutputs = 10; + + //128 MB. Do not pre-allocate. + // Get 32 MB buffer first and the another buffer with 96 on filling up + // the 32 MB buffer. + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 128); + conf.setBoolean(TezRuntimeConfiguration + .TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY, true); + conf.setInt(TezRuntimeConfiguration + .TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, 2); + + PipelinedSorter sorter = new PipelinedSorter(this.outputContext, conf, + numOutputs, (128L << 20)); + + Field numSpillsField = ExternalSorter.class.getDeclaredField("numSpills"); + numSpillsField.setAccessible(true); + numSpillsField.set(sorter, 2); + + Method method = sorter.getClass().getDeclaredMethod("incrementNumSpills"); + method.setAccessible(true); + boolean gotExceptionWithMessage = false; + try { + method.invoke(sorter); + } catch(InvocationTargetException e) { + Throwable targetException = e.getTargetException(); + if (targetException != null) { + String errorMessage = targetException.getMessage(); + if (errorMessage != null) { + if(errorMessage.equals("Too many spill files got created, control it with " + + "tez.runtime.sort.spill.files.count.limit, current value: 2, current spill count: 3")) { + gotExceptionWithMessage = true; + } + } + } + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + + Assert.assertTrue(gotExceptionWithMessage); + } + private void verifyOutputPermissions(String spillId) throws IOException { String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING; diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java index a56536dfe8..ca26bc7742 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/dflt/TestDefaultSorter.java @@ -36,6 +36,9 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.BitSet; @@ -81,7 +84,9 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -129,6 +134,9 @@ public void reset() throws IOException { localFs.mkdirs(workingDir); } + @Rule + public ExpectedException exception = ExpectedException.none(); + @Test(timeout = 5000) public void testSortSpillPercent() throws Exception { OutputContext context = createTezOutputContext(); @@ -577,6 +585,71 @@ public void testWithMultipleSpillsWithFinalMergeDisabled() throws IOException { verifyCounters(sorter, context); } + @Test(timeout = 60000) + @SuppressWarnings("unchecked") + public void testSpillFilesCountLimitInvalidValue() throws IOException, NoSuchMethodException, + NoSuchFieldException, IllegalAccessException { + OutputContext context = createTezOutputContext(); + + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, -2); + MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); + context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), handler); + + exception.expect(IllegalArgumentException.class); + exception.expectMessage("tez.runtime.sort.spill.files.count.limit should be greater than 0 or unbounded"); + + SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned()); + sorterWrapper.getSorter(); + sorterWrapper.close(); + } + + @Test(timeout = 60000) + @SuppressWarnings("unchecked") + public void testSpillFilesCountBreach() throws IOException, NoSuchMethodException, + NoSuchFieldException, IllegalAccessException { + OutputContext context = createTezOutputContext(); + + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT, false); + conf.setLong(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, 4); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES, 1); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SORT_SPILL_FILES_COUNT_LIMIT, 2); + MemoryUpdateCallbackHandler handler = new MemoryUpdateCallbackHandler(); + context.requestInitialMemory(ExternalSorter.getInitialMemoryRequirement(conf, + context.getTotalMemoryAvailableToTask()), handler); + SorterWrapper sorterWrapper = new SorterWrapper(context, conf, 1, handler.getMemoryAssigned()); + DefaultSorter sorter = sorterWrapper.getSorter(); + + Field numSpillsField = ExternalSorter.class.getDeclaredField("numSpills"); + numSpillsField.setAccessible(true); + numSpillsField.set(sorter, 2); + + Method method = sorter.getClass().getDeclaredMethod("incrementNumSpills"); + method.setAccessible(true); + boolean gotExceptionWithMessage = false; + try { + method.invoke(sorter); + } catch(InvocationTargetException e) { + Throwable targetException = e.getTargetException(); + if (targetException != null) { + String errorMessage = targetException.getMessage(); + if (errorMessage != null) { + if(errorMessage.equals("Too many spill files got created, control it with " + + "tez.runtime.sort.spill.files.count.limit, current value: 2, current spill count: 3")) { + gotExceptionWithMessage = true; + } + } + } + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + + Assert.assertTrue(gotExceptionWithMessage); + } + private void verifyOutputPermissions(String spillId) throws IOException { String subpath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + spillId + "/" + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING;