From 6d074f6e3ad41f427e6dcb9f5a72674798a40b5e Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 25 May 2016 23:29:09 -0700 Subject: [PATCH 1/7] manage the temporary memory of timsort --- .../shuffle/sort/ShuffleInMemorySorter.java | 24 +++++++----- .../shuffle/sort/ShuffleSortDataFormat.java | 12 +++--- .../spark/unsafe/map/BytesToBytesMap.java | 2 +- .../unsafe/sort/UnsafeInMemorySorter.java | 27 +++++++------- .../unsafe/sort/UnsafeSortDataFormat.java | 12 +++--- .../sort/UnsafeShuffleWriterSuite.java | 37 +++++++++++-------- .../map/AbstractBytesToBytesMapSuite.java | 2 +- .../unsafe/sort/RadixSortSuite.scala | 3 +- .../sql/execution/UnsafeKVExternalSorter.java | 8 +++- .../execution/benchmark/SortBenchmark.scala | 3 +- 10 files changed, 74 insertions(+), 56 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index 75a0e807d76f5..0cd1eaa8bcae2 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -22,12 +22,12 @@ import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.Sorter; import org.apache.spark.util.collection.unsafe.sort.RadixSort; final class ShuffleInMemorySorter { - private final Sorter sorter; private static final class SortComparator implements Comparator { @Override public int compare(PackedRecordPointer left, PackedRecordPointer right) { @@ -53,11 +53,6 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) { */ private final boolean useRadixSort; - /** - * Set to 2x for radix sort to reserve extra memory for sorting, otherwise 1x. - */ - private final int memoryAllocationFactor; - /** * The position in the pointer array where new records can be inserted. */ @@ -70,9 +65,7 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) { assert (initialSize > 0); this.initialSize = initialSize; this.useRadixSort = useRadixSort; - this.memoryAllocationFactor = useRadixSort ? 2 : 1; this.array = consumer.allocateArray(initialSize); - this.sorter = new Sorter<>(ShuffleSortDataFormat.INSTANCE); } public void free() { @@ -101,14 +94,17 @@ public void expandPointerArray(LongArray newArray) { array.getBaseOffset(), newArray.getBaseObject(), newArray.getBaseOffset(), - array.size() * (8 / memoryAllocationFactor) + pos * 8 ); consumer.freeArray(array); array = newArray; } public boolean hasSpaceForAnotherRecord() { - return pos < array.size() / memoryAllocationFactor; + // Radix sort requires same amount of used memory as buffer, Tim sort requires + // half of the used memory as buffer, so we always preserve half of the whole + // array as buffer for sorting. + return pos < array.size() / 2; } public long getMemoryUsage() { @@ -170,6 +166,14 @@ public ShuffleSorterIterator getSortedIterator() { PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX, PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false); } else { + MemoryBlock unused = new MemoryBlock( + array.getBaseObject(), + array.getBaseOffset() + pos * 8, + (array.size() - pos) * 8); + LongArray buffer = new LongArray(unused); + Sorter sorter = + new Sorter<>(new ShuffleSortDataFormat(buffer)); + sorter.sort(array, 0, pos, SORT_COMPARATOR); } return new ShuffleSorterIterator(pos, array, offset); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java index 8f4e3229976dc..4e4bed5427698 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java @@ -19,14 +19,15 @@ import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; -import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.SortDataFormat; final class ShuffleSortDataFormat extends SortDataFormat { - public static final ShuffleSortDataFormat INSTANCE = new ShuffleSortDataFormat(); + private final LongArray buffer; - private ShuffleSortDataFormat() { } + ShuffleSortDataFormat(LongArray buffer) { + this.buffer = buffer; + } @Override public PackedRecordPointer getKey(LongArray data, int pos) { @@ -70,8 +71,7 @@ public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int @Override public LongArray allocate(int length) { - // This buffer is used temporary (usually small), so it's fine to allocated from JVM heap. - return new LongArray(MemoryBlock.fromLongArray(new long[length])); + assert (length <= buffer.size()); + return buffer; } - } diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 6c00608302c4e..8268689e0f084 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -221,7 +221,7 @@ public BytesToBytesMap( SparkEnv.get() != null ? SparkEnv.get().blockManager() : null, SparkEnv.get() != null ? SparkEnv.get().serializerManager() : null, initialCapacity, - 0.70, + 0.5, pageSizeBytes, enablePerfMetrics); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 0cce792f33d34..a06d08f577137 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -25,6 +25,7 @@ import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.Sorter; /** @@ -69,8 +70,6 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) { private final MemoryConsumer consumer; private final TaskMemoryManager memoryManager; @Nullable - private final Sorter sorter; - @Nullable private final Comparator sortComparator; /** @@ -79,11 +78,6 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) { @Nullable private final PrefixComparators.RadixSortSupport radixSortSupport; - /** - * Set to 2x for radix sort to reserve extra memory for sorting, otherwise 1x. - */ - private final int memoryAllocationFactor; - /** * Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix. @@ -121,7 +115,6 @@ public UnsafeInMemorySorter( this.memoryManager = memoryManager; this.initialSize = array.size(); if (recordComparator != null) { - this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); if (canUseRadixSort && prefixComparator instanceof PrefixComparators.RadixSortSupport) { this.radixSortSupport = (PrefixComparators.RadixSortSupport)prefixComparator; @@ -129,11 +122,9 @@ public UnsafeInMemorySorter( this.radixSortSupport = null; } } else { - this.sorter = null; this.sortComparator = null; this.radixSortSupport = null; } - this.memoryAllocationFactor = this.radixSortSupport != null ? 2 : 1; this.array = array; } @@ -174,7 +165,10 @@ public long getMemoryUsage() { } public boolean hasSpaceForAnotherRecord() { - return pos + 1 < (array.size() / memoryAllocationFactor); + // Radix sort requires same amount of used memory as buffer, Tim sort requires + // half of the used memory as buffer, so we always preserve half of the whole + // array as buffer for sorting. + return pos + 1 < (array.size() / 2); } public void expandPointerArray(LongArray newArray) { @@ -186,7 +180,7 @@ public void expandPointerArray(LongArray newArray) { array.getBaseOffset(), newArray.getBaseObject(), newArray.getBaseOffset(), - array.size() * (8 / memoryAllocationFactor)); + pos * 8); consumer.freeArray(array); array = newArray; } @@ -275,13 +269,20 @@ public void loadNext() { public SortedIterator getSortedIterator() { int offset = 0; long start = System.nanoTime(); - if (sorter != null) { + if (sortComparator != null) { if (this.radixSortSupport != null) { // TODO(ekl) we should handle NULL values before radix sort for efficiency, since they // force a full-width sort (and we cannot radix-sort nullable long fields at all). offset = RadixSort.sortKeyPrefixArray( array, pos / 2, 0, 7, radixSortSupport.sortDescending(), radixSortSupport.sortSigned()); } else { + MemoryBlock unused = new MemoryBlock( + array.getBaseObject(), + array.getBaseOffset() + pos * 8, + (array.size() - pos) * 8); + LongArray buffer = new LongArray(unused); + Sorter sorter = + new Sorter<>(new UnsafeSortDataFormat(buffer)); sorter.sort(array, 0, pos / 2, sortComparator); } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java index d19b71fbc1bcb..600d680eec77a 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java @@ -19,7 +19,6 @@ import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; -import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.collection.SortDataFormat; /** @@ -32,9 +31,11 @@ public final class UnsafeSortDataFormat extends SortDataFormat { - public static final UnsafeSortDataFormat INSTANCE = new UnsafeSortDataFormat(); + private final LongArray buffer; - private UnsafeSortDataFormat() { } + public UnsafeSortDataFormat(LongArray buffer) { + this.buffer = buffer; + } @Override public RecordPointerAndKeyPrefix getKey(LongArray data, int pos) { @@ -83,9 +84,8 @@ public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int @Override public LongArray allocate(int length) { - assert (length < Integer.MAX_VALUE / 2) : "Length " + length + " is too large"; - // This is used as temporary buffer, it's fine to allocate from JVM heap. - return new LongArray(MemoryBlock.fromLongArray(new long[length * 2])); + assert (length * 2 <= buffer.size()); + return buffer; } } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index f9dc20d8b751b..a6573b16faefb 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -21,12 +21,15 @@ import java.nio.ByteBuffer; import java.util.*; -import scala.*; +import scala.Option; +import scala.Product2; +import scala.Tuple2; +import scala.Tuple2$; import scala.collection.Iterator; import scala.runtime.AbstractFunction1; -import com.google.common.collect.Iterators; import com.google.common.collect.HashMultiset; +import com.google.common.collect.Iterators; import com.google.common.io.ByteStreams; import org.junit.After; import org.junit.Before; @@ -35,29 +38,33 @@ import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; -import static org.junit.Assert.*; -import static org.mockito.Answers.RETURNS_SMART_NULLS; -import static org.mockito.Mockito.*; -import org.apache.spark.*; +import org.apache.spark.HashPartitioner; +import org.apache.spark.ShuffleDependency; +import org.apache.spark.SparkConf; +import org.apache.spark.TaskContext; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.executor.TaskMetrics; import org.apache.spark.io.CompressionCodec$; import org.apache.spark.io.LZ4CompressionCodec; import org.apache.spark.io.LZFCompressionCodec; import org.apache.spark.io.SnappyCompressionCodec; -import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.memory.TestMemoryManager; import org.apache.spark.network.util.LimitedInputStream; -import org.apache.spark.serializer.*; import org.apache.spark.scheduler.MapStatus; +import org.apache.spark.serializer.*; import org.apache.spark.shuffle.IndexShuffleBlockResolver; import org.apache.spark.storage.*; -import org.apache.spark.memory.TestMemoryManager; -import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.util.Utils; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.*; +import static org.mockito.Answers.RETURNS_SMART_NULLS; +import static org.mockito.Mockito.*; + public class UnsafeShuffleWriterSuite { static final int NUM_PARTITITONS = 4; @@ -395,7 +402,7 @@ public void writeEnoughDataToTriggerSpill() throws Exception { public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOff() throws Exception { conf.set("spark.shuffle.sort.useRadixSort", "false"); writeEnoughRecordsToTriggerSortBufferExpansionAndSpill(); - assertEquals(2, spillFilesCreated.size()); + assertEquals(3, spillFilesCreated.size()); } @Test diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 84b82f5a4742c..fc127f07c8d69 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -589,7 +589,7 @@ public void spillInIterator() throws IOException { @Test public void multipleValuesForSameKey() { BytesToBytesMap map = - new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false); + new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024, false); try { int i; for (i = 0; i < 1024; i++) { diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala index def0752b46f6a..1d26d4a8307cf 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala @@ -93,7 +93,8 @@ class RadixSortSuite extends SparkFunSuite with Logging { } private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator) { - new Sorter(UnsafeSortDataFormat.INSTANCE).sort( + val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt))) + new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort( buf, lo, hi, new Comparator[RecordPointerAndKeyPrefix] { override def compare( r1: RecordPointerAndKeyPrefix, diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 38dbfef76caee..bb823cd07be5e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -73,6 +73,8 @@ public UnsafeKVExternalSorter( PrefixComparator prefixComparator = SortPrefixUtils.getPrefixComparator(keySchema); BaseOrdering ordering = GenerateOrdering.create(keySchema); KVComparator recordComparator = new KVComparator(ordering, keySchema.length()); + boolean canUseRadixSort = keySchema.length() == 1 && + SortPrefixUtils.canSortFullyWithPrefix(keySchema.apply(0)); TaskMemoryManager taskMemoryManager = taskContext.taskMemoryManager(); @@ -86,14 +88,16 @@ public UnsafeKVExternalSorter( prefixComparator, /* initialSize */ 4096, pageSizeBytes, - keySchema.length() == 1 && SortPrefixUtils.canSortFullyWithPrefix(keySchema.apply(0))); + canUseRadixSort); } else { + // The array will be used to do in-place sort, which require half of the space to be empty. + assert(map.numKeys() <= map.getArray().size() / 2); // During spilling, the array in map will not be used, so we can borrow that and use it // as the underline array for in-memory sorter (it's always large enough). // Since we will not grow the array, it's fine to pass `null` as consumer. final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter( null, taskMemoryManager, recordComparator, prefixComparator, map.getArray(), - false /* TODO(ekl) we can only radix sort if the BytesToBytes load factor is <= 0.5 */); + canUseRadixSort); // We cannot use the destructive iterator here because we are reusing the existing memory // pages in BytesToBytesMap to hold records during sorting. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala index 0e1868dd66565..9964b7373fc20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala @@ -36,7 +36,8 @@ import org.apache.spark.util.random.XORShiftRandom class SortBenchmark extends BenchmarkBase { private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator) { - new Sorter(UnsafeSortDataFormat.INSTANCE).sort( + val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt))) + new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort( buf, lo, hi, new Comparator[RecordPointerAndKeyPrefix] { override def compare( r1: RecordPointerAndKeyPrefix, From f651956723cc36465458ee3a802e331dfc108968 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 26 May 2016 12:22:57 -0700 Subject: [PATCH 2/7] address comments --- .../shuffle/sort/ShuffleInMemorySorter.java | 21 ++++++++++++++----- .../shuffle/sort/ShuffleSortDataFormat.java | 3 ++- .../spark/unsafe/map/BytesToBytesMap.java | 1 + .../unsafe/sort/UnsafeInMemorySorter.java | 21 ++++++++++++++----- .../unsafe/sort/UnsafeSortDataFormat.java | 3 ++- 5 files changed, 37 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index 0cd1eaa8bcae2..444ce478d1ed1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -58,6 +58,11 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) { */ private int pos = 0; + /** + * How many records could be inserted, because part of the array should be left for sorting. + */ + private int capacity = 0; + private int initialSize; ShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort) { @@ -66,6 +71,13 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) { this.initialSize = initialSize; this.useRadixSort = useRadixSort; this.array = consumer.allocateArray(initialSize); + this.capacity = calcCapacity(); + } + + private int calcCapacity() { + // Radix sort requires same amount of used memory as buffer, Tim sort requires + // half of the used memory as buffer. + return (int) (array.size() / (useRadixSort ? 2 : 1.5)); } public void free() { @@ -82,7 +94,8 @@ public int numRecords() { public void reset() { if (consumer != null) { consumer.freeArray(array); - this.array = consumer.allocateArray(initialSize); + array = consumer.allocateArray(initialSize); + capacity = calcCapacity(); } pos = 0; } @@ -98,13 +111,11 @@ public void expandPointerArray(LongArray newArray) { ); consumer.freeArray(array); array = newArray; + capacity = calcCapacity(); } public boolean hasSpaceForAnotherRecord() { - // Radix sort requires same amount of used memory as buffer, Tim sort requires - // half of the used memory as buffer, so we always preserve half of the whole - // array as buffer for sorting. - return pos < array.size() / 2; + return pos < capacity; } public long getMemoryUsage() { diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java index 4e4bed5427698..ce38aa54706ba 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java @@ -71,7 +71,8 @@ public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int @Override public LongArray allocate(int length) { - assert (length <= buffer.size()); + assert (length <= buffer.size()) : + "the buffer is smaller than required: " + buffer.size() + " < " + length; return buffer; } } diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 8268689e0f084..dc04025692909 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -221,6 +221,7 @@ public BytesToBytesMap( SparkEnv.get() != null ? SparkEnv.get().blockManager() : null, SparkEnv.get() != null ? SparkEnv.get().serializerManager() : null, initialCapacity, + // In order to re-use the longArray for sorting, the load factor cannot be larger than 0.5. 0.5, pageSizeBytes, enablePerfMetrics); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index a06d08f577137..e92bc91b605de 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -89,6 +89,11 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) { */ private int pos = 0; + /** + * How many records could be inserted, because part of the array should be left for sorting. + */ + private int capacity = 0; + private long initialSize; private long totalSortTimeNanos = 0L; @@ -126,6 +131,13 @@ public UnsafeInMemorySorter( this.radixSortSupport = null; } this.array = array; + this.capacity = calcCapacity(); + } + + private int calcCapacity() { + // Radix sort requires same amount of used memory as buffer, Tim sort requires + // half of the used memory as buffer. + return (int) (array.size() / (radixSortSupport != null ? 2 : 1.5)); } /** @@ -141,7 +153,8 @@ public void free() { public void reset() { if (consumer != null) { consumer.freeArray(array); - this.array = consumer.allocateArray(initialSize); + array = consumer.allocateArray(initialSize); + capacity = calcCapacity(); } pos = 0; } @@ -165,10 +178,7 @@ public long getMemoryUsage() { } public boolean hasSpaceForAnotherRecord() { - // Radix sort requires same amount of used memory as buffer, Tim sort requires - // half of the used memory as buffer, so we always preserve half of the whole - // array as buffer for sorting. - return pos + 1 < (array.size() / 2); + return pos + 1 < capacity; } public void expandPointerArray(LongArray newArray) { @@ -183,6 +193,7 @@ public void expandPointerArray(LongArray newArray) { pos * 8); consumer.freeArray(array); array = newArray; + capacity = calcCapacity(); } /** diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java index 600d680eec77a..354e97b9c4aa6 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java @@ -84,7 +84,8 @@ public void copyRange(LongArray src, int srcPos, LongArray dst, int dstPos, int @Override public LongArray allocate(int length) { - assert (length * 2 <= buffer.size()); + assert (length * 2 <= buffer.size()) : + "the buffer is smaller than required: " + buffer.size() + " < " + (length * 2); return buffer; } From c72cdea99972204899de52ed3ed4cc206a128081 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 26 May 2016 14:10:40 -0700 Subject: [PATCH 3/7] fix tests --- .../org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index a6573b16faefb..7dd61f85abefd 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -402,7 +402,7 @@ public void writeEnoughDataToTriggerSpill() throws Exception { public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOff() throws Exception { conf.set("spark.shuffle.sort.useRadixSort", "false"); writeEnoughRecordsToTriggerSortBufferExpansionAndSpill(); - assertEquals(3, spillFilesCreated.size()); + assertEquals(2, spillFilesCreated.size()); } @Test From 2ad469d20d3eda509fdf8e4245e09f5b5bf4ca46 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 26 May 2016 16:59:15 -0700 Subject: [PATCH 4/7] avoid overflow --- .../apache/spark/shuffle/sort/ShuffleInMemorySorter.java | 6 +++--- .../util/collection/unsafe/sort/UnsafeInMemorySorter.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index 444ce478d1ed1..e8d970ca65277 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -107,7 +107,7 @@ public void expandPointerArray(LongArray newArray) { array.getBaseOffset(), newArray.getBaseObject(), newArray.getBaseOffset(), - pos * 8 + pos * 8L ); consumer.freeArray(array); array = newArray; @@ -179,8 +179,8 @@ public ShuffleSorterIterator getSortedIterator() { } else { MemoryBlock unused = new MemoryBlock( array.getBaseObject(), - array.getBaseOffset() + pos * 8, - (array.size() - pos) * 8); + array.getBaseOffset() + pos * 8L, + (array.size() - pos) * 8L); LongArray buffer = new LongArray(unused); Sorter sorter = new Sorter<>(new ShuffleSortDataFormat(buffer)); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index e92bc91b605de..0c9eb04445165 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -190,7 +190,7 @@ public void expandPointerArray(LongArray newArray) { array.getBaseOffset(), newArray.getBaseObject(), newArray.getBaseOffset(), - pos * 8); + pos * 8L); consumer.freeArray(array); array = newArray; capacity = calcCapacity(); @@ -289,8 +289,8 @@ public SortedIterator getSortedIterator() { } else { MemoryBlock unused = new MemoryBlock( array.getBaseObject(), - array.getBaseOffset() + pos * 8, - (array.size() - pos) * 8); + array.getBaseOffset() + pos * 8L, + (array.size() - pos) * 8L); LongArray buffer = new LongArray(unused); Sorter sorter = new Sorter<>(new UnsafeSortDataFormat(buffer)); From 8b4e0337da43d65bc5b8b80862bc257a001fd16f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 26 May 2016 22:59:24 -0700 Subject: [PATCH 5/7] fix overflow --- .../org/apache/spark/unsafe/memory/MemoryBlock.java | 2 +- .../spark/shuffle/sort/ShuffleInMemorySorter.java | 10 +++++----- .../collection/unsafe/sort/UnsafeInMemorySorter.java | 10 +++++----- .../spark/sql/execution/joins/HashedRelation.scala | 12 ++++++------ 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java index e3e79471154df..1bc924d424c02 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java @@ -51,6 +51,6 @@ public long size() { * Creates a memory block pointing to the memory used by the long array. */ public static MemoryBlock fromLongArray(final long[] array) { - return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8); + return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index e8d970ca65277..a0c7d45dc168a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -61,7 +61,7 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) { /** * How many records could be inserted, because part of the array should be left for sorting. */ - private int capacity = 0; + private int usableCapacity = 0; private int initialSize; @@ -71,7 +71,7 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) { this.initialSize = initialSize; this.useRadixSort = useRadixSort; this.array = consumer.allocateArray(initialSize); - this.capacity = calcCapacity(); + this.usableCapacity = calcCapacity(); } private int calcCapacity() { @@ -95,7 +95,7 @@ public void reset() { if (consumer != null) { consumer.freeArray(array); array = consumer.allocateArray(initialSize); - capacity = calcCapacity(); + usableCapacity = calcCapacity(); } pos = 0; } @@ -111,11 +111,11 @@ public void expandPointerArray(LongArray newArray) { ); consumer.freeArray(array); array = newArray; - capacity = calcCapacity(); + usableCapacity = calcCapacity(); } public boolean hasSpaceForAnotherRecord() { - return pos < capacity; + return pos < usableCapacity; } public long getMemoryUsage() { diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 0c9eb04445165..e587217553177 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -92,7 +92,7 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) { /** * How many records could be inserted, because part of the array should be left for sorting. */ - private int capacity = 0; + private int usableCapacity = 0; private long initialSize; @@ -131,7 +131,7 @@ public UnsafeInMemorySorter( this.radixSortSupport = null; } this.array = array; - this.capacity = calcCapacity(); + this.usableCapacity = calcCapacity(); } private int calcCapacity() { @@ -154,7 +154,7 @@ public void reset() { if (consumer != null) { consumer.freeArray(array); array = consumer.allocateArray(initialSize); - capacity = calcCapacity(); + usableCapacity = calcCapacity(); } pos = 0; } @@ -178,7 +178,7 @@ public long getMemoryUsage() { } public boolean hasSpaceForAnotherRecord() { - return pos + 1 < capacity; + return pos + 1 < usableCapacity; } public void expandPointerArray(LongArray newArray) { @@ -193,7 +193,7 @@ public void expandPointerArray(LongArray newArray) { pos * 8L); consumer.freeArray(array); array = newArray; - capacity = calcCapacity(); + usableCapacity = calcCapacity(); } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index cd6b97a855412..412e8c54ca308 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -540,7 +540,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap Platform.copyMemory(page, Platform.LONG_ARRAY_OFFSET, newPage, Platform.LONG_ARRAY_OFFSET, cursor - Platform.LONG_ARRAY_OFFSET) page = newPage - freeMemory(used * 8) + freeMemory(used * 8L) } // copy the bytes of UnsafeRow @@ -599,7 +599,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap i += 2 } old_array = null // release the reference to old array - freeMemory(n * 8) + freeMemory(n * 8L) } /** @@ -610,7 +610,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap // Convert to dense mode if it does not require more memory or could fit within L1 cache if (range < array.length || range < 1024) { try { - ensureAcquireMemory((range + 1) * 8) + ensureAcquireMemory((range + 1) * 8L) } catch { case e: SparkException => // there is no enough memory to convert @@ -628,7 +628,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap val old_length = array.length array = denseArray isDense = true - freeMemory(old_length * 8) + freeMemory(old_length * 8L) } } @@ -637,11 +637,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap */ def free(): Unit = { if (page != null) { - freeMemory(page.length * 8) + freeMemory(page.length * 8L) page = null } if (array != null) { - freeMemory(array.length * 8) + freeMemory(array.length * 8L) array = null } } From 2ad8eb8f9c21c1309ab18cc4e81897bdf54afa86 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 31 May 2016 12:00:24 -0700 Subject: [PATCH 6/7] fix new test --- .../apache/spark/util/collection/ExternalSorterSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 699f7fa1f2727..6bcc601e13ecc 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -106,8 +106,10 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { // that can trigger copyRange() in TimSort.mergeLo() or TimSort.mergeHi() val ref = Array.tabulate[Long](size) { i => if (i < size / 2) size / 2 + i else i } val buf = new LongArray(MemoryBlock.fromLongArray(ref)) + val tmp = new Array[Long](size/2) + val tmpBuf = new LongArray(MemoryBlock.fromLongArray(tmp)) - new Sorter(UnsafeSortDataFormat.INSTANCE).sort( + new Sorter(new UnsafeSortDataFormat(tmpBuf)).sort( buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] { override def compare( r1: RecordPointerAndKeyPrefix, From a929a0600b5cd78917989a5559275a4a4e9f1812 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 3 Jun 2016 11:29:44 -0700 Subject: [PATCH 7/7] address comments --- .../spark/shuffle/sort/ShuffleInMemorySorter.java | 11 +++++++---- .../collection/unsafe/sort/UnsafeInMemorySorter.java | 11 +++++++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java index a0c7d45dc168a..dc36809d8911f 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java @@ -44,6 +44,9 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) { * An array of record pointers and partition ids that have been encoded by * {@link PackedRecordPointer}. The sort operates on this array instead of directly manipulating * records. + * + * Only part of the array will be used to store the pointers, the rest part is preserved as + * temporary buffer for sorting. */ private LongArray array; @@ -71,10 +74,10 @@ public int compare(PackedRecordPointer left, PackedRecordPointer right) { this.initialSize = initialSize; this.useRadixSort = useRadixSort; this.array = consumer.allocateArray(initialSize); - this.usableCapacity = calcCapacity(); + this.usableCapacity = getUsableCapacity(); } - private int calcCapacity() { + private int getUsableCapacity() { // Radix sort requires same amount of used memory as buffer, Tim sort requires // half of the used memory as buffer. return (int) (array.size() / (useRadixSort ? 2 : 1.5)); @@ -95,7 +98,7 @@ public void reset() { if (consumer != null) { consumer.freeArray(array); array = consumer.allocateArray(initialSize); - usableCapacity = calcCapacity(); + usableCapacity = getUsableCapacity(); } pos = 0; } @@ -111,7 +114,7 @@ public void expandPointerArray(LongArray newArray) { ); consumer.freeArray(array); array = newArray; - usableCapacity = calcCapacity(); + usableCapacity = getUsableCapacity(); } public boolean hasSpaceForAnotherRecord() { diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index e587217553177..c7b070f519f88 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -81,6 +81,9 @@ public int compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) { /** * Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix. + * + * Only part of the array will be used to store the pointers, the rest part is preserved as + * temporary buffer for sorting. */ private LongArray array; @@ -131,10 +134,10 @@ public UnsafeInMemorySorter( this.radixSortSupport = null; } this.array = array; - this.usableCapacity = calcCapacity(); + this.usableCapacity = getUsableCapacity(); } - private int calcCapacity() { + private int getUsableCapacity() { // Radix sort requires same amount of used memory as buffer, Tim sort requires // half of the used memory as buffer. return (int) (array.size() / (radixSortSupport != null ? 2 : 1.5)); @@ -154,7 +157,7 @@ public void reset() { if (consumer != null) { consumer.freeArray(array); array = consumer.allocateArray(initialSize); - usableCapacity = calcCapacity(); + usableCapacity = getUsableCapacity(); } pos = 0; } @@ -193,7 +196,7 @@ public void expandPointerArray(LongArray newArray) { pos * 8L); consumer.freeArray(array); array = newArray; - usableCapacity = calcCapacity(); + usableCapacity = getUsableCapacity(); } /**